aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/common/test/channels/ConflatedBroadcastChannelTest.kt
blob: 7dd232f2d7a4454b57fbbb15b2a08db158232371 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
/*
 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
 */

package kotlinx.coroutines.channels

import kotlinx.coroutines.*
import kotlin.test.*

class ConflatedBroadcastChannelTest : TestBase() {

    @Test
    fun testConcurrentModification() = runTest {
        val channel = ConflatedBroadcastChannel<Int>()
        val s1 = channel.openSubscription()
        val s2 = channel.openSubscription()

        val job1 = launch(Dispatchers.Unconfined, CoroutineStart.UNDISPATCHED) {
            expect(1)
            s1.receive()
            s1.cancel()
        }

        val job2 = launch(Dispatchers.Unconfined, CoroutineStart.UNDISPATCHED) {
            expect(2)
            s2.receive()
        }

        expect(3)
        channel.send(1)
        joinAll(job1, job2)
        finish(4)
    }

    @Test
    fun testBasicScenario() = runTest {
        expect(1)
        val broadcast = ConflatedBroadcastChannel<String>()
        assertTrue(exceptionFrom { broadcast.value } is IllegalStateException)
        assertNull(broadcast.valueOrNull)

        launch(start = CoroutineStart.UNDISPATCHED) {
            expect(2)
            val sub = broadcast.openSubscription()
            assertNull(sub.poll())
            expect(3)
            assertEquals("one", sub.receive()) // suspends
            expect(6)
            assertEquals("two", sub.receive()) // suspends
            expect(12)
            sub.cancel()
            expect(13)
        }

        expect(4)
        broadcast.send("one") // does not suspend
        assertEquals("one", broadcast.value)
        assertEquals("one", broadcast.valueOrNull)
        expect(5)
        yield() // to receiver
        expect(7)
        launch(start = CoroutineStart.UNDISPATCHED) {
            expect(8)
            val sub = broadcast.openSubscription()
            assertEquals("one", sub.receive()) // does not suspend
            expect(9)
            assertEquals("two", sub.receive()) // suspends
            expect(14)
            assertEquals("three", sub.receive()) // suspends
            expect(17)
            assertNull(sub.receiveOrNull()) // suspends until closed
            expect(20)
            sub.cancel()
            expect(21)
        }

        expect(10)
        broadcast.send("two") // does not suspend
        assertEquals("two", broadcast.value)
        assertEquals("two", broadcast.valueOrNull)
        expect(11)
        yield() // to both receivers
        expect(15)
        broadcast.send("three") // does not suspend
        assertEquals("three", broadcast.value)
        assertEquals("three", broadcast.valueOrNull)
        expect(16)
        yield() // to second receiver
        expect(18)
        broadcast.close()
        assertTrue(exceptionFrom { broadcast.value } is IllegalStateException)
        assertNull(broadcast.valueOrNull)
        expect(19)
        yield() // to second receiver
        assertTrue(exceptionFrom { broadcast.send("four") } is ClosedSendChannelException)
        finish(22)
    }

    @Test
    fun testInitialValueAndReceiveClosed() = runTest {
        expect(1)
        val broadcast = ConflatedBroadcastChannel(1)
        assertEquals(1, broadcast.value)
        assertEquals(1, broadcast.valueOrNull)
        launch(start = CoroutineStart.UNDISPATCHED) {
            expect(2)
            val sub = broadcast.openSubscription()
            assertEquals(1, sub.receive())
            expect(3)
            assertTrue(exceptionFrom { sub.receive() } is ClosedReceiveChannelException) // suspends
            expect(6)
        }
        expect(4)
        broadcast.close()
        expect(5)
        yield() // to child
        finish(7)
    }

    private inline fun exceptionFrom(block: () -> Unit): Throwable? {
        return try {
            block()
            null
        } catch (e: Throwable) {
            e
        }
    }
}