aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Tolstopyatov <qwwdfsad@gmail.com>2019-09-09 19:43:57 +0300
committerRoman Elizarov <elizarov@gmail.com>2019-09-10 13:14:08 +0300
commitbf3305286c6942b60ca07cde59632e1237f83671 (patch)
tree193a65991754df70eeb6508d2cc9050d951a0f16
parentd02febfcab1106a99e3468f573dbaa34b7bd3fcb (diff)
downloadplatform_external_kotlinx.coroutines-bf3305286c6942b60ca07cde59632e1237f83671.tar.gz
platform_external_kotlinx.coroutines-bf3305286c6942b60ca07cde59632e1237f83671.tar.bz2
platform_external_kotlinx.coroutines-bf3305286c6942b60ca07cde59632e1237f83671.zip
Introduce InlineList to simplify helpClose logic, reverse helpClose resume order
-rw-r--r--kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt22
-rw-r--r--kotlinx-coroutines-core/common/src/internal/InlineList.kt46
-rw-r--r--kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt31
3 files changed, 85 insertions, 14 deletions
diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
index b5dfd95c..be18942d 100644
--- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
@@ -304,8 +304,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
* Now if T2's close resumes T1's receive_2 then it's receive gets "closed for receive" exception, but
* its subsequent attempt to send successfully rendezvous with receive_1, producing non-linearizable execution.
*/
- var closedNode: Receive<E>? = null // used when one node was closed to avoid extra memory allocation
- var closedList: ArrayList<Receive<E>>? = null // used when more nodes were closed
+ var closedList = InlineList<Receive<E>>()
while (true) {
// Break when channel is empty or has no receivers
@Suppress("UNCHECKED_CAST")
@@ -316,19 +315,14 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
previous.helpRemove() // make sure remove is complete before continuing
continue
}
- // add removed nodes to a separate list
- if (closedNode == null) {
- closedNode = previous
- } else {
- val list = closedList ?: ArrayList<Receive<E>>().also { closedList = it }
- list += previous
- }
- }
- // now notify all removed nodes that the channel was closed
- if (closedNode != null) {
- closedNode.resumeReceiveClosed(closed)
- closedList?.forEach { it.resumeReceiveClosed(closed) }
+ // add removed nodes to a separate list
+ closedList += previous
}
+ /*
+ * Now notify all removed nodes that the channel was closed
+ * in the order they were added to the channel
+ */
+ closedList.forEachReversed { it.resumeReceiveClosed(closed) }
// and do other post-processing
onClosedIdempotent(closed)
}
diff --git a/kotlinx-coroutines-core/common/src/internal/InlineList.kt b/kotlinx-coroutines-core/common/src/internal/InlineList.kt
new file mode 100644
index 00000000..062a9100
--- /dev/null
+++ b/kotlinx-coroutines-core/common/src/internal/InlineList.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+@file:Suppress("UNCHECKED_CAST")
+
+package kotlinx.coroutines.internal
+
+import kotlinx.coroutines.assert
+
+/*
+ * Inline class that represents a mutable list, but does not allocate an underlying storage
+ * for zero and one elements.
+ * Cannot be parametrized with `List<*>`.
+ */
+internal inline class InlineList<E>(private val holder: Any? = null) {
+ public operator fun plus(element: E): InlineList<E> {
+ assert { element !is List<*> } // Lists are prohibited
+ return when (holder) {
+ null -> InlineList(element)
+ is ArrayList<*> -> {
+ (holder as ArrayList<E>).add(element)
+ InlineList(holder)
+ }
+ else -> {
+ val list = ArrayList<E>(4)
+ list.add(holder as E)
+ list.add(element)
+ InlineList(list)
+ }
+ }
+ }
+
+ public inline fun forEachReversed(action: (E) -> Unit) {
+ when (holder) {
+ null -> return
+ !is ArrayList<*> -> action(holder as E)
+ else -> {
+ val list = holder as ArrayList<E>
+ for (i in (list.size - 1) downTo 0) {
+ action(list[i])
+ }
+ }
+ }
+ }
+}
diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt
index 983f353f..42cc8555 100644
--- a/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt
@@ -20,6 +20,37 @@ class ChannelsTest: TestBase() {
}
@Test
+ fun testCloseWithMultipleWaiters() = runTest {
+ val channel = Channel<Int>()
+ launch {
+ try {
+ expect(2)
+ channel.receive()
+ expectUnreached()
+ } catch (e: ClosedReceiveChannelException) {
+ expect(5)
+ }
+ }
+
+ launch {
+ try {
+ expect(3)
+ channel.receive()
+ expectUnreached()
+ } catch (e: ClosedReceiveChannelException) {
+ expect(6)
+ }
+ }
+
+ expect(1)
+ yield()
+ expect(4)
+ channel.close()
+ yield()
+ finish(7)
+ }
+
+ @Test
fun testAssociate() = runTest {
assertEquals(testList.associate { it * 2 to it * 3 },
testList.asReceiveChannel().associate { it * 2 to it * 3 }.toMap())