diff options
Diffstat (limited to 'reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt')
-rw-r--r-- | reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt | 173 |
1 files changed, 143 insertions, 30 deletions
diff --git a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt index 18cd012d..efe7ec7e 100644 --- a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt @@ -5,10 +5,14 @@ package kotlinx.coroutines.reactive import kotlinx.coroutines.* +import kotlinx.coroutines.exceptions.* import org.junit.Test import org.junit.runner.* import org.junit.runners.* import org.reactivestreams.* +import java.lang.IllegalStateException +import java.lang.RuntimeException +import kotlin.contracts.* import kotlin.coroutines.* import kotlin.test.* @@ -80,30 +84,6 @@ class IntegrationTest( } @Test - fun testNumbers() = runBlocking<Unit> { - val n = 100 * stressTestMultiplier - val pub = publish(ctx(coroutineContext)) { - for (i in 1..n) { - send(i) - if (delay) delay(1) - } - } - assertEquals(1, pub.awaitFirst()) - assertEquals(1, pub.awaitFirstOrDefault(0)) - assertEquals(1, pub.awaitFirstOrNull()) - assertEquals(1, pub.awaitFirstOrElse { 0 }) - assertEquals(n, pub.awaitLast()) - assertFailsWith<IllegalArgumentException> { pub.awaitSingle() } - assertFailsWith<IllegalArgumentException> { pub.awaitSingleOrDefault(0) } - assertFailsWith<IllegalArgumentException> { pub.awaitSingleOrNull() } - assertFailsWith<IllegalArgumentException> { pub.awaitSingleOrElse { 0 } } - checkNumbers(n, pub) - val channel = pub.openSubscription() - checkNumbers(n, channel.asPublisher(ctx(coroutineContext))) - channel.cancel() - } - - @Test fun testCancelWithoutValue() = runTest { val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) { publish<String> { @@ -116,7 +96,7 @@ class IntegrationTest( } @Test - fun testEmptySingle() = runTest(unhandled = listOf({e -> e is NoSuchElementException})) { + fun testEmptySingle() = runTest(unhandled = listOf { e -> e is NoSuchElementException }) { expect(1) val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) { publish<String> { @@ -130,12 +110,145 @@ class IntegrationTest( finish(3) } - private suspend fun checkNumbers(n: Int, pub: Publisher<Int>) { - var last = 0 - pub.collect { - assertEquals(++last, it) + /** + * Test that the continuation is not being resumed after it has already failed due to there having been too many + * values passed. + */ + @Test + fun testNotCompletingFailedAwait() = runTest { + try { + expect(1) + Publisher<Int> { sub -> + sub.onSubscribe(object: Subscription { + override fun request(n: Long) { + expect(2) + sub.onNext(1) + sub.onNext(2) + expect(4) + sub.onComplete() + } + + override fun cancel() { + expect(3) + } + }) + }.awaitSingle() + } catch (e: java.lang.IllegalArgumentException) { + expect(5) } - assertEquals(n, last) + finish(6) } + /** + * Test the behavior of [awaitOne] on unconforming publishers. + */ + @Test + fun testAwaitOnNonconformingPublishers() = runTest { + fun <T> publisher(block: Subscriber<in T>.(n: Long) -> Unit) = + Publisher<T> { subscriber -> + subscriber.onSubscribe(object: Subscription { + override fun request(n: Long) { + subscriber.block(n) + } + + override fun cancel() { + } + }) + } + val dummyMessage = "dummy" + val dummyThrowable = RuntimeException(dummyMessage) + suspend fun <T> assertDetectsBadPublisher( + operation: suspend Publisher<T>.() -> T, + message: String, + block: Subscriber<in T>.(n: Long) -> Unit, + ) { + assertCallsExceptionHandlerWith<IllegalStateException> { + try { + publisher(block).operation() + } catch (e: Throwable) { + if (e.message != dummyMessage) + throw e + } + }.let { + assertTrue("Expected the message to contain '$message', got '${it.message}'") { + it.message?.contains(message) ?: false + } + } + } + + // Rule 1.1 broken: the publisher produces more values than requested. + assertDetectsBadPublisher<Int>({ awaitFirst() }, "provided more") { + onNext(1) + onNext(2) + onComplete() + } + + // Rule 1.7 broken: the publisher calls a method on a subscriber after reaching the terminal state. + assertDetectsBadPublisher<Int>({ awaitSingle() }, "terminal state") { + onNext(1) + onError(dummyThrowable) + onComplete() + } + assertDetectsBadPublisher<Int>({ awaitSingleOrDefault(2) }, "terminal state") { + onComplete() + onError(dummyThrowable) + } + assertDetectsBadPublisher<Int>({ awaitFirst() }, "terminal state") { + onNext(0) + onComplete() + onComplete() + } + assertDetectsBadPublisher<Int>({ awaitFirstOrDefault(1) }, "terminal state") { + onComplete() + onNext(3) + } + assertDetectsBadPublisher<Int>({ awaitSingle() }, "terminal state") { + onError(dummyThrowable) + onNext(3) + } + + // Rule 1.9 broken (the first signal to the subscriber was not 'onSubscribe') + assertCallsExceptionHandlerWith<IllegalStateException> { + try { + Publisher<Int> { subscriber -> + subscriber.onNext(3) + subscriber.onComplete() + }.awaitFirst() + } catch (e: NoSuchElementException) { + // intentionally blank + } + }.let { assertTrue(it.message?.contains("onSubscribe") ?: false) } + } + + @Test + fun testPublishWithTimeout() = runTest { + val publisher = publish<Int> { + expect(2) + withTimeout(1) { delay(100) } + } + try { + expect(1) + publisher.awaitFirstOrNull() + } catch (e: CancellationException) { + expect(3) + } + finish(4) + } + +} + +@OptIn(ExperimentalContracts::class) +internal suspend inline fun <reified E: Throwable> assertCallsExceptionHandlerWith( + crossinline operation: suspend (CoroutineExceptionHandler) -> Unit): E { + contract { + callsInPlace(operation, InvocationKind.EXACTLY_ONCE) + } + val handler = CapturingHandler() + return withContext(handler) { + operation(handler) + handler.getException().let { + assertTrue(it is E, it.toString()) + it + } + } }
\ No newline at end of file |