diff options
Diffstat (limited to 'reactive/kotlinx-coroutines-reactive/test/PublishTest.kt')
-rw-r--r-- | reactive/kotlinx-coroutines-reactive/test/PublishTest.kt | 158 |
1 files changed, 130 insertions, 28 deletions
diff --git a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt index 9e3c07b6..095b724d 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt @@ -5,6 +5,7 @@ package kotlinx.coroutines.reactive import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* import org.junit.Test import org.reactivestreams.* import kotlin.test.* @@ -121,44 +122,110 @@ class PublishTest : TestBase() { finish(7) } + /** Tests that, as soon as `ProducerScope.close` is called, `isClosedForSend` starts returning `true`. */ @Test - fun testOnNextError() = runTest { + fun testChannelClosing() = runTest { expect(1) - val publisher = publish(currentDispatcher()) { + val publisher = publish<Int>(Dispatchers.Unconfined) { + expect(3) + close() + assert(isClosedForSend) expect(4) - try { - send("OK") - } catch(e: Throwable) { - expect(6) - assert(e is TestException) - } } - expect(2) + try { + expect(2) + publisher.awaitFirstOrNull() + } catch (e: CancellationException) { + expect(5) + } + finish(6) + } + + @Test + fun testOnNextError() = runTest { val latch = CompletableDeferred<Unit>() - publisher.subscribe(object : Subscriber<String> { - override fun onComplete() { - expectUnreached() + expect(1) + assertCallsExceptionHandlerWith<TestException> { exceptionHandler -> + val publisher = publish(currentDispatcher() + exceptionHandler) { + expect(4) + try { + send("OK") + } catch (e: Throwable) { + expect(6) + assert(e is TestException) + assert(isClosedForSend) + latch.complete(Unit) + } } + expect(2) + publisher.subscribe(object : Subscriber<String> { + override fun onComplete() { + expectUnreached() + } - override fun onSubscribe(s: Subscription) { - expect(3) - s.request(1) - } + override fun onSubscribe(s: Subscription) { + expect(3) + s.request(1) + } - override fun onNext(t: String) { - expect(5) - assertEquals("OK", t) - throw TestException() - } + override fun onNext(t: String) { + expect(5) + assertEquals("OK", t) + throw TestException() + } - override fun onError(t: Throwable) { - expect(7) - assert(t is TestException) - latch.complete(Unit) + override fun onError(t: Throwable) { + expectUnreached() + } + }) + latch.await() + } + finish(7) + } + + /** Tests the behavior when a call to `onNext` fails after the channel is already closed. */ + @Test + fun testOnNextErrorAfterCancellation() = runTest { + assertCallsExceptionHandlerWith<TestException> { handler -> + var producerScope: ProducerScope<Int>? = null + CompletableDeferred<Unit>() + expect(1) + var job: Job? = null + val publisher = publish<Int>(handler + Dispatchers.Unconfined) { + producerScope = this + expect(4) + job = launch { + delay(Long.MAX_VALUE) + } } - }) - latch.await() - finish(8) + expect(2) + publisher.subscribe(object: Subscriber<Int> { + override fun onSubscribe(s: Subscription) { + expect(3) + s.request(Long.MAX_VALUE) + } + override fun onNext(t: Int) { + expect(6) + assertEquals(1, t) + job!!.cancel() + throw TestException() + } + override fun onError(t: Throwable?) { + /* Correct changes to the implementation could lead to us entering or not entering this method, but + it only matters that if we do, it is the "correct" exception that was validly used to cancel the + coroutine that gets passed here and not `TestException`. */ + assertTrue(t is CancellationException) + } + override fun onComplete() { expectUnreached() } + }) + expect(5) + val result: ChannelResult<Unit> = producerScope!!.trySend(1) + val e = result.exceptionOrNull()!! + assertTrue(e is CancellationException, "The actual error: $e") + assertTrue(producerScope!!.isClosedForSend) + assertTrue(result.isFailure) + } + finish(7) } @Test @@ -182,4 +249,39 @@ class PublishTest : TestBase() { fun testIllegalArgumentException() { assertFailsWith<IllegalArgumentException> { publish<Int>(Job()) { } } } + + /** Tests that `trySend` doesn't throw in `publish`. */ + @Test + fun testTrySendNotThrowing() = runTest { + var producerScope: ProducerScope<Int>? = null + expect(1) + val publisher = publish<Int>(Dispatchers.Unconfined) { + producerScope = this + expect(3) + delay(Long.MAX_VALUE) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + expect(2) + publisher.awaitFirstOrNull() + expectUnreached() + } + job.cancel() + expect(4) + val result = producerScope!!.trySend(1) + assertTrue(result.isFailure) + finish(5) + } + + /** Tests that all methods on `publish` fail without closing the channel when attempting to emit `null`. */ + @Test + fun testEmittingNull() = runTest { + val publisher = publish { + assertFailsWith<NullPointerException> { send(null) } + assertFailsWith<NullPointerException> { trySend(null) } + @Suppress("DEPRECATION") + assertFailsWith<NullPointerException> { offer(null) } + send("OK") + } + assertEquals("OK", publisher.awaitFirstOrNull()) + } }
\ No newline at end of file |