aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRoman Elizarov <elizarov@gmail.com>2019-08-23 15:57:59 +0300
committerRoman Elizarov <elizarov@gmail.com>2019-08-26 19:25:08 +0300
commit5ea2c05211a6ffe5aaf302291311fbc31c278c06 (patch)
treee9c6ad6584eada3e0d3c4dea1df3190f223dcc98
parent2907df9783bdfa85f85967dffb1a45f1567c21da (diff)
downloadplatform_external_kotlinx.coroutines-5ea2c05211a6ffe5aaf302291311fbc31c278c06.tar.gz
platform_external_kotlinx.coroutines-5ea2c05211a6ffe5aaf302291311fbc31c278c06.tar.bz2
platform_external_kotlinx.coroutines-5ea2c05211a6ffe5aaf302291311fbc31c278c06.zip
Add ChannelLFStressTest
It tests lock-freedom on send and receive operations on rendezvous and conflated channels. The test is comprehensive enough to fail on buffered channels (which are currently not lock-free) and will help us ensuring that lock-freedom property is not lost while channels are being improved and refactored.
-rw-r--r--gradle.properties6
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/ChannelLFStressTest.kt110
2 files changed, 115 insertions, 1 deletions
diff --git a/gradle.properties b/gradle.properties
index 6b3060e6..316be1c2 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,3 +1,7 @@
+#
+# Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+#
+
# Kotlin
version=1.3.0-SNAPSHOT
group=org.jetbrains.kotlinx
@@ -5,7 +9,7 @@ kotlin_version=1.3.50
# Dependencies
junit_version=4.12
-atomicfu_version=0.12.10
+atomicfu_version=0.12.11
html_version=0.6.8
lincheck_version=2.0
dokka_version=0.9.16-rdev-2-mpp-hacks
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelLFStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelLFStressTest.kt
new file mode 100644
index 00000000..67bd68ac
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelLFStressTest.kt
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.channels
+
+import kotlinx.atomicfu.*
+import kotlinx.coroutines.*
+import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.atomic.AtomicLongArray
+import kotlin.math.*
+import kotlin.test.*
+
+/**
+ * Tests lock-freedom of send and receive operations on rendezvous and conflated channels.
+ * There is a single channel with two sender and two receiver threads.
+ * When one sender or receiver gets suspended at most one other operation is allowed to cease having progress
+ * (`allowSuspendedThreads = 1`).
+ *
+ * **Note**: In the current implementation buffered channels are not lock-free, so this test would fail
+ * if channel is created with a buffer.
+ */
+class ChannelLFStressTest : TestBase() {
+ private val nSeconds = 5 * stressTestMultiplier
+ private val env = LockFreedomTestEnvironment("ChannelLFStressTest", allowSuspendedThreads = 1)
+ private lateinit var channel: Channel<Long>
+
+ private val sendIndex = AtomicLong()
+ private val receiveCount = AtomicLong()
+ private val duplicateCount = AtomicLong()
+
+ private val nCheckedSize = 10_000_000
+ private val nChecked = (nCheckedSize * Long.SIZE_BITS).toLong()
+ private val receivedBits = AtomicLongArray(nCheckedSize) // bit set of received values
+
+ @Test
+ fun testRendezvousLockFreedom() {
+ channel = Channel()
+ performLockFreedomTest()
+ // ensure that all sent were received
+ checkAllReceived()
+ }
+
+ @Test
+ fun testConflatedLockFreedom() {
+ // This test does not really verify that all sent elements were received
+ // and checks only LF property
+ channel = Channel(Channel.CONFLATED)
+ performLockFreedomTest()
+ }
+
+ private fun performLockFreedomTest() {
+ env.onCompletion { channel.close() }
+ repeat(2) { env.testThread { sender() } }
+ repeat(2) { env.testThread { receiver() } }
+ env.performTest(nSeconds) {
+ println("Sent: $sendIndex, Received: $receiveCount, dups: $duplicateCount")
+ }
+ // ensure no duplicates
+ assertEquals(0L, duplicateCount.get())
+ }
+
+ private fun checkAllReceived() {
+ for (i in 0 until min(sendIndex.get(), nChecked)) {
+ assertTrue(isReceived(i))
+ }
+ }
+
+ private suspend fun sender() {
+ val value = sendIndex.getAndIncrement()
+ try {
+ channel.send(value)
+ } catch (e: ClosedSendChannelException) {
+ check(env.isCompleted) // expected when test was completed
+ markReceived(value) // fake received (actually failed to send)
+ }
+ }
+
+ private suspend fun receiver() {
+ val value = try {
+ channel.receive()
+ } catch (e: ClosedReceiveChannelException) {
+ check(env.isCompleted) // expected when test was completed
+ return
+ }
+ receiveCount.incrementAndGet()
+ markReceived(value)
+ }
+
+ private fun markReceived(value: Long) {
+ if (value >= nChecked) return // too big
+ val index = (value / Long.SIZE_BITS).toInt()
+ val mask = 1L shl (value % Long.SIZE_BITS).toInt()
+ while (true) {
+ val bits = receivedBits.get(index)
+ if (bits and mask != 0L) {
+ duplicateCount.incrementAndGet()
+ break
+ }
+ if (receivedBits.compareAndSet(index, bits, bits or mask)) break
+ }
+ }
+
+ private fun isReceived(value: Long): Boolean {
+ val index = (value / Long.SIZE_BITS).toInt()
+ val mask = 1L shl (value % Long.SIZE_BITS).toInt()
+ val bits = receivedBits.get(index)
+ return bits and mask != 0L
+ }
+}