diff options
author | Roman Elizarov <elizarov@gmail.com> | 2019-08-23 15:57:59 +0300 |
---|---|---|
committer | Roman Elizarov <elizarov@gmail.com> | 2019-08-26 19:25:08 +0300 |
commit | 5ea2c05211a6ffe5aaf302291311fbc31c278c06 (patch) | |
tree | e9c6ad6584eada3e0d3c4dea1df3190f223dcc98 | |
parent | 2907df9783bdfa85f85967dffb1a45f1567c21da (diff) | |
download | platform_external_kotlinx.coroutines-5ea2c05211a6ffe5aaf302291311fbc31c278c06.tar.gz platform_external_kotlinx.coroutines-5ea2c05211a6ffe5aaf302291311fbc31c278c06.tar.bz2 platform_external_kotlinx.coroutines-5ea2c05211a6ffe5aaf302291311fbc31c278c06.zip |
Add ChannelLFStressTest
It tests lock-freedom on send and receive operations on rendezvous
and conflated channels. The test is comprehensive enough to fail on
buffered channels (which are currently not lock-free) and will help us
ensuring that lock-freedom property is not lost while channels are
being improved and refactored.
-rw-r--r-- | gradle.properties | 6 | ||||
-rw-r--r-- | kotlinx-coroutines-core/jvm/test/channels/ChannelLFStressTest.kt | 110 |
2 files changed, 115 insertions, 1 deletions
diff --git a/gradle.properties b/gradle.properties index 6b3060e6..316be1c2 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,7 @@ +# +# Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. +# + # Kotlin version=1.3.0-SNAPSHOT group=org.jetbrains.kotlinx @@ -5,7 +9,7 @@ kotlin_version=1.3.50 # Dependencies junit_version=4.12 -atomicfu_version=0.12.10 +atomicfu_version=0.12.11 html_version=0.6.8 lincheck_version=2.0 dokka_version=0.9.16-rdev-2-mpp-hacks diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelLFStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelLFStressTest.kt new file mode 100644 index 00000000..67bd68ac --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelLFStressTest.kt @@ -0,0 +1,110 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.channels + +import kotlinx.atomicfu.* +import kotlinx.coroutines.* +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.atomic.AtomicLongArray +import kotlin.math.* +import kotlin.test.* + +/** + * Tests lock-freedom of send and receive operations on rendezvous and conflated channels. + * There is a single channel with two sender and two receiver threads. + * When one sender or receiver gets suspended at most one other operation is allowed to cease having progress + * (`allowSuspendedThreads = 1`). + * + * **Note**: In the current implementation buffered channels are not lock-free, so this test would fail + * if channel is created with a buffer. + */ +class ChannelLFStressTest : TestBase() { + private val nSeconds = 5 * stressTestMultiplier + private val env = LockFreedomTestEnvironment("ChannelLFStressTest", allowSuspendedThreads = 1) + private lateinit var channel: Channel<Long> + + private val sendIndex = AtomicLong() + private val receiveCount = AtomicLong() + private val duplicateCount = AtomicLong() + + private val nCheckedSize = 10_000_000 + private val nChecked = (nCheckedSize * Long.SIZE_BITS).toLong() + private val receivedBits = AtomicLongArray(nCheckedSize) // bit set of received values + + @Test + fun testRendezvousLockFreedom() { + channel = Channel() + performLockFreedomTest() + // ensure that all sent were received + checkAllReceived() + } + + @Test + fun testConflatedLockFreedom() { + // This test does not really verify that all sent elements were received + // and checks only LF property + channel = Channel(Channel.CONFLATED) + performLockFreedomTest() + } + + private fun performLockFreedomTest() { + env.onCompletion { channel.close() } + repeat(2) { env.testThread { sender() } } + repeat(2) { env.testThread { receiver() } } + env.performTest(nSeconds) { + println("Sent: $sendIndex, Received: $receiveCount, dups: $duplicateCount") + } + // ensure no duplicates + assertEquals(0L, duplicateCount.get()) + } + + private fun checkAllReceived() { + for (i in 0 until min(sendIndex.get(), nChecked)) { + assertTrue(isReceived(i)) + } + } + + private suspend fun sender() { + val value = sendIndex.getAndIncrement() + try { + channel.send(value) + } catch (e: ClosedSendChannelException) { + check(env.isCompleted) // expected when test was completed + markReceived(value) // fake received (actually failed to send) + } + } + + private suspend fun receiver() { + val value = try { + channel.receive() + } catch (e: ClosedReceiveChannelException) { + check(env.isCompleted) // expected when test was completed + return + } + receiveCount.incrementAndGet() + markReceived(value) + } + + private fun markReceived(value: Long) { + if (value >= nChecked) return // too big + val index = (value / Long.SIZE_BITS).toInt() + val mask = 1L shl (value % Long.SIZE_BITS).toInt() + while (true) { + val bits = receivedBits.get(index) + if (bits and mask != 0L) { + duplicateCount.incrementAndGet() + break + } + if (receivedBits.compareAndSet(index, bits, bits or mask)) break + } + } + + private fun isReceived(value: Long): Boolean { + val index = (value / Long.SIZE_BITS).toInt() + val mask = 1L shl (value % Long.SIZE_BITS).toInt() + val bits = receivedBits.get(index) + return bits and mask != 0L + } +} |