aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt
blob: be4b2c7d453216925ed6fc85ac4d674cb1fb689b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/*
 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
 */

package kotlinx.coroutines.reactor

import kotlinx.coroutines.ExperimentalCoroutinesApi
import reactor.util.context.Context
import kotlin.coroutines.*
import kotlinx.coroutines.reactive.*

/**
 * Wraps Reactor's [Context] into [CoroutineContext] element for seamless integration Reactor and kotlinx.coroutines.
 * [Context.asCoroutineContext] is defined to add Reactor's [Context] elements as part of [CoroutineContext].
 * Coroutine context element that propagates information about Reactor's [Context] through coroutines.
 *
 * This context element is implicitly propagated through subscriber's context by all Reactive integrations, such as [mono], [flux],
 * [Publisher.asFlow][asFlow], [Flow.asPublisher][asPublisher] and [Flow.asFlux][asFlux].
 * Functions that subscribe to the reactive stream (e.g. [Publisher.awaitFirst][awaitFirst]) also propagate [ReactorContext] to the
 * subscriber's [Context].
 **
 * ### Examples of Reactive context integration.
 *
 * #### Propagating ReactorContext to Reactor's Context
 * ```
 * val flux = myDatabaseService.getUsers()
 *     .contextWrite { ctx -> println(ctx); ctx }
 * flux.await() // Will print "null"
 *
 * // Now add ReactorContext
 * withContext(Context.of("answer", "42").asCoroutineContext()) {
 *    flux.await() // Will print "Context{'key'='value'}"
 * }
 * ```
 *
 * #### Propagating subscriber's Context to ReactorContext:
 * ```
 * val flow = flow {
 *     println("Reactor context in Flow: " + coroutineContext[ReactorContext])
 * }
 * // No context
 * flow.asFlux()
 *     .subscribe() // Will print 'Reactor context in Flow: null'
 * // Add subscriber's context
 * flow.asFlux()
 *     .contextWrite { ctx -> ctx.put("answer", 42) }
 *     .subscribe() // Will print "Reactor context in Flow: Context{'answer'=42}"
 * ```
 */
@ExperimentalCoroutinesApi
public class ReactorContext(public val context: Context) : AbstractCoroutineContextElement(ReactorContext) {
    public companion object Key : CoroutineContext.Key<ReactorContext>
}

/**
 * Wraps the given [Context] into [ReactorContext], so it can be added to coroutine's context
 * and later used via `coroutineContext[ReactorContext]`.
 */
@ExperimentalCoroutinesApi
public fun Context.asCoroutineContext(): ReactorContext = ReactorContext(this)