aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt')
-rw-r--r--reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt148
1 files changed, 148 insertions, 0 deletions
diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt
new file mode 100644
index 00000000..a37719de
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.reactive
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import kotlin.test.*
+
+class PublisherAsFlowTest : TestBase() {
+ @Test
+ fun testCancellation() = runTest {
+ var onNext = 0
+ var onCancelled = 0
+ var onError = 0
+
+ val publisher = publish(currentDispatcher()) {
+ coroutineContext[Job]?.invokeOnCompletion {
+ if (it is CancellationException) ++onCancelled
+ }
+
+ repeat(100) {
+ send(it)
+ }
+ }
+
+ publisher.asFlow().launchIn(CoroutineScope(Dispatchers.Unconfined)) {
+ onEach {
+ ++onNext
+ throw RuntimeException()
+ }
+ catch<Throwable> {
+ ++onError
+ }
+ }.join()
+
+
+ assertEquals(1, onNext)
+ assertEquals(1, onError)
+ assertEquals(1, onCancelled)
+ }
+
+ @Test
+ fun testBufferSize1() = runTest {
+ val publisher = publish(currentDispatcher()) {
+ expect(1)
+ send(3)
+
+ expect(2)
+ send(5)
+
+ expect(4)
+ send(7)
+ expect(6)
+ }
+
+ publisher.asFlow().collect {
+ expect(it)
+ }
+
+ finish(8)
+ }
+
+ @Test
+ fun testBufferSize10() = runTest {
+ val publisher = publish(currentDispatcher()) {
+ expect(1)
+ send(5)
+
+ expect(2)
+ send(6)
+
+ expect(3)
+ send(7)
+ expect(4)
+ }
+
+ publisher.asFlow().buffer(10).collect {
+ expect(it)
+ }
+
+ finish(8)
+ }
+
+ @Test
+ fun testConflated() = runTest {
+ val publisher = publish(currentDispatcher()) {
+ for (i in 1..5) send(i)
+ }
+ val list = publisher.asFlow().conflate().toList()
+ assertEquals(listOf(1, 5), list)
+ }
+
+ @Test
+ fun testProduce() = runTest {
+ val flow = publish(currentDispatcher()) { repeat(10) { send(it) } }.asFlow()
+ check((0..9).toList(), flow.produceIn(this))
+ check((0..9).toList(), flow.buffer(2).produceIn(this))
+ check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this))
+ check(listOf(0, 9), flow.conflate().produceIn(this))
+ }
+
+ private suspend fun check(expected: List<Int>, channel: ReceiveChannel<Int>) {
+ val result = ArrayList<Int>(10)
+ channel.consumeEach { result.add(it) }
+ assertEquals(expected, result)
+ }
+
+ @Test
+ fun testProduceCancellation() = runTest {
+ expect(1)
+ // publisher is an async coroutine, so it overproduces to the channel, but still gets cancelled
+ val flow = publish(currentDispatcher()) {
+ expect(3)
+ repeat(10) { value ->
+ when (value) {
+ in 0..6 -> send(value)
+ 7 -> try {
+ send(value)
+ } catch (e: CancellationException) {
+ finish(6)
+ throw e
+ }
+ else -> expectUnreached()
+ }
+ }
+ }.asFlow()
+ assertFailsWith<TestException> {
+ coroutineScope {
+ expect(2)
+ val channel = flow.produceIn(this)
+ channel.consumeEach { value ->
+ when (value) {
+ in 0..4 -> {}
+ 5 -> {
+ expect(4)
+ throw TestException()
+ }
+ else -> expectUnreached()
+ }
+ }
+ }
+ }
+ expect(5)
+ }
+} \ No newline at end of file