aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt
blob: 577238be1d1c30d46ffe9c2b58bdc230fa20a0e1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package kotlinx.coroutines.reactor

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
import org.junit.Test
import reactor.core.publisher.*
import reactor.util.context.*
import kotlin.coroutines.*
import kotlin.test.*

class ReactorContextTest : TestBase() {
    @Test
    fun testMonoHookedContext() = runBlocking {
        val mono = mono(Context.of(1, "1", 7, "7").asCoroutineContext()) {
            val ctx = reactorContext()
            buildString {
                (1..7).forEach { append(ctx.getOrDefault(it, "noValue")) }
            }
        }  .contextWrite(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
           .contextWrite { ctx -> ctx.put(6, "6") }
        assertEquals(mono.awaitSingle(), "1234567")
    }

    @Test
    fun testFluxContext() {
        val flux = flux(Context.of(1, "1", 7, "7").asCoroutineContext()) {
            val ctx = reactorContext()
            (1..7).forEach { send(ctx.getOrDefault(it, "noValue")) }
        }
            .contextWrite(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
            .contextWrite { ctx -> ctx.put(6, "6") }
        val list = flux.collectList().block()!!
        assertEquals((1..7).map { it.toString() }, list)
    }

    @Test
    fun testAwait() = runBlocking(Context.of(3, "3").asCoroutineContext()) {
        val result = mono(Context.of(1, "1").asCoroutineContext()) {
            val ctx = reactorContext()
            buildString {
                (1..3).forEach { append(ctx.getOrDefault(it, "noValue")) }
            }
        }  .contextWrite(Context.of(2, "2"))
            .awaitSingle()
        assertEquals(result, "123")
    }

    @Test
    fun testMonoAwaitContextPropagation() = runBlocking(Context.of(7, "7").asCoroutineContext()) {
        assertEquals(createMono().awaitSingle(), "7")
        assertEquals(createMono().awaitSingleOrNull(), "7")
    }

    @Test
    fun testFluxAwaitContextPropagation() = runBlocking(
        Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()
    ) {
        assertEquals(createFlux().awaitFirst(), "1")
        assertEquals(createFlux().awaitFirstOrDefault("noValue"), "1")
        assertEquals(createFlux().awaitFirstOrNull(), "1")
        assertEquals(createFlux().awaitFirstOrElse { "noValue" }, "1")
        assertEquals(createFlux().awaitLast(), "3")
    }

    private fun createMono(): Mono<String> = mono {
        val ctx = reactorContext()
        ctx.getOrDefault(7, "noValue")
    }


    private fun createFlux(): Flux<String?> = flux {
        val ctx = reactorContext()
        (1..3).forEach { send(ctx.getOrDefault(it, "noValue")) }
    }

    @Test
    fun testFlowToFluxContextPropagation() = runBlocking(
        Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()
    ) {
        var i = 0
        // call "collect" on the converted Flow
        bar().collect { str ->
            i++; assertEquals(str, i.toString())
        }
        assertEquals(i, 3)
    }

    @Test
    fun testFlowToFluxDirectContextPropagation() = runBlocking(
        Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()
    ) {
        // convert resulting flow to channel using "produceIn"
        val channel = bar().produceIn(this)
        val list = channel.toList()
        assertEquals(listOf("1", "2", "3"), list)
    }

    private fun bar(): Flow<String> = flux {
        val ctx = reactorContext()
        (1..3).forEach { send(ctx.getOrDefault(it, "noValue")) }
    }.asFlow()

    private suspend fun reactorContext() =
        coroutineContext[ReactorContext]!!.context
}