aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-jdk9/test
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-jdk9/test')
-rw-r--r--reactive/kotlinx-coroutines-jdk9/test/AwaitTest.kt43
-rw-r--r--reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt21
-rw-r--r--reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt20
-rw-r--r--reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt158
-rw-r--r--reactive/kotlinx-coroutines-jdk9/test/PublisherAsFlowTest.kt2
-rw-r--r--reactive/kotlinx-coroutines-jdk9/test/PublisherCollectTest.kt146
6 files changed, 357 insertions, 33 deletions
diff --git a/reactive/kotlinx-coroutines-jdk9/test/AwaitTest.kt b/reactive/kotlinx-coroutines-jdk9/test/AwaitTest.kt
new file mode 100644
index 00000000..5a95d098
--- /dev/null
+++ b/reactive/kotlinx-coroutines-jdk9/test/AwaitTest.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.jdk9
+
+import kotlinx.coroutines.*
+import org.junit.*
+import java.util.concurrent.Flow as JFlow
+
+class AwaitTest: TestBase() {
+
+ /** Tests that calls to [awaitFirst] (and, thus, to the rest of these functions) throw [CancellationException] and
+ * unsubscribe from the publisher when their [Job] is cancelled. */
+ @Test
+ fun testAwaitCancellation() = runTest {
+ expect(1)
+ val publisher = JFlow.Publisher<Int> { s ->
+ s.onSubscribe(object : JFlow.Subscription {
+ override fun request(n: Long) {
+ expect(3)
+ }
+
+ override fun cancel() {
+ expect(5)
+ }
+ })
+ }
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ try {
+ expect(2)
+ publisher.awaitFirst()
+ } catch (e: CancellationException) {
+ expect(6)
+ throw e
+ }
+ }
+ expect(4)
+ job.cancelAndJoin()
+ finish(7)
+ }
+
+} \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt b/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt
index 488695de..b860e162 100644
--- a/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt
+++ b/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt
@@ -15,7 +15,7 @@ class FlowAsPublisherTest : TestBase() {
@Test
fun testErrorOnCancellationIsReported() {
expect(1)
- flow<Int> {
+ flow {
try {
emit(2)
} finally {
@@ -50,13 +50,13 @@ class FlowAsPublisherTest : TestBase() {
@Test
fun testCancellationIsNotReported() {
expect(1)
- flow<Int> {
+ flow {
emit(2)
}.asPublisher().subscribe(object : JFlow.Subscriber<Int> {
private lateinit var subscription: JFlow.Subscription
override fun onComplete() {
- expect(3)
+ expectUnreached()
}
override fun onSubscribe(s: JFlow.Subscription?) {
@@ -73,6 +73,21 @@ class FlowAsPublisherTest : TestBase() {
expectUnreached()
}
})
+ finish(3)
+ }
+
+ @Test
+ fun testFlowWithTimeout() = runTest {
+ val publisher = flow<Int> {
+ expect(2)
+ withTimeout(1) { delay(Long.MAX_VALUE) }
+ }.asPublisher()
+ try {
+ expect(1)
+ publisher.awaitFirstOrNull()
+ } catch (e: CancellationException) {
+ expect(3)
+ }
finish(4)
}
}
diff --git a/reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt
index 5bfddfee..5b3542ad 100644
--- a/reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt
+++ b/reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt
@@ -5,10 +5,12 @@
package kotlinx.coroutines.jdk9
import kotlinx.coroutines.*
+import kotlinx.coroutines.exceptions.*
import org.junit.Test
import kotlinx.coroutines.flow.flowOn
import org.junit.runner.*
import org.junit.runners.*
+import kotlin.contracts.*
import java.util.concurrent.Flow as JFlow
import kotlin.coroutines.*
import kotlin.test.*
@@ -129,4 +131,20 @@ class IntegrationTest(
assertEquals(n, last)
}
-} \ No newline at end of file
+}
+
+@OptIn(ExperimentalContracts::class)
+internal suspend inline fun <reified E: Throwable> assertCallsExceptionHandlerWith(
+ crossinline operation: suspend (CoroutineExceptionHandler) -> Unit): E {
+ contract {
+ callsInPlace(operation, InvocationKind.EXACTLY_ONCE)
+ }
+ val handler = CapturingHandler()
+ return withContext(handler) {
+ operation(handler)
+ handler.getException().let {
+ assertTrue(it is E, it.toString())
+ it
+ }
+ }
+}
diff --git a/reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt b/reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt
index 1a36a389..3682d5e3 100644
--- a/reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt
+++ b/reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt
@@ -5,6 +5,7 @@
package kotlinx.coroutines.jdk9
import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
import org.junit.Test
import java.util.concurrent.Flow as JFlow
import kotlin.test.*
@@ -121,44 +122,110 @@ class PublishTest : TestBase() {
finish(7)
}
+ /** Tests that, as soon as `ProducerScope.close` is called, `isClosedForSend` starts returning `true`. */
@Test
- fun testOnNextError() = runTest {
+ fun testChannelClosing() = runTest {
expect(1)
- val publisher = flowPublish(currentDispatcher()) {
+ val publisher = flowPublish<Int>(Dispatchers.Unconfined) {
+ expect(3)
+ close()
+ assert(isClosedForSend)
expect(4)
- try {
- send("OK")
- } catch(e: Throwable) {
- expect(6)
- assert(e is TestException)
- }
}
- expect(2)
+ try {
+ expect(2)
+ publisher.awaitFirstOrNull()
+ } catch (e: CancellationException) {
+ expect(5)
+ }
+ finish(6)
+ }
+
+ @Test
+ fun testOnNextError() = runTest {
val latch = CompletableDeferred<Unit>()
- publisher.subscribe(object : JFlow.Subscriber<String> {
- override fun onComplete() {
- expectUnreached()
+ expect(1)
+ assertCallsExceptionHandlerWith<TestException> { exceptionHandler ->
+ val publisher = flowPublish(currentDispatcher() + exceptionHandler) {
+ expect(4)
+ try {
+ send("OK")
+ } catch (e: Throwable) {
+ expect(6)
+ assert(e is TestException)
+ assert(isClosedForSend)
+ latch.complete(Unit)
+ }
}
+ expect(2)
+ publisher.subscribe(object : JFlow.Subscriber<String> {
+ override fun onComplete() {
+ expectUnreached()
+ }
- override fun onSubscribe(s: JFlow.Subscription) {
- expect(3)
- s.request(1)
- }
+ override fun onSubscribe(s: JFlow.Subscription) {
+ expect(3)
+ s.request(1)
+ }
- override fun onNext(t: String) {
- expect(5)
- assertEquals("OK", t)
- throw TestException()
- }
+ override fun onNext(t: String) {
+ expect(5)
+ assertEquals("OK", t)
+ throw TestException()
+ }
- override fun onError(t: Throwable) {
- expect(7)
- assert(t is TestException)
- latch.complete(Unit)
+ override fun onError(t: Throwable) {
+ expectUnreached()
+ }
+ })
+ latch.await()
+ }
+ finish(7)
+ }
+
+ /** Tests the behavior when a call to `onNext` fails after the channel is already closed. */
+ @Test
+ fun testOnNextErrorAfterCancellation() = runTest {
+ assertCallsExceptionHandlerWith<TestException> { handler ->
+ var producerScope: ProducerScope<Int>? = null
+ CompletableDeferred<Unit>()
+ expect(1)
+ var job: Job? = null
+ val publisher = flowPublish<Int>(handler + Dispatchers.Unconfined) {
+ producerScope = this
+ expect(4)
+ job = launch {
+ delay(Long.MAX_VALUE)
+ }
}
- })
- latch.await()
- finish(8)
+ expect(2)
+ publisher.subscribe(object: JFlow.Subscriber<Int> {
+ override fun onSubscribe(s: JFlow.Subscription) {
+ expect(3)
+ s.request(Long.MAX_VALUE)
+ }
+ override fun onNext(t: Int) {
+ expect(6)
+ assertEquals(1, t)
+ job!!.cancel()
+ throw TestException()
+ }
+ override fun onError(t: Throwable?) {
+ /* Correct changes to the implementation could lead to us entering or not entering this method, but
+ it only matters that if we do, it is the "correct" exception that was validly used to cancel the
+ coroutine that gets passed here and not `TestException`. */
+ assertTrue(t is CancellationException)
+ }
+ override fun onComplete() { expectUnreached() }
+ })
+ expect(5)
+ val result: ChannelResult<Unit> = producerScope!!.trySend(1)
+ val e = result.exceptionOrNull()!!
+ assertTrue(e is CancellationException, "The actual error: $e")
+ assertTrue(producerScope!!.isClosedForSend)
+ assertTrue(result.isFailure)
+ }
+ finish(7)
}
@Test
@@ -182,4 +249,39 @@ class PublishTest : TestBase() {
fun testIllegalArgumentException() {
assertFailsWith<IllegalArgumentException> { flowPublish<Int>(Job()) { } }
}
+
+ /** Tests that `trySend` doesn't throw in `flowPublish`. */
+ @Test
+ fun testTrySendNotThrowing() = runTest {
+ var producerScope: ProducerScope<Int>? = null
+ expect(1)
+ val publisher = flowPublish<Int>(Dispatchers.Unconfined) {
+ producerScope = this
+ expect(3)
+ delay(Long.MAX_VALUE)
+ }
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ publisher.awaitFirstOrNull()
+ expectUnreached()
+ }
+ job.cancel()
+ expect(4)
+ val result = producerScope!!.trySend(1)
+ assertTrue(result.isFailure)
+ finish(5)
+ }
+
+ /** Tests that all methods on `flowPublish` fail without closing the channel when attempting to emit `null`. */
+ @Test
+ fun testEmittingNull() = runTest {
+ val publisher = flowPublish {
+ assertFailsWith<NullPointerException> { send(null) }
+ assertFailsWith<NullPointerException> { trySend(null) }
+ @Suppress("DEPRECATION")
+ assertFailsWith<NullPointerException> { offer(null) }
+ send("OK")
+ }
+ assertEquals("OK", publisher.awaitFirstOrNull())
+ }
} \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-jdk9/test/PublisherAsFlowTest.kt b/reactive/kotlinx-coroutines-jdk9/test/PublisherAsFlowTest.kt
index 97f106b3..b5b2a0a2 100644
--- a/reactive/kotlinx-coroutines-jdk9/test/PublisherAsFlowTest.kt
+++ b/reactive/kotlinx-coroutines-jdk9/test/PublisherAsFlowTest.kt
@@ -70,7 +70,7 @@ class PublisherAsFlowTest : TestBase() {
send(it + 1)
expect(it + 1)
}
- assertFalse { offer(-1) }
+ assertFalse { trySend(-1).isSuccess }
}
publisher.asFlow().collect {
diff --git a/reactive/kotlinx-coroutines-jdk9/test/PublisherCollectTest.kt b/reactive/kotlinx-coroutines-jdk9/test/PublisherCollectTest.kt
new file mode 100644
index 00000000..c2e88483
--- /dev/null
+++ b/reactive/kotlinx-coroutines-jdk9/test/PublisherCollectTest.kt
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.jdk9
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.reactive.*
+import org.junit.Test
+import org.reactivestreams.*
+import kotlin.test.*
+import java.util.concurrent.Flow as JFlow
+
+class PublisherCollectTest: TestBase() {
+
+ /** Tests the simple scenario where the publisher outputs a bounded stream of values to collect. */
+ @Test
+ fun testCollect() = runTest {
+ val x = 100
+ val xSum = x * (x + 1) / 2
+ val publisher = JFlow.Publisher<Int> { subscriber ->
+ var requested = 0L
+ var lastOutput = 0
+ subscriber.onSubscribe(object: JFlow.Subscription {
+
+ override fun request(n: Long) {
+ requested += n
+ if (n <= 0) {
+ subscriber.onError(IllegalArgumentException())
+ return
+ }
+ while (lastOutput < x && lastOutput < requested) {
+ lastOutput += 1
+ subscriber.onNext(lastOutput)
+ }
+ if (lastOutput == x)
+ subscriber.onComplete()
+ }
+
+ override fun cancel() {
+ /** According to rule 3.5 of the
+ * [reactive spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#3.5),
+ * this method can be called by the subscriber at any point, so it's not an error if it's called
+ * in this scenario. */
+ }
+
+ })
+ }
+ var sum = 0
+ publisher.collect {
+ sum += it
+ }
+ assertEquals(xSum, sum)
+ }
+
+ /** Tests the behavior of [collect] when the publisher raises an error. */
+ @Test
+ fun testCollectThrowingPublisher() = runTest {
+ val errorString = "Too many elements requested"
+ val x = 100
+ val xSum = x * (x + 1) / 2
+ val publisher = Publisher<Int> { subscriber ->
+ var requested = 0L
+ var lastOutput = 0
+ subscriber.onSubscribe(object: Subscription {
+
+ override fun request(n: Long) {
+ requested += n
+ if (n <= 0) {
+ subscriber.onError(IllegalArgumentException())
+ return
+ }
+ while (lastOutput < x && lastOutput < requested) {
+ lastOutput += 1
+ subscriber.onNext(lastOutput)
+ }
+ if (lastOutput == x)
+ subscriber.onError(IllegalArgumentException(errorString))
+ }
+
+ override fun cancel() {
+ /** See the comment for the corresponding part of [testCollect]. */
+ }
+
+ })
+ }
+ var sum = 0
+ try {
+ publisher.collect {
+ sum += it
+ }
+ } catch (e: IllegalArgumentException) {
+ assertEquals(errorString, e.message)
+ }
+ assertEquals(xSum, sum)
+ }
+
+ /** Tests the behavior of [collect] when the action throws. */
+ @Test
+ fun testCollectThrowingAction() = runTest {
+ val errorString = "Too many elements produced"
+ val x = 100
+ val xSum = x * (x + 1) / 2
+ val publisher = Publisher<Int> { subscriber ->
+ var requested = 0L
+ var lastOutput = 0
+ subscriber.onSubscribe(object: Subscription {
+
+ override fun request(n: Long) {
+ requested += n
+ if (n <= 0) {
+ subscriber.onError(IllegalArgumentException())
+ return
+ }
+ while (lastOutput < x && lastOutput < requested) {
+ lastOutput += 1
+ subscriber.onNext(lastOutput)
+ }
+ }
+
+ override fun cancel() {
+ assertEquals(x, lastOutput)
+ expect(x + 2)
+ }
+
+ })
+ }
+ var sum = 0
+ try {
+ expect(1)
+ var i = 1
+ publisher.collect {
+ sum += it
+ i += 1
+ expect(i)
+ if (sum >= xSum) {
+ throw IllegalArgumentException(errorString)
+ }
+ }
+ } catch (e: IllegalArgumentException) {
+ expect(x + 3)
+ assertEquals(errorString, e.message)
+ }
+ finish(x + 4)
+ }
+} \ No newline at end of file