aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt
diff options
context:
space:
mode:
Diffstat (limited to 'kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt')
-rw-r--r--kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt276
1 files changed, 276 insertions, 0 deletions
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt
new file mode 100644
index 00000000..9c96352d
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt
@@ -0,0 +1,276 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow.operators
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import kotlin.test.*
+
+class SampleTest : TestBase() {
+ @Test
+ public fun testBasic() = withVirtualTime {
+ expect(1)
+ val flow = flow {
+ expect(3)
+ emit("A")
+ delay(1500)
+ emit("B")
+ delay(500)
+ emit("C")
+ delay(250)
+ emit("D")
+ delay(2000)
+ emit("E")
+ expect(4)
+ }
+
+ expect(2)
+ val result = flow.sample(1000).toList()
+ assertEquals(listOf("A", "B", "D"), result)
+ finish(5)
+ }
+
+ @Test
+ fun testDelayedFirst() = withVirtualTime {
+ val flow = flow {
+ delay(60)
+ emit(1)
+ delay(60)
+ expect(1)
+ }.sample(100)
+ assertEquals(1, flow.singleOrNull())
+ finish(2)
+ }
+
+ @Test
+ fun testBasic2() = withVirtualTime {
+ expect(1)
+ val flow = flow {
+ expect(3)
+ emit(1)
+ emit(2)
+ delay(501)
+ emit(3)
+ delay(100)
+ emit(4)
+ delay(100)
+ emit(5)
+ emit(6)
+ delay(301)
+ emit(7)
+ delay(501)
+ expect(4)
+ }
+
+ expect(2)
+ val result = flow.sample(500).toList()
+ assertEquals(listOf(2, 6, 7), result)
+ finish(5)
+ }
+
+ @Test
+ fun testFixedDelay() = withVirtualTime {
+ val flow = flow {
+ emit("A")
+ delay(150)
+ emit("B")
+ expect(1)
+ }.sample(100)
+ assertEquals("A", flow.single())
+ finish(2)
+ }
+
+ @Test
+ fun testSingleNull() = withVirtualTime {
+ val flow = flow<Int?> {
+ emit(null)
+ delay(2)
+ expect(1)
+ }.sample(1)
+ assertNull(flow.single())
+ finish(2)
+ }
+
+ @Test
+ fun testBasicWithNulls() = withVirtualTime {
+ expect(1)
+ val flow = flow {
+ expect(3)
+ emit("A")
+ delay(1500)
+ emit(null)
+ delay(500)
+ emit("C")
+ delay(250)
+ emit(null)
+ delay(2000)
+ emit("E")
+ expect(4)
+ }
+
+ expect(2)
+ val result = flow.sample(1000).toList()
+ assertEquals(listOf("A", null, null), result)
+ finish(5)
+ }
+
+ @Test
+ fun testEmpty() = runTest {
+ val flow = emptyFlow<Int>().sample(Long.MAX_VALUE)
+ assertNull(flow.singleOrNull())
+ }
+
+ @Test
+ fun testScalar() = runTest {
+ val flow = flowOf(1, 2, 3).sample(Long.MAX_VALUE)
+ assertNull(flow.singleOrNull())
+ }
+
+ @Test
+ // note that this test depends on the sampling strategy -- when sampling time starts on a quiescent flow that suddenly emits
+ fun testLongWait() = withVirtualTime {
+ expect(1)
+ val flow = flow {
+ expect(2)
+ emit("A")
+ delay(3500) // long delay -- multiple sampling intervals
+ emit("B")
+ delay(900) // crosses time = 4000 barrier
+ emit("C")
+ delay(3000) // long wait again
+
+ }
+ val result = flow.sample(1000).toList()
+ assertEquals(listOf("A", "B", "C"), result)
+ finish(3)
+ }
+
+ @Test
+ fun testPace() = withVirtualTime {
+ val flow = flow {
+ expect(1)
+ repeat(4) {
+ emit(-it)
+ delay(50)
+ }
+
+ repeat(4) {
+ emit(it)
+ delay(100)
+ }
+ expect(2)
+ }.sample(100)
+
+ assertEquals(listOf(-1, -3, 0, 1, 2, 3), flow.toList())
+ finish(3)
+ }
+
+ @Test
+ fun testUpstreamError() = testUpstreamError(TestException())
+
+ @Test
+ fun testUpstreamErrorCancellationException() = testUpstreamError(CancellationException(""))
+
+ private inline fun <reified T: Throwable> testUpstreamError(cause: T) = runTest {
+ val latch = Channel<Unit>()
+ val flow = flow {
+ expect(1)
+ emit(1)
+ expect(2)
+ latch.receive()
+ throw cause
+ }.sample(1).map {
+ latch.send(Unit)
+ hang { expect(3) }
+ }
+
+ assertFailsWith<T>(flow)
+ finish(4)
+ }
+
+ @Test
+ fun testUpstreamErrorIsolatedContext() = runTest {
+ val latch = Channel<Unit>()
+ val flow = flow {
+ assertEquals("upstream", NamedDispatchers.name())
+ expect(1)
+ emit(1)
+ expect(2)
+ latch.receive()
+ throw TestException()
+ }.flowOn(NamedDispatchers("upstream")).sample(1).map {
+ latch.send(Unit)
+ hang { expect(3) }
+ }
+
+ assertFailsWith<TestException>(flow)
+ finish(4)
+ }
+
+ @Test
+ fun testUpstreamErrorSampleNotTriggered() = runTest {
+ val flow = flow {
+ expect(1)
+ emit(1)
+ expect(2)
+ throw TestException()
+ }.sample(Long.MAX_VALUE).map {
+ expectUnreached()
+ }
+ assertFailsWith<TestException>(flow)
+ finish(3)
+ }
+
+ @Test
+ fun testUpstreamErrorSampleNotTriggeredInIsolatedContext() = runTest {
+ val flow = flow {
+ expect(1)
+ emit(1)
+ expect(2)
+ throw TestException()
+ }.flowOn(NamedDispatchers("unused")).sample(Long.MAX_VALUE).map {
+ expectUnreached()
+ }
+
+ assertFailsWith<TestException>(flow)
+ finish(3)
+ }
+
+ @Test
+ fun testDownstreamError() = runTest {
+ val flow = flow {
+ expect(1)
+ emit(1)
+ hang { expect(3) }
+ }.sample(100).map {
+ expect(2)
+ yield()
+ throw TestException()
+ it
+ }
+
+ assertFailsWith<TestException>(flow)
+ finish(4)
+ }
+
+ @Test
+ fun testDownstreamErrorIsolatedContext() = runTest {
+ val flow = flow {
+ assertEquals("upstream", NamedDispatchers.name())
+ expect(1)
+ emit(1)
+ hang { expect(3) }
+ }.flowOn(NamedDispatchers("upstream")).sample(100).map {
+ expect(2)
+ yield()
+ throw TestException()
+ it
+ }
+
+ assertFailsWith<TestException>(flow)
+ finish(4)
+ }
+}