blob: cf935f97dc78f2c6f960513dcc4ac9a177285289 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:Suppress("UNCHECKED_CAST")
package kotlinx.coroutines.reactive
import kotlinx.coroutines.flow.*
import org.junit.Ignore
import org.junit.Test
import org.reactivestreams.*
import org.reactivestreams.tck.*
import java.util.concurrent.*
import java.util.concurrent.ForkJoinPool.*
import kotlin.test.*
class IterableFlowTckTest : PublisherVerification<Long>(TestEnvironment()) {
private fun generate(num: Long): Array<Long> {
return Array(if (num >= Integer.MAX_VALUE) 1000000 else num.toInt()) { it.toLong() }
}
override fun createPublisher(elements: Long): Publisher<Long> {
return generate(elements).asIterable().asFlow().asPublisher()
}
@Suppress("SubscriberImplementation")
override fun createFailedPublisher(): Publisher<Long>? {
/*
* This is a hack for our adapter structure:
* Tests assume that calling "collect" is enough for publisher to fail and it is not
* true for our implementation
*/
val pub = { error(42) }.asFlow().asPublisher()
return Publisher { subscriber ->
pub.subscribe(object : Subscriber<Long> by subscriber as Subscriber<Long> {
override fun onSubscribe(s: Subscription) {
subscriber.onSubscribe(s)
s.request(1)
}
})
}
}
@Test
fun testStackOverflowTrampoline() {
val latch = CountDownLatch(1)
val collected = ArrayList<Long>()
val toRequest = 1000L
val array = generate(toRequest)
val publisher = array.asIterable().asFlow().asPublisher()
publisher.subscribe(object : Subscriber<Long> {
private lateinit var s: Subscription
override fun onSubscribe(s: Subscription) {
this.s = s
s.request(1)
}
override fun onNext(aLong: Long) {
collected.add(aLong)
s.request(1)
}
override fun onError(t: Throwable) {
}
override fun onComplete() {
latch.countDown()
}
})
latch.await(5, TimeUnit.SECONDS)
assertEquals(collected, array.toList())
}
@Test
fun testConcurrentRequest() {
val latch = CountDownLatch(1)
val collected = ArrayList<Long>()
val n = 50000L
val array = generate(n)
val publisher = array.asIterable().asFlow().asPublisher()
publisher.subscribe(object : Subscriber<Long> {
private var s: Subscription? = null
override fun onSubscribe(s: Subscription) {
this.s = s
for (i in 0..n) {
commonPool().execute { s.request(1) }
}
}
override fun onNext(aLong: Long) {
collected.add(aLong)
}
override fun onError(t: Throwable) {
}
override fun onComplete() {
latch.countDown()
}
})
latch.await()
assertEquals(array.toList(), collected)
}
@Ignore
override fun required_spec309_requestZeroMustSignalIllegalArgumentException() {
}
@Ignore
override fun required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() {
}
@Ignore
override fun required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() {
// This test has a bug in it
}
}
|