aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt
blob: f7f60cf97d3c7c794aabb6a0bddb34194b494c9b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
/*
 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
 */

package kotlinx.coroutines.channels

import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.selects.*

/**
 * Channel that buffers at most one element and conflates all subsequent `send` and `trySend` invocations,
 * so that the receiver always gets the most recently sent element.
 * Back-to-send sent elements are _conflated_ -- only the most recently sent element is received,
 * while previously sent elements **are lost**.
 * Sender to this channel never suspends and [trySend] always succeeds.
 *
 * This channel is created by `Channel(Channel.CONFLATED)` factory function invocation.
 */
internal open class ConflatedChannel<E>(onUndeliveredElement: OnUndeliveredElement<E>?) : AbstractChannel<E>(onUndeliveredElement) {
    protected final override val isBufferAlwaysEmpty: Boolean get() = false
    protected final override val isBufferEmpty: Boolean get() = value === EMPTY
    protected final override val isBufferAlwaysFull: Boolean get() = false
    protected final override val isBufferFull: Boolean get() = false

    override val isEmpty: Boolean get() = lock.withLock { isEmptyImpl }

    private val lock = ReentrantLock()

    private var value: Any? = EMPTY

    // result is `OFFER_SUCCESS | Closed`
    protected override fun offerInternal(element: E): Any {
        var receive: ReceiveOrClosed<E>? = null
        lock.withLock {
            closedForSend?.let { return it }
            // if there is no element written in buffer
            if (value === EMPTY) {
                // check for receivers that were waiting on the empty buffer
                loop@ while(true) {
                    receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
                    if (receive is Closed) {
                        return receive!!
                    }
                    val token = receive!!.tryResumeReceive(element, null)
                    if (token != null) {
                        assert { token === RESUME_TOKEN }
                        return@withLock
                    }
                }
            }
            updateValueLocked(element)?.let { throw it }
            return OFFER_SUCCESS
        }
        // breaks here if offer meets receiver
        receive!!.completeResumeReceive(element)
        return receive!!.offerResult
    }

    // result is `ALREADY_SELECTED | OFFER_SUCCESS | Closed`
    protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
        var receive: ReceiveOrClosed<E>? = null
        lock.withLock {
            closedForSend?.let { return it }
            if (value === EMPTY) {
                loop@ while(true) {
                    val offerOp = describeTryOffer(element)
                    val failure = select.performAtomicTrySelect(offerOp)
                    when {
                        failure == null -> { // offered successfully
                            receive = offerOp.result
                            return@withLock
                        }
                        failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
                        failure === RETRY_ATOMIC -> {} // retry
                        failure === ALREADY_SELECTED || failure is Closed<*> -> return failure
                        else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
                    }
                }
            }
            // try to select sending this element to buffer
            if (!select.trySelect()) {
                return ALREADY_SELECTED
            }
            updateValueLocked(element)?.let { throw it }
            return OFFER_SUCCESS
        }
        // breaks here if offer meets receiver
        receive!!.completeResumeReceive(element)
        return receive!!.offerResult
    }

    // result is `E | POLL_FAILED | Closed`
    protected override fun pollInternal(): Any? {
        var result: Any? = null
        lock.withLock {
            if (value === EMPTY) return closedForSend ?: POLL_FAILED
            result = value
            value = EMPTY
        }
        return result
    }

    // result is `E | POLL_FAILED | Closed`
    protected override fun pollSelectInternal(select: SelectInstance<*>): Any? {
        var result: Any? = null
        lock.withLock {
            if (value === EMPTY) return closedForSend ?: POLL_FAILED
            if (!select.trySelect())
                return ALREADY_SELECTED
            result = value
            value = EMPTY
        }
        return result
    }

    protected override fun onCancelIdempotent(wasClosed: Boolean) {
        var undeliveredElementException: UndeliveredElementException? = null // resource cancel exception
        lock.withLock {
            undeliveredElementException = updateValueLocked(EMPTY)
        }
        super.onCancelIdempotent(wasClosed)
        undeliveredElementException?.let { throw it } // throw UndeliveredElementException at the end if there was one
    }

    @Suppress("UNCHECKED_CAST")
    private fun updateValueLocked(element: Any?): UndeliveredElementException? {
        val old = value
        val undeliveredElementException = if (old === EMPTY) null else
            onUndeliveredElement?.callUndeliveredElementCatchingException(old as E)
        value = element
        return undeliveredElementException
    }

    override fun enqueueReceiveInternal(receive: Receive<E>): Boolean = lock.withLock {
        super.enqueueReceiveInternal(receive)
    }

    // ------ debug ------

    override val bufferDebugString: String
        get() = "(value=$value)"
}