diff options
Diffstat (limited to 'reactive/kotlinx-coroutines-reactive/test')
10 files changed, 537 insertions, 91 deletions
diff --git a/reactive/kotlinx-coroutines-reactive/test/AwaitTest.kt b/reactive/kotlinx-coroutines-reactive/test/AwaitTest.kt new file mode 100644 index 00000000..6749423f --- /dev/null +++ b/reactive/kotlinx-coroutines-reactive/test/AwaitTest.kt @@ -0,0 +1,43 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.reactive + +import kotlinx.coroutines.* +import org.junit.* +import org.reactivestreams.* + +class AwaitTest: TestBase() { + + /** Tests that calls to [awaitFirst] (and, thus, to the rest of these functions) throw [CancellationException] and + * unsubscribe from the publisher when their [Job] is cancelled. */ + @Test + fun testAwaitCancellation() = runTest { + expect(1) + val publisher = Publisher<Int> { s -> + s.onSubscribe(object: Subscription { + override fun request(n: Long) { + expect(3) + } + + override fun cancel() { + expect(5) + } + }) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + try { + expect(2) + publisher.awaitFirst() + } catch (e: CancellationException) { + expect(6) + throw e + } + } + expect(4) + job.cancelAndJoin() + finish(7) + } + +}
\ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactive/test/CancelledParentAttachTest.kt b/reactive/kotlinx-coroutines-reactive/test/CancelledParentAttachTest.kt new file mode 100644 index 00000000..1db10b27 --- /dev/null +++ b/reactive/kotlinx-coroutines-reactive/test/CancelledParentAttachTest.kt @@ -0,0 +1,21 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.reactive + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.junit.* + + +class CancelledParentAttachTest : TestBase() {; + + @Test + fun testFlow() = runTest { + val f = flowOf(1, 2, 3).cancellable() + val j = Job().also { it.cancel() } + f.asPublisher(j).asFlow().collect() + } + +} diff --git a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt index e7b8cb17..02c9e242 100644 --- a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt @@ -5,6 +5,7 @@ package kotlinx.coroutines.reactive import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.flow.* import org.junit.Test import org.reactivestreams.* @@ -15,7 +16,7 @@ class FlowAsPublisherTest : TestBase() { @Test fun testErrorOnCancellationIsReported() { expect(1) - flow<Int> { + flow { try { emit(2) } finally { @@ -50,13 +51,13 @@ class FlowAsPublisherTest : TestBase() { @Test fun testCancellationIsNotReported() { expect(1) - flow<Int> { + flow { emit(2) }.asPublisher().subscribe(object : Subscriber<Int> { private lateinit var subscription: Subscription override fun onComplete() { - expect(3) + expectUnreached() } override fun onSubscribe(s: Subscription?) { @@ -73,7 +74,7 @@ class FlowAsPublisherTest : TestBase() { expectUnreached() } }) - finish(4) + finish(3) } @Test @@ -149,4 +150,19 @@ class FlowAsPublisherTest : TestBase() { } finish(5) } + + @Test + fun testFlowWithTimeout() = runTest { + val publisher = flow<Int> { + expect(2) + withTimeout(1) { delay(Long.MAX_VALUE) } + }.asPublisher() + try { + expect(1) + publisher.awaitFirstOrNull() + } catch (e: CancellationException) { + expect(3) + } + finish(4) + } } diff --git a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt index 18cd012d..efe7ec7e 100644 --- a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt @@ -5,10 +5,14 @@ package kotlinx.coroutines.reactive import kotlinx.coroutines.* +import kotlinx.coroutines.exceptions.* import org.junit.Test import org.junit.runner.* import org.junit.runners.* import org.reactivestreams.* +import java.lang.IllegalStateException +import java.lang.RuntimeException +import kotlin.contracts.* import kotlin.coroutines.* import kotlin.test.* @@ -80,30 +84,6 @@ class IntegrationTest( } @Test - fun testNumbers() = runBlocking<Unit> { - val n = 100 * stressTestMultiplier - val pub = publish(ctx(coroutineContext)) { - for (i in 1..n) { - send(i) - if (delay) delay(1) - } - } - assertEquals(1, pub.awaitFirst()) - assertEquals(1, pub.awaitFirstOrDefault(0)) - assertEquals(1, pub.awaitFirstOrNull()) - assertEquals(1, pub.awaitFirstOrElse { 0 }) - assertEquals(n, pub.awaitLast()) - assertFailsWith<IllegalArgumentException> { pub.awaitSingle() } - assertFailsWith<IllegalArgumentException> { pub.awaitSingleOrDefault(0) } - assertFailsWith<IllegalArgumentException> { pub.awaitSingleOrNull() } - assertFailsWith<IllegalArgumentException> { pub.awaitSingleOrElse { 0 } } - checkNumbers(n, pub) - val channel = pub.openSubscription() - checkNumbers(n, channel.asPublisher(ctx(coroutineContext))) - channel.cancel() - } - - @Test fun testCancelWithoutValue() = runTest { val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) { publish<String> { @@ -116,7 +96,7 @@ class IntegrationTest( } @Test - fun testEmptySingle() = runTest(unhandled = listOf({e -> e is NoSuchElementException})) { + fun testEmptySingle() = runTest(unhandled = listOf { e -> e is NoSuchElementException }) { expect(1) val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) { publish<String> { @@ -130,12 +110,145 @@ class IntegrationTest( finish(3) } - private suspend fun checkNumbers(n: Int, pub: Publisher<Int>) { - var last = 0 - pub.collect { - assertEquals(++last, it) + /** + * Test that the continuation is not being resumed after it has already failed due to there having been too many + * values passed. + */ + @Test + fun testNotCompletingFailedAwait() = runTest { + try { + expect(1) + Publisher<Int> { sub -> + sub.onSubscribe(object: Subscription { + override fun request(n: Long) { + expect(2) + sub.onNext(1) + sub.onNext(2) + expect(4) + sub.onComplete() + } + + override fun cancel() { + expect(3) + } + }) + }.awaitSingle() + } catch (e: java.lang.IllegalArgumentException) { + expect(5) } - assertEquals(n, last) + finish(6) } + /** + * Test the behavior of [awaitOne] on unconforming publishers. + */ + @Test + fun testAwaitOnNonconformingPublishers() = runTest { + fun <T> publisher(block: Subscriber<in T>.(n: Long) -> Unit) = + Publisher<T> { subscriber -> + subscriber.onSubscribe(object: Subscription { + override fun request(n: Long) { + subscriber.block(n) + } + + override fun cancel() { + } + }) + } + val dummyMessage = "dummy" + val dummyThrowable = RuntimeException(dummyMessage) + suspend fun <T> assertDetectsBadPublisher( + operation: suspend Publisher<T>.() -> T, + message: String, + block: Subscriber<in T>.(n: Long) -> Unit, + ) { + assertCallsExceptionHandlerWith<IllegalStateException> { + try { + publisher(block).operation() + } catch (e: Throwable) { + if (e.message != dummyMessage) + throw e + } + }.let { + assertTrue("Expected the message to contain '$message', got '${it.message}'") { + it.message?.contains(message) ?: false + } + } + } + + // Rule 1.1 broken: the publisher produces more values than requested. + assertDetectsBadPublisher<Int>({ awaitFirst() }, "provided more") { + onNext(1) + onNext(2) + onComplete() + } + + // Rule 1.7 broken: the publisher calls a method on a subscriber after reaching the terminal state. + assertDetectsBadPublisher<Int>({ awaitSingle() }, "terminal state") { + onNext(1) + onError(dummyThrowable) + onComplete() + } + assertDetectsBadPublisher<Int>({ awaitSingleOrDefault(2) }, "terminal state") { + onComplete() + onError(dummyThrowable) + } + assertDetectsBadPublisher<Int>({ awaitFirst() }, "terminal state") { + onNext(0) + onComplete() + onComplete() + } + assertDetectsBadPublisher<Int>({ awaitFirstOrDefault(1) }, "terminal state") { + onComplete() + onNext(3) + } + assertDetectsBadPublisher<Int>({ awaitSingle() }, "terminal state") { + onError(dummyThrowable) + onNext(3) + } + + // Rule 1.9 broken (the first signal to the subscriber was not 'onSubscribe') + assertCallsExceptionHandlerWith<IllegalStateException> { + try { + Publisher<Int> { subscriber -> + subscriber.onNext(3) + subscriber.onComplete() + }.awaitFirst() + } catch (e: NoSuchElementException) { + // intentionally blank + } + }.let { assertTrue(it.message?.contains("onSubscribe") ?: false) } + } + + @Test + fun testPublishWithTimeout() = runTest { + val publisher = publish<Int> { + expect(2) + withTimeout(1) { delay(100) } + } + try { + expect(1) + publisher.awaitFirstOrNull() + } catch (e: CancellationException) { + expect(3) + } + finish(4) + } + +} + +@OptIn(ExperimentalContracts::class) +internal suspend inline fun <reified E: Throwable> assertCallsExceptionHandlerWith( + crossinline operation: suspend (CoroutineExceptionHandler) -> Unit): E { + contract { + callsInPlace(operation, InvocationKind.EXACTLY_ONCE) + } + val handler = CapturingHandler() + return withContext(handler) { + operation(handler) + handler.getException().let { + assertTrue(it is E, it.toString()) + it + } + } }
\ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactive/test/IterableFlowTckTest.kt b/reactive/kotlinx-coroutines-reactive/test/IterableFlowTckTest.kt index 906b2579..cf935f97 100644 --- a/reactive/kotlinx-coroutines-reactive/test/IterableFlowTckTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/IterableFlowTckTest.kt @@ -7,18 +7,12 @@ package kotlinx.coroutines.reactive import kotlinx.coroutines.flow.* -import org.junit.* import org.junit.Ignore import org.junit.Test import org.reactivestreams.* import org.reactivestreams.tck.* - -import org.reactivestreams.Subscription -import org.reactivestreams.Subscriber -import java.util.ArrayList import java.util.concurrent.* -import java.util.concurrent.CountDownLatch -import java.util.concurrent.ForkJoinPool.commonPool +import java.util.concurrent.ForkJoinPool.* import kotlin.test.* class IterableFlowTckTest : PublisherVerification<Long>(TestEnvironment()) { @@ -97,7 +91,7 @@ class IterableFlowTckTest : PublisherVerification<Long>(TestEnvironment()) { override fun onSubscribe(s: Subscription) { this.s = s - for (i in 0 until n) { + for (i in 0..n) { commonPool().execute { s.request(1) } } } @@ -115,7 +109,7 @@ class IterableFlowTckTest : PublisherVerification<Long>(TestEnvironment()) { } }) - latch.await(50, TimeUnit.SECONDS) + latch.await() assertEquals(array.toList(), collected) } diff --git a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt index 9e3c07b6..095b724d 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt @@ -5,6 +5,7 @@ package kotlinx.coroutines.reactive import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* import org.junit.Test import org.reactivestreams.* import kotlin.test.* @@ -121,44 +122,110 @@ class PublishTest : TestBase() { finish(7) } + /** Tests that, as soon as `ProducerScope.close` is called, `isClosedForSend` starts returning `true`. */ @Test - fun testOnNextError() = runTest { + fun testChannelClosing() = runTest { expect(1) - val publisher = publish(currentDispatcher()) { + val publisher = publish<Int>(Dispatchers.Unconfined) { + expect(3) + close() + assert(isClosedForSend) expect(4) - try { - send("OK") - } catch(e: Throwable) { - expect(6) - assert(e is TestException) - } } - expect(2) + try { + expect(2) + publisher.awaitFirstOrNull() + } catch (e: CancellationException) { + expect(5) + } + finish(6) + } + + @Test + fun testOnNextError() = runTest { val latch = CompletableDeferred<Unit>() - publisher.subscribe(object : Subscriber<String> { - override fun onComplete() { - expectUnreached() + expect(1) + assertCallsExceptionHandlerWith<TestException> { exceptionHandler -> + val publisher = publish(currentDispatcher() + exceptionHandler) { + expect(4) + try { + send("OK") + } catch (e: Throwable) { + expect(6) + assert(e is TestException) + assert(isClosedForSend) + latch.complete(Unit) + } } + expect(2) + publisher.subscribe(object : Subscriber<String> { + override fun onComplete() { + expectUnreached() + } - override fun onSubscribe(s: Subscription) { - expect(3) - s.request(1) - } + override fun onSubscribe(s: Subscription) { + expect(3) + s.request(1) + } - override fun onNext(t: String) { - expect(5) - assertEquals("OK", t) - throw TestException() - } + override fun onNext(t: String) { + expect(5) + assertEquals("OK", t) + throw TestException() + } - override fun onError(t: Throwable) { - expect(7) - assert(t is TestException) - latch.complete(Unit) + override fun onError(t: Throwable) { + expectUnreached() + } + }) + latch.await() + } + finish(7) + } + + /** Tests the behavior when a call to `onNext` fails after the channel is already closed. */ + @Test + fun testOnNextErrorAfterCancellation() = runTest { + assertCallsExceptionHandlerWith<TestException> { handler -> + var producerScope: ProducerScope<Int>? = null + CompletableDeferred<Unit>() + expect(1) + var job: Job? = null + val publisher = publish<Int>(handler + Dispatchers.Unconfined) { + producerScope = this + expect(4) + job = launch { + delay(Long.MAX_VALUE) + } } - }) - latch.await() - finish(8) + expect(2) + publisher.subscribe(object: Subscriber<Int> { + override fun onSubscribe(s: Subscription) { + expect(3) + s.request(Long.MAX_VALUE) + } + override fun onNext(t: Int) { + expect(6) + assertEquals(1, t) + job!!.cancel() + throw TestException() + } + override fun onError(t: Throwable?) { + /* Correct changes to the implementation could lead to us entering or not entering this method, but + it only matters that if we do, it is the "correct" exception that was validly used to cancel the + coroutine that gets passed here and not `TestException`. */ + assertTrue(t is CancellationException) + } + override fun onComplete() { expectUnreached() } + }) + expect(5) + val result: ChannelResult<Unit> = producerScope!!.trySend(1) + val e = result.exceptionOrNull()!! + assertTrue(e is CancellationException, "The actual error: $e") + assertTrue(producerScope!!.isClosedForSend) + assertTrue(result.isFailure) + } + finish(7) } @Test @@ -182,4 +249,39 @@ class PublishTest : TestBase() { fun testIllegalArgumentException() { assertFailsWith<IllegalArgumentException> { publish<Int>(Job()) { } } } + + /** Tests that `trySend` doesn't throw in `publish`. */ + @Test + fun testTrySendNotThrowing() = runTest { + var producerScope: ProducerScope<Int>? = null + expect(1) + val publisher = publish<Int>(Dispatchers.Unconfined) { + producerScope = this + expect(3) + delay(Long.MAX_VALUE) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + expect(2) + publisher.awaitFirstOrNull() + expectUnreached() + } + job.cancel() + expect(4) + val result = producerScope!!.trySend(1) + assertTrue(result.isFailure) + finish(5) + } + + /** Tests that all methods on `publish` fail without closing the channel when attempting to emit `null`. */ + @Test + fun testEmittingNull() = runTest { + val publisher = publish { + assertFailsWith<NullPointerException> { send(null) } + assertFailsWith<NullPointerException> { trySend(null) } + @Suppress("DEPRECATION") + assertFailsWith<NullPointerException> { offer(null) } + send("OK") + } + assertEquals("OK", publisher.awaitFirstOrNull()) + } }
\ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt index 04833e98..7a0e0fac 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.reactive @@ -71,7 +71,7 @@ class PublisherAsFlowTest : TestBase() { send(it + 1) expect(it + 1) } - assertFalse { offer(-1) } + assertFalse { trySend(-1).isSuccess } } publisher.asFlow().collect { @@ -263,4 +263,14 @@ class PublisherAsFlowTest : TestBase() { } assertEquals(expected, list) } + + @Test + fun testException() = runTest { + expect(1) + val p = publish<Int> { throw TestException() }.asFlow() + p.catch { + assertTrue { it is TestException } + finish(2) + }.collect() + } } diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherCollectTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherCollectTest.kt new file mode 100644 index 00000000..e4753f04 --- /dev/null +++ b/reactive/kotlinx-coroutines-reactive/test/PublisherCollectTest.kt @@ -0,0 +1,144 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.reactive + +import kotlinx.coroutines.* +import org.junit.Test +import org.reactivestreams.* +import kotlin.test.* + +class PublisherCollectTest: TestBase() { + + /** Tests the simple scenario where the publisher outputs a bounded stream of values to collect. */ + @Test + fun testCollect() = runTest { + val x = 100 + val xSum = x * (x + 1) / 2 + val publisher = Publisher<Int> { subscriber -> + var requested = 0L + var lastOutput = 0 + subscriber.onSubscribe(object: Subscription { + + override fun request(n: Long) { + requested += n + if (n <= 0) { + subscriber.onError(IllegalArgumentException()) + return + } + while (lastOutput < x && lastOutput < requested) { + lastOutput += 1 + subscriber.onNext(lastOutput) + } + if (lastOutput == x) + subscriber.onComplete() + } + + override fun cancel() { + /** According to rule 3.5 of the + * [reactive spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#3.5), + * this method can be called by the subscriber at any point, so it's not an error if it's called + * in this scenario. */ + } + + }) + } + var sum = 0 + publisher.collect { + sum += it + } + assertEquals(xSum, sum) + } + + /** Tests the behavior of [collect] when the publisher raises an error. */ + @Test + fun testCollectThrowingPublisher() = runTest { + val errorString = "Too many elements requested" + val x = 100 + val xSum = x * (x + 1) / 2 + val publisher = Publisher<Int> { subscriber -> + var requested = 0L + var lastOutput = 0 + subscriber.onSubscribe(object: Subscription { + + override fun request(n: Long) { + requested += n + if (n <= 0) { + subscriber.onError(IllegalArgumentException()) + return + } + while (lastOutput < x && lastOutput < requested) { + lastOutput += 1 + subscriber.onNext(lastOutput) + } + if (lastOutput == x) + subscriber.onError(IllegalArgumentException(errorString)) + } + + override fun cancel() { + /** See the comment for the corresponding part of [testCollect]. */ + } + + }) + } + var sum = 0 + try { + publisher.collect { + sum += it + } + } catch (e: IllegalArgumentException) { + assertEquals(errorString, e.message) + } + assertEquals(xSum, sum) + } + + /** Tests the behavior of [collect] when the action throws. */ + @Test + fun testCollectThrowingAction() = runTest { + val errorString = "Too many elements produced" + val x = 100 + val xSum = x * (x + 1) / 2 + val publisher = Publisher<Int> { subscriber -> + var requested = 0L + var lastOutput = 0 + subscriber.onSubscribe(object: Subscription { + + override fun request(n: Long) { + requested += n + if (n <= 0) { + subscriber.onError(IllegalArgumentException()) + return + } + while (lastOutput < x && lastOutput < requested) { + lastOutput += 1 + subscriber.onNext(lastOutput) + } + } + + override fun cancel() { + assertEquals(x, lastOutput) + expect(x + 2) + } + + }) + } + var sum = 0 + try { + expect(1) + var i = 1 + publisher.collect { + sum += it + i += 1 + expect(i) + if (sum >= xSum) { + throw IllegalArgumentException(errorString) + } + } + } catch (e: IllegalArgumentException) { + expect(x + 3) + assertEquals(errorString, e.message) + } + finish(x + 4) + } +}
\ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt index 736a6640..a19ce2f4 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt @@ -29,13 +29,14 @@ import kotlin.random.* */ @Suppress("ReactiveStreamsSubscriberImplementation") class PublisherRequestStressTest : TestBase() { + private val testDurationSec = 3 * stressTestMultiplier // Original code in Amazon SDK uses 4 and 16 as low/high watermarks. - // There constants were chosen so that problem reproduces asap with particular this code. + // These constants were chosen so that problem reproduces asap with particular this code. private val minDemand = 8L private val maxDemand = 16L - + private val nEmitThreads = 4 private val emitThreadNo = AtomicInteger() @@ -47,7 +48,7 @@ class PublisherRequestStressTest : TestBase() { private val reqPool = Executors.newSingleThreadExecutor { r -> Thread(r, "PublisherRequestStressTest-req") } - + private val nextValue = AtomicLong(0) @After @@ -64,7 +65,6 @@ class PublisherRequestStressTest : TestBase() { fun testRequestStress() { val expectedValue = AtomicLong(0) val requestedTill = AtomicLong(0) - val completionLatch = CountDownLatch(1) val callingOnNext = AtomicInteger() val publisher = mtFlow().asPublisher() @@ -74,7 +74,7 @@ class PublisherRequestStressTest : TestBase() { private var demand = 0L // only updated from reqPool override fun onComplete() { - completionLatch.countDown() + // Typically unreached, but, rarely, `emitPool` may shut down before the cancellation is performed. } override fun onSubscribe(sub: Subscription) { @@ -123,7 +123,9 @@ class PublisherRequestStressTest : TestBase() { } if (!error) { subscription.cancel() - completionLatch.await() + runBlocking { + (subscription as AbstractCoroutine<*>).join() + } } } diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherSubscriptionSelectTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherSubscriptionSelectTest.kt index 110718ac..740fd86a 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublisherSubscriptionSelectTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublisherSubscriptionSelectTest.kt @@ -1,10 +1,11 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.reactive import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* import kotlinx.coroutines.selects.* import org.junit.Test import org.junit.runner.* @@ -27,27 +28,27 @@ class PublisherSubscriptionSelectTest(private val request: Int) : TestBase() { var a = 0 var b = 0 // open two subs - val channelA = source.openSubscription(request) - val channelB = source.openSubscription(request) + val channelA = source.toChannel(request) + val channelB = source.toChannel(request) loop@ while (true) { val done: Int = select { - channelA.onReceiveOrNull { - if (it != null) assertEquals(a++, it) - if (it == null) 0 else 1 + channelA.onReceiveCatching { result -> + result.onSuccess { assertEquals(a++, it) } + if (result.isSuccess) 1 else 0 } - channelB.onReceiveOrNull { - if (it != null) assertEquals(b++, it) - if (it == null) 0 else 2 + channelB.onReceiveCatching { result -> + result.onSuccess { assertEquals(b++, it) } + if (result.isSuccess) 2 else 0 } } when (done) { 0 -> break@loop 1 -> { - val r = channelB.receiveOrNull() + val r = channelB.receiveCatching().getOrNull() if (r != null) assertEquals(b++, r) } 2 -> { - val r = channelA.receiveOrNull() + val r = channelA.receiveCatching().getOrNull() if (r != null) assertEquals(a++, r) } } @@ -58,4 +59,4 @@ class PublisherSubscriptionSelectTest(private val request: Int) : TestBase() { // should receive one of them fully assertTrue(a == n || b == n) } -}
\ No newline at end of file +} |