diff options
Diffstat (limited to 'kotlinx-coroutines-core/jvm/test/flow')
4 files changed, 110 insertions, 5 deletions
diff --git a/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt index e3db2626..f1be284c 100644 --- a/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt +++ b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt @@ -36,7 +36,7 @@ class CallbackFlowTest : TestBase() { fun testThrowingConsumer() = runTest { var i = 0 val api = CallbackApi { - runCatching { it.offer(++i) } + it.trySend(++i) } val flow = callbackFlow<Int> { @@ -77,13 +77,13 @@ class CallbackFlowTest : TestBase() { var i = 0 val api = CallbackApi { if (i < 5) { - it.offer(++i) + it.trySend(++i) } else { it.close(RuntimeException()) } } - val flow = callbackFlow<Int>() { + val flow = callbackFlow<Int> { api.start(channel) awaitClose { api.stop() diff --git a/kotlinx-coroutines-core/jvm/test/flow/SharingReferenceTest.kt b/kotlinx-coroutines-core/jvm/test/flow/SharingReferenceTest.kt new file mode 100644 index 00000000..98240fc9 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/flow/SharingReferenceTest.kt @@ -0,0 +1,61 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.internal.* +import org.junit.* + +/** + * Tests that shared flows keep strong reference to their source flows. + * See https://github.com/Kotlin/kotlinx.coroutines/issues/2557 + */ +@OptIn(DelicateCoroutinesApi::class) +class SharingReferenceTest : TestBase() { + private val token = object {} + + /* + * Single-threaded executor that we are using to ensure that the flow being sharing actually + * suspended (spilled its locals, attached to parent), so we can verify reachability. + * Without that, it's possible to have a situation where target flow is still + * being strongly referenced (by its dispatcher), but the test already tries to test reachability and fails. + */ + @get:Rule + val executor = ExecutorRule(1) + + private val weakEmitter = flow { + emit(null) + // suspend forever without keeping a strong reference to continuation -- this is a model of + // a callback API that does not keep a strong reference it is listeners, but works + suspendCancellableCoroutine<Unit> { } + // using the token here to make it easily traceable + emit(token) + } + + @Test + fun testShareInReference() { + val flow = weakEmitter.shareIn(ContextScope(executor), SharingStarted.Eagerly, 0) + linearize() + FieldWalker.assertReachableCount(1, flow) { it === token } + } + + @Test + fun testStateInReference() { + val flow = weakEmitter.stateIn(ContextScope(executor), SharingStarted.Eagerly, null) + linearize() + FieldWalker.assertReachableCount(1, flow) { it === token } + } + + @Test + fun testStateInSuspendingReference() = runTest { + val flow = weakEmitter.stateIn(GlobalScope) + linearize() + FieldWalker.assertReachableCount(1, flow) { it === token } + } + + private fun linearize() { + runBlocking(executor) { } + } +} diff --git a/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt index dc3cd43c..e55eaad1 100644 --- a/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt @@ -62,7 +62,7 @@ class StateFlowStressTest : TestBase() { for (second in 1..nSeconds) { delay(1000) val cs = collected.map { it.sum() } - println("$second: emitted=${emitted.sum()}, collected=${cs.min()}..${cs.max()}") + println("$second: emitted=${emitted.sum()}, collected=${cs.minOrNull()}..${cs.maxOrNull()}") } emitters.cancelAndJoin() collectors.cancelAndJoin() @@ -77,4 +77,4 @@ class StateFlowStressTest : TestBase() { @Test fun testTenEmittersAndCollectors() = stress(10, 10) -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/flow/StateFlowUpdateStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/StateFlowUpdateStressTest.kt new file mode 100644 index 00000000..660ed0aa --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/flow/StateFlowUpdateStressTest.kt @@ -0,0 +1,44 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import org.junit.* +import kotlin.test.* +import kotlin.test.Test + +class StateFlowUpdateStressTest : TestBase() { + private val iterations = 1_000_000 * stressTestMultiplier + + @get:Rule + public val executor = ExecutorRule(2) + + @Test + fun testUpdate() = doTest { update { it + 1 } } + + @Test + fun testUpdateAndGet() = doTest { updateAndGet { it + 1 } } + + @Test + fun testGetAndUpdate() = doTest { getAndUpdate { it + 1 } } + + private fun doTest(increment: MutableStateFlow<Int>.() -> Unit) = runTest { + val flow = MutableStateFlow(0) + val j1 = launch(Dispatchers.Default) { + repeat(iterations / 2) { + flow.increment() + } + } + + val j2 = launch(Dispatchers.Default) { + repeat(iterations / 2) { + flow.increment() + } + } + + joinAll(j1, j2) + assertEquals(iterations, flow.value) + } +} |