aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/jvm/test/flow
diff options
context:
space:
mode:
Diffstat (limited to 'kotlinx-coroutines-core/jvm/test/flow')
-rw-r--r--kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt6
-rw-r--r--kotlinx-coroutines-core/jvm/test/flow/SharingReferenceTest.kt61
-rw-r--r--kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt4
-rw-r--r--kotlinx-coroutines-core/jvm/test/flow/StateFlowUpdateStressTest.kt44
4 files changed, 110 insertions, 5 deletions
diff --git a/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt
index e3db2626..f1be284c 100644
--- a/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt
@@ -36,7 +36,7 @@ class CallbackFlowTest : TestBase() {
fun testThrowingConsumer() = runTest {
var i = 0
val api = CallbackApi {
- runCatching { it.offer(++i) }
+ it.trySend(++i)
}
val flow = callbackFlow<Int> {
@@ -77,13 +77,13 @@ class CallbackFlowTest : TestBase() {
var i = 0
val api = CallbackApi {
if (i < 5) {
- it.offer(++i)
+ it.trySend(++i)
} else {
it.close(RuntimeException())
}
}
- val flow = callbackFlow<Int>() {
+ val flow = callbackFlow<Int> {
api.start(channel)
awaitClose {
api.stop()
diff --git a/kotlinx-coroutines-core/jvm/test/flow/SharingReferenceTest.kt b/kotlinx-coroutines-core/jvm/test/flow/SharingReferenceTest.kt
new file mode 100644
index 00000000..98240fc9
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/flow/SharingReferenceTest.kt
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.internal.*
+import org.junit.*
+
+/**
+ * Tests that shared flows keep strong reference to their source flows.
+ * See https://github.com/Kotlin/kotlinx.coroutines/issues/2557
+ */
+@OptIn(DelicateCoroutinesApi::class)
+class SharingReferenceTest : TestBase() {
+ private val token = object {}
+
+ /*
+ * Single-threaded executor that we are using to ensure that the flow being sharing actually
+ * suspended (spilled its locals, attached to parent), so we can verify reachability.
+ * Without that, it's possible to have a situation where target flow is still
+ * being strongly referenced (by its dispatcher), but the test already tries to test reachability and fails.
+ */
+ @get:Rule
+ val executor = ExecutorRule(1)
+
+ private val weakEmitter = flow {
+ emit(null)
+ // suspend forever without keeping a strong reference to continuation -- this is a model of
+ // a callback API that does not keep a strong reference it is listeners, but works
+ suspendCancellableCoroutine<Unit> { }
+ // using the token here to make it easily traceable
+ emit(token)
+ }
+
+ @Test
+ fun testShareInReference() {
+ val flow = weakEmitter.shareIn(ContextScope(executor), SharingStarted.Eagerly, 0)
+ linearize()
+ FieldWalker.assertReachableCount(1, flow) { it === token }
+ }
+
+ @Test
+ fun testStateInReference() {
+ val flow = weakEmitter.stateIn(ContextScope(executor), SharingStarted.Eagerly, null)
+ linearize()
+ FieldWalker.assertReachableCount(1, flow) { it === token }
+ }
+
+ @Test
+ fun testStateInSuspendingReference() = runTest {
+ val flow = weakEmitter.stateIn(GlobalScope)
+ linearize()
+ FieldWalker.assertReachableCount(1, flow) { it === token }
+ }
+
+ private fun linearize() {
+ runBlocking(executor) { }
+ }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt
index dc3cd43c..e55eaad1 100644
--- a/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt
@@ -62,7 +62,7 @@ class StateFlowStressTest : TestBase() {
for (second in 1..nSeconds) {
delay(1000)
val cs = collected.map { it.sum() }
- println("$second: emitted=${emitted.sum()}, collected=${cs.min()}..${cs.max()}")
+ println("$second: emitted=${emitted.sum()}, collected=${cs.minOrNull()}..${cs.maxOrNull()}")
}
emitters.cancelAndJoin()
collectors.cancelAndJoin()
@@ -77,4 +77,4 @@ class StateFlowStressTest : TestBase() {
@Test
fun testTenEmittersAndCollectors() = stress(10, 10)
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/flow/StateFlowUpdateStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/StateFlowUpdateStressTest.kt
new file mode 100644
index 00000000..660ed0aa
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/flow/StateFlowUpdateStressTest.kt
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import org.junit.*
+import kotlin.test.*
+import kotlin.test.Test
+
+class StateFlowUpdateStressTest : TestBase() {
+ private val iterations = 1_000_000 * stressTestMultiplier
+
+ @get:Rule
+ public val executor = ExecutorRule(2)
+
+ @Test
+ fun testUpdate() = doTest { update { it + 1 } }
+
+ @Test
+ fun testUpdateAndGet() = doTest { updateAndGet { it + 1 } }
+
+ @Test
+ fun testGetAndUpdate() = doTest { getAndUpdate { it + 1 } }
+
+ private fun doTest(increment: MutableStateFlow<Int>.() -> Unit) = runTest {
+ val flow = MutableStateFlow(0)
+ val j1 = launch(Dispatchers.Default) {
+ repeat(iterations / 2) {
+ flow.increment()
+ }
+ }
+
+ val j2 = launch(Dispatchers.Default) {
+ repeat(iterations / 2) {
+ flow.increment()
+ }
+ }
+
+ joinAll(j1, j2)
+ assertEquals(iterations, flow.value)
+ }
+}