aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt
diff options
context:
space:
mode:
Diffstat (limited to 'kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt')
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt159
1 files changed, 159 insertions, 0 deletions
diff --git a/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt
new file mode 100644
index 00000000..54ba7b63
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.channels
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.selects.*
+import org.junit.*
+import org.junit.runner.*
+import org.junit.runners.*
+import java.util.concurrent.atomic.*
+
+/**
+ * Tests delivery of events to multiple broadcast channel subscribers.
+ */
+@RunWith(Parameterized::class)
+class BroadcastChannelMultiReceiveStressTest(
+ private val kind: TestBroadcastChannelKind
+) : TestBase() {
+ companion object {
+ @Parameterized.Parameters(name = "{0}")
+ @JvmStatic
+ fun params(): Collection<Array<Any>> =
+ TestBroadcastChannelKind.values().map { arrayOf<Any>(it) }
+ }
+
+ private val nReceivers = if (isStressTest) 10 else 5
+ private val nSeconds = 3 * stressTestMultiplier
+
+ private val broadcast = kind.create<Long>()
+ private val pool = newFixedThreadPoolContext(nReceivers + 1, "BroadcastChannelMultiReceiveStressTest")
+
+ private val sentTotal = AtomicLong()
+ private val receivedTotal = AtomicLong()
+ private val stopOnReceive = AtomicLong(-1)
+ private val lastReceived = Array(nReceivers) { AtomicLong(-1) }
+
+ @After
+ fun tearDown() {
+ pool.close()
+ }
+
+ @Test
+ fun testStress() = runBlocking {
+ println("--- BroadcastChannelMultiReceiveStressTest $kind with nReceivers=$nReceivers")
+ val sender =
+ launch(pool + CoroutineName("Sender")) {
+ var i = 0L
+ while (isActive) {
+ broadcast.send(++i)
+ sentTotal.set(i) // set sentTotal only if `send` was not cancelled
+ }
+ }
+ val receivers = mutableListOf<Job>()
+ fun printProgress() {
+ println("Sent ${sentTotal.get()}, received ${receivedTotal.get()}, receivers=${receivers.size}")
+ }
+ // ramp up receivers
+ repeat(nReceivers) {
+ delay(100) // wait 0.1 sec
+ val receiverIndex = receivers.size
+ val name = "Receiver$receiverIndex"
+ println("Launching $name")
+ receivers += launch(pool + CoroutineName(name)) {
+ val channel = broadcast.openSubscription()
+ when (receiverIndex % 5) {
+ 0 -> doReceive(channel, receiverIndex)
+ 1 -> doReceiveOrNull(channel, receiverIndex)
+ 2 -> doIterator(channel, receiverIndex)
+ 3 -> doReceiveSelect(channel, receiverIndex)
+ 4 -> doReceiveSelectOrNull(channel, receiverIndex)
+ }
+ channel.cancel()
+ }
+ printProgress()
+ }
+ // wait
+ repeat(nSeconds) { _ ->
+ delay(1000)
+ printProgress()
+ }
+ sender.cancelAndJoin()
+ println("Tested $kind with nReceivers=$nReceivers")
+ val total = sentTotal.get()
+ println(" Sent $total events, waiting for receivers")
+ stopOnReceive.set(total)
+ try {
+ withTimeout(5000) {
+ receivers.forEachIndexed { index, receiver ->
+ if (lastReceived[index].get() == total)
+ receiver.cancel()
+ else
+ receiver.join()
+ }
+ }
+ } catch (e: Exception) {
+ println("Failed: $e")
+ pool.dumpThreads("Threads in pool")
+ receivers.indices.forEach { index ->
+ println("lastReceived[$index] = ${lastReceived[index].get()}")
+ }
+ throw e
+ }
+ println(" Received ${receivedTotal.get()} events")
+ }
+
+ private fun doReceived(receiverIndex: Int, i: Long): Boolean {
+ val last = lastReceived[receiverIndex].get()
+ check(i > last) { "Last was $last, got $i" }
+ if (last != -1L && !kind.isConflated)
+ check(i == last + 1) { "Last was $last, got $i" }
+ receivedTotal.incrementAndGet()
+ lastReceived[receiverIndex].set(i)
+ return i == stopOnReceive.get()
+ }
+
+ private suspend fun doReceive(channel: ReceiveChannel<Long>, receiverIndex: Int) {
+ while (true) {
+ try {
+ val stop = doReceived(receiverIndex, channel.receive())
+ if (stop) break
+ }
+ catch (ex: ClosedReceiveChannelException) { break }
+ }
+ }
+
+ private suspend fun doReceiveOrNull(channel: ReceiveChannel<Long>, receiverIndex: Int) {
+ while (true) {
+ val stop = doReceived(receiverIndex, channel.receiveOrNull() ?: break)
+ if (stop) break
+ }
+ }
+
+ private suspend fun doIterator(channel: ReceiveChannel<Long>, receiverIndex: Int) {
+ for (event in channel) {
+ val stop = doReceived(receiverIndex, event)
+ if (stop) break
+ }
+ }
+
+ private suspend fun doReceiveSelect(channel: ReceiveChannel<Long>, receiverIndex: Int) {
+ while (true) {
+ try {
+ val event = select<Long> { channel.onReceive { it } }
+ val stop = doReceived(receiverIndex, event)
+ if (stop) break
+ } catch (ex: ClosedReceiveChannelException) { break }
+ }
+ }
+
+ private suspend fun doReceiveSelectOrNull(channel: ReceiveChannel<Long>, receiverIndex: Int) {
+ while (true) {
+ val event = select<Long?> { channel.onReceiveOrNull { it } } ?: break
+ val stop = doReceived(receiverIndex, event)
+ if (stop) break
+ }
+ }
+} \ No newline at end of file