blob: 680786f9b3004cd432ce7750d01b782e8aa83808 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.rx3
import io.reactivex.rxjava3.core.ObservableSource
import io.reactivex.rxjava3.disposables.*
import kotlinx.coroutines.*
import org.junit.Test
import kotlin.test.*
class ObservableCollectTest: TestBase() {
/** Tests the behavior of [collect] when the publisher raises an error. */
@Test
fun testObservableCollectThrowingObservable() = runTest {
expect(1)
var sum = 0
try {
rxObservable {
for (i in 0..100) {
send(i)
}
throw TestException()
}.collect {
sum += it
}
} catch (e: TestException) {
assertTrue(sum > 0)
finish(2)
}
}
@Test
fun testObservableCollectThrowingAction() = runTest {
expect(1)
var sum = 0
val expectedSum = 5
try {
var disposed = false
ObservableSource<Int> { observer ->
launch(Dispatchers.Default) {
observer.onSubscribe(object : Disposable {
override fun dispose() {
disposed = true
expect(expectedSum + 2)
}
override fun isDisposed(): Boolean = disposed
})
while (!disposed) {
observer.onNext(1)
}
}
}.collect {
expect(sum + 2)
sum += it
if (sum == expectedSum) {
throw TestException()
}
}
} catch (e: TestException) {
assertEquals(expectedSum, sum)
finish(expectedSum + 3)
}
}
}
|