blob: 316b3785083f2a76d9dc8651658690a11d160dc1 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
import kotlinx.coroutines.*
import org.junit.After
import org.junit.Test
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.*
class ConflatedChannelCloseStressTest : TestBase() {
private val nSenders = 2
private val testSeconds = 3 * stressTestMultiplier
private val curChannel = AtomicReference<Channel<Int>>(Channel(Channel.CONFLATED))
private val sent = AtomicInteger()
private val closed = AtomicInteger()
val received = AtomicInteger()
val pool = newFixedThreadPoolContext(nSenders + 2, "TestStressClose")
@After
fun tearDown() {
pool.close()
}
@Test
fun testStressClose() = runBlocking {
println("--- ConflatedChannelCloseStressTest with nSenders=$nSenders")
val senderJobs = List(nSenders) { Job() }
val senders = List(nSenders) { senderId ->
launch(pool) {
var x = senderId
try {
while (isActive) {
try {
curChannel.get().offer(x)
x += nSenders
sent.incrementAndGet()
} catch (e: ClosedSendChannelException) {
// ignore
}
}
} finally {
senderJobs[senderId].cancel()
}
}
}
val closerJob = Job()
val closer = launch(pool) {
try {
while (isActive) {
flipChannel()
closed.incrementAndGet()
yield()
}
} finally {
closerJob.cancel()
}
}
val receiver = async(pool + NonCancellable) {
while (isActive) {
curChannel.get().receiveOrNull()
received.incrementAndGet()
}
}
// print stats while running
repeat(testSeconds) {
delay(1000)
printStats()
}
println("Stopping")
senders.forEach { it.cancel() }
closer.cancel()
// wait them to complete
println("waiting for senders...")
senderJobs.forEach { it.join() }
println("waiting for closer...")
closerJob.join()
// close cur channel
println("Closing channel and signalling receiver...")
flipChannel()
curChannel.get().close(StopException())
/// wait for receiver do complete
println("Waiting for receiver...")
try {
receiver.await()
error("Receiver should not complete normally")
} catch (e: StopException) {
// ok
}
// print stats
println("--- done")
printStats()
}
private fun flipChannel() {
val oldChannel = curChannel.get()
val newChannel = Channel<Int>(Channel.CONFLATED)
curChannel.set(newChannel)
check(oldChannel.close())
}
private fun printStats() {
println("sent ${sent.get()}, closed ${closed.get()}, received ${received.get()}")
}
class StopException : Exception()
}
|