aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt')
-rw-r--r--reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt50
1 files changed, 35 insertions, 15 deletions
diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt
index be4b2c7d..d9228409 100644
--- a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt
+++ b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt
@@ -5,19 +5,21 @@
package kotlinx.coroutines.reactor
import kotlinx.coroutines.ExperimentalCoroutinesApi
-import reactor.util.context.Context
import kotlin.coroutines.*
import kotlinx.coroutines.reactive.*
+import reactor.util.context.*
/**
- * Wraps Reactor's [Context] into [CoroutineContext] element for seamless integration Reactor and kotlinx.coroutines.
- * [Context.asCoroutineContext] is defined to add Reactor's [Context] elements as part of [CoroutineContext].
- * Coroutine context element that propagates information about Reactor's [Context] through coroutines.
+ * Wraps Reactor's [Context] into a [CoroutineContext] element for seamless integration between
+ * Reactor and kotlinx.coroutines.
+ * [Context.asCoroutineContext] puts Reactor's [Context] elements into a [CoroutineContext],
+ * which can be used to propagate the information about Reactor's [Context] through coroutines.
*
- * This context element is implicitly propagated through subscriber's context by all Reactive integrations, such as [mono], [flux],
- * [Publisher.asFlow][asFlow], [Flow.asPublisher][asPublisher] and [Flow.asFlux][asFlux].
- * Functions that subscribe to the reactive stream (e.g. [Publisher.awaitFirst][awaitFirst]) also propagate [ReactorContext] to the
- * subscriber's [Context].
+ * This context element is implicitly propagated through subscribers' context by all Reactive integrations,
+ * such as [mono], [flux], [Publisher.asFlow][asFlow], [Flow.asPublisher][asPublisher] and [Flow.asFlux][asFlux].
+ * Functions that subscribe to a reactive stream
+ * (e.g. [Publisher.awaitFirst][kotlinx.coroutines.reactive.awaitFirst]), too, propagate [ReactorContext]
+ * to the subscriber's [Context].
**
* ### Examples of Reactive context integration.
*
@@ -25,18 +27,18 @@ import kotlinx.coroutines.reactive.*
* ```
* val flux = myDatabaseService.getUsers()
* .contextWrite { ctx -> println(ctx); ctx }
- * flux.await() // Will print "null"
+ * flux.awaitFirst() // Will print "null"
*
* // Now add ReactorContext
* withContext(Context.of("answer", "42").asCoroutineContext()) {
- * flux.await() // Will print "Context{'key'='value'}"
+ * flux.awaitFirst() // Will print "Context{'key'='value'}"
* }
* ```
*
* #### Propagating subscriber's Context to ReactorContext:
* ```
* val flow = flow {
- * println("Reactor context in Flow: " + coroutineContext[ReactorContext])
+ * println("Reactor context in Flow: " + currentCoroutineContext()[ReactorContext])
* }
* // No context
* flow.asFlux()
@@ -47,14 +49,32 @@ import kotlinx.coroutines.reactive.*
* .subscribe() // Will print "Reactor context in Flow: Context{'answer'=42}"
* ```
*/
-@ExperimentalCoroutinesApi
public class ReactorContext(public val context: Context) : AbstractCoroutineContextElement(ReactorContext) {
+
+ // `Context.of` is zero-cost if the argument is a `Context`
+ public constructor(contextView: ContextView): this(Context.of(contextView))
+
public companion object Key : CoroutineContext.Key<ReactorContext>
+
+ override fun toString(): String = context.toString()
}
/**
- * Wraps the given [Context] into [ReactorContext], so it can be added to coroutine's context
+ * Wraps the given [ContextView] into [ReactorContext], so it can be added to the coroutine's context
+ * and later used via `coroutineContext[ReactorContext]`.
+ */
+public fun ContextView.asCoroutineContext(): ReactorContext = ReactorContext(this)
+
+/**
+ * Wraps the given [Context] into [ReactorContext], so it can be added to the coroutine's context
* and later used via `coroutineContext[ReactorContext]`.
+ * @suppress
+ */
+@Deprecated("The more general version for ContextView should be used instead", level = DeprecationLevel.HIDDEN)
+public fun Context.asCoroutineContext(): ReactorContext = readOnly().asCoroutineContext() // `readOnly()` is zero-cost.
+
+/**
+ * Updates the Reactor context in this [CoroutineContext], adding (or possibly replacing) some values.
*/
-@ExperimentalCoroutinesApi
-public fun Context.asCoroutineContext(): ReactorContext = ReactorContext(this)
+internal fun CoroutineContext.extendReactorContext(extensions: ContextView): CoroutineContext =
+ (this[ReactorContext]?.context?.putAll(extensions) ?: extensions).asCoroutineContext()