aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt
blob: 488695dea217f7a56cbe599c3fa55954c63a5f3f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/*
 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
 */

package kotlinx.coroutines.jdk9

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import org.junit.Test
import java.util.concurrent.Flow as JFlow
import kotlin.test.*

class FlowAsPublisherTest : TestBase() {

    @Test
    fun testErrorOnCancellationIsReported() {
        expect(1)
        flow<Int> {
            try {
                emit(2)
            } finally {
                expect(3)
                throw TestException()
            }
        }.asPublisher().subscribe(object : JFlow.Subscriber<Int> {
            private lateinit var subscription: JFlow.Subscription

            override fun onComplete() {
                expectUnreached()
            }

            override fun onSubscribe(s: JFlow.Subscription?) {
                subscription = s!!
                subscription.request(2)
            }

            override fun onNext(t: Int) {
                expect(t)
                subscription.cancel()
            }

            override fun onError(t: Throwable?) {
                assertTrue(t is TestException)
                expect(4)
            }
        })
        finish(5)
    }

    @Test
    fun testCancellationIsNotReported() {
        expect(1)
        flow<Int>    {
            emit(2)
        }.asPublisher().subscribe(object : JFlow.Subscriber<Int> {
            private lateinit var subscription: JFlow.Subscription

            override fun onComplete() {
                expect(3)
            }

            override fun onSubscribe(s: JFlow.Subscription?) {
                subscription = s!!
                subscription.request(2)
            }

            override fun onNext(t: Int) {
                expect(t)
                subscription.cancel()
            }

            override fun onError(t: Throwable?) {
                expectUnreached()
            }
        })
        finish(4)
    }
}