aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordkhalanskyjb <52952525+dkhalanskyjb@users.noreply.github.com>2021-04-20 11:21:32 +0300
committerGitHub <noreply@github.com>2021-04-20 11:21:32 +0300
commit8bb5210c2825c7d248eb172d06975c5628aa6671 (patch)
treef2e3d15e9865ef9b77a44c799d8d36e1b4d45b93
parent95ad4449e56fec0c3b800d6b0a44218a6822c1e6 (diff)
downloadplatform_external_kotlinx.coroutines-8bb5210c2825c7d248eb172d06975c5628aa6671.tar.gz
platform_external_kotlinx.coroutines-8bb5210c2825c7d248eb172d06975c5628aa6671.tar.bz2
platform_external_kotlinx.coroutines-8bb5210c2825c7d248eb172d06975c5628aa6671.zip
Consistently handle exceptions in reactive streams (#2646)
* Fixed `PublisherCoroutine`, `rxObservable`, and `Flow.toPublisher` ignoring cancellations. * Fatal exceptions are not treated in a special manner by us anymore. Instead, we follow the requirement in the reactive streams specification that, in case some method of `Subscriber` throws, that subscriber MUST be considered canceled, and the exception MUST be reported in someplace other than `onError`. * Fixed `trySend` sometimes throwing in `PublisherCoroutine` and `rxObservable`. * When an exception happens inside a cancellation handler, we now consistently throw the original exception passed to the handler, with the new exception added as suppressed. * Fixed `PublisherCoroutine` and `rxObservable` claiming that the channel is not closed for send for some time after `close()` has finished. * Fixed publishers sometimes signalling `onComplete()` after cancellation even though their streams are not finite. Fixes https://github.com/Kotlin/kotlinx.coroutines/issues/2173
-rw-r--r--reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt21
-rw-r--r--reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt20
-rw-r--r--reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt158
-rw-r--r--reactive/kotlinx-coroutines-reactive/src/Publish.kt155
-rw-r--r--reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt31
-rw-r--r--reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt24
-rw-r--r--reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt51
-rw-r--r--reactive/kotlinx-coroutines-reactive/test/PublishTest.kt158
-rw-r--r--reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt14
-rw-r--r--reactive/kotlinx-coroutines-reactor/src/Flux.kt9
-rw-r--r--reactive/kotlinx-coroutines-reactor/src/Mono.kt5
-rw-r--r--reactive/kotlinx-coroutines-reactor/test/FluxTest.kt37
-rw-r--r--reactive/kotlinx-coroutines-rx2/README.md4
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt1
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt7
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt7
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxObservable.kt117
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxSingle.kt7
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt16
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt16
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt37
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt1
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt7
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt7
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxObservable.kt112
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxSingle.kt7
-rw-r--r--reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt16
-rw-r--r--reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt15
-rw-r--r--reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt37
29 files changed, 740 insertions, 357 deletions
diff --git a/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt b/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt
index 488695de..b860e162 100644
--- a/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt
+++ b/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt
@@ -15,7 +15,7 @@ class FlowAsPublisherTest : TestBase() {
@Test
fun testErrorOnCancellationIsReported() {
expect(1)
- flow<Int> {
+ flow {
try {
emit(2)
} finally {
@@ -50,13 +50,13 @@ class FlowAsPublisherTest : TestBase() {
@Test
fun testCancellationIsNotReported() {
expect(1)
- flow<Int> {
+ flow {
emit(2)
}.asPublisher().subscribe(object : JFlow.Subscriber<Int> {
private lateinit var subscription: JFlow.Subscription
override fun onComplete() {
- expect(3)
+ expectUnreached()
}
override fun onSubscribe(s: JFlow.Subscription?) {
@@ -73,6 +73,21 @@ class FlowAsPublisherTest : TestBase() {
expectUnreached()
}
})
+ finish(3)
+ }
+
+ @Test
+ fun testFlowWithTimeout() = runTest {
+ val publisher = flow<Int> {
+ expect(2)
+ withTimeout(1) { delay(Long.MAX_VALUE) }
+ }.asPublisher()
+ try {
+ expect(1)
+ publisher.awaitFirstOrNull()
+ } catch (e: CancellationException) {
+ expect(3)
+ }
finish(4)
}
}
diff --git a/reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt
index 5bfddfee..5b3542ad 100644
--- a/reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt
+++ b/reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt
@@ -5,10 +5,12 @@
package kotlinx.coroutines.jdk9
import kotlinx.coroutines.*
+import kotlinx.coroutines.exceptions.*
import org.junit.Test
import kotlinx.coroutines.flow.flowOn
import org.junit.runner.*
import org.junit.runners.*
+import kotlin.contracts.*
import java.util.concurrent.Flow as JFlow
import kotlin.coroutines.*
import kotlin.test.*
@@ -129,4 +131,20 @@ class IntegrationTest(
assertEquals(n, last)
}
-} \ No newline at end of file
+}
+
+@OptIn(ExperimentalContracts::class)
+internal suspend inline fun <reified E: Throwable> assertCallsExceptionHandlerWith(
+ crossinline operation: suspend (CoroutineExceptionHandler) -> Unit): E {
+ contract {
+ callsInPlace(operation, InvocationKind.EXACTLY_ONCE)
+ }
+ val handler = CapturingHandler()
+ return withContext(handler) {
+ operation(handler)
+ handler.getException().let {
+ assertTrue(it is E, it.toString())
+ it
+ }
+ }
+}
diff --git a/reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt b/reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt
index 1a36a389..3682d5e3 100644
--- a/reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt
+++ b/reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt
@@ -5,6 +5,7 @@
package kotlinx.coroutines.jdk9
import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
import org.junit.Test
import java.util.concurrent.Flow as JFlow
import kotlin.test.*
@@ -121,44 +122,110 @@ class PublishTest : TestBase() {
finish(7)
}
+ /** Tests that, as soon as `ProducerScope.close` is called, `isClosedForSend` starts returning `true`. */
@Test
- fun testOnNextError() = runTest {
+ fun testChannelClosing() = runTest {
expect(1)
- val publisher = flowPublish(currentDispatcher()) {
+ val publisher = flowPublish<Int>(Dispatchers.Unconfined) {
+ expect(3)
+ close()
+ assert(isClosedForSend)
expect(4)
- try {
- send("OK")
- } catch(e: Throwable) {
- expect(6)
- assert(e is TestException)
- }
}
- expect(2)
+ try {
+ expect(2)
+ publisher.awaitFirstOrNull()
+ } catch (e: CancellationException) {
+ expect(5)
+ }
+ finish(6)
+ }
+
+ @Test
+ fun testOnNextError() = runTest {
val latch = CompletableDeferred<Unit>()
- publisher.subscribe(object : JFlow.Subscriber<String> {
- override fun onComplete() {
- expectUnreached()
+ expect(1)
+ assertCallsExceptionHandlerWith<TestException> { exceptionHandler ->
+ val publisher = flowPublish(currentDispatcher() + exceptionHandler) {
+ expect(4)
+ try {
+ send("OK")
+ } catch (e: Throwable) {
+ expect(6)
+ assert(e is TestException)
+ assert(isClosedForSend)
+ latch.complete(Unit)
+ }
}
+ expect(2)
+ publisher.subscribe(object : JFlow.Subscriber<String> {
+ override fun onComplete() {
+ expectUnreached()
+ }
- override fun onSubscribe(s: JFlow.Subscription) {
- expect(3)
- s.request(1)
- }
+ override fun onSubscribe(s: JFlow.Subscription) {
+ expect(3)
+ s.request(1)
+ }
- override fun onNext(t: String) {
- expect(5)
- assertEquals("OK", t)
- throw TestException()
- }
+ override fun onNext(t: String) {
+ expect(5)
+ assertEquals("OK", t)
+ throw TestException()
+ }
- override fun onError(t: Throwable) {
- expect(7)
- assert(t is TestException)
- latch.complete(Unit)
+ override fun onError(t: Throwable) {
+ expectUnreached()
+ }
+ })
+ latch.await()
+ }
+ finish(7)
+ }
+
+ /** Tests the behavior when a call to `onNext` fails after the channel is already closed. */
+ @Test
+ fun testOnNextErrorAfterCancellation() = runTest {
+ assertCallsExceptionHandlerWith<TestException> { handler ->
+ var producerScope: ProducerScope<Int>? = null
+ CompletableDeferred<Unit>()
+ expect(1)
+ var job: Job? = null
+ val publisher = flowPublish<Int>(handler + Dispatchers.Unconfined) {
+ producerScope = this
+ expect(4)
+ job = launch {
+ delay(Long.MAX_VALUE)
+ }
}
- })
- latch.await()
- finish(8)
+ expect(2)
+ publisher.subscribe(object: JFlow.Subscriber<Int> {
+ override fun onSubscribe(s: JFlow.Subscription) {
+ expect(3)
+ s.request(Long.MAX_VALUE)
+ }
+ override fun onNext(t: Int) {
+ expect(6)
+ assertEquals(1, t)
+ job!!.cancel()
+ throw TestException()
+ }
+ override fun onError(t: Throwable?) {
+ /* Correct changes to the implementation could lead to us entering or not entering this method, but
+ it only matters that if we do, it is the "correct" exception that was validly used to cancel the
+ coroutine that gets passed here and not `TestException`. */
+ assertTrue(t is CancellationException)
+ }
+ override fun onComplete() { expectUnreached() }
+ })
+ expect(5)
+ val result: ChannelResult<Unit> = producerScope!!.trySend(1)
+ val e = result.exceptionOrNull()!!
+ assertTrue(e is CancellationException, "The actual error: $e")
+ assertTrue(producerScope!!.isClosedForSend)
+ assertTrue(result.isFailure)
+ }
+ finish(7)
}
@Test
@@ -182,4 +249,39 @@ class PublishTest : TestBase() {
fun testIllegalArgumentException() {
assertFailsWith<IllegalArgumentException> { flowPublish<Int>(Job()) { } }
}
+
+ /** Tests that `trySend` doesn't throw in `flowPublish`. */
+ @Test
+ fun testTrySendNotThrowing() = runTest {
+ var producerScope: ProducerScope<Int>? = null
+ expect(1)
+ val publisher = flowPublish<Int>(Dispatchers.Unconfined) {
+ producerScope = this
+ expect(3)
+ delay(Long.MAX_VALUE)
+ }
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ publisher.awaitFirstOrNull()
+ expectUnreached()
+ }
+ job.cancel()
+ expect(4)
+ val result = producerScope!!.trySend(1)
+ assertTrue(result.isFailure)
+ finish(5)
+ }
+
+ /** Tests that all methods on `flowPublish` fail without closing the channel when attempting to emit `null`. */
+ @Test
+ fun testEmittingNull() = runTest {
+ val publisher = flowPublish {
+ assertFailsWith<NullPointerException> { send(null) }
+ assertFailsWith<NullPointerException> { trySend(null) }
+ @Suppress("DEPRECATION")
+ assertFailsWith<NullPointerException> { offer(null) }
+ send("OK")
+ }
+ assertEquals("OK", publisher.awaitFirstOrNull())
+ }
} \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt
index 383a17d8..37113849 100644
--- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt
@@ -10,7 +10,6 @@ import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.*
import org.reactivestreams.*
import kotlin.coroutines.*
-import kotlin.internal.*
/**
* Creates cold reactive [Publisher] that runs a given [block] in a coroutine.
@@ -74,29 +73,26 @@ public class PublisherCoroutine<in T>(
private val _nRequested = atomic(0L) // < 0 when closed (CLOSED or SIGNALLED)
@Volatile
- private var cancelled = false // true when Subscription.cancel() is invoked
+ private var cancelled = false // true after Subscription.cancel() is invoked
- override val isClosedForSend: Boolean get() = isCompleted
+ override val isClosedForSend: Boolean get() = !isActive
override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause)
override fun invokeOnClose(handler: (Throwable?) -> Unit): Nothing =
throw UnsupportedOperationException("PublisherCoroutine doesn't support invokeOnClose")
- override fun trySend(element: T): ChannelResult<Unit> {
- if (!mutex.tryLock()) return ChannelResult.failure()
- doLockedNext(element)
- return ChannelResult.success(Unit)
- }
+ override fun trySend(element: T): ChannelResult<Unit> =
+ if (!mutex.tryLock()) {
+ ChannelResult.failure()
+ } else {
+ when (val throwable = doLockedNext(element)) {
+ null -> ChannelResult.success(Unit)
+ else -> ChannelResult.closed(throwable)
+ }
+ }
public override suspend fun send(element: T) {
- // fast-path -- try send without suspension
- if (trySend(element).isSuccess) return
- // slow-path does suspend
- return sendSuspend(element)
- }
-
- private suspend fun sendSuspend(element: T) {
mutex.lock()
- doLockedNext(element)
+ doLockedNext(element)?.let { throw it }
}
override val onSend: SelectClause2<T, SendChannel<T>>
@@ -106,13 +102,13 @@ public class PublisherCoroutine<in T>(
@Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) {
mutex.onLock.registerSelectClause2(select, null) {
- doLockedNext(element)
+ doLockedNext(element)?.let { throw it }
block(this)
}
}
/*
- * This code is not trivial because of the two properties:
+ * This code is not trivial because of the following properties:
* 1. It ensures conformance to the reactive specification that mandates that onXXX invocations should not
* be concurrent. It uses Mutex to protect all onXXX invocation and ensure conformance even when multiple
* coroutines are invoking `send` function.
@@ -121,27 +117,61 @@ public class PublisherCoroutine<in T>(
* globally-scoped coroutine that is invoking `send` outside of this context. Without extra precaution this may
* lead to `onNext` that is concurrent with `onComplete/onError`, so that is why signalling for
* `onComplete/onError` is also done under the same mutex.
+ * 3. The reactive specification forbids emitting more elements than requested, so `onNext` is forbidden until the
+ * subscriber actually requests some elements. This is implemented by the mutex being locked when emitting
+ * elements is not permitted (`_nRequested.value == 0`).
*/
- // assert: mutex.isLocked()
- private fun doLockedNext(elem: T) {
- // check if already closed for send, note that isActive becomes false as soon as cancel() is invoked,
- // because the job is cancelled, so this check also ensure conformance to the reactive specification's
- // requirement that after cancellation requested we don't call onXXX
+ /**
+ * Attempts to emit a value to the subscriber and, if back-pressure permits this, unlock the mutex.
+ *
+ * Requires that the caller has locked the mutex before this invocation.
+ *
+ * If the channel is closed, returns the corresponding [Throwable]; otherwise, returns `null` to denote success.
+ *
+ * @throws NullPointerException if the passed element is `null`
+ */
+ private fun doLockedNext(elem: T): Throwable? {
+ if (elem == null) {
+ unlockAndCheckCompleted()
+ throw NullPointerException("Attempted to emit `null` inside a reactive publisher")
+ }
+ /** This guards against the case when the caller of this function managed to lock the mutex not because some
+ * elements were requested--and thus it is permitted to call `onNext`--but because the channel was closed.
+ *
+ * It may look like there is a race condition here between `isActive` and a concurrent cancellation, but it's
+ * okay for a cancellation to happen during `onNext`, as the reactive spec only requires that we *eventually*
+ * stop signalling the subscriber. */
if (!isActive) {
unlockAndCheckCompleted()
- throw getCancellationException()
+ return getCancellationException()
}
- // notify subscriber
+ // notify the subscriber
try {
subscriber.onNext(elem)
- } catch (e: Throwable) {
- // If onNext fails with exception, then we cancel coroutine (with this exception) and then rethrow it
- // to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery,
- // this failure is essentially equivalent to a failure of a child coroutine.
- cancelCoroutine(e)
+ } catch (cause: Throwable) {
+ /** The reactive streams spec forbids the subscribers from throwing from [Subscriber.onNext] unless the
+ * element is `null`, which we check not to be the case. Therefore, we report this exception to the handler
+ * for uncaught exceptions and consider the subscription cancelled, as mandated by
+ * https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#2.13.
+ *
+ * Some reactive implementations, like RxJava or Reactor, are known to throw from [Subscriber.onNext] if the
+ * execution encounters an exception they consider to be "fatal", like [VirtualMachineError] or
+ * [ThreadDeath]. Us using the handler for the undeliverable exceptions to signal "fatal" exceptions is
+ * inconsistent with RxJava and Reactor, which attempt to bubble the exception up the call chain as soon as
+ * possible. However, we can't do much better here, as simply throwing from all methods indiscriminately
+ * would violate the contracts we place on them. */
+ cancelled = true
+ val causeDelivered = close(cause)
unlockAndCheckCompleted()
- throw e
+ return if (causeDelivered) {
+ // `cause` is the reason this channel is closed
+ cause
+ } else {
+ // Someone else closed the channel during `onNext`. We report `cause` as an undeliverable exception.
+ exceptionOnCancelHandler(cause, context)
+ getCancellationException()
+ }
}
// now update nRequested
while (true) { // lock-free loop on nRequested
@@ -152,12 +182,13 @@ public class PublisherCoroutine<in T>(
if (_nRequested.compareAndSet(current, updated)) {
if (updated == 0L) {
// return to keep locked due to back-pressure
- return
+ return null
}
break // unlock if updated > 0
}
}
unlockAndCheckCompleted()
+ return null
}
private fun unlockAndCheckCompleted() {
@@ -177,38 +208,31 @@ public class PublisherCoroutine<in T>(
// assert: mutex.isLocked() & isCompleted
private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) {
try {
- if (_nRequested.value >= CLOSED) {
- _nRequested.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
- // Specification requires that after cancellation requested we don't call onXXX
- if (cancelled) {
- // If the parent had failed to handle our exception, then we must not lose this exception
- if (cause != null && !handled) exceptionOnCancelHandler(cause, context)
- return
- }
-
+ if (_nRequested.value == SIGNALLED)
+ return
+ _nRequested.value = SIGNALLED // we'll signal onError/onCompleted (the final state, so no CAS needed)
+ // Specification requires that after the cancellation is requested we eventually stop calling onXXX
+ if (cancelled) {
+ // If the parent failed to handle this exception, then we must not lose the exception
+ if (cause != null && !handled) exceptionOnCancelHandler(cause, context)
+ return
+ }
+ if (cause == null) {
try {
- if (cause != null && cause !is CancellationException) {
- /*
- * Reactive frameworks have two types of exceptions: regular and fatal.
- * Regular are passed to onError.
- * Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297).
- * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether
- * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was
- * thrown by subscriber or upstream).
- * To make behaviour consistent and least surprising, we always handle fatal exceptions
- * by coroutines machinery, anyway, they should not be present in regular program flow,
- * thus our goal here is just to expose it as soon as possible.
- */
- subscriber.onError(cause)
- if (!handled && cause.isFatal()) {
- exceptionOnCancelHandler(cause, context)
- }
- } else {
- subscriber.onComplete()
- }
+ subscriber.onComplete()
} catch (e: Throwable) {
handleCoroutineException(context, e)
}
+ } else {
+ try {
+ // This can't be the cancellation exception from `cancel`, as then `cancelled` would be `true`.
+ subscriber.onError(cause)
+ } catch (e: Throwable) {
+ if (e !== cause) {
+ cause.addSuppressed(e)
+ }
+ handleCoroutineException(context, cause)
+ }
}
} finally {
mutex.unlock()
@@ -217,13 +241,13 @@ public class PublisherCoroutine<in T>(
override fun request(n: Long) {
if (n <= 0) {
- // Specification requires IAE for n <= 0
+ // Specification requires to call onError with IAE for n <= 0
cancelCoroutine(IllegalArgumentException("non-positive subscription request $n"))
return
}
while (true) { // lock-free loop for nRequested
val cur = _nRequested.value
- if (cur < 0) return // already closed for send, ignore requests
+ if (cur < 0) return // already closed for send, ignore requests, as mandated by the reactive streams spec
var upd = cur + n
if (upd < 0 || n == Long.MAX_VALUE)
upd = Long.MAX_VALUE
@@ -231,6 +255,11 @@ public class PublisherCoroutine<in T>(
if (_nRequested.compareAndSet(cur, upd)) {
// unlock the mutex when we don't have back-pressure anymore
if (cur == 0L) {
+ /** In a sense, after a successful CAS, it is this invocation, not the coroutine itself, that owns
+ * the lock, given that `upd` is necessarily strictly positive. Thus, no other operation has the
+ * right to lower the value on [_nRequested], it can only grow or become [CLOSED]. Therefore, it is
+ * impossible for any other operations to assume that they own the lock without actually acquiring
+ * it. */
unlockAndCheckCompleted()
}
return
@@ -271,8 +300,6 @@ public class PublisherCoroutine<in T>(
cancelled = true
super.cancel(null)
}
-
- private fun Throwable.isFatal() = this is VirtualMachineError || this is ThreadDeath || this is LinkageError
}
@Deprecated(
diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
index f0245388..1f197f94 100644
--- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
@@ -13,6 +13,7 @@ import kotlinx.coroutines.intrinsics.*
import org.reactivestreams.*
import java.util.*
import kotlin.coroutines.*
+import kotlinx.coroutines.internal.*
/**
* Transforms the given reactive [Publisher] into [Flow].
@@ -195,6 +196,8 @@ public class FlowSubscription<T>(
*/
private val requested = atomic(0L)
private val producer = atomic<Continuation<Unit>?>(createInitialContinuation())
+ @Volatile
+ private var cancellationRequested = false
// This code wraps startCoroutineCancellable into continuation
private fun createInitialContinuation(): Continuation<Unit> = Continuation(coroutineContext) {
@@ -204,18 +207,25 @@ public class FlowSubscription<T>(
private suspend fun flowProcessing() {
try {
consumeFlow()
- subscriber.onComplete()
- } catch (e: Throwable) {
- try {
- if (e is CancellationException) {
- subscriber.onComplete()
- } else {
- subscriber.onError(e)
+ } catch (cause: Throwable) {
+ @Suppress("INVISIBLE_MEMBER")
+ val unwrappedCause = unwrap(cause)
+ if (!cancellationRequested || isActive || unwrappedCause !== getCancellationException()) {
+ try {
+ subscriber.onError(cause)
+ } catch (e: Throwable) {
+ // Last ditch report
+ cause.addSuppressed(e)
+ handleCoroutineException(coroutineContext, cause)
}
- } catch (e: Throwable) {
- // Last ditch report
- handleCoroutineException(coroutineContext, e)
}
+ return
+ }
+ // We only call this if `consumeFlow()` finished successfully
+ try {
+ subscriber.onComplete()
+ } catch (e: Throwable) {
+ handleCoroutineException(coroutineContext, e)
}
}
@@ -239,6 +249,7 @@ public class FlowSubscription<T>(
}
override fun cancel() {
+ cancellationRequested = true
cancel(null)
}
diff --git a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt
index e7b8cb17..02c9e242 100644
--- a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt
@@ -5,6 +5,7 @@
package kotlinx.coroutines.reactive
import kotlinx.coroutines.*
+import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.flow.*
import org.junit.Test
import org.reactivestreams.*
@@ -15,7 +16,7 @@ class FlowAsPublisherTest : TestBase() {
@Test
fun testErrorOnCancellationIsReported() {
expect(1)
- flow<Int> {
+ flow {
try {
emit(2)
} finally {
@@ -50,13 +51,13 @@ class FlowAsPublisherTest : TestBase() {
@Test
fun testCancellationIsNotReported() {
expect(1)
- flow<Int> {
+ flow {
emit(2)
}.asPublisher().subscribe(object : Subscriber<Int> {
private lateinit var subscription: Subscription
override fun onComplete() {
- expect(3)
+ expectUnreached()
}
override fun onSubscribe(s: Subscription?) {
@@ -73,7 +74,7 @@ class FlowAsPublisherTest : TestBase() {
expectUnreached()
}
})
- finish(4)
+ finish(3)
}
@Test
@@ -149,4 +150,19 @@ class FlowAsPublisherTest : TestBase() {
}
finish(5)
}
+
+ @Test
+ fun testFlowWithTimeout() = runTest {
+ val publisher = flow<Int> {
+ expect(2)
+ withTimeout(1) { delay(Long.MAX_VALUE) }
+ }.asPublisher()
+ try {
+ expect(1)
+ publisher.awaitFirstOrNull()
+ } catch (e: CancellationException) {
+ expect(3)
+ }
+ finish(4)
+ }
}
diff --git a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt
index 72479b5c..efe7ec7e 100644
--- a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt
@@ -5,12 +5,14 @@
package kotlinx.coroutines.reactive
import kotlinx.coroutines.*
+import kotlinx.coroutines.exceptions.*
import org.junit.Test
import org.junit.runner.*
import org.junit.runners.*
import org.reactivestreams.*
import java.lang.IllegalStateException
import java.lang.RuntimeException
+import kotlin.contracts.*
import kotlin.coroutines.*
import kotlin.test.*
@@ -218,32 +220,35 @@ class IntegrationTest(
}.let { assertTrue(it.message?.contains("onSubscribe") ?: false) }
}
- private suspend inline fun <reified E: Throwable> assertCallsExceptionHandlerWith(
- crossinline operation: suspend () -> Unit): E
- {
- val caughtExceptions = mutableListOf<Throwable>()
- val exceptionHandler = object: AbstractCoroutineContextElement(CoroutineExceptionHandler),
- CoroutineExceptionHandler
- {
- override fun handleException(context: CoroutineContext, exception: Throwable) {
- caughtExceptions += exception
- }
+ @Test
+ fun testPublishWithTimeout() = runTest {
+ val publisher = publish<Int> {
+ expect(2)
+ withTimeout(1) { delay(100) }
}
- return withContext(exceptionHandler) {
- operation()
- caughtExceptions.single().let {
- assertTrue(it is E)
- it
- }
+ try {
+ expect(1)
+ publisher.awaitFirstOrNull()
+ } catch (e: CancellationException) {
+ expect(3)
}
+ finish(4)
}
- private suspend fun checkNumbers(n: Int, pub: Publisher<Int>) {
- var last = 0
- pub.collect {
- assertEquals(++last, it)
+}
+
+@OptIn(ExperimentalContracts::class)
+internal suspend inline fun <reified E: Throwable> assertCallsExceptionHandlerWith(
+ crossinline operation: suspend (CoroutineExceptionHandler) -> Unit): E {
+ contract {
+ callsInPlace(operation, InvocationKind.EXACTLY_ONCE)
+ }
+ val handler = CapturingHandler()
+ return withContext(handler) {
+ operation(handler)
+ handler.getException().let {
+ assertTrue(it is E, it.toString())
+ it
}
- assertEquals(n, last)
}
-
-}
+} \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt
index 9e3c07b6..095b724d 100644
--- a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt
@@ -5,6 +5,7 @@
package kotlinx.coroutines.reactive
import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
import org.junit.Test
import org.reactivestreams.*
import kotlin.test.*
@@ -121,44 +122,110 @@ class PublishTest : TestBase() {
finish(7)
}
+ /** Tests that, as soon as `ProducerScope.close` is called, `isClosedForSend` starts returning `true`. */
@Test
- fun testOnNextError() = runTest {
+ fun testChannelClosing() = runTest {
expect(1)
- val publisher = publish(currentDispatcher()) {
+ val publisher = publish<Int>(Dispatchers.Unconfined) {
+ expect(3)
+ close()
+ assert(isClosedForSend)
expect(4)
- try {
- send("OK")
- } catch(e: Throwable) {
- expect(6)
- assert(e is TestException)
- }
}
- expect(2)
+ try {
+ expect(2)
+ publisher.awaitFirstOrNull()
+ } catch (e: CancellationException) {
+ expect(5)
+ }
+ finish(6)
+ }
+
+ @Test
+ fun testOnNextError() = runTest {
val latch = CompletableDeferred<Unit>()
- publisher.subscribe(object : Subscriber<String> {
- override fun onComplete() {
- expectUnreached()
+ expect(1)
+ assertCallsExceptionHandlerWith<TestException> { exceptionHandler ->
+ val publisher = publish(currentDispatcher() + exceptionHandler) {
+ expect(4)
+ try {
+ send("OK")
+ } catch (e: Throwable) {
+ expect(6)
+ assert(e is TestException)
+ assert(isClosedForSend)
+ latch.complete(Unit)
+ }
}
+ expect(2)
+ publisher.subscribe(object : Subscriber<String> {
+ override fun onComplete() {
+ expectUnreached()
+ }
- override fun onSubscribe(s: Subscription) {
- expect(3)
- s.request(1)
- }
+ override fun onSubscribe(s: Subscription) {
+ expect(3)
+ s.request(1)
+ }
- override fun onNext(t: String) {
- expect(5)
- assertEquals("OK", t)
- throw TestException()
- }
+ override fun onNext(t: String) {
+ expect(5)
+ assertEquals("OK", t)
+ throw TestException()
+ }
- override fun onError(t: Throwable) {
- expect(7)
- assert(t is TestException)
- latch.complete(Unit)
+ override fun onError(t: Throwable) {
+ expectUnreached()
+ }
+ })
+ latch.await()
+ }
+ finish(7)
+ }
+
+ /** Tests the behavior when a call to `onNext` fails after the channel is already closed. */
+ @Test
+ fun testOnNextErrorAfterCancellation() = runTest {
+ assertCallsExceptionHandlerWith<TestException> { handler ->
+ var producerScope: ProducerScope<Int>? = null
+ CompletableDeferred<Unit>()
+ expect(1)
+ var job: Job? = null
+ val publisher = publish<Int>(handler + Dispatchers.Unconfined) {
+ producerScope = this
+ expect(4)
+ job = launch {
+ delay(Long.MAX_VALUE)
+ }
}
- })
- latch.await()
- finish(8)
+ expect(2)
+ publisher.subscribe(object: Subscriber<Int> {
+ override fun onSubscribe(s: Subscription) {
+ expect(3)
+ s.request(Long.MAX_VALUE)
+ }
+ override fun onNext(t: Int) {
+ expect(6)
+ assertEquals(1, t)
+ job!!.cancel()
+ throw TestException()
+ }
+ override fun onError(t: Throwable?) {
+ /* Correct changes to the implementation could lead to us entering or not entering this method, but
+ it only matters that if we do, it is the "correct" exception that was validly used to cancel the
+ coroutine that gets passed here and not `TestException`. */
+ assertTrue(t is CancellationException)
+ }
+ override fun onComplete() { expectUnreached() }
+ })
+ expect(5)
+ val result: ChannelResult<Unit> = producerScope!!.trySend(1)
+ val e = result.exceptionOrNull()!!
+ assertTrue(e is CancellationException, "The actual error: $e")
+ assertTrue(producerScope!!.isClosedForSend)
+ assertTrue(result.isFailure)
+ }
+ finish(7)
}
@Test
@@ -182,4 +249,39 @@ class PublishTest : TestBase() {
fun testIllegalArgumentException() {
assertFailsWith<IllegalArgumentException> { publish<Int>(Job()) { } }
}
+
+ /** Tests that `trySend` doesn't throw in `publish`. */
+ @Test
+ fun testTrySendNotThrowing() = runTest {
+ var producerScope: ProducerScope<Int>? = null
+ expect(1)
+ val publisher = publish<Int>(Dispatchers.Unconfined) {
+ producerScope = this
+ expect(3)
+ delay(Long.MAX_VALUE)
+ }
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ publisher.awaitFirstOrNull()
+ expectUnreached()
+ }
+ job.cancel()
+ expect(4)
+ val result = producerScope!!.trySend(1)
+ assertTrue(result.isFailure)
+ finish(5)
+ }
+
+ /** Tests that all methods on `publish` fail without closing the channel when attempting to emit `null`. */
+ @Test
+ fun testEmittingNull() = runTest {
+ val publisher = publish {
+ assertFailsWith<NullPointerException> { send(null) }
+ assertFailsWith<NullPointerException> { trySend(null) }
+ @Suppress("DEPRECATION")
+ assertFailsWith<NullPointerException> { offer(null) }
+ send("OK")
+ }
+ assertEquals("OK", publisher.awaitFirstOrNull())
+ }
} \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt
index 736a6640..9b069dca 100644
--- a/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt
@@ -29,13 +29,14 @@ import kotlin.random.*
*/
@Suppress("ReactiveStreamsSubscriberImplementation")
class PublisherRequestStressTest : TestBase() {
+
private val testDurationSec = 3 * stressTestMultiplier
// Original code in Amazon SDK uses 4 and 16 as low/high watermarks.
- // There constants were chosen so that problem reproduces asap with particular this code.
+ // These constants were chosen so that problem reproduces asap with particular this code.
private val minDemand = 8L
private val maxDemand = 16L
-
+
private val nEmitThreads = 4
private val emitThreadNo = AtomicInteger()
@@ -47,7 +48,7 @@ class PublisherRequestStressTest : TestBase() {
private val reqPool = Executors.newSingleThreadExecutor { r ->
Thread(r, "PublisherRequestStressTest-req")
}
-
+
private val nextValue = AtomicLong(0)
@After
@@ -64,7 +65,6 @@ class PublisherRequestStressTest : TestBase() {
fun testRequestStress() {
val expectedValue = AtomicLong(0)
val requestedTill = AtomicLong(0)
- val completionLatch = CountDownLatch(1)
val callingOnNext = AtomicInteger()
val publisher = mtFlow().asPublisher()
@@ -74,7 +74,7 @@ class PublisherRequestStressTest : TestBase() {
private var demand = 0L // only updated from reqPool
override fun onComplete() {
- completionLatch.countDown()
+ expectUnreached()
}
override fun onSubscribe(sub: Subscription) {
@@ -123,7 +123,9 @@ class PublisherRequestStressTest : TestBase() {
}
if (!error) {
subscription.cancel()
- completionLatch.await()
+ runBlocking {
+ (subscription as AbstractCoroutine<*>).join()
+ }
}
}
diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt
index b7143288..806f5bd5 100644
--- a/reactive/kotlinx-coroutines-reactor/src/Flux.kt
+++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt
@@ -55,12 +55,13 @@ private fun <T> reactorPublish(
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
-private val REACTOR_HANDLER: (Throwable, CoroutineContext) -> Unit = { e, ctx ->
- if (e !is CancellationException) {
+private val REACTOR_HANDLER: (Throwable, CoroutineContext) -> Unit = { cause, ctx ->
+ if (cause !is CancellationException) {
try {
- Operators.onOperatorError(e, ctx[ReactorContext]?.context ?: Context.empty())
+ Operators.onOperatorError(cause, ctx[ReactorContext]?.context ?: Context.empty())
} catch (e: Throwable) {
- handleCoroutineException(ctx, e)
+ cause.addSuppressed(e)
+ handleCoroutineException(ctx, cause)
}
}
}
diff --git a/reactive/kotlinx-coroutines-reactor/src/Mono.kt b/reactive/kotlinx-coroutines-reactor/src/Mono.kt
index 307ec227..6e7b95ba 100644
--- a/reactive/kotlinx-coroutines-reactor/src/Mono.kt
+++ b/reactive/kotlinx-coroutines-reactor/src/Mono.kt
@@ -13,6 +13,7 @@ import reactor.core.*
import reactor.core.publisher.*
import kotlin.coroutines.*
import kotlin.internal.*
+import kotlinx.coroutines.internal.*
/**
* Creates a cold [mono][Mono] that runs a given [block] in a coroutine and emits its result.
@@ -59,13 +60,15 @@ private class MonoCoroutine<in T>(
override fun onCancelled(cause: Throwable, handled: Boolean) {
/** Cancellation exceptions that were caused by [dispose], that is, came from downstream, are not errors. */
- if (getCancellationException() !== cause || !disposed) {
+ val unwrappedCause = unwrap(cause)
+ if (getCancellationException() !== unwrappedCause || !disposed) {
try {
/** If [sink] turns out to already be in a terminal state, this exception will be passed through the
* [Hooks.onOperatorError] hook, which is the way to signal undeliverable exceptions in Reactor. */
sink.error(cause)
} catch (e: Throwable) {
// In case of improper error implementation or fatal exceptions
+ cause.addSuppressed(e)
handleCoroutineException(context, cause)
}
}
diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt
index 31f5f5d9..d059eb66 100644
--- a/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt
+++ b/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt
@@ -5,9 +5,9 @@
package kotlinx.coroutines.reactor
import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
-import org.junit.*
import org.junit.Test
import kotlin.test.*
@@ -141,4 +141,39 @@ class FluxTest : TestBase() {
.collect { }
}
}
+
+ /** Tests that `trySend` doesn't throw in `flux`. */
+ @Test
+ fun testTrySendNotThrowing() = runTest {
+ var producerScope: ProducerScope<Int>? = null
+ expect(1)
+ val flux = flux<Int>(Dispatchers.Unconfined) {
+ producerScope = this
+ expect(3)
+ delay(Long.MAX_VALUE)
+ }
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ flux.awaitFirstOrNull()
+ expectUnreached()
+ }
+ job.cancel()
+ expect(4)
+ val result = producerScope!!.trySend(1)
+ assertTrue(result.isFailure)
+ finish(5)
+ }
+
+ /** Tests that all methods on `flux` fail without closing the channel when attempting to emit `null`. */
+ @Test
+ fun testEmittingNull() = runTest {
+ val flux = flux {
+ assertFailsWith<NullPointerException> { send(null) }
+ assertFailsWith<NullPointerException> { trySend(null) }
+ @Suppress("DEPRECATION")
+ assertFailsWith<NullPointerException> { offer(null) }
+ send("OK")
+ }
+ assertEquals("OK", flux.awaitFirstOrNull())
+ }
} \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-rx2/README.md b/reactive/kotlinx-coroutines-rx2/README.md
index 40fe122f..1ae2c8a0 100644
--- a/reactive/kotlinx-coroutines-rx2/README.md
+++ b/reactive/kotlinx-coroutines-rx2/README.md
@@ -35,7 +35,6 @@ Suspending extension functions and suspending iteration:
| [ObservableSource.awaitFirstOrNull][io.reactivex.ObservableSource.awaitFirstOrNull] | Awaits for the first value from the given observable or null
| [ObservableSource.awaitLast][io.reactivex.ObservableSource.awaitFirst] | Awaits for the last value from the given observable
| [ObservableSource.awaitSingle][io.reactivex.ObservableSource.awaitSingle] | Awaits for the single value from the given observable
-| [ObservableSource.openSubscription][io.reactivex.ObservableSource.openSubscription] | Subscribes to observable and returns [ReceiveChannel]
Note that `Flowable` is a subclass of [Reactive Streams](https://www.reactive-streams.org)
`Publisher` and extensions for it are covered by
@@ -47,7 +46,6 @@ Conversion functions:
| -------- | ---------------
| [Job.asCompletable][kotlinx.coroutines.Job.asCompletable] | Converts job to hot completable
| [Deferred.asSingle][kotlinx.coroutines.Deferred.asSingle] | Converts deferred value to hot single
-| [ReceiveChannel.asObservable][kotlinx.coroutines.channels.ReceiveChannel.asObservable] | Converts streaming channel to hot observable
| [Scheduler.asCoroutineDispatcher][io.reactivex.Scheduler.asCoroutineDispatcher] | Converts scheduler to [CoroutineDispatcher]
<!--- MODULE kotlinx-coroutines-core -->
@@ -86,10 +84,8 @@ Conversion functions:
[io.reactivex.ObservableSource.awaitFirstOrElse]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/await-first-or-else.html
[io.reactivex.ObservableSource.awaitFirstOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/await-first-or-null.html
[io.reactivex.ObservableSource.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/await-single.html
-[io.reactivex.ObservableSource.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/open-subscription.html
[kotlinx.coroutines.Job.asCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.-job/as-completable.html
[kotlinx.coroutines.Deferred.asSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.-deferred/as-single.html
-[kotlinx.coroutines.channels.ReceiveChannel.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.channels.-receive-channel/as-observable.html
[io.reactivex.Scheduler.asCoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-scheduler/as-coroutine-dispatcher.html
<!--- END -->
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt b/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt
index 0fe43f1c..3e39033e 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt
@@ -20,6 +20,7 @@ internal fun handleUndeliverableException(cause: Throwable, context: CoroutineCo
try {
RxJavaPlugins.onError(cause)
} catch (e: Throwable) {
+ cause.addSuppressed(e)
handleCoroutineException(context, cause)
}
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt
index eee84b1b..3f6c27a6 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt
@@ -50,12 +50,13 @@ private class RxCompletableCoroutine(
override fun onCancelled(cause: Throwable, handled: Boolean) {
try {
- if (!subscriber.tryOnError(cause)) {
- handleUndeliverableException(cause, context)
+ if (subscriber.tryOnError(cause)) {
+ return
}
} catch (e: Throwable) {
- handleUndeliverableException(e, context)
+ cause.addSuppressed(e)
}
+ handleUndeliverableException(cause, context)
}
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt
index 9682373a..aa531c6e 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt
@@ -51,12 +51,13 @@ private class RxMaybeCoroutine<T>(
override fun onCancelled(cause: Throwable, handled: Boolean) {
try {
- if (!subscriber.tryOnError(cause)) {
- handleUndeliverableException(cause, context)
+ if (subscriber.tryOnError(cause)) {
+ return
}
} catch (e: Throwable) {
- handleUndeliverableException(e, context)
+ cause.addSuppressed(e)
}
+ handleUndeliverableException(cause, context)
}
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
index 09c5dc1d..c096c0d2 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
@@ -9,6 +9,7 @@ import io.reactivex.exceptions.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.internal.*
import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.*
import kotlin.coroutines.*
@@ -60,38 +61,29 @@ private class RxObservableCoroutine<T : Any>(
) : AbstractCoroutine<Unit>(parentContext, false, true), ProducerScope<T>, SelectClause2<T, SendChannel<T>> {
override val channel: SendChannel<T> get() = this
- // Mutex is locked when while subscriber.onXXX is being invoked
+ // Mutex is locked while subscriber.onXXX is being invoked
private val mutex = Mutex()
private val _signal = atomic(OPEN)
- override val isClosedForSend: Boolean get() = isCompleted
+ override val isClosedForSend: Boolean get() = !isActive
override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause)
override fun invokeOnClose(handler: (Throwable?) -> Unit) =
throw UnsupportedOperationException("RxObservableCoroutine doesn't support invokeOnClose")
- override fun offer(element: T): Boolean {
- if (!mutex.tryLock()) return false
- doLockedNext(element)
- return true
- }
-
- override fun trySend(element: T): ChannelResult<Unit> {
- if (!mutex.tryLock()) return ChannelResult.failure()
- doLockedNext(element)
- return ChannelResult.success(Unit)
- }
+ override fun trySend(element: T): ChannelResult<Unit> =
+ if (!mutex.tryLock()) {
+ ChannelResult.failure()
+ } else {
+ when (val throwable = doLockedNext(element)) {
+ null -> ChannelResult.success(Unit)
+ else -> ChannelResult.closed(throwable)
+ }
+ }
public override suspend fun send(element: T) {
- // fast-path -- try send without suspension
- if (trySend(element).isSuccess) return
- // slow-path does suspend
- return sendSuspend(element)
- }
-
- private suspend fun sendSuspend(element: T) {
mutex.lock()
- doLockedNext(element)
+ doLockedNext(element)?.let { throw it }
}
override val onSend: SelectClause2<T, SendChannel<T>>
@@ -99,30 +91,39 @@ private class RxObservableCoroutine<T : Any>(
// registerSelectSend
@Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
- override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) {
+ override fun <R> registerSelectClause2(
+ select: SelectInstance<R>,
+ element: T,
+ block: suspend (SendChannel<T>) -> R
+ ) {
mutex.onLock.registerSelectClause2(select, null) {
- doLockedNext(element)
+ doLockedNext(element)?.let { throw it }
block(this)
}
}
// assert: mutex.isLocked()
- private fun doLockedNext(elem: T) {
+ private fun doLockedNext(elem: T): Throwable? {
// check if already closed for send
if (!isActive) {
doLockedSignalCompleted(completionCause, completionCauseHandled)
- throw getCancellationException()
+ return getCancellationException()
}
// notify subscriber
try {
subscriber.onNext(elem)
} catch (e: Throwable) {
- // If onNext fails with exception, then we cancel coroutine (with this exception) and then rethrow it
- // to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery,
- // this failure is essentially equivalent to a failure of a child coroutine.
- cancelCoroutine(e)
- mutex.unlock()
- throw e
+ val cause = UndeliverableException(e)
+ val causeDelivered = close(cause)
+ unlockAndCheckCompleted()
+ return if (causeDelivered) {
+ // `cause` is the reason this channel is closed
+ cause
+ } else {
+ // Someone else closed the channel during `onNext`. We report `cause` as an undeliverable exception.
+ handleUndeliverableException(cause, context)
+ getCancellationException()
+ }
}
/*
* There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
@@ -131,6 +132,7 @@ private class RxObservableCoroutine<T : Any>(
* We have to recheck `isCompleted` after `unlock` anyway.
*/
unlockAndCheckCompleted()
+ return null
}
private fun unlockAndCheckCompleted() {
@@ -144,33 +146,31 @@ private class RxObservableCoroutine<T : Any>(
private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) {
// cancellation failures
try {
- if (_signal.value >= CLOSED) {
- _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
+ if (_signal.value == SIGNALLED)
+ return
+ _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
+ @Suppress("INVISIBLE_MEMBER")
+ val unwrappedCause = cause?.let { unwrap(it) }
+ if (unwrappedCause == null) {
try {
- if (cause != null && cause !is CancellationException) {
- /*
- * Reactive frameworks have two types of exceptions: regular and fatal.
- * Regular are passed to onError.
- * Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297).
- * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether
- * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was
- * thrown by subscriber or upstream).
- * To make behaviour consistent and least surprising, we always handle fatal exceptions
- * by coroutines machinery, anyway, they should not be present in regular program flow,
- * thus our goal here is just to expose it as soon as possible.
- */
- subscriber.tryOnError(cause)
- if (!handled && cause.isFatal()) {
- handleUndeliverableException(cause, context)
- }
- }
- else {
- subscriber.onComplete()
- }
- } catch (e: Throwable) {
- // Unhandled exception (cannot handle in other way, since we are already complete)
+ subscriber.onComplete()
+ } catch (e: Exception) {
handleUndeliverableException(e, context)
}
+ } else if (unwrappedCause is UndeliverableException && !handled) {
+ /** Such exceptions are not reported to `onError`, as, according to the reactive specifications,
+ * exceptions thrown from the Subscriber methods must be treated as if the Subscriber was already
+ * cancelled. */
+ handleUndeliverableException(cause, context)
+ } else if (unwrappedCause !== getCancellationException() || !subscriber.isDisposed) {
+ try {
+ /** If the subscriber is already in a terminal state, the error will be signalled to
+ * `RxJavaPlugins.onError`. */
+ subscriber.onError(cause)
+ } catch (e: Exception) {
+ cause.addSuppressed(e)
+ handleUndeliverableException(cause, context)
+ }
}
} finally {
mutex.unlock()
@@ -192,13 +192,6 @@ private class RxObservableCoroutine<T : Any>(
}
}
-internal fun Throwable.isFatal() = try {
- Exceptions.throwIfFatal(this) // Rx-consistent behaviour without hardcode
- false
-} catch (e: Throwable) {
- true
-}
-
@Deprecated(
message = "CoroutineScope.rxObservable is deprecated in favour of top-level rxObservable",
level = DeprecationLevel.HIDDEN,
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt
index 49a9bb8d..c7ad606e 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt
@@ -50,12 +50,13 @@ private class RxSingleCoroutine<T: Any>(
override fun onCancelled(cause: Throwable, handled: Boolean) {
try {
- if (!subscriber.tryOnError(cause)) {
- handleUndeliverableException(cause, context)
+ if (subscriber.tryOnError(cause)) {
+ return
}
} catch (e: Throwable) {
- handleUndeliverableException(e, context)
+ cause.addSuppressed(e)
}
+ handleUndeliverableException(cause, context)
}
}
diff --git a/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt
index 05b7ee92..31643929 100644
--- a/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt
@@ -38,16 +38,16 @@ class FlowableExceptionHandlingTest : TestBase() {
}
@Test
- fun testFatalException() = withExceptionHandler(handler<LinkageError>(3)) {
+ fun testFatalException() = withExceptionHandler({ expectUnreached() }) {
rxFlowable<Int>(Dispatchers.Unconfined) {
expect(1)
throw LinkageError()
}.subscribe({
expectUnreached()
}, {
- expect(2) // Fatal exception is reported to both onError and CEH
+ expect(2) // Fatal exceptions are not treated as special
})
- finish(4)
+ finish(3)
}
@Test
@@ -66,7 +66,7 @@ class FlowableExceptionHandlingTest : TestBase() {
}
@Test
- fun testFatalExceptionAsynchronous() = withExceptionHandler(handler<LinkageError>(3)) {
+ fun testFatalExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) {
rxFlowable<Int>(Dispatchers.Unconfined) {
expect(1)
throw LinkageError()
@@ -77,19 +77,19 @@ class FlowableExceptionHandlingTest : TestBase() {
}, {
expect(2)
})
- finish(4)
+ finish(3)
}
@Test
- fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) {
+ fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
rxFlowable(Dispatchers.Unconfined) {
expect(1)
send(Unit)
}.subscribe({
expect(2)
throw LinkageError()
- }, { expect(3) }) // Fatal exception is reported to both onError and CEH
- finish(5)
+ }, { expectUnreached() }) // Fatal exception is rethrown from `onNext` => the subscription is thought to be cancelled
+ finish(4)
}
@Test
diff --git a/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt
index 4bf3e453..8a6362ad 100644
--- a/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt
@@ -7,6 +7,7 @@ package kotlinx.coroutines.rx2
import io.reactivex.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.reactive.*
import org.junit.Test
import org.junit.runner.*
import org.junit.runners.*
@@ -124,6 +125,21 @@ class IntegrationTest(
finish(3)
}
+ @Test
+ fun testObservableWithTimeout() = runTest {
+ val observable = rxObservable<Int> {
+ expect(2)
+ withTimeout(1) { delay(100) }
+ }
+ try {
+ expect(1)
+ observable.awaitFirstOrNull()
+ } catch (e: CancellationException) {
+ expect(3)
+ }
+ finish(4)
+ }
+
private suspend fun checkNumbers(n: Int, observable: Observable<Int>) {
var last = 0
observable.collect {
diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt
index d6cdd3ca..fb3d0f69 100644
--- a/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt
@@ -8,6 +8,7 @@ import io.reactivex.exceptions.*
import kotlinx.coroutines.*
import org.junit.*
import org.junit.Test
+import java.util.concurrent.*
import kotlin.test.*
class ObservableExceptionHandlingTest : TestBase() {
@@ -18,7 +19,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}
private inline fun <reified T : Throwable> handler(expect: Int) = { t: Throwable ->
- assertTrue(t is UndeliverableException && t.cause is T)
+ assertTrue(t is UndeliverableException && t.cause is T, "$t")
expect(expect)
}
@@ -38,8 +39,8 @@ class ObservableExceptionHandlingTest : TestBase() {
}
@Test
- fun testFatalException() = withExceptionHandler(handler<LinkageError>(3)) {
- rxObservable<Int>(Dispatchers.Unconfined) {
+ fun testFatalException() = withExceptionHandler({ expectUnreached() }) {
+ rxObservable<Int>(Dispatchers.Unconfined + cehUnreached()) {
expect(1)
throw LinkageError()
}.subscribe({
@@ -47,7 +48,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}, {
expect(2)
})
- finish(4)
+ finish(3)
}
@Test
@@ -66,7 +67,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}
@Test
- fun testFatalExceptionAsynchronous() = withExceptionHandler(handler<LinkageError>(3)) {
+ fun testFatalExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) {
rxObservable<Int>(Dispatchers.Unconfined) {
expect(1)
throw LinkageError()
@@ -75,20 +76,28 @@ class ObservableExceptionHandlingTest : TestBase() {
.subscribe({
expectUnreached()
}, {
- expect(2) // Fatal exception is not reported in onError
+ expect(2) // Fatal exceptions are not treated in a special manner
})
- finish(4)
+ finish(3)
}
@Test
- fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) {
+ fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
+ val latch = CountDownLatch(1)
rxObservable(Dispatchers.Unconfined) {
expect(1)
- send(Unit)
+ val result = trySend(Unit)
+ val exception = result.exceptionOrNull()
+ assertTrue(exception is UndeliverableException)
+ assertTrue(exception.cause is LinkageError)
+ assertTrue(isClosedForSend)
+ expect(4)
+ latch.countDown()
}.subscribe({
expect(2)
throw LinkageError()
- }, { expect(3) }) // Unreached because fatal errors are rethrown
+ }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw.
+ latch.await()
finish(5)
}
@@ -100,7 +109,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}.subscribe({
expect(2)
throw TestException()
- }, { expect(3) }) // not reported to onError because came from the subscribe itself
+ }, { expect(3) })
finish(4)
}
@@ -119,7 +128,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}
@Test
- fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) {
+ fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
rxObservable(Dispatchers.Unconfined) {
expect(1)
send(Unit)
@@ -128,7 +137,7 @@ class ObservableExceptionHandlingTest : TestBase() {
.subscribe({
expect(2)
throw LinkageError()
- }, { expect(3) })
- finish(5)
+ }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw.
+ finish(4)
}
}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt b/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt
index 29951598..1017b112 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt
@@ -20,6 +20,7 @@ internal fun handleUndeliverableException(cause: Throwable, context: CoroutineCo
try {
RxJavaPlugins.onError(cause)
} catch (e: Throwable) {
+ cause.addSuppressed(e)
handleCoroutineException(context, cause)
}
}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt b/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt
index 88137675..47cc6ad3 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt
@@ -50,11 +50,12 @@ private class RxCompletableCoroutine(
override fun onCancelled(cause: Throwable, handled: Boolean) {
try {
- if (!subscriber.tryOnError(cause)) {
- handleUndeliverableException(cause, context)
+ if (subscriber.tryOnError(cause)) {
+ return
}
} catch (e: Throwable) {
- handleUndeliverableException(e, context)
+ cause.addSuppressed(e)
}
+ handleUndeliverableException(cause, context)
}
}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt
index 1c102664..12d0197b 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt
@@ -51,11 +51,12 @@ private class RxMaybeCoroutine<T>(
override fun onCancelled(cause: Throwable, handled: Boolean) {
try {
- if (!subscriber.tryOnError(cause)) {
- handleUndeliverableException(cause, context)
+ if (subscriber.tryOnError(cause)) {
+ return
}
} catch (e: Throwable) {
- handleUndeliverableException(e, context)
+ cause.addSuppressed(e)
}
+ handleUndeliverableException(cause, context)
}
}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt
index 55794f9a..5c810c49 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt
@@ -12,6 +12,7 @@ import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.*
import kotlin.coroutines.*
+import kotlinx.coroutines.internal.*
/**
* Creates cold [observable][Observable] that will run a given [block] in a coroutine.
@@ -54,38 +55,35 @@ private const val OPEN = 0 // open channel, still working
private const val CLOSED = -1 // closed, but have not signalled onCompleted/onError yet
private const val SIGNALLED = -2 // already signalled subscriber onCompleted/onError
-private class RxObservableCoroutine<T: Any>(
+private class RxObservableCoroutine<T : Any>(
parentContext: CoroutineContext,
private val subscriber: ObservableEmitter<T>
) : AbstractCoroutine<Unit>(parentContext, false, true), ProducerScope<T>, SelectClause2<T, SendChannel<T>> {
override val channel: SendChannel<T> get() = this
- // Mutex is locked when while subscriber.onXXX is being invoked
+ // Mutex is locked while subscriber.onXXX is being invoked
private val mutex = Mutex()
private val _signal = atomic(OPEN)
- override val isClosedForSend: Boolean get() = isCompleted
+ override val isClosedForSend: Boolean get() = !isActive
override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause)
override fun invokeOnClose(handler: (Throwable?) -> Unit) =
throw UnsupportedOperationException("RxObservableCoroutine doesn't support invokeOnClose")
- override fun trySend(element: T): ChannelResult<Unit> {
- if (!mutex.tryLock()) return ChannelResult.failure()
- doLockedNext(element)
- return ChannelResult.success(Unit)
- }
+ override fun trySend(element: T): ChannelResult<Unit> =
+ if (!mutex.tryLock()) {
+ ChannelResult.failure()
+ } else {
+ when (val throwable = doLockedNext(element)) {
+ null -> ChannelResult.success(Unit)
+ else -> ChannelResult.closed(throwable)
+ }
+ }
public override suspend fun send(element: T) {
- // fast-path -- try send without suspension
- if (trySend(element).isSuccess) return
- // slow-path does suspend
- return sendSuspend(element)
- }
-
- private suspend fun sendSuspend(element: T) {
mutex.lock()
- doLockedNext(element)
+ doLockedNext(element)?.let { throw it }
}
override val onSend: SelectClause2<T, SendChannel<T>>
@@ -93,30 +91,39 @@ private class RxObservableCoroutine<T: Any>(
// registerSelectSend
@Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
- override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) {
+ override fun <R> registerSelectClause2(
+ select: SelectInstance<R>,
+ element: T,
+ block: suspend (SendChannel<T>) -> R
+ ) {
mutex.onLock.registerSelectClause2(select, null) {
- doLockedNext(element)
+ doLockedNext(element)?.let { throw it }
block(this)
}
}
// assert: mutex.isLocked()
- private fun doLockedNext(elem: T) {
+ private fun doLockedNext(elem: T): Throwable? {
// check if already closed for send
if (!isActive) {
doLockedSignalCompleted(completionCause, completionCauseHandled)
- throw getCancellationException()
+ return getCancellationException()
}
// notify subscriber
try {
subscriber.onNext(elem)
} catch (e: Throwable) {
- // If onNext fails with exception, then we cancel coroutine (with this exception) and then rethrow it
- // to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery,
- // this failure is essentially equivalent to a failure of a child coroutine.
- cancelCoroutine(e)
- mutex.unlock()
- throw e
+ val cause = UndeliverableException(e)
+ val causeDelivered = close(cause)
+ unlockAndCheckCompleted()
+ return if (causeDelivered) {
+ // `cause` is the reason this channel is closed
+ cause
+ } else {
+ // Someone else closed the channel during `onNext`. We report `cause` as an undeliverable exception.
+ handleUndeliverableException(cause, context)
+ getCancellationException()
+ }
}
/*
* There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
@@ -125,6 +132,7 @@ private class RxObservableCoroutine<T: Any>(
* We have to recheck `isCompleted` after `unlock` anyway.
*/
unlockAndCheckCompleted()
+ return null
}
private fun unlockAndCheckCompleted() {
@@ -138,33 +146,31 @@ private class RxObservableCoroutine<T: Any>(
private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) {
// cancellation failures
try {
- if (_signal.value >= CLOSED) {
- _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
+ if (_signal.value == SIGNALLED)
+ return
+ _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
+ @Suppress("INVISIBLE_MEMBER")
+ val unwrappedCause = cause?.let { unwrap(it) }
+ if (unwrappedCause == null) {
try {
- if (cause != null && cause !is CancellationException) {
- /*
- * Reactive frameworks have two types of exceptions: regular and fatal.
- * Regular are passed to onError.
- * Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297).
- * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether
- * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was
- * thrown by subscriber or upstream).
- * To make behaviour consistent and least surprising, we always handle fatal exceptions
- * by coroutines machinery, anyway, they should not be present in regular program flow,
- * thus our goal here is just to expose it as soon as possible.
- */
- subscriber.tryOnError(cause)
- if (!handled && cause.isFatal()) {
- handleUndeliverableException(cause, context)
- }
- }
- else {
- subscriber.onComplete()
- }
- } catch (e: Throwable) {
- // Unhandled exception (cannot handle in other way, since we are already complete)
+ subscriber.onComplete()
+ } catch (e: Exception) {
handleUndeliverableException(e, context)
}
+ } else if (unwrappedCause is UndeliverableException && !handled) {
+ /** Such exceptions are not reported to `onError`, as, according to the reactive specifications,
+ * exceptions thrown from the Subscriber methods must be treated as if the Subscriber was already
+ * cancelled. */
+ handleUndeliverableException(cause, context)
+ } else if (unwrappedCause !== getCancellationException() || !subscriber.isDisposed) {
+ try {
+ /** If the subscriber is already in a terminal state, the error will be signalled to
+ * `RxJavaPlugins.onError`. */
+ subscriber.onError(cause)
+ } catch (e: Exception) {
+ cause.addSuppressed(e)
+ handleUndeliverableException(cause, context)
+ }
}
} finally {
mutex.unlock()
@@ -186,9 +192,3 @@ private class RxObservableCoroutine<T: Any>(
}
}
-internal fun Throwable.isFatal() = try {
- Exceptions.throwIfFatal(this) // Rx-consistent behaviour without hardcode
- false
-} catch (e: Throwable) {
- true
-}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt
index fb6020ea..e7678f0d 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt
@@ -50,11 +50,12 @@ private class RxSingleCoroutine<T: Any>(
override fun onCancelled(cause: Throwable, handled: Boolean) {
try {
- if (!subscriber.tryOnError(cause)) {
- handleUndeliverableException(cause, context)
+ if (subscriber.tryOnError(cause)) {
+ return
}
} catch (e: Throwable) {
- handleUndeliverableException(e, context)
+ cause.addSuppressed(e)
}
+ handleUndeliverableException(cause, context)
}
}
diff --git a/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt
index 8cbd7ee8..126cb818 100644
--- a/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt
+++ b/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt
@@ -38,16 +38,16 @@ class FlowableExceptionHandlingTest : TestBase() {
}
@Test
- fun testFatalException() = withExceptionHandler(handler<LinkageError>(3)) {
+ fun testFatalException() = withExceptionHandler({ expectUnreached() }) {
rxFlowable<Int>(Dispatchers.Unconfined) {
expect(1)
throw LinkageError()
}.subscribe({
expectUnreached()
}, {
- expect(2) // Fatal exception is reported to both onError and CEH
+ expect(2) // Fatal exceptions are not treated as special
})
- finish(4)
+ finish(3)
}
@Test
@@ -66,7 +66,7 @@ class FlowableExceptionHandlingTest : TestBase() {
}
@Test
- fun testFatalExceptionAsynchronous() = withExceptionHandler(handler<LinkageError>(3)) {
+ fun testFatalExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) {
rxFlowable<Int>(Dispatchers.Unconfined) {
expect(1)
throw LinkageError()
@@ -77,19 +77,19 @@ class FlowableExceptionHandlingTest : TestBase() {
}, {
expect(2)
})
- finish(4)
+ finish(3)
}
@Test
- fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) {
+ fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
rxFlowable(Dispatchers.Unconfined) {
expect(1)
send(Unit)
}.subscribe({
expect(2)
throw LinkageError()
- }, { expect(3) }) // Fatal exception is reported to both onError and CEH
- finish(5)
+ }, { expectUnreached() }) // Fatal exception is rethrown from `onNext` => the subscription is thought to be cancelled
+ finish(4)
}
@Test
diff --git a/reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt
index 395672ce..1302124f 100644
--- a/reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt
+++ b/reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt
@@ -125,6 +125,21 @@ class IntegrationTest(
finish(3)
}
+ @Test
+ fun testObservableWithTimeout() = runTest {
+ val observable = rxObservable<Int> {
+ expect(2)
+ withTimeout(1) { delay(100) }
+ }
+ try {
+ expect(1)
+ observable.awaitFirstOrNull()
+ } catch (e: CancellationException) {
+ expect(3)
+ }
+ finish(4)
+ }
+
private suspend fun checkNumbers(n: Int, observable: Observable<Int>) {
var last = 0
observable.collect {
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt
index 1183b2ae..5ddb36ed 100644
--- a/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt
@@ -8,6 +8,7 @@ import io.reactivex.rxjava3.exceptions.*
import kotlinx.coroutines.*
import org.junit.*
import org.junit.Test
+import java.util.concurrent.*
import kotlin.test.*
class ObservableExceptionHandlingTest : TestBase() {
@@ -18,7 +19,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}
private inline fun <reified T : Throwable> handler(expect: Int) = { t: Throwable ->
- assertTrue(t is UndeliverableException && t.cause is T)
+ assertTrue(t is UndeliverableException && t.cause is T, "$t")
expect(expect)
}
@@ -38,8 +39,8 @@ class ObservableExceptionHandlingTest : TestBase() {
}
@Test
- fun testFatalException() = withExceptionHandler(handler<LinkageError>(3)) {
- rxObservable<Int>(Dispatchers.Unconfined) {
+ fun testFatalException() = withExceptionHandler({ expectUnreached() }) {
+ rxObservable<Int>(Dispatchers.Unconfined + cehUnreached()) {
expect(1)
throw LinkageError()
}.subscribe({
@@ -47,7 +48,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}, {
expect(2)
})
- finish(4)
+ finish(3)
}
@Test
@@ -66,7 +67,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}
@Test
- fun testFatalExceptionAsynchronous() = withExceptionHandler(handler<LinkageError>(3)) {
+ fun testFatalExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) {
rxObservable<Int>(Dispatchers.Unconfined) {
expect(1)
throw LinkageError()
@@ -75,20 +76,28 @@ class ObservableExceptionHandlingTest : TestBase() {
.subscribe({
expectUnreached()
}, {
- expect(2) // Fatal exception is not reported in onError
+ expect(2) // Fatal exceptions are not treated in a special manner
})
- finish(4)
+ finish(3)
}
@Test
- fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) {
+ fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
+ val latch = CountDownLatch(1)
rxObservable(Dispatchers.Unconfined) {
expect(1)
- send(Unit)
+ val result = trySend(Unit)
+ val exception = result.exceptionOrNull()
+ assertTrue(exception is UndeliverableException)
+ assertTrue(exception.cause is LinkageError)
+ assertTrue(isClosedForSend)
+ expect(4)
+ latch.countDown()
}.subscribe({
expect(2)
throw LinkageError()
- }, { expect(3) }) // Unreached because fatal errors are rethrown
+ }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw.
+ latch.await()
finish(5)
}
@@ -100,7 +109,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}.subscribe({
expect(2)
throw TestException()
- }, { expect(3) }) // not reported to onError because came from the subscribe itself
+ }, { expect(3) })
finish(4)
}
@@ -119,7 +128,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}
@Test
- fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) {
+ fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
rxObservable(Dispatchers.Unconfined) {
expect(1)
send(Unit)
@@ -128,7 +137,7 @@ class ObservableExceptionHandlingTest : TestBase() {
.subscribe({
expect(2)
throw LinkageError()
- }, { expect(3) })
- finish(5)
+ }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw.
+ finish(4)
}
}