aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Tolstopyatov <qwwdfsad@gmail.com>2021-04-13 16:47:55 +0300
committerGitHub <noreply@github.com>2021-04-13 16:47:55 +0300
commit3c83c0cfea619f68f1eb323773d1f057f294023f (patch)
tree992f7071599c7cbfaaea31969ddfe0dc94ebdae9
parent7b1f3b379f1863521a26a7d3086b943d7fe3b58e (diff)
downloadplatform_external_kotlinx.coroutines-3c83c0cfea619f68f1eb323773d1f057f294023f.tar.gz
platform_external_kotlinx.coroutines-3c83c0cfea619f68f1eb323773d1f057f294023f.tar.bz2
platform_external_kotlinx.coroutines-3c83c0cfea619f68f1eb323773d1f057f294023f.zip
Deprecate SendChannel.offer and ReceiveChannel.poll, replace their usages along the codebase (#2644)
* Deprecate SendChannel.offer and replace its usages along the codebase * Deprecate ReceiveChannel.poll and replace its usages along the codebase Co-authored-by: Roman Elizarov <elizarov@gmail.com> Addresses #974
-rw-r--r--kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt1
-rw-r--r--kotlinx-coroutines-core/common/src/channels/Channel.kt191
-rw-r--r--kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt2
-rw-r--r--kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt4
-rw-r--r--kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt2
-rw-r--r--kotlinx-coroutines-core/common/src/flow/SharedFlow.kt2
-rw-r--r--kotlinx-coroutines-core/common/src/flow/StateFlow.kt2
-rw-r--r--kotlinx-coroutines-core/common/src/flow/internal/Combine.kt2
-rw-r--r--kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt20
-rw-r--r--kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt45
-rw-r--r--kotlinx-coroutines-core/common/test/channels/ChannelBufferOverflowTest.kt26
-rw-r--r--kotlinx-coroutines-core/common/test/channels/ConflatedBroadcastChannelTest.kt2
-rw-r--r--kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt14
-rw-r--r--kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt4
-rw-r--r--kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt12
-rw-r--r--kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt12
-rw-r--r--kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt22
-rw-r--r--kotlinx-coroutines-core/common/test/flow/sharing/ShareInFusionTest.kt4
-rw-r--r--kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt6
-rw-r--r--kotlinx-coroutines-core/jvm/src/channels/Channels.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt8
-rw-r--r--kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testCancelledOffer.txt10
-rw-r--r--kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt30
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt6
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt12
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt8
-rw-r--r--kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt19
-rw-r--r--kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt6
-rw-r--r--kotlinx-coroutines-core/jvm/test/lincheck/ChannelsLincheckTest.kt30
-rw-r--r--reactive/kotlinx-coroutines-jdk9/test/PublisherAsFlowTest.kt2
-rw-r--r--reactive/kotlinx-coroutines-reactive/src/Channel.kt2
-rw-r--r--reactive/kotlinx-coroutines-reactive/src/Publish.kt2
-rw-r--r--reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt2
-rw-r--r--reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt2
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxChannel.kt4
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxObservable.kt2
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt2
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxChannel.kt4
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxObservable.kt2
-rw-r--r--reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt2
-rw-r--r--ui/coroutines-guide-ui.md18
-rw-r--r--ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt15
-rw-r--r--ui/kotlinx-coroutines-javafx/test/JavaFxObservableAsFlowTest.kt16
-rw-r--r--ui/kotlinx-coroutines-javafx/test/guide/example-ui-actor-02.kt2
-rw-r--r--ui/kotlinx-coroutines-javafx/test/guide/example-ui-actor-03.kt4
-rw-r--r--ui/kotlinx-coroutines-javafx/test/guide/example-ui-blocking-01.kt2
-rw-r--r--ui/kotlinx-coroutines-javafx/test/guide/example-ui-blocking-02.kt2
-rw-r--r--ui/kotlinx-coroutines-javafx/test/guide/example-ui-blocking-03.kt2
52 files changed, 289 insertions, 310 deletions
diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
index 52e47227..bcf19215 100644
--- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
@@ -137,6 +137,7 @@ internal abstract class AbstractSendChannel<E>(
}
override fun offer(element: E): Boolean {
+ // Temporary migration for offer users who rely on onUndeliveredElement
try {
return super.offer(element)
} catch (e: Throwable) {
diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt
index 7b6cc0ad..f006efc7 100644
--- a/kotlinx-coroutines-core/common/src/channels/Channel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt
@@ -24,7 +24,7 @@ import kotlin.jvm.*
public interface SendChannel<in E> {
/**
* Returns `true` if this channel was closed by an invocation of [close]. This means that
- * calling [send] or [offer] will result in an exception.
+ * calling [send] will result in an exception.
*
* **Note: This is an experimental api.** This property may change its semantics and/or name in the future.
*/
@@ -51,7 +51,7 @@ public interface SendChannel<in E> {
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*
* This function can be used in [select] invocations with the [onSend] clause.
- * Use [offer] to try sending to this channel without waiting.
+ * Use [trySend] to try sending to this channel without waiting.
*/
public suspend fun send(element: E)
@@ -64,23 +64,6 @@ public interface SendChannel<in E> {
*/
public val onSend: SelectClause2<E, SendChannel<E>>
- /**
- * Immediately adds the specified [element] to this channel, if this doesn't violate its capacity restrictions,
- * and returns `true`. Otherwise, just returns `false`. This is a synchronous variant of [send] which backs off
- * in situations when `send` suspends.
- *
- * Throws an exception if the channel [is closed for `send`][isClosedForSend] (see [close] for details).
- *
- * When `offer` call returns `false` it guarantees that the element was not delivered to the consumer and it
- * it does not call `onUndeliveredElement` that was installed for this channel. If the channel was closed,
- * then it calls `onUndeliveredElement` before throwing an exception.
- * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
- */
- public fun offer(element: E): Boolean {
- val result = trySend(element)
- if (result.isSuccess) return true
- throw recoverStackTrace(result.exceptionOrNull() ?: return false)
- }
/**
* Immediately adds the specified [element] to this channel, if this doesn't violate its capacity restrictions,
@@ -103,7 +86,7 @@ public interface SendChannel<in E> {
* on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements
* are received.
*
- * A channel that was closed without a [cause] throws a [ClosedSendChannelException] on attempts to [send] or [offer]
+ * A channel that was closed without a [cause] throws a [ClosedSendChannelException] on attempts to [send]
* and [ClosedReceiveChannelException] on attempts to [receive][ReceiveChannel.receive].
* A channel that was closed with non-null [cause] is called a _failed_ channel. Attempts to send or
* receive on a failed channel throw the specified [cause] exception.
@@ -122,10 +105,11 @@ public interface SendChannel<in E> {
* * the cause of `close` or `cancel` otherwise.
*
* Example of usage (exception handling is omitted):
+ *
* ```
* val events = Channel(UNLIMITED)
* callbackBasedApi.registerCallback { event ->
- * events.offer(event)
+ * events.trySend(event)
* }
*
* val uiUpdater = launch(Dispatchers.Main, parent = UILifecycle) {
@@ -134,7 +118,6 @@ public interface SendChannel<in E> {
* }
*
* events.invokeOnClose { callbackBasedApi.stop() }
- *
* ```
*
* **Note: This is an experimental api.** This function may change its semantics, parameters or return type in the future.
@@ -146,6 +129,33 @@ public interface SendChannel<in E> {
*/
@ExperimentalCoroutinesApi
public fun invokeOnClose(handler: (cause: Throwable?) -> Unit)
+
+ /**
+ * **Deprecated** offer method.
+ *
+ * This method was deprecated in the favour of [trySend].
+ * It has proven itself as the most error-prone method in Channel API:
+ *
+ * * `Boolean` return type creates the false sense of security, implying that `false`
+ * is returned instead of throwing an exception.
+ * * It was used mostly from non-suspending APIs where CancellationException triggered
+ * internal failures in the application (the most common source of bugs).
+ * * Due to signature and explicit `if (ch.offer(...))` checks it was easy to
+ * oversee such error during code review.
+ * * Its name was not aligned with the rest of the API and tried to mimic Java's queue instead.
+ *
+ * See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context.
+ */
+ @Deprecated(
+ level = DeprecationLevel.WARNING,
+ message = "Deprecated in the favour of 'trySend' method",
+ replaceWith = ReplaceWith("trySend(element).isSuccess")
+ ) // Since 1.5.0
+ public fun offer(element: E): Boolean {
+ val result = trySend(element)
+ if (result.isSuccess) return true
+ throw recoverStackTrace(result.exceptionOrNull() ?: return false)
+ }
}
/**
@@ -188,7 +198,7 @@ public interface ReceiveChannel<out E> {
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*
* This function can be used in [select] invocations with the [onReceive] clause.
- * Use [poll] to try receiving from this channel without waiting.
+ * Use [tryReceive] to try receiving from this channel without waiting.
*/
public suspend fun receive(): E
@@ -201,51 +211,6 @@ public interface ReceiveChannel<out E> {
public val onReceive: SelectClause1<E>
/**
- * This function was deprecated since 1.3.0 and is no longer recommended to use
- * or to implement in subclasses.
- *
- * It had the following pitfalls:
- * - Didn't allow to distinguish 'null' as "closed channel" from "null as a value"
- * - Was throwing if the channel has failed even though its signature may suggest it returns 'null'
- * - It didn't really belong to core channel API and can be exposed as an extension instead.
- *
- * @suppress doc
- */
- @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
- @LowPriorityInOverloadResolution
- @Deprecated(
- message = "Deprecated in favor of receiveCatching",
- level = DeprecationLevel.ERROR,
- replaceWith = ReplaceWith("receiveCatching().getOrNull()")
- ) // Warning since 1.3.0, error in 1.5.0, will be hidden in 1.6.0
- public suspend fun receiveOrNull(): E? = receiveCatching().getOrNull()
-
- /**
- * This function was deprecated since 1.3.0 and is no longer recommended to use
- * or to implement in subclasses.
- * See [receiveOrNull] documentation.
- *
- * @suppress **Deprecated**: in favor of onReceiveCatching extension.
- */
- @Deprecated(
- message = "Deprecated in favor of onReceiveCatching extension",
- level = DeprecationLevel.ERROR,
- replaceWith = ReplaceWith("onReceiveCatching")
- ) // Warning since 1.3.0, error in 1.5.0, will be hidden or removed in 1.6.0
- public val onReceiveOrNull: SelectClause1<E?>
- get() {
- return object : SelectClause1<E?> {
- @InternalCoroutinesApi
- override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E?) -> R) {
- onReceiveCatching.registerSelectClause1(select) {
- it.exceptionOrNull()?.let { throw it }
- block(it.getOrNull())
- }
- }
- }
- }
-
- /**
* Retrieves and removes an element from this channel if it's not empty, or suspends the caller while this channel is empty.
* This method returns [ChannelResult] with the value of an element successfully retrieved from the channel
* or the close cause if the channel was closed. Closed cause may be `null` if the channel was closed normally.
@@ -262,7 +227,7 @@ public interface ReceiveChannel<out E> {
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*
* This function can be used in [select] invocations with the [onReceiveCatching] clause.
- * Use [poll] to try receiving from this channel without waiting.
+ * Use [tryReceive] to try receiving from this channel without waiting.
*/
public suspend fun receiveCatching(): ChannelResult<E>
@@ -274,17 +239,6 @@ public interface ReceiveChannel<out E> {
public val onReceiveCatching: SelectClause1<ChannelResult<E>>
/**
- * Retrieves and removes an element from this channel if it's not empty or returns `null` if the channel is empty
- * or is [is closed for `receive`][isClosedForReceive] without a cause.
- * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
- */
- public fun poll(): E? {
- val result = tryReceive()
- if (result.isSuccess) return result.getOrThrow()
- throw recoverStackTrace(result.exceptionOrNull() ?: return null)
- }
-
- /**
* Retrieves and removes an element from this channel if it's not empty, returning a [successful][ChannelResult.success]
* result, returns [failed][ChannelResult.failed] result if the channel is empty, and [closed][ChannelResult.closed]
* result if the channel is closed.
@@ -325,6 +279,75 @@ public interface ReceiveChannel<out E> {
*/
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
public fun cancel(cause: Throwable? = null): Boolean
+
+ /**
+ * **Deprecated** poll method.
+ *
+ * This method was deprecated in the favour of [tryReceive].
+ * It has proven itself as error-prone method in Channel API:
+ *
+ * * Nullable return type creates the false sense of security, implying that `null`
+ * is returned instead of throwing an exception.
+ * * It was used mostly from non-suspending APIs where CancellationException triggered
+ * internal failures in the application (the most common source of bugs).
+ * * Its name was not aligned with the rest of the API and tried to mimic Java's queue instead.
+ *
+ * See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context.
+ */
+ @Deprecated(level = DeprecationLevel.WARNING,
+ message = "Deprecated in the favour of 'tryReceive'",
+ replaceWith = ReplaceWith("tryReceive().getOrNull()")
+ ) // Since 1.5.0
+ public fun poll(): E? {
+ val result = tryReceive()
+ if (result.isSuccess) return result.getOrThrow()
+ throw recoverStackTrace(result.exceptionOrNull() ?: return null)
+ }
+
+ /**
+ * This function was deprecated since 1.3.0 and is no longer recommended to use
+ * or to implement in subclasses.
+ *
+ * It had the following pitfalls:
+ * - Didn't allow to distinguish 'null' as "closed channel" from "null as a value"
+ * - Was throwing if the channel has failed even though its signature may suggest it returns 'null'
+ * - It didn't really belong to core channel API and can be exposed as an extension instead.
+ *
+ * @suppress doc
+ */
+ @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
+ @LowPriorityInOverloadResolution
+ @Deprecated(
+ message = "Deprecated in favor of receiveCatching",
+ level = DeprecationLevel.ERROR,
+ replaceWith = ReplaceWith("receiveCatching().getOrNull()")
+ ) // Warning since 1.3.0, error in 1.5.0, will be hidden in 1.6.0
+ public suspend fun receiveOrNull(): E? = receiveCatching().getOrNull()
+
+ /**
+ * This function was deprecated since 1.3.0 and is no longer recommended to use
+ * or to implement in subclasses.
+ * See [receiveOrNull] documentation.
+ *
+ * @suppress **Deprecated**: in favor of onReceiveCatching extension.
+ */
+ @Deprecated(
+ message = "Deprecated in favor of onReceiveCatching extension",
+ level = DeprecationLevel.ERROR,
+ replaceWith = ReplaceWith("onReceiveCatching")
+ ) // Warning since 1.3.0, error in 1.5.0, will be hidden or removed in 1.6.0
+ public val onReceiveOrNull: SelectClause1<E?>
+ get() {
+ return object : SelectClause1<E?> {
+ @InternalCoroutinesApi
+ override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E?) -> R) {
+ onReceiveCatching.registerSelectClause1(select) {
+ it.exceptionOrNull()?.let { throw it }
+ block(it.getOrNull())
+ }
+ }
+ }
+ }
}
/**
@@ -544,14 +567,14 @@ public interface ChannelIterator<out E> {
*
* * When `capacity` is [Channel.UNLIMITED] &mdash; it creates a channel with effectively unlimited buffer.
* This channel has a linked-list buffer of unlimited capacity (limited only by available memory).
- * [Sending][send] to this channel never suspends, and [offer] always returns `true`.
+ * [Sending][send] to this channel never suspends, and [trySend] always succeeds.
*
* * When `capacity` is [Channel.CONFLATED] &mdash; it creates a _conflated_ channel
- * This channel buffers at most one element and conflates all subsequent `send` and `offer` invocations,
+ * This channel buffers at most one element and conflates all subsequent `send` and `trySend` invocations,
* so that the receiver always gets the last element sent.
* Back-to-back sent elements are conflated &mdash; only the last sent element is received,
* while previously sent elements **are lost**.
- * [Sending][send] to this channel never suspends, and [offer] always returns `true`.
+ * [Sending][send] to this channel never suspends, and [trySend] always succeeds.
*
* * When `capacity` is positive but less than [UNLIMITED] &mdash; it creates an array-based channel with the specified capacity.
* This channel has an array buffer of a fixed `capacity`.
@@ -598,8 +621,6 @@ public interface ChannelIterator<out E> {
*
* * When [send][SendChannel.send] operation throws an exception because it was cancelled before it had a chance to actually
* send the element or because the channel was [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel].
- * * When [offer][SendChannel.offer] operation throws an exception when
- * the channel was [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel].
* * When [receive][ReceiveChannel.receive], [receiveOrNull][ReceiveChannel.receiveOrNull], or [hasNext][ChannelIterator.hasNext]
* operation throws an exception when it had retrieved the element from the
* channel but was cancelled before the code following the receive call resumed.
diff --git a/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt b/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt
index c84afb2f..8283bcb1 100644
--- a/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt
@@ -17,7 +17,7 @@ import kotlin.jvm.*
* Back-to-send sent elements are _conflated_ -- only the the most recently sent value is received,
* while previously sent elements **are lost**.
* Every subscriber immediately receives the most recently sent element.
- * Sender to this broadcast channel never suspends and [offer] always returns `true`.
+ * Sender to this broadcast channel never suspends and [trySend] always succeeds.
*
* A secondary constructor can be used to create an instance of this class that already holds a value.
* This channel is also created by `BroadcastChannel(Channel.CONFLATED)` factory function invocation.
diff --git a/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt b/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt
index 0e686447..e1e2b140 100644
--- a/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt
@@ -9,11 +9,11 @@ import kotlinx.coroutines.internal.*
import kotlinx.coroutines.selects.*
/**
- * Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations,
+ * Channel that buffers at most one element and conflates all subsequent `send` and `trySend` invocations,
* so that the receiver always gets the most recently sent element.
* Back-to-send sent elements are _conflated_ -- only the most recently sent element is received,
* while previously sent elements **are lost**.
- * Sender to this channel never suspends and [offer] always returns `true`.
+ * Sender to this channel never suspends and [trySend] always succeeds.
*
* This channel is created by `Channel(Channel.CONFLATED)` factory function invocation.
*/
diff --git a/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt b/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt
index 83175273..b5f607b2 100644
--- a/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt
@@ -9,7 +9,7 @@ import kotlinx.coroutines.selects.*
/**
* Channel with linked-list buffer of a unlimited capacity (limited only by available memory).
- * Sender to this channel never suspends and [offer] always returns `true`.
+ * Sender to this channel never suspends and [trySend] always succeeds.
*
* This channel is created by `Channel(Channel.UNLIMITED)` factory function invocation.
*
diff --git a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
index ac9e66b0..490b221b 100644
--- a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
+++ b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
@@ -92,7 +92,7 @@ import kotlin.native.concurrent.*
*
* To migrate [BroadcastChannel] usage to [SharedFlow], start by replacing usages of the `BroadcastChannel(capacity)`
* constructor with `MutableSharedFlow(0, extraBufferCapacity=capacity)` (broadcast channel does not replay
- * values to new subscribers). Replace [send][BroadcastChannel.send] and [offer][BroadcastChannel.offer] calls
+ * values to new subscribers). Replace [send][BroadcastChannel.send] and [trySend][BroadcastChannel.trySend] calls
* with [emit][MutableStateFlow.emit] and [tryEmit][MutableStateFlow.tryEmit], and convert subscribers' code to flow operators.
*
* ### Concurrency
diff --git a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt
index fc8aa02f..74d33140 100644
--- a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt
+++ b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt
@@ -107,7 +107,7 @@ import kotlin.native.concurrent.*
*
* To migrate [ConflatedBroadcastChannel] usage to [StateFlow], start by replacing usages of the `ConflatedBroadcastChannel()`
* constructor with `MutableStateFlow(initialValue)`, using `null` as an initial value if you don't have one.
- * Replace [send][ConflatedBroadcastChannel.send] and [offer][ConflatedBroadcastChannel.offer] calls
+ * Replace [send][ConflatedBroadcastChannel.send] and [trySend][ConflatedBroadcastChannel.trySend] calls
* with updates to the state flow's [MutableStateFlow.value], and convert subscribers' code to flow operators.
* You can use the [filterNotNull] operator to mimic behavior of a `ConflatedBroadcastChannel` without initial value.
*
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
index 6e5f3f11..c924c090 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
@@ -65,7 +65,7 @@ internal suspend fun <R, T> FlowCollector<R>.combineInternal(
// Received the second value from the same flow in the same epoch -- bail out
if (lastReceivedEpoch[index] == currentEpoch) break
lastReceivedEpoch[index] = currentEpoch
- element = resultChannel.poll() ?: break
+ element = resultChannel.tryReceive().getOrNull() ?: break
}
// Process batch result if there is enough data
diff --git a/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt
index 3900c2db..632fd292 100644
--- a/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt
@@ -86,31 +86,31 @@ class ArrayChannelTest : TestBase() {
}
@Test
- fun testOfferAndPoll() = runTest {
+ fun testTryOp() = runTest {
val q = Channel<Int>(1)
- assertTrue(q.offer(1))
+ assertTrue(q.trySend(1).isSuccess)
expect(1)
launch {
expect(3)
- assertEquals(1, q.poll())
+ assertEquals(1, q.tryReceive().getOrNull())
expect(4)
- assertNull(q.poll())
+ assertNull(q.tryReceive().getOrNull())
expect(5)
assertEquals(2, q.receive()) // suspends
expect(9)
- assertEquals(3, q.poll())
+ assertEquals(3, q.tryReceive().getOrNull())
expect(10)
- assertNull(q.poll())
+ assertNull(q.tryReceive().getOrNull())
expect(11)
}
expect(2)
yield()
expect(6)
- assertTrue(q.offer(2))
+ assertTrue(q.trySend(2).isSuccess)
expect(7)
- assertTrue(q.offer(3))
+ assertTrue(q.trySend(3).isSuccess)
expect(8)
- assertFalse(q.offer(4))
+ assertFalse(q.trySend(4).isSuccess)
yield()
finish(12)
}
@@ -157,7 +157,7 @@ class ArrayChannelTest : TestBase() {
val capacity = 42
val channel = Channel<Int>(capacity)
repeat(4) {
- channel.offer(-1)
+ channel.trySend(-1)
}
repeat(4) {
channel.receiveCatching().getOrNull()
diff --git a/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt b/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt
index a64284ae..8962acc3 100644
--- a/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt
@@ -15,8 +15,8 @@ class BasicOperationsTest : TestBase() {
}
@Test
- fun testOfferAfterClose() = runTest {
- TestChannelKind.values().forEach { kind -> testOffer(kind) }
+ fun testTrySendAfterClose() = runTest {
+ TestChannelKind.values().forEach { kind -> testTrySend(kind) }
}
@Test
@@ -39,7 +39,7 @@ class BasicOperationsTest : TestBase() {
}
}
expect(1)
- channel.offer(42)
+ channel.trySend(42)
expect(2)
channel.close(AssertionError())
finish(4)
@@ -80,28 +80,6 @@ class BasicOperationsTest : TestBase() {
}
}
- private suspend fun testReceiveCatchingException(kind: TestChannelKind) = coroutineScope {
- val channel = kind.create<Int>()
- val d = async(NonCancellable) {
- channel.receive()
- }
-
- yield()
- channel.close(TestException())
- assertTrue(channel.isClosedForReceive)
-
- assertFailsWith<TestException> { channel.poll() }
- try {
- channel.receiveCatching().getOrThrow()
- expectUnreached()
- } catch (e: TestException) {
- // Expected
- }
-
- d.join()
- assertTrue(d.getCancellationException().cause is TestException)
- }
-
@Suppress("ReplaceAssertBooleanWithAssertEquality")
private suspend fun testReceiveCatching(kind: TestChannelKind) = coroutineScope {
reset()
@@ -131,22 +109,21 @@ class BasicOperationsTest : TestBase() {
finish(6)
}
- private suspend fun testOffer(kind: TestChannelKind) = coroutineScope {
+ private suspend fun testTrySend(kind: TestChannelKind) = coroutineScope {
val channel = kind.create<Int>()
val d = async { channel.send(42) }
yield()
channel.close()
assertTrue(channel.isClosedForSend)
- try {
- channel.offer(2)
- fail()
- } catch (e: ClosedSendChannelException) {
- if (!kind.isConflated) {
- assertEquals(42, channel.receive())
+ channel.trySend(2)
+ .onSuccess { expectUnreached() }
+ .onFailure {
+ assertTrue { it is ClosedSendChannelException}
+ if (!kind.isConflated) {
+ assertEquals(42, channel.receive())
+ }
}
- }
-
d.await()
}
diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelBufferOverflowTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelBufferOverflowTest.kt
index 41f60479..0b9a0fdb 100644
--- a/kotlinx-coroutines-core/common/test/channels/ChannelBufferOverflowTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/ChannelBufferOverflowTest.kt
@@ -11,30 +11,30 @@ class ChannelBufferOverflowTest : TestBase() {
@Test
fun testDropLatest() = runTest {
val c = Channel<Int>(2, BufferOverflow.DROP_LATEST)
- assertTrue(c.offer(1))
- assertTrue(c.offer(2))
- assertTrue(c.offer(3)) // overflows, dropped
+ assertTrue(c.trySend(1).isSuccess)
+ assertTrue(c.trySend(2).isSuccess)
+ assertTrue(c.trySend(3).isSuccess) // overflows, dropped
c.send(4) // overflows dropped
assertEquals(1, c.receive())
- assertTrue(c.offer(5))
- assertTrue(c.offer(6)) // overflows, dropped
+ assertTrue(c.trySend(5).isSuccess)
+ assertTrue(c.trySend(6).isSuccess) // overflows, dropped
assertEquals(2, c.receive())
assertEquals(5, c.receive())
- assertEquals(null, c.poll())
+ assertEquals(null, c.tryReceive().getOrNull())
}
@Test
fun testDropOldest() = runTest {
val c = Channel<Int>(2, BufferOverflow.DROP_OLDEST)
- assertTrue(c.offer(1))
- assertTrue(c.offer(2))
- assertTrue(c.offer(3)) // overflows, keeps 2, 3
+ assertTrue(c.trySend(1).isSuccess)
+ assertTrue(c.trySend(2).isSuccess)
+ assertTrue(c.trySend(3).isSuccess) // overflows, keeps 2, 3
c.send(4) // overflows, keeps 3, 4
assertEquals(3, c.receive())
- assertTrue(c.offer(5))
- assertTrue(c.offer(6)) // overflows, keeps 5, 6
+ assertTrue(c.trySend(5).isSuccess)
+ assertTrue(c.trySend(6).isSuccess) // overflows, keeps 5, 6
assertEquals(5, c.receive())
assertEquals(6, c.receive())
- assertEquals(null, c.poll())
+ assertEquals(null, c.tryReceive().getOrNull())
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/common/test/channels/ConflatedBroadcastChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/ConflatedBroadcastChannelTest.kt
index 856a66fb..a8c2a29c 100644
--- a/kotlinx-coroutines-core/common/test/channels/ConflatedBroadcastChannelTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/ConflatedBroadcastChannelTest.kt
@@ -42,7 +42,7 @@ class ConflatedBroadcastChannelTest : TestBase() {
launch(start = CoroutineStart.UNDISPATCHED) {
expect(2)
val sub = broadcast.openSubscription()
- assertNull(sub.poll())
+ assertNull(sub.tryReceive().getOrNull())
expect(3)
assertEquals("one", sub.receive()) // suspends
expect(6)
diff --git a/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt
index 87194f72..370fd5b9 100644
--- a/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt
@@ -12,14 +12,14 @@ open class ConflatedChannelTest : TestBase() {
Channel<T>(Channel.CONFLATED)
@Test
- fun testBasicConflationOfferPoll() {
+ fun testBasicConflationOfferTryReceive() {
val q = createConflatedChannel<Int>()
- assertNull(q.poll())
- assertTrue(q.offer(1))
- assertTrue(q.offer(2))
- assertTrue(q.offer(3))
- assertEquals(3, q.poll())
- assertNull(q.poll())
+ assertNull(q.tryReceive().getOrNull())
+ assertTrue(q.trySend(1).isSuccess)
+ assertTrue(q.trySend(2).isSuccess)
+ assertTrue(q.trySend(3).isSuccess)
+ assertEquals(3, q.tryReceive().getOrNull())
+ assertNull(q.tryReceive().getOrNull())
}
@Test
diff --git a/kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt
index cdec8a77..501affb4 100644
--- a/kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt
@@ -12,12 +12,12 @@ class LinkedListChannelTest : TestBase() {
fun testBasic() = runTest {
val c = Channel<Int>(Channel.UNLIMITED)
c.send(1)
- check(c.offer(2))
+ assertTrue(c.trySend(2).isSuccess)
c.send(3)
check(c.close())
check(!c.close())
assertEquals(1, c.receive())
- assertEquals(2, c.poll())
+ assertEquals(2, c.tryReceive().getOrNull())
assertEquals(3, c.receiveCatching().getOrNull())
assertNull(c.receiveCatching().getOrNull())
}
diff --git a/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt
index ab0292a8..c83813e4 100644
--- a/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt
@@ -80,26 +80,26 @@ class RendezvousChannelTest : TestBase() {
}
@Test
- fun testOfferAndPool() = runTest {
+ fun testTrySendTryReceive() = runTest {
val q = Channel<Int>(Channel.RENDEZVOUS)
- assertFalse(q.offer(1))
+ assertFalse(q.trySend(1).isSuccess)
expect(1)
launch {
expect(3)
- assertNull(q.poll())
+ assertNull(q.tryReceive().getOrNull())
expect(4)
assertEquals(2, q.receive())
expect(7)
- assertNull(q.poll())
+ assertNull(q.tryReceive().getOrNull())
yield()
expect(9)
- assertEquals(3, q.poll())
+ assertEquals(3, q.tryReceive().getOrNull())
expect(10)
}
expect(2)
yield()
expect(5)
- assertTrue(q.offer(2))
+ assertTrue(q.trySend(2).isSuccess)
expect(6)
yield()
expect(8)
diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt
index f93d0399..4763d13b 100644
--- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt
@@ -166,15 +166,15 @@ class ChannelBuildersFlowTest : TestBase() {
var expected = 0
launch {
- assertTrue(channel.offer(1)) // Handed to the coroutine
- assertTrue(channel.offer(2)) // Buffered
- assertFalse(channel.offer(3)) // Failed to offer
+ assertTrue(channel.trySend(1).isSuccess) // Handed to the coroutine
+ assertTrue(channel.trySend(2).isSuccess) // Buffered
+ assertFalse(channel.trySend(3).isSuccess) // Failed to offer
channel.send(3)
yield()
assertEquals(1, expected)
- assertTrue(channel.offer(4)) // Handed to the coroutine
- assertTrue(channel.offer(5)) // Buffered
- assertFalse(channel.offer(6)) // Failed to offer
+ assertTrue(channel.trySend(4).isSuccess) // Handed to the coroutine
+ assertTrue(channel.trySend(5).isSuccess) // Buffered
+ assertFalse(channel.trySend(6).isSuccess) // Failed to offer
channel.send(6)
assertEquals(2, expected)
}
diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt
index 31a929b2..f197a214 100644
--- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt
@@ -12,9 +12,9 @@ class ChannelFlowTest : TestBase() {
@Test
fun testRegular() = runTest {
val flow = channelFlow {
- assertTrue(offer(1))
- assertTrue(offer(2))
- assertTrue(offer(3))
+ assertTrue(trySend(1).isSuccess)
+ assertTrue(trySend(2).isSuccess)
+ assertTrue(trySend(3).isSuccess)
}
assertEquals(listOf(1, 2, 3), flow.toList())
}
@@ -22,9 +22,9 @@ class ChannelFlowTest : TestBase() {
@Test
fun testBuffer() = runTest {
val flow = channelFlow {
- assertTrue(offer(1))
- assertTrue(offer(2))
- assertFalse(offer(3))
+ assertTrue(trySend(1).isSuccess)
+ assertTrue(trySend(2).isSuccess)
+ assertFalse(trySend(3).isSuccess)
}.buffer(1)
assertEquals(listOf(1, 2), flow.toList())
}
@@ -32,10 +32,10 @@ class ChannelFlowTest : TestBase() {
@Test
fun testConflated() = runTest {
val flow = channelFlow {
- assertTrue(offer(1))
- assertTrue(offer(2))
- assertTrue(offer(3))
- assertTrue(offer(4))
+ assertTrue(trySend(1).isSuccess)
+ assertTrue(trySend(2).isSuccess)
+ assertTrue(trySend(3).isSuccess)
+ assertTrue(trySend(4).isSuccess)
}.buffer(Channel.CONFLATED)
assertEquals(listOf(1, 4), flow.toList()) // two elements in the middle got conflated
}
@@ -43,7 +43,7 @@ class ChannelFlowTest : TestBase() {
@Test
fun testFailureCancelsChannel() = runTest {
val flow = channelFlow {
- offer(1)
+ trySend(1)
invokeOnClose {
expect(2)
}
diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/ShareInFusionTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/ShareInFusionTest.kt
index 371d0147..85a17ba0 100644
--- a/kotlinx-coroutines-core/common/test/flow/sharing/ShareInFusionTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/sharing/ShareInFusionTest.kt
@@ -42,7 +42,7 @@ class ShareInFusionTest : TestBase() {
val flow = channelFlow {
// send a batch of 10 elements using [offer]
for (i in 1..10) {
- assertTrue(offer(i)) // offer must succeed, because buffer
+ assertTrue(trySend(i).isSuccess) // offer must succeed, because buffer
}
send(0) // done
}.buffer(10) // request a buffer of 10
@@ -53,4 +53,4 @@ class ShareInFusionTest : TestBase() {
.collect { i -> expect(i + 1) }
finish(12)
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt
index 42cdb1e1..db69e2bc 100644
--- a/kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt
@@ -167,11 +167,11 @@ class ShareInTest : TestBase() {
subs += shared
.onEach { value -> // only the first threshold subscribers get the value
when (i) {
- in 1..threshold -> log.offer("sub$i: $value")
+ in 1..threshold -> log.trySend("sub$i: $value")
else -> expectUnreached()
}
}
- .onCompletion { log.offer("sub$i: completion") }
+ .onCompletion { log.trySend("sub$i: completion") }
.launchIn(this)
checkStartTransition(i)
}
@@ -210,4 +210,4 @@ class ShareInTest : TestBase() {
stop()
}
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/src/channels/Channels.kt b/kotlinx-coroutines-core/jvm/src/channels/Channels.kt
index ea0b639e..0df8278b 100644
--- a/kotlinx-coroutines-core/jvm/src/channels/Channels.kt
+++ b/kotlinx-coroutines-core/jvm/src/channels/Channels.kt
@@ -50,7 +50,7 @@ import kotlinx.coroutines.*
)
public fun <E> SendChannel<E>.sendBlocking(element: E) {
// fast path
- if (offer(element))
+ if (trySend(element).isSuccess)
return
// slow path
runBlocking {
diff --git a/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt b/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt
index 099e70b3..6c23982e 100644
--- a/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt
+++ b/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt
@@ -21,13 +21,13 @@ public enum class TickerMode {
* ```
* val channel = ticker(delay = 100)
* delay(350) // 250 ms late
- * println(channel.poll()) // prints Unit
- * println(channel.poll()) // prints null
+ * println(channel.tryReceive().getOrNull()) // prints Unit
+ * println(channel.tryReceive().getOrNull()) // prints null
*
* delay(50)
- * println(channel.poll()) // prints Unit, delay was adjusted
+ * println(channel.tryReceive().getOrNull()) // prints Unit, delay was adjusted
* delay(50)
- * println(channel.poll()) // prints null, we'are not late relatively to previous element
+ * println(channel.tryReceive().getOrNull()) // prints null, we're not late relatively to previous element
* ```
*/
FIXED_PERIOD,
diff --git a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testCancelledOffer.txt b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testCancelledOffer.txt
deleted file mode 100644
index 010dab38..00000000
--- a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testCancelledOffer.txt
+++ /dev/null
@@ -1,10 +0,0 @@
-kotlinx.coroutines.JobCancellationException: Job was cancelled; job=JobImpl{Cancelling}@2a06d350
- at _COROUTINE._BOUNDARY._(CoroutineDebugging.kt)
- at kotlinx.coroutines.channels.AbstractSendChannel.offer(AbstractChannel.kt:170)
- at kotlinx.coroutines.channels.ChannelCoroutine.offer(ChannelCoroutine.kt)
- at kotlinx.coroutines.exceptions.StackTraceRecoveryChannelsTest$testCancelledOffer$1.invokeSuspend(StackTraceRecoveryChannelsTest.kt:153)
-Caused by: kotlinx.coroutines.JobCancellationException: Job was cancelled; job=JobImpl{Cancelling}@2a06d350
- at kotlinx.coroutines.JobSupport.cancel(JobSupport.kt:599)
- at kotlinx.coroutines.Job$DefaultImpls.cancel$default(Job.kt:164)
- at kotlinx.coroutines.exceptions.StackTraceRecoveryChannelsTest$testCancelledOffer$1.invokeSuspend(StackTraceRecoveryChannelsTest.kt:151)
- at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
diff --git a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt
index 56f1e283..06839f4a 100644
--- a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt
@@ -39,7 +39,7 @@ class ReusableCancellableContinuationTest : TestBase() {
repeat(iterations) {
suspender {
- assertTrue(channel.offer(it))
+ assertTrue(channel.trySend(it).isSuccess)
}
}
channel.close()
diff --git a/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt b/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt
index e755b17d..49c93c7f 100644
--- a/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt
@@ -41,7 +41,7 @@ class RunInterruptibleTest : TestBase() {
val job = launch {
runInterruptible(Dispatchers.IO) {
expect(2)
- latch.offer(Unit)
+ latch.trySend(Unit)
try {
Thread.sleep(10_000L)
expectUnreached()
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt
index a6a53403..86adfee0 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt
@@ -15,7 +15,7 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() {
// total counters
private var sendCnt = 0
- private var offerFailedCnt = 0
+ private var trySendFailedCnt = 0
private var receivedCnt = 0
private var undeliveredCnt = 0
@@ -23,7 +23,7 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() {
private var lastReceived = 0
private var dSendCnt = 0
private var dSendExceptionCnt = 0
- private var dOfferFailedCnt = 0
+ private var dTrySendFailedCnt = 0
private var dReceivedCnt = 0
private val dUndeliveredCnt = AtomicInteger()
@@ -43,30 +43,30 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() {
joinAll(j1, j2)
// All elements must be either received or undelivered (IN every run)
- if (dSendCnt - dOfferFailedCnt != dReceivedCnt + dUndeliveredCnt.get()) {
+ if (dSendCnt - dTrySendFailedCnt != dReceivedCnt + dUndeliveredCnt.get()) {
println(" Send: $dSendCnt")
- println("Send Exception: $dSendExceptionCnt")
- println(" Offer failed: $dOfferFailedCnt")
+ println("Send exception: $dSendExceptionCnt")
+ println("trySend failed: $dTrySendFailedCnt")
println(" Received: $dReceivedCnt")
println(" Undelivered: ${dUndeliveredCnt.get()}")
error("Failed")
}
- offerFailedCnt += dOfferFailedCnt
+ trySendFailedCnt += dTrySendFailedCnt
receivedCnt += dReceivedCnt
undeliveredCnt += dUndeliveredCnt.get()
// clear for next run
dSendCnt = 0
dSendExceptionCnt = 0
- dOfferFailedCnt = 0
+ dTrySendFailedCnt = 0
dReceivedCnt = 0
dUndeliveredCnt.set(0)
}
// Stats
- println(" Send: $sendCnt")
- println(" Offer failed: $offerFailedCnt")
- println(" Received: $receivedCnt")
- println(" Undelivered: $undeliveredCnt")
- assertEquals(sendCnt - offerFailedCnt, receivedCnt + undeliveredCnt)
+ println(" Send: $sendCnt")
+ println("trySend failed: $trySendFailedCnt")
+ println(" Received: $receivedCnt")
+ println(" Undelivered: $undeliveredCnt")
+ assertEquals(sendCnt - trySendFailedCnt, receivedCnt + undeliveredCnt)
}
private suspend fun sendOne(channel: Channel<Int>) {
@@ -75,11 +75,11 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() {
try {
when (Random.nextInt(2)) {
0 -> channel.send(i)
- 1 -> if (!channel.offer(i)) {
- dOfferFailedCnt++
+ 1 -> if (!channel.trySend(i).isSuccess) {
+ dTrySendFailedCnt++
}
}
- } catch(e: Throwable) {
+ } catch (e: Throwable) {
assertTrue(e is CancellationException) // the only exception possible in this test
dSendExceptionCnt++
throw e
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt
index 8f5224db..12334326 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt
@@ -68,7 +68,7 @@ class ChannelUndeliveredElementStressTest(private val kind: TestChannelKind) : T
try {
block()
} finally {
- if (!done.offer(true))
+ if (!done.trySend(true).isSuccess)
error(IllegalStateException("failed to offer to done channel"))
}
}
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt
index eb7be575..2b3c05bc 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt
@@ -29,7 +29,7 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() {
launch(Dispatchers.Default + CoroutineName("Sender$senderId")) {
repeat(nEvents) { i ->
if (i % nSenders == senderId) {
- broadcast.offer(i)
+ broadcast.trySend(i)
sentTotal.incrementAndGet()
yield()
}
@@ -63,7 +63,7 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() {
try {
withTimeout(timeLimit) {
senders.forEach { it.join() }
- broadcast.offer(nEvents) // last event to signal receivers termination
+ broadcast.trySend(nEvents) // last event to signal receivers termination
receivers.forEach { it.join() }
}
} catch (e: CancellationException) {
@@ -86,4 +86,4 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() {
cancel()
value
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt
index fd26144f..793d7e44 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt
@@ -5,11 +5,8 @@
package kotlinx.coroutines.channels
import kotlinx.coroutines.*
-import org.junit.After
-import org.junit.Test
-import java.util.concurrent.atomic.AtomicInteger
-import java.util.concurrent.atomic.AtomicReference
-import kotlin.coroutines.*
+import org.junit.*
+import java.util.concurrent.atomic.*
class ConflatedChannelCloseStressTest : TestBase() {
@@ -37,12 +34,9 @@ class ConflatedChannelCloseStressTest : TestBase() {
var x = senderId
try {
while (isActive) {
- try {
- curChannel.get().offer(x)
+ curChannel.get().trySend(x).onSuccess {
x += nSenders
sent.incrementAndGet()
- } catch (e: ClosedSendChannelException) {
- // ignore
}
}
} finally {
diff --git a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt
index 6ce2f20d..fbc28a18 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt
@@ -48,7 +48,7 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase()
delayChannel.cancel()
delay(5100)
- assertFailsWith<CancellationException> { delayChannel.poll() }
+ assertFailsWith<CancellationException> { delayChannel.tryReceive().getOrThrow() }
}
}
@@ -159,9 +159,9 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase()
}
}
-fun ReceiveChannel<Unit>.checkEmpty() = assertNull(poll())
+fun ReceiveChannel<Unit>.checkEmpty() = assertNull(tryReceive().getOrNull())
fun ReceiveChannel<Unit>.checkNotEmpty() {
- assertNotNull(poll())
- assertNull(poll())
+ assertNotNull(tryReceive().getOrNull())
+ assertNull(tryReceive().getOrNull())
}
diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt
index ae2554ad..2d8c0ebc 100644
--- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt
@@ -137,25 +137,6 @@ class StackTraceRecoveryChannelsTest : TestBase() {
deferred.await()
}
- // See https://github.com/Kotlin/kotlinx.coroutines/issues/950
- @Test
- fun testCancelledOffer() = runTest {
- expect(1)
- val job = Job()
- val actor = actor<Int>(job, Channel.UNLIMITED) {
- consumeEach {
- expectUnreached() // is cancelled before offer
- }
- }
- job.cancel()
- try {
- actor.offer(1)
- } catch (e: Exception) {
- verifyStackTrace("channels/${name.methodName}", e)
- finish(2)
- }
- }
-
private suspend fun Channel<Int>.sendWithContext(ctx: CoroutineContext) = withContext(ctx) {
sendInChannel()
yield() // TCE
diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt
index 1a381547..9afcab53 100644
--- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt
@@ -261,7 +261,7 @@ class StackTraceRecoveryTest : TestBase() {
private suspend fun awaitCallback(channel: Channel<Callback>) {
suspendCancellableCoroutine<Unit> { cont ->
- channel.offer(Callback(cont))
+ channel.trySend(Callback(cont))
}
yield() // nop to make sure it is not a tail call
}
diff --git a/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt
index e3db2626..f1be284c 100644
--- a/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt
@@ -36,7 +36,7 @@ class CallbackFlowTest : TestBase() {
fun testThrowingConsumer() = runTest {
var i = 0
val api = CallbackApi {
- runCatching { it.offer(++i) }
+ it.trySend(++i)
}
val flow = callbackFlow<Int> {
@@ -77,13 +77,13 @@ class CallbackFlowTest : TestBase() {
var i = 0
val api = CallbackApi {
if (i < 5) {
- it.offer(++i)
+ it.trySend(++i)
} else {
it.close(RuntimeException())
}
}
- val flow = callbackFlow<Int>() {
+ val flow = callbackFlow<Int> {
api.start(channel)
awaitClose {
api.stop()
diff --git a/kotlinx-coroutines-core/jvm/test/lincheck/ChannelsLincheckTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/ChannelsLincheckTest.kt
index fbd5c0d8..74cc1783 100644
--- a/kotlinx-coroutines-core/jvm/test/lincheck/ChannelsLincheckTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/lincheck/ChannelsLincheckTest.kt
@@ -63,11 +63,12 @@ abstract class ChannelLincheckTestBase(
}
@Operation
- fun offer(@Param(name = "value") value: Int): Any = try {
- c.offer(value)
- } catch (e: NumberedCancellationException) {
- e.testResult
- }
+ fun trySend(@Param(name = "value") value: Int): Any = c.trySend(value)
+ .onSuccess { return true }
+ .onFailure {
+ return if (it is NumberedCancellationException) it.testResult
+ else false
+ }
// TODO: this operation should be (and can be!) linearizable, but is not
// @Operation
@@ -85,11 +86,10 @@ abstract class ChannelLincheckTestBase(
}
@Operation
- fun poll(): Any? = try {
- c.poll()
- } catch (e: NumberedCancellationException) {
- e.testResult
- }
+ fun tryReceive(): Any? =
+ c.tryReceive()
+ .onSuccess { return it }
+ .onFailure { return if (it is NumberedCancellationException) it.testResult else null }
// TODO: this operation should be (and can be!) linearizable, but is not
// @Operation
@@ -131,7 +131,7 @@ abstract class SequentialIntChannelBase(private val capacity: Int) : VerifierSta
private val buffer = ArrayList<Int>()
private var closedMessage: String? = null
- suspend fun send(x: Int): Any = when (val offerRes = offer(x)) {
+ suspend fun send(x: Int): Any = when (val offerRes = trySend(x)) {
true -> Unit
false -> suspendCancellableCoroutine { cont ->
senders.add(cont to x)
@@ -139,7 +139,7 @@ abstract class SequentialIntChannelBase(private val capacity: Int) : VerifierSta
else -> offerRes
}
- fun offer(element: Int): Any {
+ fun trySend(element: Int): Any {
if (closedMessage !== null) return closedMessage!!
if (capacity == CONFLATED) {
if (resumeFirstReceiver(element)) return true
@@ -163,11 +163,11 @@ abstract class SequentialIntChannelBase(private val capacity: Int) : VerifierSta
return false
}
- suspend fun receive(): Any = poll() ?: suspendCancellableCoroutine { cont ->
+ suspend fun receive(): Any = tryReceive() ?: suspendCancellableCoroutine { cont ->
receivers.add(cont)
}
- fun poll(): Any? {
+ fun tryReceive(): Any? {
if (buffer.isNotEmpty()) {
val el = buffer.removeAt(0)
resumeFirstSender().also {
@@ -221,4 +221,4 @@ private fun <T> CancellableContinuation<T>.resume(res: T): Boolean {
val token = tryResume(res) ?: return false
completeResume(token)
return true
-} \ No newline at end of file
+}
diff --git a/reactive/kotlinx-coroutines-jdk9/test/PublisherAsFlowTest.kt b/reactive/kotlinx-coroutines-jdk9/test/PublisherAsFlowTest.kt
index 97f106b3..b5b2a0a2 100644
--- a/reactive/kotlinx-coroutines-jdk9/test/PublisherAsFlowTest.kt
+++ b/reactive/kotlinx-coroutines-jdk9/test/PublisherAsFlowTest.kt
@@ -70,7 +70,7 @@ class PublisherAsFlowTest : TestBase() {
send(it + 1)
expect(it + 1)
}
- assertFalse { offer(-1) }
+ assertFalse { trySend(-1).isSuccess }
}
publisher.asFlow().collect {
diff --git a/reactive/kotlinx-coroutines-reactive/src/Channel.kt b/reactive/kotlinx-coroutines-reactive/src/Channel.kt
index 854a8829..d9c9b91a 100644
--- a/reactive/kotlinx-coroutines-reactive/src/Channel.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/Channel.kt
@@ -109,7 +109,7 @@ private class SubscriptionChannel<T>(
override fun onNext(t: T) {
_requested.decrementAndGet()
- offer(t)
+ trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
}
override fun onComplete() {
diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt
index d2cdbab8..383a17d8 100644
--- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt
@@ -89,7 +89,7 @@ public class PublisherCoroutine<in T>(
public override suspend fun send(element: T) {
// fast-path -- try send without suspension
- if (offer(element)) return
+ if (trySend(element).isSuccess) return
// slow-path does suspend
return sendSuspend(element)
}
diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
index cb8de7a6..f0245388 100644
--- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
@@ -132,7 +132,7 @@ private class ReactiveSubscriber<T : Any>(
override fun onNext(value: T) {
// Controlled by requestSize
- require(channel.offer(value)) { "Element $value was not added to channel because it was full, $channel" }
+ require(channel.trySend(value).isSuccess) { "Element $value was not added to channel because it was full, $channel" }
}
override fun onComplete() {
diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt
index e2c86c97..7a0e0fac 100644
--- a/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt
@@ -71,7 +71,7 @@ class PublisherAsFlowTest : TestBase() {
send(it + 1)
expect(it + 1)
}
- assertFalse { offer(-1) }
+ assertFalse { trySend(-1).isSuccess }
}
publisher.asFlow().collect {
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt
index 35f3391e..3bc50c8d 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt
@@ -84,11 +84,11 @@ private class SubscriptionChannel<T> :
}
override fun onSuccess(t: T) {
- offer(t)
+ trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
}
override fun onNext(t: T) {
- offer(t)
+ trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
}
override fun onComplete() {
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
index 56ce30c3..09c5dc1d 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
@@ -84,7 +84,7 @@ private class RxObservableCoroutine<T : Any>(
public override suspend fun send(element: T) {
// fast-path -- try send without suspension
- if (offer(element)) return
+ if (trySend(element).isSuccess) return
// slow-path does suspend
return sendSuspend(element)
}
diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt
index 159f3729..0253fced 100644
--- a/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt
@@ -26,7 +26,7 @@ class ObservableSourceAsFlowStressTest : TestBase() {
val latch = Channel<Unit>(1)
var i = 0
val observable = Observable.interval(100L, TimeUnit.MICROSECONDS)
- .doOnNext { if (++i > 100) latch.offer(Unit) }
+ .doOnNext { if (++i > 100) latch.trySend(Unit) }
val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default))
latch.receive()
job.cancelAndJoin()
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt
index 76333f2e..ad780f75 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt
@@ -69,11 +69,11 @@ private class SubscriptionChannel<T> :
}
override fun onSuccess(t: T) {
- offer(t)
+ trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
}
override fun onNext(t: T) {
- offer(t)
+ trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
}
override fun onComplete() {
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt
index a17e32dd..55794f9a 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt
@@ -78,7 +78,7 @@ private class RxObservableCoroutine<T: Any>(
public override suspend fun send(element: T) {
// fast-path -- try send without suspension
- if (offer(element)) return
+ if (trySend(element).isSuccess) return
// slow-path does suspend
return sendSuspend(element)
}
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt
index 431a7a78..01d6a20e 100644
--- a/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt
@@ -27,7 +27,7 @@ class ObservableSourceAsFlowStressTest : TestBase() {
val latch = Channel<Unit>(1)
var i = 0
val observable = Observable.interval(100L, TimeUnit.MICROSECONDS)
- .doOnNext { if (++i > 100) latch.offer(Unit) }
+ .doOnNext { if (++i > 100) latch.trySend(Unit) }
val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default))
latch.receive()
job.cancelAndJoin()
diff --git a/ui/coroutines-guide-ui.md b/ui/coroutines-guide-ui.md
index 7673c8f2..d5bd2320 100644
--- a/ui/coroutines-guide-ui.md
+++ b/ui/coroutines-guide-ui.md
@@ -268,7 +268,7 @@ fun Node.onClick(action: suspend (MouseEvent) -> Unit) {
}
// install a listener to offer events to this actor
onMouseClicked = EventHandler { event ->
- eventActor.offer(event)
+ eventActor.trySend(event)
}
}
```
@@ -276,12 +276,12 @@ fun Node.onClick(action: suspend (MouseEvent) -> Unit) {
> You can get the full code [here](kotlinx-coroutines-javafx/test/guide/example-ui-actor-02.kt).
The key idea that underlies an integration of an actor coroutine and a regular event handler is that
-there is an [offer][SendChannel.offer] function on [SendChannel] that does not wait. It sends an element to the actor immediately,
-if it is possible, or discards an element otherwise. An `offer` actually returns a `Boolean` result which we ignore here.
+there is an [trySend][SendChannel.trySend] function on [SendChannel] that does not wait. It sends an element to the actor immediately,
+if it is possible, or discards an element otherwise. A `trySend` actually returns a `ChanneResult` instance which we ignore here.
Try clicking repeatedly on a circle in this version of the code. The clicks are just ignored while the countdown
animation is running. This happens because the actor is busy with an animation and does not receive from its channel.
-By default, an actor's mailbox is backed by `RendezvousChannel`, whose `offer` operation succeeds only when
+By default, an actor's mailbox is backed by `RendezvousChannel`, whose `trySend` operation succeeds only when
the `receive` is active.
> On Android, there is `View` sent in OnClickListener, so we send the `View` to the actor as a signal.
@@ -295,7 +295,7 @@ fun View.onClick(action: suspend (View) -> Unit) {
}
// install a listener to activate this actor
setOnClickListener {
- eventActor.offer(it)
+ eventActor.trySend(it)
}
}
```
@@ -319,9 +319,9 @@ fun Node.onClick(action: suspend (MouseEvent) -> Unit) {
val eventActor = GlobalScope.actor<MouseEvent>(Dispatchers.Main, capacity = Channel.CONFLATED) { // <--- Changed here
for (event in channel) action(event) // pass event to action
}
- // install a listener to offer events to this actor
+ // install a listener to send events to this actor
onMouseClicked = EventHandler { event ->
- eventActor.offer(event)
+ eventActor.trySend(event)
}
}
```
@@ -359,7 +359,7 @@ fun Node.onClick(action: suspend (MouseEvent) -> Unit) {
for (event in channel) action(event) // pass event to action
}
onMouseClicked = EventHandler { event ->
- eventActor.offer(event)
+ eventActor.trySend(event)
}
}
-->
@@ -623,7 +623,7 @@ After delay
<!--- INDEX kotlinx.coroutines.channels -->
[actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/actor.html
-[SendChannel.offer]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/offer.html
+[SendChannel.trySend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/try-send.html
[SendChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/index.html
[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html
[Channel.CONFLATED]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/-c-o-n-f-l-a-t-e-d.html
diff --git a/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt b/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt
index 1cbf9b6f..ebeaa3b8 100644
--- a/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt
+++ b/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt
@@ -4,10 +4,9 @@
package kotlinx.coroutines.javafx
-import javafx.beans.value.ChangeListener
-import javafx.beans.value.ObservableValue
+import javafx.beans.value.*
import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.awaitClose
+import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
/**
@@ -27,11 +26,11 @@ import kotlinx.coroutines.flow.*
@ExperimentalCoroutinesApi // Since 1.3.x
public fun <T> ObservableValue<T>.asFlow(): Flow<T> = callbackFlow<T> {
val listener = ChangeListener<T> { _, _, newValue ->
- try {
- offer(newValue)
- } catch (e: CancellationException) {
- // In case the event fires after the channel is closed
- }
+ /*
+ * Do not propagate the exception to the ObservableValue, it
+ * already should've been handled by the downstream
+ */
+ trySend(newValue)
}
addListener(listener)
send(value)
diff --git a/ui/kotlinx-coroutines-javafx/test/JavaFxObservableAsFlowTest.kt b/ui/kotlinx-coroutines-javafx/test/JavaFxObservableAsFlowTest.kt
index 69640501..bc40b0fd 100644
--- a/ui/kotlinx-coroutines-javafx/test/JavaFxObservableAsFlowTest.kt
+++ b/ui/kotlinx-coroutines-javafx/test/JavaFxObservableAsFlowTest.kt
@@ -83,4 +83,20 @@ class JavaFxObservableAsFlowTest : TestBase() {
}
}
+ @Test
+ fun testIntermediateCrash() = runTest {
+ if (!initPlatform()) {
+ println("Skipping JavaFxTest in headless environment")
+ return@runTest // ignore test in headless environments
+ }
+
+ val property = SimpleIntegerProperty(0)
+
+ assertFailsWith<TestException> {
+ property.asFlow().onEach {
+ yield()
+ throw TestException()
+ }.collect()
+ }
+ }
}
diff --git a/ui/kotlinx-coroutines-javafx/test/guide/example-ui-actor-02.kt b/ui/kotlinx-coroutines-javafx/test/guide/example-ui-actor-02.kt
index 51e94779..ec8a09f9 100644
--- a/ui/kotlinx-coroutines-javafx/test/guide/example-ui-actor-02.kt
+++ b/ui/kotlinx-coroutines-javafx/test/guide/example-ui-actor-02.kt
@@ -67,6 +67,6 @@ fun Node.onClick(action: suspend (MouseEvent) -> Unit) {
}
// install a listener to offer events to this actor
onMouseClicked = EventHandler { event ->
- eventActor.offer(event)
+ eventActor.trySend(event)
}
}
diff --git a/ui/kotlinx-coroutines-javafx/test/guide/example-ui-actor-03.kt b/ui/kotlinx-coroutines-javafx/test/guide/example-ui-actor-03.kt
index 81371678..aa152b79 100644
--- a/ui/kotlinx-coroutines-javafx/test/guide/example-ui-actor-03.kt
+++ b/ui/kotlinx-coroutines-javafx/test/guide/example-ui-actor-03.kt
@@ -65,8 +65,8 @@ fun Node.onClick(action: suspend (MouseEvent) -> Unit) {
val eventActor = GlobalScope.actor<MouseEvent>(Dispatchers.Main, capacity = Channel.CONFLATED) { // <--- Changed here
for (event in channel) action(event) // pass event to action
}
- // install a listener to offer events to this actor
+ // install a listener to send events to this actor
onMouseClicked = EventHandler { event ->
- eventActor.offer(event)
+ eventActor.trySend(event)
}
}
diff --git a/ui/kotlinx-coroutines-javafx/test/guide/example-ui-blocking-01.kt b/ui/kotlinx-coroutines-javafx/test/guide/example-ui-blocking-01.kt
index ea5ac90a..0c89ea70 100644
--- a/ui/kotlinx-coroutines-javafx/test/guide/example-ui-blocking-01.kt
+++ b/ui/kotlinx-coroutines-javafx/test/guide/example-ui-blocking-01.kt
@@ -55,7 +55,7 @@ fun Node.onClick(action: suspend (MouseEvent) -> Unit) {
for (event in channel) action(event) // pass event to action
}
onMouseClicked = EventHandler { event ->
- eventActor.offer(event)
+ eventActor.trySend(event)
}
}
diff --git a/ui/kotlinx-coroutines-javafx/test/guide/example-ui-blocking-02.kt b/ui/kotlinx-coroutines-javafx/test/guide/example-ui-blocking-02.kt
index 504f2ee6..6e8b984a 100644
--- a/ui/kotlinx-coroutines-javafx/test/guide/example-ui-blocking-02.kt
+++ b/ui/kotlinx-coroutines-javafx/test/guide/example-ui-blocking-02.kt
@@ -55,7 +55,7 @@ fun Node.onClick(action: suspend (MouseEvent) -> Unit) {
for (event in channel) action(event) // pass event to action
}
onMouseClicked = EventHandler { event ->
- eventActor.offer(event)
+ eventActor.trySend(event)
}
}
diff --git a/ui/kotlinx-coroutines-javafx/test/guide/example-ui-blocking-03.kt b/ui/kotlinx-coroutines-javafx/test/guide/example-ui-blocking-03.kt
index 0e115367..3ff5d7d5 100644
--- a/ui/kotlinx-coroutines-javafx/test/guide/example-ui-blocking-03.kt
+++ b/ui/kotlinx-coroutines-javafx/test/guide/example-ui-blocking-03.kt
@@ -55,7 +55,7 @@ fun Node.onClick(action: suspend (MouseEvent) -> Unit) {
for (event in channel) action(event) // pass event to action
}
onMouseClicked = EventHandler { event ->
- eventActor.offer(event)
+ eventActor.trySend(event)
}
}