aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt
blob: e55eaad1274357871aba15c591e9c060b3f86b3f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
/*
 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
 */

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import org.junit.*
import kotlin.random.*

class StateFlowStressTest : TestBase() {
    private val nSeconds = 3 * stressTestMultiplier
    private val state = MutableStateFlow<Long>(0)
    private lateinit var pool: ExecutorCoroutineDispatcher

    @After
    fun tearDown() {
        pool.close()
    }

    fun stress(nEmitters: Int, nCollectors: Int) = runTest {
        pool = newFixedThreadPoolContext(nEmitters + nCollectors, "StateFlowStressTest")
        val collected = Array(nCollectors) { LongArray(nEmitters) }
        val collectors = launch {
            repeat(nCollectors) { collector ->
                launch(pool) {
                    val c = collected[collector]
                    // collect, but abort and collect again after every 1000 values to stress allocation/deallocation
                    do {
                        val batchSize = Random.nextInt(1..1000)
                        var index = 0
                        val cnt = state.onEach { value ->
                            val emitter = (value % nEmitters).toInt()
                            val current = value / nEmitters
                            // the first value in batch is allowed to repeat, but cannot go back
                            val ok = if (index++ == 0) current >= c[emitter] else current > c[emitter]
                            check(ok) {
                                "Values must be monotonic, but $current is not, " +
                                    "was ${c[emitter]} in collector #$collector from emitter #$emitter"
                            }
                            c[emitter] = current

                        }.take(batchSize).map { 1 }.sum()
                    } while (cnt == batchSize)
                }
            }
        }
        val emitted = LongArray(nEmitters)
        val emitters = launch {
            repeat(nEmitters) { emitter ->
                launch(pool) {
                    var current = 1L
                    while (true) {
                        state.value = current * nEmitters + emitter
                        emitted[emitter] = current
                        current++
                        if (current % 1000 == 0L) yield() // make it cancellable
                    }
                }
            }
        }
        for (second in 1..nSeconds) {
            delay(1000)
            val cs = collected.map { it.sum() }
            println("$second: emitted=${emitted.sum()}, collected=${cs.minOrNull()}..${cs.maxOrNull()}")
        }
        emitters.cancelAndJoin()
        collectors.cancelAndJoin()
        // make sure nothing hanged up
        require(collected.all { c ->
            c.withIndex().all { (emitter, current) -> current > emitted[emitter] / 2 }
        })
    }

    @Test
    fun testSingleEmitterAndCollector() = stress(1, 1)

    @Test
    fun testTenEmittersAndCollectors() = stress(10, 10)
}