aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-reactive/test
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-reactive/test')
-rw-r--r--reactive/kotlinx-coroutines-reactive/test/AwaitTest.kt43
-rw-r--r--reactive/kotlinx-coroutines-reactive/test/CancelledParentAttachTest.kt21
-rw-r--r--reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt24
-rw-r--r--reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt173
-rw-r--r--reactive/kotlinx-coroutines-reactive/test/IterableFlowTckTest.kt12
-rw-r--r--reactive/kotlinx-coroutines-reactive/test/PublishTest.kt158
-rw-r--r--reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt14
-rw-r--r--reactive/kotlinx-coroutines-reactive/test/PublisherCollectTest.kt144
-rw-r--r--reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt14
-rw-r--r--reactive/kotlinx-coroutines-reactive/test/PublisherSubscriptionSelectTest.kt25
10 files changed, 537 insertions, 91 deletions
diff --git a/reactive/kotlinx-coroutines-reactive/test/AwaitTest.kt b/reactive/kotlinx-coroutines-reactive/test/AwaitTest.kt
new file mode 100644
index 00000000..6749423f
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactive/test/AwaitTest.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.reactive
+
+import kotlinx.coroutines.*
+import org.junit.*
+import org.reactivestreams.*
+
+class AwaitTest: TestBase() {
+
+ /** Tests that calls to [awaitFirst] (and, thus, to the rest of these functions) throw [CancellationException] and
+ * unsubscribe from the publisher when their [Job] is cancelled. */
+ @Test
+ fun testAwaitCancellation() = runTest {
+ expect(1)
+ val publisher = Publisher<Int> { s ->
+ s.onSubscribe(object: Subscription {
+ override fun request(n: Long) {
+ expect(3)
+ }
+
+ override fun cancel() {
+ expect(5)
+ }
+ })
+ }
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ try {
+ expect(2)
+ publisher.awaitFirst()
+ } catch (e: CancellationException) {
+ expect(6)
+ throw e
+ }
+ }
+ expect(4)
+ job.cancelAndJoin()
+ finish(7)
+ }
+
+} \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactive/test/CancelledParentAttachTest.kt b/reactive/kotlinx-coroutines-reactive/test/CancelledParentAttachTest.kt
new file mode 100644
index 00000000..1db10b27
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactive/test/CancelledParentAttachTest.kt
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.reactive
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import org.junit.*
+
+
+class CancelledParentAttachTest : TestBase() {;
+
+ @Test
+ fun testFlow() = runTest {
+ val f = flowOf(1, 2, 3).cancellable()
+ val j = Job().also { it.cancel() }
+ f.asPublisher(j).asFlow().collect()
+ }
+
+}
diff --git a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt
index e7b8cb17..02c9e242 100644
--- a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt
@@ -5,6 +5,7 @@
package kotlinx.coroutines.reactive
import kotlinx.coroutines.*
+import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.flow.*
import org.junit.Test
import org.reactivestreams.*
@@ -15,7 +16,7 @@ class FlowAsPublisherTest : TestBase() {
@Test
fun testErrorOnCancellationIsReported() {
expect(1)
- flow<Int> {
+ flow {
try {
emit(2)
} finally {
@@ -50,13 +51,13 @@ class FlowAsPublisherTest : TestBase() {
@Test
fun testCancellationIsNotReported() {
expect(1)
- flow<Int> {
+ flow {
emit(2)
}.asPublisher().subscribe(object : Subscriber<Int> {
private lateinit var subscription: Subscription
override fun onComplete() {
- expect(3)
+ expectUnreached()
}
override fun onSubscribe(s: Subscription?) {
@@ -73,7 +74,7 @@ class FlowAsPublisherTest : TestBase() {
expectUnreached()
}
})
- finish(4)
+ finish(3)
}
@Test
@@ -149,4 +150,19 @@ class FlowAsPublisherTest : TestBase() {
}
finish(5)
}
+
+ @Test
+ fun testFlowWithTimeout() = runTest {
+ val publisher = flow<Int> {
+ expect(2)
+ withTimeout(1) { delay(Long.MAX_VALUE) }
+ }.asPublisher()
+ try {
+ expect(1)
+ publisher.awaitFirstOrNull()
+ } catch (e: CancellationException) {
+ expect(3)
+ }
+ finish(4)
+ }
}
diff --git a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt
index 18cd012d..efe7ec7e 100644
--- a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt
@@ -5,10 +5,14 @@
package kotlinx.coroutines.reactive
import kotlinx.coroutines.*
+import kotlinx.coroutines.exceptions.*
import org.junit.Test
import org.junit.runner.*
import org.junit.runners.*
import org.reactivestreams.*
+import java.lang.IllegalStateException
+import java.lang.RuntimeException
+import kotlin.contracts.*
import kotlin.coroutines.*
import kotlin.test.*
@@ -80,30 +84,6 @@ class IntegrationTest(
}
@Test
- fun testNumbers() = runBlocking<Unit> {
- val n = 100 * stressTestMultiplier
- val pub = publish(ctx(coroutineContext)) {
- for (i in 1..n) {
- send(i)
- if (delay) delay(1)
- }
- }
- assertEquals(1, pub.awaitFirst())
- assertEquals(1, pub.awaitFirstOrDefault(0))
- assertEquals(1, pub.awaitFirstOrNull())
- assertEquals(1, pub.awaitFirstOrElse { 0 })
- assertEquals(n, pub.awaitLast())
- assertFailsWith<IllegalArgumentException> { pub.awaitSingle() }
- assertFailsWith<IllegalArgumentException> { pub.awaitSingleOrDefault(0) }
- assertFailsWith<IllegalArgumentException> { pub.awaitSingleOrNull() }
- assertFailsWith<IllegalArgumentException> { pub.awaitSingleOrElse { 0 } }
- checkNumbers(n, pub)
- val channel = pub.openSubscription()
- checkNumbers(n, channel.asPublisher(ctx(coroutineContext)))
- channel.cancel()
- }
-
- @Test
fun testCancelWithoutValue() = runTest {
val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
publish<String> {
@@ -116,7 +96,7 @@ class IntegrationTest(
}
@Test
- fun testEmptySingle() = runTest(unhandled = listOf({e -> e is NoSuchElementException})) {
+ fun testEmptySingle() = runTest(unhandled = listOf { e -> e is NoSuchElementException }) {
expect(1)
val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
publish<String> {
@@ -130,12 +110,145 @@ class IntegrationTest(
finish(3)
}
- private suspend fun checkNumbers(n: Int, pub: Publisher<Int>) {
- var last = 0
- pub.collect {
- assertEquals(++last, it)
+ /**
+ * Test that the continuation is not being resumed after it has already failed due to there having been too many
+ * values passed.
+ */
+ @Test
+ fun testNotCompletingFailedAwait() = runTest {
+ try {
+ expect(1)
+ Publisher<Int> { sub ->
+ sub.onSubscribe(object: Subscription {
+ override fun request(n: Long) {
+ expect(2)
+ sub.onNext(1)
+ sub.onNext(2)
+ expect(4)
+ sub.onComplete()
+ }
+
+ override fun cancel() {
+ expect(3)
+ }
+ })
+ }.awaitSingle()
+ } catch (e: java.lang.IllegalArgumentException) {
+ expect(5)
}
- assertEquals(n, last)
+ finish(6)
}
+ /**
+ * Test the behavior of [awaitOne] on unconforming publishers.
+ */
+ @Test
+ fun testAwaitOnNonconformingPublishers() = runTest {
+ fun <T> publisher(block: Subscriber<in T>.(n: Long) -> Unit) =
+ Publisher<T> { subscriber ->
+ subscriber.onSubscribe(object: Subscription {
+ override fun request(n: Long) {
+ subscriber.block(n)
+ }
+
+ override fun cancel() {
+ }
+ })
+ }
+ val dummyMessage = "dummy"
+ val dummyThrowable = RuntimeException(dummyMessage)
+ suspend fun <T> assertDetectsBadPublisher(
+ operation: suspend Publisher<T>.() -> T,
+ message: String,
+ block: Subscriber<in T>.(n: Long) -> Unit,
+ ) {
+ assertCallsExceptionHandlerWith<IllegalStateException> {
+ try {
+ publisher(block).operation()
+ } catch (e: Throwable) {
+ if (e.message != dummyMessage)
+ throw e
+ }
+ }.let {
+ assertTrue("Expected the message to contain '$message', got '${it.message}'") {
+ it.message?.contains(message) ?: false
+ }
+ }
+ }
+
+ // Rule 1.1 broken: the publisher produces more values than requested.
+ assertDetectsBadPublisher<Int>({ awaitFirst() }, "provided more") {
+ onNext(1)
+ onNext(2)
+ onComplete()
+ }
+
+ // Rule 1.7 broken: the publisher calls a method on a subscriber after reaching the terminal state.
+ assertDetectsBadPublisher<Int>({ awaitSingle() }, "terminal state") {
+ onNext(1)
+ onError(dummyThrowable)
+ onComplete()
+ }
+ assertDetectsBadPublisher<Int>({ awaitSingleOrDefault(2) }, "terminal state") {
+ onComplete()
+ onError(dummyThrowable)
+ }
+ assertDetectsBadPublisher<Int>({ awaitFirst() }, "terminal state") {
+ onNext(0)
+ onComplete()
+ onComplete()
+ }
+ assertDetectsBadPublisher<Int>({ awaitFirstOrDefault(1) }, "terminal state") {
+ onComplete()
+ onNext(3)
+ }
+ assertDetectsBadPublisher<Int>({ awaitSingle() }, "terminal state") {
+ onError(dummyThrowable)
+ onNext(3)
+ }
+
+ // Rule 1.9 broken (the first signal to the subscriber was not 'onSubscribe')
+ assertCallsExceptionHandlerWith<IllegalStateException> {
+ try {
+ Publisher<Int> { subscriber ->
+ subscriber.onNext(3)
+ subscriber.onComplete()
+ }.awaitFirst()
+ } catch (e: NoSuchElementException) {
+ // intentionally blank
+ }
+ }.let { assertTrue(it.message?.contains("onSubscribe") ?: false) }
+ }
+
+ @Test
+ fun testPublishWithTimeout() = runTest {
+ val publisher = publish<Int> {
+ expect(2)
+ withTimeout(1) { delay(100) }
+ }
+ try {
+ expect(1)
+ publisher.awaitFirstOrNull()
+ } catch (e: CancellationException) {
+ expect(3)
+ }
+ finish(4)
+ }
+
+}
+
+@OptIn(ExperimentalContracts::class)
+internal suspend inline fun <reified E: Throwable> assertCallsExceptionHandlerWith(
+ crossinline operation: suspend (CoroutineExceptionHandler) -> Unit): E {
+ contract {
+ callsInPlace(operation, InvocationKind.EXACTLY_ONCE)
+ }
+ val handler = CapturingHandler()
+ return withContext(handler) {
+ operation(handler)
+ handler.getException().let {
+ assertTrue(it is E, it.toString())
+ it
+ }
+ }
} \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactive/test/IterableFlowTckTest.kt b/reactive/kotlinx-coroutines-reactive/test/IterableFlowTckTest.kt
index 906b2579..cf935f97 100644
--- a/reactive/kotlinx-coroutines-reactive/test/IterableFlowTckTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/IterableFlowTckTest.kt
@@ -7,18 +7,12 @@
package kotlinx.coroutines.reactive
import kotlinx.coroutines.flow.*
-import org.junit.*
import org.junit.Ignore
import org.junit.Test
import org.reactivestreams.*
import org.reactivestreams.tck.*
-
-import org.reactivestreams.Subscription
-import org.reactivestreams.Subscriber
-import java.util.ArrayList
import java.util.concurrent.*
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.ForkJoinPool.commonPool
+import java.util.concurrent.ForkJoinPool.*
import kotlin.test.*
class IterableFlowTckTest : PublisherVerification<Long>(TestEnvironment()) {
@@ -97,7 +91,7 @@ class IterableFlowTckTest : PublisherVerification<Long>(TestEnvironment()) {
override fun onSubscribe(s: Subscription) {
this.s = s
- for (i in 0 until n) {
+ for (i in 0..n) {
commonPool().execute { s.request(1) }
}
}
@@ -115,7 +109,7 @@ class IterableFlowTckTest : PublisherVerification<Long>(TestEnvironment()) {
}
})
- latch.await(50, TimeUnit.SECONDS)
+ latch.await()
assertEquals(array.toList(), collected)
}
diff --git a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt
index 9e3c07b6..095b724d 100644
--- a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt
@@ -5,6 +5,7 @@
package kotlinx.coroutines.reactive
import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
import org.junit.Test
import org.reactivestreams.*
import kotlin.test.*
@@ -121,44 +122,110 @@ class PublishTest : TestBase() {
finish(7)
}
+ /** Tests that, as soon as `ProducerScope.close` is called, `isClosedForSend` starts returning `true`. */
@Test
- fun testOnNextError() = runTest {
+ fun testChannelClosing() = runTest {
expect(1)
- val publisher = publish(currentDispatcher()) {
+ val publisher = publish<Int>(Dispatchers.Unconfined) {
+ expect(3)
+ close()
+ assert(isClosedForSend)
expect(4)
- try {
- send("OK")
- } catch(e: Throwable) {
- expect(6)
- assert(e is TestException)
- }
}
- expect(2)
+ try {
+ expect(2)
+ publisher.awaitFirstOrNull()
+ } catch (e: CancellationException) {
+ expect(5)
+ }
+ finish(6)
+ }
+
+ @Test
+ fun testOnNextError() = runTest {
val latch = CompletableDeferred<Unit>()
- publisher.subscribe(object : Subscriber<String> {
- override fun onComplete() {
- expectUnreached()
+ expect(1)
+ assertCallsExceptionHandlerWith<TestException> { exceptionHandler ->
+ val publisher = publish(currentDispatcher() + exceptionHandler) {
+ expect(4)
+ try {
+ send("OK")
+ } catch (e: Throwable) {
+ expect(6)
+ assert(e is TestException)
+ assert(isClosedForSend)
+ latch.complete(Unit)
+ }
}
+ expect(2)
+ publisher.subscribe(object : Subscriber<String> {
+ override fun onComplete() {
+ expectUnreached()
+ }
- override fun onSubscribe(s: Subscription) {
- expect(3)
- s.request(1)
- }
+ override fun onSubscribe(s: Subscription) {
+ expect(3)
+ s.request(1)
+ }
- override fun onNext(t: String) {
- expect(5)
- assertEquals("OK", t)
- throw TestException()
- }
+ override fun onNext(t: String) {
+ expect(5)
+ assertEquals("OK", t)
+ throw TestException()
+ }
- override fun onError(t: Throwable) {
- expect(7)
- assert(t is TestException)
- latch.complete(Unit)
+ override fun onError(t: Throwable) {
+ expectUnreached()
+ }
+ })
+ latch.await()
+ }
+ finish(7)
+ }
+
+ /** Tests the behavior when a call to `onNext` fails after the channel is already closed. */
+ @Test
+ fun testOnNextErrorAfterCancellation() = runTest {
+ assertCallsExceptionHandlerWith<TestException> { handler ->
+ var producerScope: ProducerScope<Int>? = null
+ CompletableDeferred<Unit>()
+ expect(1)
+ var job: Job? = null
+ val publisher = publish<Int>(handler + Dispatchers.Unconfined) {
+ producerScope = this
+ expect(4)
+ job = launch {
+ delay(Long.MAX_VALUE)
+ }
}
- })
- latch.await()
- finish(8)
+ expect(2)
+ publisher.subscribe(object: Subscriber<Int> {
+ override fun onSubscribe(s: Subscription) {
+ expect(3)
+ s.request(Long.MAX_VALUE)
+ }
+ override fun onNext(t: Int) {
+ expect(6)
+ assertEquals(1, t)
+ job!!.cancel()
+ throw TestException()
+ }
+ override fun onError(t: Throwable?) {
+ /* Correct changes to the implementation could lead to us entering or not entering this method, but
+ it only matters that if we do, it is the "correct" exception that was validly used to cancel the
+ coroutine that gets passed here and not `TestException`. */
+ assertTrue(t is CancellationException)
+ }
+ override fun onComplete() { expectUnreached() }
+ })
+ expect(5)
+ val result: ChannelResult<Unit> = producerScope!!.trySend(1)
+ val e = result.exceptionOrNull()!!
+ assertTrue(e is CancellationException, "The actual error: $e")
+ assertTrue(producerScope!!.isClosedForSend)
+ assertTrue(result.isFailure)
+ }
+ finish(7)
}
@Test
@@ -182,4 +249,39 @@ class PublishTest : TestBase() {
fun testIllegalArgumentException() {
assertFailsWith<IllegalArgumentException> { publish<Int>(Job()) { } }
}
+
+ /** Tests that `trySend` doesn't throw in `publish`. */
+ @Test
+ fun testTrySendNotThrowing() = runTest {
+ var producerScope: ProducerScope<Int>? = null
+ expect(1)
+ val publisher = publish<Int>(Dispatchers.Unconfined) {
+ producerScope = this
+ expect(3)
+ delay(Long.MAX_VALUE)
+ }
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ publisher.awaitFirstOrNull()
+ expectUnreached()
+ }
+ job.cancel()
+ expect(4)
+ val result = producerScope!!.trySend(1)
+ assertTrue(result.isFailure)
+ finish(5)
+ }
+
+ /** Tests that all methods on `publish` fail without closing the channel when attempting to emit `null`. */
+ @Test
+ fun testEmittingNull() = runTest {
+ val publisher = publish {
+ assertFailsWith<NullPointerException> { send(null) }
+ assertFailsWith<NullPointerException> { trySend(null) }
+ @Suppress("DEPRECATION")
+ assertFailsWith<NullPointerException> { offer(null) }
+ send("OK")
+ }
+ assertEquals("OK", publisher.awaitFirstOrNull())
+ }
} \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt
index 04833e98..7a0e0fac 100644
--- a/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.reactive
@@ -71,7 +71,7 @@ class PublisherAsFlowTest : TestBase() {
send(it + 1)
expect(it + 1)
}
- assertFalse { offer(-1) }
+ assertFalse { trySend(-1).isSuccess }
}
publisher.asFlow().collect {
@@ -263,4 +263,14 @@ class PublisherAsFlowTest : TestBase() {
}
assertEquals(expected, list)
}
+
+ @Test
+ fun testException() = runTest {
+ expect(1)
+ val p = publish<Int> { throw TestException() }.asFlow()
+ p.catch {
+ assertTrue { it is TestException }
+ finish(2)
+ }.collect()
+ }
}
diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherCollectTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherCollectTest.kt
new file mode 100644
index 00000000..e4753f04
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactive/test/PublisherCollectTest.kt
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.reactive
+
+import kotlinx.coroutines.*
+import org.junit.Test
+import org.reactivestreams.*
+import kotlin.test.*
+
+class PublisherCollectTest: TestBase() {
+
+ /** Tests the simple scenario where the publisher outputs a bounded stream of values to collect. */
+ @Test
+ fun testCollect() = runTest {
+ val x = 100
+ val xSum = x * (x + 1) / 2
+ val publisher = Publisher<Int> { subscriber ->
+ var requested = 0L
+ var lastOutput = 0
+ subscriber.onSubscribe(object: Subscription {
+
+ override fun request(n: Long) {
+ requested += n
+ if (n <= 0) {
+ subscriber.onError(IllegalArgumentException())
+ return
+ }
+ while (lastOutput < x && lastOutput < requested) {
+ lastOutput += 1
+ subscriber.onNext(lastOutput)
+ }
+ if (lastOutput == x)
+ subscriber.onComplete()
+ }
+
+ override fun cancel() {
+ /** According to rule 3.5 of the
+ * [reactive spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#3.5),
+ * this method can be called by the subscriber at any point, so it's not an error if it's called
+ * in this scenario. */
+ }
+
+ })
+ }
+ var sum = 0
+ publisher.collect {
+ sum += it
+ }
+ assertEquals(xSum, sum)
+ }
+
+ /** Tests the behavior of [collect] when the publisher raises an error. */
+ @Test
+ fun testCollectThrowingPublisher() = runTest {
+ val errorString = "Too many elements requested"
+ val x = 100
+ val xSum = x * (x + 1) / 2
+ val publisher = Publisher<Int> { subscriber ->
+ var requested = 0L
+ var lastOutput = 0
+ subscriber.onSubscribe(object: Subscription {
+
+ override fun request(n: Long) {
+ requested += n
+ if (n <= 0) {
+ subscriber.onError(IllegalArgumentException())
+ return
+ }
+ while (lastOutput < x && lastOutput < requested) {
+ lastOutput += 1
+ subscriber.onNext(lastOutput)
+ }
+ if (lastOutput == x)
+ subscriber.onError(IllegalArgumentException(errorString))
+ }
+
+ override fun cancel() {
+ /** See the comment for the corresponding part of [testCollect]. */
+ }
+
+ })
+ }
+ var sum = 0
+ try {
+ publisher.collect {
+ sum += it
+ }
+ } catch (e: IllegalArgumentException) {
+ assertEquals(errorString, e.message)
+ }
+ assertEquals(xSum, sum)
+ }
+
+ /** Tests the behavior of [collect] when the action throws. */
+ @Test
+ fun testCollectThrowingAction() = runTest {
+ val errorString = "Too many elements produced"
+ val x = 100
+ val xSum = x * (x + 1) / 2
+ val publisher = Publisher<Int> { subscriber ->
+ var requested = 0L
+ var lastOutput = 0
+ subscriber.onSubscribe(object: Subscription {
+
+ override fun request(n: Long) {
+ requested += n
+ if (n <= 0) {
+ subscriber.onError(IllegalArgumentException())
+ return
+ }
+ while (lastOutput < x && lastOutput < requested) {
+ lastOutput += 1
+ subscriber.onNext(lastOutput)
+ }
+ }
+
+ override fun cancel() {
+ assertEquals(x, lastOutput)
+ expect(x + 2)
+ }
+
+ })
+ }
+ var sum = 0
+ try {
+ expect(1)
+ var i = 1
+ publisher.collect {
+ sum += it
+ i += 1
+ expect(i)
+ if (sum >= xSum) {
+ throw IllegalArgumentException(errorString)
+ }
+ }
+ } catch (e: IllegalArgumentException) {
+ expect(x + 3)
+ assertEquals(errorString, e.message)
+ }
+ finish(x + 4)
+ }
+} \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt
index 736a6640..a19ce2f4 100644
--- a/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt
@@ -29,13 +29,14 @@ import kotlin.random.*
*/
@Suppress("ReactiveStreamsSubscriberImplementation")
class PublisherRequestStressTest : TestBase() {
+
private val testDurationSec = 3 * stressTestMultiplier
// Original code in Amazon SDK uses 4 and 16 as low/high watermarks.
- // There constants were chosen so that problem reproduces asap with particular this code.
+ // These constants were chosen so that problem reproduces asap with particular this code.
private val minDemand = 8L
private val maxDemand = 16L
-
+
private val nEmitThreads = 4
private val emitThreadNo = AtomicInteger()
@@ -47,7 +48,7 @@ class PublisherRequestStressTest : TestBase() {
private val reqPool = Executors.newSingleThreadExecutor { r ->
Thread(r, "PublisherRequestStressTest-req")
}
-
+
private val nextValue = AtomicLong(0)
@After
@@ -64,7 +65,6 @@ class PublisherRequestStressTest : TestBase() {
fun testRequestStress() {
val expectedValue = AtomicLong(0)
val requestedTill = AtomicLong(0)
- val completionLatch = CountDownLatch(1)
val callingOnNext = AtomicInteger()
val publisher = mtFlow().asPublisher()
@@ -74,7 +74,7 @@ class PublisherRequestStressTest : TestBase() {
private var demand = 0L // only updated from reqPool
override fun onComplete() {
- completionLatch.countDown()
+ // Typically unreached, but, rarely, `emitPool` may shut down before the cancellation is performed.
}
override fun onSubscribe(sub: Subscription) {
@@ -123,7 +123,9 @@ class PublisherRequestStressTest : TestBase() {
}
if (!error) {
subscription.cancel()
- completionLatch.await()
+ runBlocking {
+ (subscription as AbstractCoroutine<*>).join()
+ }
}
}
diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherSubscriptionSelectTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherSubscriptionSelectTest.kt
index 110718ac..740fd86a 100644
--- a/reactive/kotlinx-coroutines-reactive/test/PublisherSubscriptionSelectTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/PublisherSubscriptionSelectTest.kt
@@ -1,10 +1,11 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.reactive
import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
import org.junit.Test
import org.junit.runner.*
@@ -27,27 +28,27 @@ class PublisherSubscriptionSelectTest(private val request: Int) : TestBase() {
var a = 0
var b = 0
// open two subs
- val channelA = source.openSubscription(request)
- val channelB = source.openSubscription(request)
+ val channelA = source.toChannel(request)
+ val channelB = source.toChannel(request)
loop@ while (true) {
val done: Int = select {
- channelA.onReceiveOrNull {
- if (it != null) assertEquals(a++, it)
- if (it == null) 0 else 1
+ channelA.onReceiveCatching { result ->
+ result.onSuccess { assertEquals(a++, it) }
+ if (result.isSuccess) 1 else 0
}
- channelB.onReceiveOrNull {
- if (it != null) assertEquals(b++, it)
- if (it == null) 0 else 2
+ channelB.onReceiveCatching { result ->
+ result.onSuccess { assertEquals(b++, it) }
+ if (result.isSuccess) 2 else 0
}
}
when (done) {
0 -> break@loop
1 -> {
- val r = channelB.receiveOrNull()
+ val r = channelB.receiveCatching().getOrNull()
if (r != null) assertEquals(b++, r)
}
2 -> {
- val r = channelA.receiveOrNull()
+ val r = channelA.receiveCatching().getOrNull()
if (r != null) assertEquals(a++, r)
}
}
@@ -58,4 +59,4 @@ class PublisherSubscriptionSelectTest(private val request: Int) : TestBase() {
// should receive one of them fully
assertTrue(a == n || b == n)
}
-} \ No newline at end of file
+}