aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt')
-rw-r--r--reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt27
1 files changed, 27 insertions, 0 deletions
diff --git a/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt
new file mode 100644
index 00000000..2f8ce9ac
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt
@@ -0,0 +1,27 @@
+package kotlinx.coroutines.reactor
+
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.reactive.*
+import kotlinx.coroutines.runBlocking
+import org.junit.Test
+import reactor.core.publisher.Mono
+import reactor.util.context.Context
+import kotlin.test.assertEquals
+
+class FlowAsFluxTest {
+ @Test
+ fun testFlowToFluxContextPropagation() = runBlocking<Unit> {
+ val flux = flow<String> {
+ (1..4).forEach { i -> emit(m(i).awaitFirst()) }
+ } .asFlux()
+ .subscriberContext(Context.of(1, "1"))
+ .subscriberContext(Context.of(2, "2", 3, "3", 4, "4"))
+ var i = 0
+ flux.subscribe { str -> i++; println(str); assertEquals(str, i.toString()) }
+ }
+
+ private fun m(i: Int): Mono<String> = mono {
+ val ctx = coroutineContext[ReactorContext]?.context
+ ctx?.getOrDefault(i, "noValue")
+ }
+} \ No newline at end of file