diff options
author | Vsevolod Tolstopyatov <qwwdfsad@gmail.com> | 2019-09-09 19:43:57 +0300 |
---|---|---|
committer | Roman Elizarov <elizarov@gmail.com> | 2019-09-10 13:14:08 +0300 |
commit | bf3305286c6942b60ca07cde59632e1237f83671 (patch) | |
tree | 193a65991754df70eeb6508d2cc9050d951a0f16 | |
parent | d02febfcab1106a99e3468f573dbaa34b7bd3fcb (diff) | |
download | platform_external_kotlinx.coroutines-bf3305286c6942b60ca07cde59632e1237f83671.tar.gz platform_external_kotlinx.coroutines-bf3305286c6942b60ca07cde59632e1237f83671.tar.bz2 platform_external_kotlinx.coroutines-bf3305286c6942b60ca07cde59632e1237f83671.zip |
Introduce InlineList to simplify helpClose logic, reverse helpClose resume order
3 files changed, 85 insertions, 14 deletions
diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt index b5dfd95c..be18942d 100644 --- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt @@ -304,8 +304,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> { * Now if T2's close resumes T1's receive_2 then it's receive gets "closed for receive" exception, but * its subsequent attempt to send successfully rendezvous with receive_1, producing non-linearizable execution. */ - var closedNode: Receive<E>? = null // used when one node was closed to avoid extra memory allocation - var closedList: ArrayList<Receive<E>>? = null // used when more nodes were closed + var closedList = InlineList<Receive<E>>() while (true) { // Break when channel is empty or has no receivers @Suppress("UNCHECKED_CAST") @@ -316,19 +315,14 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> { previous.helpRemove() // make sure remove is complete before continuing continue } - // add removed nodes to a separate list - if (closedNode == null) { - closedNode = previous - } else { - val list = closedList ?: ArrayList<Receive<E>>().also { closedList = it } - list += previous - } - } - // now notify all removed nodes that the channel was closed - if (closedNode != null) { - closedNode.resumeReceiveClosed(closed) - closedList?.forEach { it.resumeReceiveClosed(closed) } + // add removed nodes to a separate list + closedList += previous } + /* + * Now notify all removed nodes that the channel was closed + * in the order they were added to the channel + */ + closedList.forEachReversed { it.resumeReceiveClosed(closed) } // and do other post-processing onClosedIdempotent(closed) } diff --git a/kotlinx-coroutines-core/common/src/internal/InlineList.kt b/kotlinx-coroutines-core/common/src/internal/InlineList.kt new file mode 100644 index 00000000..062a9100 --- /dev/null +++ b/kotlinx-coroutines-core/common/src/internal/InlineList.kt @@ -0,0 +1,46 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:Suppress("UNCHECKED_CAST") + +package kotlinx.coroutines.internal + +import kotlinx.coroutines.assert + +/* + * Inline class that represents a mutable list, but does not allocate an underlying storage + * for zero and one elements. + * Cannot be parametrized with `List<*>`. + */ +internal inline class InlineList<E>(private val holder: Any? = null) { + public operator fun plus(element: E): InlineList<E> { + assert { element !is List<*> } // Lists are prohibited + return when (holder) { + null -> InlineList(element) + is ArrayList<*> -> { + (holder as ArrayList<E>).add(element) + InlineList(holder) + } + else -> { + val list = ArrayList<E>(4) + list.add(holder as E) + list.add(element) + InlineList(list) + } + } + } + + public inline fun forEachReversed(action: (E) -> Unit) { + when (holder) { + null -> return + !is ArrayList<*> -> action(holder as E) + else -> { + val list = holder as ArrayList<E> + for (i in (list.size - 1) downTo 0) { + action(list[i]) + } + } + } + } +} diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt index 983f353f..42cc8555 100644 --- a/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt @@ -20,6 +20,37 @@ class ChannelsTest: TestBase() { } @Test + fun testCloseWithMultipleWaiters() = runTest { + val channel = Channel<Int>() + launch { + try { + expect(2) + channel.receive() + expectUnreached() + } catch (e: ClosedReceiveChannelException) { + expect(5) + } + } + + launch { + try { + expect(3) + channel.receive() + expectUnreached() + } catch (e: ClosedReceiveChannelException) { + expect(6) + } + } + + expect(1) + yield() + expect(4) + channel.close() + yield() + finish(7) + } + + @Test fun testAssociate() = runTest { assertEquals(testList.associate { it * 2 to it * 3 }, testList.asReceiveChannel().associate { it * 2 to it * 3 }.toMap()) |