aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-reactive/test/IterableFlowTckTest.kt
blob: 906b2579d7fa35e30162a2bb3915f2cba21d4bb5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
/*
 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
 */

@file:Suppress("UNCHECKED_CAST")

package kotlinx.coroutines.reactive

import kotlinx.coroutines.flow.*
import org.junit.*
import org.junit.Ignore
import org.junit.Test
import org.reactivestreams.*
import org.reactivestreams.tck.*

import org.reactivestreams.Subscription
import org.reactivestreams.Subscriber
import java.util.ArrayList
import java.util.concurrent.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ForkJoinPool.commonPool
import kotlin.test.*

class IterableFlowTckTest : PublisherVerification<Long>(TestEnvironment()) {

    private fun generate(num: Long): Array<Long> {
        return Array(if (num >= Integer.MAX_VALUE) 1000000 else num.toInt()) { it.toLong() }
    }

    override fun createPublisher(elements: Long): Publisher<Long> {
        return generate(elements).asIterable().asFlow().asPublisher()
    }

    @Suppress("SubscriberImplementation")
    override fun createFailedPublisher(): Publisher<Long>? {
        /*
         * This is a hack for our adapter structure:
         * Tests assume that calling "collect" is enough for publisher to fail and it is not
         * true for our implementation
         */
        val pub = { error(42) }.asFlow().asPublisher()
        return Publisher { subscriber ->
            pub.subscribe(object : Subscriber<Long> by subscriber as Subscriber<Long> {
                override fun onSubscribe(s: Subscription) {
                    subscriber.onSubscribe(s)
                    s.request(1)
                }
            })
        }
    }

    @Test
    fun testStackOverflowTrampoline() {
        val latch = CountDownLatch(1)
        val collected = ArrayList<Long>()
        val toRequest = 1000L
        val array = generate(toRequest)
        val publisher = array.asIterable().asFlow().asPublisher()

        publisher.subscribe(object : Subscriber<Long> {
            private lateinit var s: Subscription

            override fun onSubscribe(s: Subscription) {
                this.s = s
                s.request(1)
            }

            override fun onNext(aLong: Long) {
                collected.add(aLong)

                s.request(1)
            }

            override fun onError(t: Throwable) {

            }

            override fun onComplete() {
                latch.countDown()
            }
        })

        latch.await(5, TimeUnit.SECONDS)
        assertEquals(collected, array.toList())
    }

    @Test
    fun testConcurrentRequest() {
        val latch = CountDownLatch(1)
        val collected = ArrayList<Long>()
        val n = 50000L
        val array = generate(n)
        val publisher = array.asIterable().asFlow().asPublisher()

        publisher.subscribe(object : Subscriber<Long> {
            private var s: Subscription? = null

            override fun onSubscribe(s: Subscription) {
                this.s = s
                for (i in 0 until n) {
                    commonPool().execute { s.request(1) }
                }
            }

            override fun onNext(aLong: Long) {
                collected.add(aLong)
            }

            override fun onError(t: Throwable) {

            }

            override fun onComplete() {
                latch.countDown()
            }
        })

        latch.await(50, TimeUnit.SECONDS)
        assertEquals(array.toList(), collected)
    }

    @Ignore
    override fun required_spec309_requestZeroMustSignalIllegalArgumentException() {
    }

    @Ignore
    override fun required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() {
    }

    @Ignore
    override fun required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() {
        // This test has a bug in it
    }
}