aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt')
-rw-r--r--reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt173
1 files changed, 143 insertions, 30 deletions
diff --git a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt
index 18cd012d..efe7ec7e 100644
--- a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt
@@ -5,10 +5,14 @@
package kotlinx.coroutines.reactive
import kotlinx.coroutines.*
+import kotlinx.coroutines.exceptions.*
import org.junit.Test
import org.junit.runner.*
import org.junit.runners.*
import org.reactivestreams.*
+import java.lang.IllegalStateException
+import java.lang.RuntimeException
+import kotlin.contracts.*
import kotlin.coroutines.*
import kotlin.test.*
@@ -80,30 +84,6 @@ class IntegrationTest(
}
@Test
- fun testNumbers() = runBlocking<Unit> {
- val n = 100 * stressTestMultiplier
- val pub = publish(ctx(coroutineContext)) {
- for (i in 1..n) {
- send(i)
- if (delay) delay(1)
- }
- }
- assertEquals(1, pub.awaitFirst())
- assertEquals(1, pub.awaitFirstOrDefault(0))
- assertEquals(1, pub.awaitFirstOrNull())
- assertEquals(1, pub.awaitFirstOrElse { 0 })
- assertEquals(n, pub.awaitLast())
- assertFailsWith<IllegalArgumentException> { pub.awaitSingle() }
- assertFailsWith<IllegalArgumentException> { pub.awaitSingleOrDefault(0) }
- assertFailsWith<IllegalArgumentException> { pub.awaitSingleOrNull() }
- assertFailsWith<IllegalArgumentException> { pub.awaitSingleOrElse { 0 } }
- checkNumbers(n, pub)
- val channel = pub.openSubscription()
- checkNumbers(n, channel.asPublisher(ctx(coroutineContext)))
- channel.cancel()
- }
-
- @Test
fun testCancelWithoutValue() = runTest {
val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
publish<String> {
@@ -116,7 +96,7 @@ class IntegrationTest(
}
@Test
- fun testEmptySingle() = runTest(unhandled = listOf({e -> e is NoSuchElementException})) {
+ fun testEmptySingle() = runTest(unhandled = listOf { e -> e is NoSuchElementException }) {
expect(1)
val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
publish<String> {
@@ -130,12 +110,145 @@ class IntegrationTest(
finish(3)
}
- private suspend fun checkNumbers(n: Int, pub: Publisher<Int>) {
- var last = 0
- pub.collect {
- assertEquals(++last, it)
+ /**
+ * Test that the continuation is not being resumed after it has already failed due to there having been too many
+ * values passed.
+ */
+ @Test
+ fun testNotCompletingFailedAwait() = runTest {
+ try {
+ expect(1)
+ Publisher<Int> { sub ->
+ sub.onSubscribe(object: Subscription {
+ override fun request(n: Long) {
+ expect(2)
+ sub.onNext(1)
+ sub.onNext(2)
+ expect(4)
+ sub.onComplete()
+ }
+
+ override fun cancel() {
+ expect(3)
+ }
+ })
+ }.awaitSingle()
+ } catch (e: java.lang.IllegalArgumentException) {
+ expect(5)
}
- assertEquals(n, last)
+ finish(6)
}
+ /**
+ * Test the behavior of [awaitOne] on unconforming publishers.
+ */
+ @Test
+ fun testAwaitOnNonconformingPublishers() = runTest {
+ fun <T> publisher(block: Subscriber<in T>.(n: Long) -> Unit) =
+ Publisher<T> { subscriber ->
+ subscriber.onSubscribe(object: Subscription {
+ override fun request(n: Long) {
+ subscriber.block(n)
+ }
+
+ override fun cancel() {
+ }
+ })
+ }
+ val dummyMessage = "dummy"
+ val dummyThrowable = RuntimeException(dummyMessage)
+ suspend fun <T> assertDetectsBadPublisher(
+ operation: suspend Publisher<T>.() -> T,
+ message: String,
+ block: Subscriber<in T>.(n: Long) -> Unit,
+ ) {
+ assertCallsExceptionHandlerWith<IllegalStateException> {
+ try {
+ publisher(block).operation()
+ } catch (e: Throwable) {
+ if (e.message != dummyMessage)
+ throw e
+ }
+ }.let {
+ assertTrue("Expected the message to contain '$message', got '${it.message}'") {
+ it.message?.contains(message) ?: false
+ }
+ }
+ }
+
+ // Rule 1.1 broken: the publisher produces more values than requested.
+ assertDetectsBadPublisher<Int>({ awaitFirst() }, "provided more") {
+ onNext(1)
+ onNext(2)
+ onComplete()
+ }
+
+ // Rule 1.7 broken: the publisher calls a method on a subscriber after reaching the terminal state.
+ assertDetectsBadPublisher<Int>({ awaitSingle() }, "terminal state") {
+ onNext(1)
+ onError(dummyThrowable)
+ onComplete()
+ }
+ assertDetectsBadPublisher<Int>({ awaitSingleOrDefault(2) }, "terminal state") {
+ onComplete()
+ onError(dummyThrowable)
+ }
+ assertDetectsBadPublisher<Int>({ awaitFirst() }, "terminal state") {
+ onNext(0)
+ onComplete()
+ onComplete()
+ }
+ assertDetectsBadPublisher<Int>({ awaitFirstOrDefault(1) }, "terminal state") {
+ onComplete()
+ onNext(3)
+ }
+ assertDetectsBadPublisher<Int>({ awaitSingle() }, "terminal state") {
+ onError(dummyThrowable)
+ onNext(3)
+ }
+
+ // Rule 1.9 broken (the first signal to the subscriber was not 'onSubscribe')
+ assertCallsExceptionHandlerWith<IllegalStateException> {
+ try {
+ Publisher<Int> { subscriber ->
+ subscriber.onNext(3)
+ subscriber.onComplete()
+ }.awaitFirst()
+ } catch (e: NoSuchElementException) {
+ // intentionally blank
+ }
+ }.let { assertTrue(it.message?.contains("onSubscribe") ?: false) }
+ }
+
+ @Test
+ fun testPublishWithTimeout() = runTest {
+ val publisher = publish<Int> {
+ expect(2)
+ withTimeout(1) { delay(100) }
+ }
+ try {
+ expect(1)
+ publisher.awaitFirstOrNull()
+ } catch (e: CancellationException) {
+ expect(3)
+ }
+ finish(4)
+ }
+
+}
+
+@OptIn(ExperimentalContracts::class)
+internal suspend inline fun <reified E: Throwable> assertCallsExceptionHandlerWith(
+ crossinline operation: suspend (CoroutineExceptionHandler) -> Unit): E {
+ contract {
+ callsInPlace(operation, InvocationKind.EXACTLY_ONCE)
+ }
+ val handler = CapturingHandler()
+ return withContext(handler) {
+ operation(handler)
+ handler.getException().let {
+ assertTrue(it is E, it.toString())
+ it
+ }
+ }
} \ No newline at end of file