aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt
blob: 86adfee0491e5883fe7be747fa4b7e622d53f41a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
/*
 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
 */

package kotlinx.coroutines.channels

import kotlinx.coroutines.*
import kotlinx.coroutines.selects.*
import java.util.concurrent.atomic.*
import kotlin.random.*
import kotlin.test.*

class ChannelCancelUndeliveredElementStressTest : TestBase() {
    private val repeatTimes = 10_000 * stressTestMultiplier

    // total counters
    private var sendCnt = 0
    private var trySendFailedCnt = 0
    private var receivedCnt = 0
    private var undeliveredCnt = 0

    // last operation
    private var lastReceived = 0
    private var dSendCnt = 0
    private var dSendExceptionCnt = 0
    private var dTrySendFailedCnt = 0
    private var dReceivedCnt = 0
    private val dUndeliveredCnt = AtomicInteger()

    @Test
    fun testStress() = runTest {
        repeat(repeatTimes) {
            val channel = Channel<Int>(1) { dUndeliveredCnt.incrementAndGet() }
            val j1 = launch(Dispatchers.Default) {
                sendOne(channel) // send first
                sendOne(channel) // send second
            }
            val j2 = launch(Dispatchers.Default) {
                receiveOne(channel) // receive one element from the channel
                channel.cancel() // cancel the channel
            }

            joinAll(j1, j2)

            // All elements must be either received or undelivered (IN every run)
            if (dSendCnt - dTrySendFailedCnt != dReceivedCnt + dUndeliveredCnt.get()) {
                println("          Send: $dSendCnt")
                println("Send exception: $dSendExceptionCnt")
                println("trySend failed: $dTrySendFailedCnt")
                println("      Received: $dReceivedCnt")
                println("   Undelivered: ${dUndeliveredCnt.get()}")
                error("Failed")
            }
            trySendFailedCnt += dTrySendFailedCnt
            receivedCnt += dReceivedCnt
            undeliveredCnt += dUndeliveredCnt.get()
            // clear for next run
            dSendCnt = 0
            dSendExceptionCnt = 0
            dTrySendFailedCnt = 0
            dReceivedCnt = 0
            dUndeliveredCnt.set(0)
        }
        // Stats
        println("          Send: $sendCnt")
        println("trySend failed: $trySendFailedCnt")
        println("      Received: $receivedCnt")
        println("   Undelivered: $undeliveredCnt")
        assertEquals(sendCnt - trySendFailedCnt, receivedCnt + undeliveredCnt)
    }

    private suspend fun sendOne(channel: Channel<Int>) {
        dSendCnt++
        val i = ++sendCnt
        try {
            when (Random.nextInt(2)) {
                0 -> channel.send(i)
                1 -> if (!channel.trySend(i).isSuccess) {
                    dTrySendFailedCnt++
                }
            }
        } catch (e: Throwable) {
            assertTrue(e is CancellationException) // the only exception possible in this test
            dSendExceptionCnt++
            throw e
        }
    }

    private suspend fun receiveOne(channel: Channel<Int>) {
        val received = when (Random.nextInt(3)) {
            0 -> channel.receive()
            1 -> channel.receiveCatching().getOrElse { error("Cannot be closed yet") }
            2 -> select {
                channel.onReceive { it }
            }
            else -> error("Cannot happen")
        }
        assertTrue(received > lastReceived)
        dReceivedCnt++
        lastReceived = received
    }
}