aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-reactive/test/PublishTest.kt')
-rw-r--r--reactive/kotlinx-coroutines-reactive/test/PublishTest.kt158
1 files changed, 130 insertions, 28 deletions
diff --git a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt
index 9e3c07b6..095b724d 100644
--- a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt
@@ -5,6 +5,7 @@
package kotlinx.coroutines.reactive
import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
import org.junit.Test
import org.reactivestreams.*
import kotlin.test.*
@@ -121,44 +122,110 @@ class PublishTest : TestBase() {
finish(7)
}
+ /** Tests that, as soon as `ProducerScope.close` is called, `isClosedForSend` starts returning `true`. */
@Test
- fun testOnNextError() = runTest {
+ fun testChannelClosing() = runTest {
expect(1)
- val publisher = publish(currentDispatcher()) {
+ val publisher = publish<Int>(Dispatchers.Unconfined) {
+ expect(3)
+ close()
+ assert(isClosedForSend)
expect(4)
- try {
- send("OK")
- } catch(e: Throwable) {
- expect(6)
- assert(e is TestException)
- }
}
- expect(2)
+ try {
+ expect(2)
+ publisher.awaitFirstOrNull()
+ } catch (e: CancellationException) {
+ expect(5)
+ }
+ finish(6)
+ }
+
+ @Test
+ fun testOnNextError() = runTest {
val latch = CompletableDeferred<Unit>()
- publisher.subscribe(object : Subscriber<String> {
- override fun onComplete() {
- expectUnreached()
+ expect(1)
+ assertCallsExceptionHandlerWith<TestException> { exceptionHandler ->
+ val publisher = publish(currentDispatcher() + exceptionHandler) {
+ expect(4)
+ try {
+ send("OK")
+ } catch (e: Throwable) {
+ expect(6)
+ assert(e is TestException)
+ assert(isClosedForSend)
+ latch.complete(Unit)
+ }
}
+ expect(2)
+ publisher.subscribe(object : Subscriber<String> {
+ override fun onComplete() {
+ expectUnreached()
+ }
- override fun onSubscribe(s: Subscription) {
- expect(3)
- s.request(1)
- }
+ override fun onSubscribe(s: Subscription) {
+ expect(3)
+ s.request(1)
+ }
- override fun onNext(t: String) {
- expect(5)
- assertEquals("OK", t)
- throw TestException()
- }
+ override fun onNext(t: String) {
+ expect(5)
+ assertEquals("OK", t)
+ throw TestException()
+ }
- override fun onError(t: Throwable) {
- expect(7)
- assert(t is TestException)
- latch.complete(Unit)
+ override fun onError(t: Throwable) {
+ expectUnreached()
+ }
+ })
+ latch.await()
+ }
+ finish(7)
+ }
+
+ /** Tests the behavior when a call to `onNext` fails after the channel is already closed. */
+ @Test
+ fun testOnNextErrorAfterCancellation() = runTest {
+ assertCallsExceptionHandlerWith<TestException> { handler ->
+ var producerScope: ProducerScope<Int>? = null
+ CompletableDeferred<Unit>()
+ expect(1)
+ var job: Job? = null
+ val publisher = publish<Int>(handler + Dispatchers.Unconfined) {
+ producerScope = this
+ expect(4)
+ job = launch {
+ delay(Long.MAX_VALUE)
+ }
}
- })
- latch.await()
- finish(8)
+ expect(2)
+ publisher.subscribe(object: Subscriber<Int> {
+ override fun onSubscribe(s: Subscription) {
+ expect(3)
+ s.request(Long.MAX_VALUE)
+ }
+ override fun onNext(t: Int) {
+ expect(6)
+ assertEquals(1, t)
+ job!!.cancel()
+ throw TestException()
+ }
+ override fun onError(t: Throwable?) {
+ /* Correct changes to the implementation could lead to us entering or not entering this method, but
+ it only matters that if we do, it is the "correct" exception that was validly used to cancel the
+ coroutine that gets passed here and not `TestException`. */
+ assertTrue(t is CancellationException)
+ }
+ override fun onComplete() { expectUnreached() }
+ })
+ expect(5)
+ val result: ChannelResult<Unit> = producerScope!!.trySend(1)
+ val e = result.exceptionOrNull()!!
+ assertTrue(e is CancellationException, "The actual error: $e")
+ assertTrue(producerScope!!.isClosedForSend)
+ assertTrue(result.isFailure)
+ }
+ finish(7)
}
@Test
@@ -182,4 +249,39 @@ class PublishTest : TestBase() {
fun testIllegalArgumentException() {
assertFailsWith<IllegalArgumentException> { publish<Int>(Job()) { } }
}
+
+ /** Tests that `trySend` doesn't throw in `publish`. */
+ @Test
+ fun testTrySendNotThrowing() = runTest {
+ var producerScope: ProducerScope<Int>? = null
+ expect(1)
+ val publisher = publish<Int>(Dispatchers.Unconfined) {
+ producerScope = this
+ expect(3)
+ delay(Long.MAX_VALUE)
+ }
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ publisher.awaitFirstOrNull()
+ expectUnreached()
+ }
+ job.cancel()
+ expect(4)
+ val result = producerScope!!.trySend(1)
+ assertTrue(result.isFailure)
+ finish(5)
+ }
+
+ /** Tests that all methods on `publish` fail without closing the channel when attempting to emit `null`. */
+ @Test
+ fun testEmittingNull() = runTest {
+ val publisher = publish {
+ assertFailsWith<NullPointerException> { send(null) }
+ assertFailsWith<NullPointerException> { trySend(null) }
+ @Suppress("DEPRECATION")
+ assertFailsWith<NullPointerException> { offer(null) }
+ send("OK")
+ }
+ assertEquals("OK", publisher.awaitFirstOrNull())
+ }
} \ No newline at end of file