aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeFastPathTest.kt
diff options
context:
space:
mode:
Diffstat (limited to 'kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeFastPathTest.kt')
-rw-r--r--kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeFastPathTest.kt92
1 files changed, 92 insertions, 0 deletions
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeFastPathTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeFastPathTest.kt
new file mode 100644
index 00000000..a92189c4
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeFastPathTest.kt
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlin.test.*
+
+class FlatMapMergeFastPathTest : FlatMapMergeBaseTest() {
+
+ override fun <T> Flow<T>.flatMap(mapper: suspend (T) -> Flow<T>): Flow<T> = flatMapMerge(transform = mapper).buffer(64)
+
+ @Test
+ override fun testFlatMapConcurrency() = runTest {
+ var concurrentRequests = 0
+ val flow = (1..100).asFlow().flatMapMerge(concurrency = 2) { value ->
+ flow {
+ ++concurrentRequests
+ emit(value)
+ delay(Long.MAX_VALUE)
+ }
+ }.buffer(64)
+
+ val consumer = launch {
+ flow.collect { value ->
+ expect(value)
+ }
+ }
+
+ repeat(4) {
+ yield()
+ }
+
+ assertEquals(2, concurrentRequests)
+ consumer.cancelAndJoin()
+ finish(3)
+ }
+
+ @Test
+ fun testCancellationExceptionDownstream() = runTest {
+ val flow = flow {
+ emit(1)
+ hang { expect(2) }
+ }.flatMapMerge {
+ flow {
+ emit(it)
+ expect(1)
+ throw CancellationException("")
+ }
+ }.buffer(64)
+
+ assertFailsWith<CancellationException>(flow)
+ finish(3)
+ }
+
+ @Test
+ fun testCancellationExceptionUpstream() = runTest {
+ val flow = flow {
+ expect(1)
+ emit(1)
+ expect(2)
+ yield()
+ throw CancellationException("")
+ }.flatMapMerge {
+ flow {
+ expect(3)
+ emit(it)
+ hang { expect(4) }
+ }
+ }.buffer(64)
+
+ assertFailsWith<CancellationException>(flow)
+ finish(5)
+ }
+
+ @Test
+ fun testCancellation() = runTest {
+ val result = flow {
+ emit(1)
+ emit(2)
+ emit(3)
+ emit(4)
+ expectUnreached() // Cancelled by take
+ emit(5)
+ }.flatMapMerge(2) { v -> flow { emit(v) } }
+ .buffer(64)
+ .take(2)
+ .toList()
+ assertEquals(listOf(1, 2), result)
+ }
+}