diff options
Diffstat (limited to 'reactive/kotlinx-coroutines-reactor/src')
-rw-r--r-- | reactive/kotlinx-coroutines-reactor/src/Convert.kt | 11 | ||||
-rw-r--r-- | reactive/kotlinx-coroutines-reactor/src/Flux.kt | 86 | ||||
-rw-r--r-- | reactive/kotlinx-coroutines-reactor/src/Migration.kt | 5 | ||||
-rw-r--r-- | reactive/kotlinx-coroutines-reactor/src/Mono.kt | 221 | ||||
-rw-r--r-- | reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt | 50 |
5 files changed, 289 insertions, 84 deletions
diff --git a/reactive/kotlinx-coroutines-reactor/src/Convert.kt b/reactive/kotlinx-coroutines-reactor/src/Convert.kt index 7807549c..3063d1dd 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Convert.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Convert.kt @@ -21,7 +21,6 @@ import kotlin.coroutines.* * * @param context -- the coroutine context from which the resulting mono is going to be signalled */ -@ExperimentalCoroutinesApi public fun Job.asMono(context: CoroutineContext): Mono<Unit> = mono(context) { this@asMono.join() } /** * Converts this deferred value to the hot reactive mono that signals @@ -35,19 +34,19 @@ public fun Job.asMono(context: CoroutineContext): Mono<Unit> = mono(context) { t * * @param context -- the coroutine context from which the resulting mono is going to be signalled */ -@ExperimentalCoroutinesApi public fun <T> Deferred<T?>.asMono(context: CoroutineContext): Mono<T> = mono(context) { this@asMono.await() } /** * Converts a stream of elements received from the channel to the hot reactive flux. * - * Every subscriber receives values from this channel in **fan-out** fashion. If the are multiple subscribers, - * they'll receive values in round-robin way. + * Every subscriber receives values from this channel in a **fan-out** fashion. If the are multiple subscribers, + * they'll receive values in a round-robin way. * @param context -- the coroutine context from which the resulting flux is going to be signalled + * @suppress */ @Deprecated(message = "Deprecated in the favour of consumeAsFlow()", - level = DeprecationLevel.WARNING, - replaceWith = ReplaceWith("this.consumeAsFlow().asFlux()")) + level = DeprecationLevel.ERROR, + replaceWith = ReplaceWith("this.consumeAsFlow().asFlux(context)", imports = ["kotlinx.coroutines.flow.consumeAsFlow"])) public fun <T> ReceiveChannel<T>.asFlux(context: CoroutineContext = EmptyCoroutineContext): Flux<T> = flux(context) { for (t in this@asFlux) send(t) diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt index 8d4f9cc9..1e408d83 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Flux.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt @@ -1,10 +1,7 @@ - /* * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") - package kotlinx.coroutines.reactor import kotlinx.coroutines.* @@ -15,25 +12,23 @@ import reactor.core.* import reactor.core.publisher.* import reactor.util.context.* import kotlin.coroutines.* -import kotlin.internal.* /** - * Creates cold reactive [Flux] that runs a given [block] in a coroutine. + * Creates a cold reactive [Flux] that runs the given [block] in a coroutine. * Every time the returned flux is subscribed, it starts a new coroutine in the specified [context]. - * Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete]) - * when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError]) - * if coroutine throws an exception or closes channel with a cause. - * Unsubscribing cancels running coroutine. + * The coroutine emits ([Subscriber.onNext]) values with [send][ProducerScope.send], completes ([Subscriber.onComplete]) + * when the coroutine completes, or, in case the coroutine throws an exception or the channel is closed, + * emits the error ([Subscriber.onError]) and closes the channel with the cause. + * Unsubscribing cancels the running coroutine. * - * Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that - * `onNext` is not invoked concurrently. - * - * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. + * Invocations of [send][ProducerScope.send] are suspended appropriately when subscribers apply back-pressure and to + * ensure that [onNext][Subscriber.onNext] is not invoked concurrently. * * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect * to cancellation and error handling may change in the future. + * + * @throws IllegalArgumentException if the provided [context] contains a [Job] instance. */ -@ExperimentalCoroutinesApi public fun <T> flux( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope<T>.() -> Unit @@ -43,40 +38,61 @@ public fun <T> flux( return Flux.from(reactorPublish(GlobalScope, context, block)) } -@Deprecated( - message = "CoroutineScope.flux is deprecated in favour of top-level flux", - level = DeprecationLevel.ERROR, - replaceWith = ReplaceWith("flux(context, block)") -) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring -@LowPriorityInOverloadResolution -public fun <T> CoroutineScope.flux( - context: CoroutineContext = EmptyCoroutineContext, - @BuilderInference block: suspend ProducerScope<T>.() -> Unit -): Flux<T> = - Flux.from(reactorPublish(this, context, block)) - private fun <T> reactorPublish( scope: CoroutineScope, context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope<T>.() -> Unit -): Publisher<T> = Publisher { subscriber -> - // specification requires NPE on null subscriber - if (subscriber == null) throw NullPointerException("Subscriber cannot be null") - require(subscriber is CoreSubscriber) { "Subscriber is not an instance of CoreSubscriber, context can not be extracted." } +): Publisher<T> = Publisher onSubscribe@{ subscriber: Subscriber<in T>? -> + if (subscriber !is CoreSubscriber) { + subscriber.reject(IllegalArgumentException("Subscriber is not an instance of CoreSubscriber, context can not be extracted.")) + return@onSubscribe + } val currentContext = subscriber.currentContext() - val reactorContext = (context[ReactorContext]?.context?.putAll(currentContext) ?: currentContext).asCoroutineContext() + val reactorContext = context.extendReactorContext(currentContext) val newContext = scope.newCoroutineContext(context + reactorContext) val coroutine = PublisherCoroutine(newContext, subscriber, REACTOR_HANDLER) subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions coroutine.start(CoroutineStart.DEFAULT, coroutine, block) } -private val REACTOR_HANDLER: (Throwable, CoroutineContext) -> Unit = { e, ctx -> - if (e !is CancellationException) { +private val REACTOR_HANDLER: (Throwable, CoroutineContext) -> Unit = { cause, ctx -> + if (cause !is CancellationException) { try { - Operators.onOperatorError(e, ctx[ReactorContext]?.context ?: Context.empty()) + Operators.onOperatorError(cause, ctx[ReactorContext]?.context ?: Context.empty()) } catch (e: Throwable) { - handleCoroutineException(ctx, e) + cause.addSuppressed(e) + handleCoroutineException(ctx, cause) } } } + +/** The proper way to reject the subscriber, according to + * [the reactive spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.9) + */ +private fun <T> Subscriber<T>?.reject(t: Throwable) { + if (this == null) + throw NullPointerException("The subscriber can not be null") + onSubscribe(object: Subscription { + override fun request(n: Long) { + // intentionally left blank + } + override fun cancel() { + // intentionally left blank + } + }) + onError(t) +} + +/** + * @suppress + */ +@Deprecated( + message = "CoroutineScope.flux is deprecated in favour of top-level flux", + level = DeprecationLevel.HIDDEN, + replaceWith = ReplaceWith("flux(context, block)") +) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring +public fun <T> CoroutineScope.flux( + context: CoroutineContext = EmptyCoroutineContext, + @BuilderInference block: suspend ProducerScope<T>.() -> Unit +): Flux<T> = + Flux.from(reactorPublish(this, context, block)) diff --git a/reactive/kotlinx-coroutines-reactor/src/Migration.kt b/reactive/kotlinx-coroutines-reactor/src/Migration.kt index ec5674d9..8da0db2d 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Migration.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Migration.kt @@ -9,9 +9,10 @@ package kotlinx.coroutines.reactor import kotlinx.coroutines.flow.* import reactor.core.publisher.* +/** @suppress **/ @Deprecated( message = "Replaced in favor of ReactiveFlow extension, please import kotlinx.coroutines.reactor.* instead of kotlinx.coroutines.reactor.FlowKt", - level = DeprecationLevel.ERROR -) + level = DeprecationLevel.HIDDEN +) // Compatibility with Spring 5.2-RC @JvmName("asFlux") public fun <T : Any> Flow<T>.asFluxDeprecated(): Flux<T> = asFlux() diff --git a/reactive/kotlinx-coroutines-reactor/src/Mono.kt b/reactive/kotlinx-coroutines-reactor/src/Mono.kt index 2d595c96..e86d51c6 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Mono.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Mono.kt @@ -7,21 +7,23 @@ package kotlinx.coroutines.reactor import kotlinx.coroutines.* +import kotlinx.coroutines.reactive.* +import org.reactivestreams.* import reactor.core.* import reactor.core.publisher.* import kotlin.coroutines.* -import kotlin.internal.* +import kotlinx.coroutines.internal.* /** - * Creates cold [mono][Mono] that will run a given [block] in a coroutine and emits its result. + * Creates a cold [mono][Mono] that runs a given [block] in a coroutine and emits its result. * Every time the returned mono is subscribed, it starts a new coroutine. - * If [block] result is `null`, [MonoSink.success] is invoked without a value. - * Unsubscribing cancels running coroutine. + * If the result of [block] is `null`, [MonoSink.success] is invoked without a value. + * Unsubscribing cancels the running coroutine. * * Coroutine context can be specified with [context] argument. * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. * - * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. + * @throws IllegalArgumentException if the provided [context] contains a [Job] instance. */ public fun <T> mono( context: CoroutineContext = EmptyCoroutineContext, @@ -32,23 +34,56 @@ public fun <T> mono( return monoInternal(GlobalScope, context, block) } -@Deprecated( - message = "CoroutineScope.mono is deprecated in favour of top-level mono", - level = DeprecationLevel.ERROR, - replaceWith = ReplaceWith("mono(context, block)") -) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 -@LowPriorityInOverloadResolution -public fun <T> CoroutineScope.mono( - context: CoroutineContext = EmptyCoroutineContext, - block: suspend CoroutineScope.() -> T? -): Mono<T> = monoInternal(this, context, block) +/** + * Awaits the single value from the given [Mono] without blocking the thread and returns the resulting value, or, if + * this publisher has produced an error, throws the corresponding exception. If the Mono completed without a value, + * `null` is returned. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. + */ +public suspend fun <T> Mono<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont -> + injectCoroutineContext(cont.context).subscribe(object : Subscriber<T> { + private var seenValue = false + + override fun onSubscribe(s: Subscription) { + cont.invokeOnCancellation { s.cancel() } + s.request(Long.MAX_VALUE) + } + + override fun onComplete() { + if (!seenValue) cont.resume(null) + } + + override fun onNext(t: T) { + seenValue = true + cont.resume(t) + } + + override fun onError(error: Throwable) { cont.resumeWithException(error) } + }) +} + +/** + * Awaits the single value from the given [Mono] without blocking the thread and returns the resulting value, or, + * if this Mono has produced an error, throws the corresponding exception. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. + * + * @throws NoSuchElementException if the Mono does not emit any value + */ +// TODO: consider using https://github.com/Kotlin/kotlinx.coroutines/issues/2607 once that lands +public suspend fun <T> Mono<T>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException() private fun <T> monoInternal( scope: CoroutineScope, // support for legacy mono in scope context: CoroutineContext, block: suspend CoroutineScope.() -> T? ): Mono<T> = Mono.create { sink -> - val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext()) ?: sink.currentContext()).asCoroutineContext() + val reactorContext = context.extendReactorContext(sink.currentContext()) val newContext = scope.newCoroutineContext(context + reactorContext) val coroutine = MonoCoroutine(newContext, sink) sink.onDispose(coroutine) @@ -58,7 +93,7 @@ private fun <T> monoInternal( private class MonoCoroutine<in T>( parentContext: CoroutineContext, private val sink: MonoSink<T> -) : AbstractCoroutine<T>(parentContext, true), Disposable { +) : AbstractCoroutine<T>(parentContext, false, true), Disposable { @Volatile private var disposed = false @@ -67,17 +102,18 @@ private class MonoCoroutine<in T>( } override fun onCancelled(cause: Throwable, handled: Boolean) { - try { - /* - * sink.error handles exceptions on its own and, by default, handling of undeliverable exceptions is a no-op. - * Guard potentially non-empty handlers against meaningless cancellation exceptions - */ - if (getCancellationException() !== cause) { + /** Cancellation exceptions that were caused by [dispose], that is, came from downstream, are not errors. */ + val unwrappedCause = unwrap(cause) + if (getCancellationException() !== unwrappedCause || !disposed) { + try { + /** If [sink] turns out to already be in a terminal state, this exception will be passed through the + * [Hooks.onOperatorError] hook, which is the way to signal undeliverable exceptions in Reactor. */ sink.error(cause) + } catch (e: Throwable) { + // In case of improper error implementation or fatal exceptions + cause.addSuppressed(e) + handleCoroutineException(context, cause) } - } catch (e: Throwable) { - // In case of improper error implementation or fatal exceptions - handleCoroutineException(context, cause) } } @@ -88,3 +124,136 @@ private class MonoCoroutine<in T>( override fun isDisposed(): Boolean = disposed } + +/** + * @suppress + */ +@Deprecated( + message = "CoroutineScope.mono is deprecated in favour of top-level mono", + level = DeprecationLevel.HIDDEN, + replaceWith = ReplaceWith("mono(context, block)") +) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 +public fun <T> CoroutineScope.mono( + context: CoroutineContext = EmptyCoroutineContext, + block: suspend CoroutineScope.() -> T? +): Mono<T> = monoInternal(this, context, block) + +/** + * This is a lint function that was added already deprecated in order to guard against confusing usages on [Mono]. + * On [Publisher] instances other than [Mono], this function is not deprecated. + * + * Both [awaitFirst] and [awaitSingle] await the first value, or throw [NoSuchElementException] if there is none, but + * the name [Mono.awaitSingle] better reflects the semantics of [Mono]. + * + * For example, consider this code: + * ``` + * myDbClient.findById(uniqueId).awaitFirst() // findById returns a `Mono` + * ``` + * It looks like more than one value could be returned from `findById` and [awaitFirst] discards the extra elements, + * when in fact, at most a single value can be present. + * + * @suppress + */ +@Deprecated( + message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " + + "Please use awaitSingle() instead.", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingle()") +) // Warning since 1.5, error in 1.6 +public suspend fun <T> Mono<T>.awaitFirst(): T = awaitSingle() + +/** + * This is a lint function that was added already deprecated in order to guard against confusing usages on [Mono]. + * On [Publisher] instances other than [Mono], this function is not deprecated. + * + * Both [awaitFirstOrDefault] and [awaitSingleOrNull] await the first value, or return some special value if there + * is none, but the name [Mono.awaitSingleOrNull] better reflects the semantics of [Mono]. + * + * For example, consider this code: + * ``` + * myDbClient.findById(uniqueId).awaitFirstOrDefault(default) // findById returns a `Mono` + * ``` + * It looks like more than one value could be returned from `findById` and [awaitFirstOrDefault] discards the extra + * elements, when in fact, at most a single value can be present. + * + * @suppress + */ +@Deprecated( + message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " + + "Please use awaitSingleOrNull() instead.", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default") +) // Warning since 1.5, error in 1.6 +public suspend fun <T> Mono<T>.awaitFirstOrDefault(default: T): T = awaitSingleOrNull() ?: default + +/** + * This is a lint function that was added already deprecated in order to guard against confusing usages on [Mono]. + * On [Publisher] instances other than [Mono], this function is not deprecated. + * + * Both [awaitFirstOrNull] and [awaitSingleOrNull] await the first value, or return some special value if there + * is none, but the name [Mono.awaitSingleOrNull] better reflects the semantics of [Mono]. + * + * For example, consider this code: + * ``` + * myDbClient.findById(uniqueId).awaitFirstOrNull() // findById returns a `Mono` + * ``` + * It looks like more than one value could be returned from `findById` and [awaitFirstOrNull] discards the extra + * elements, when in fact, at most a single value can be present. + * + * @suppress + */ +@Deprecated( + message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " + + "Please use awaitSingleOrNull() instead.", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingleOrNull()") +) // Warning since 1.5, error in 1.6 +public suspend fun <T> Mono<T>.awaitFirstOrNull(): T? = awaitSingleOrNull() + +/** + * This is a lint function that was added already deprecated in order to guard against confusing usages on [Mono]. + * On [Publisher] instances other than [Mono], this function is not deprecated. + * + * Both [awaitFirstOrElse] and [awaitSingleOrNull] await the first value, or return some special value if there + * is none, but the name [Mono.awaitSingleOrNull] better reflects the semantics of [Mono]. + * + * For example, consider this code: + * ``` + * myDbClient.findById(uniqueId).awaitFirstOrElse(defaultValue) // findById returns a `Mono` + * ``` + * It looks like more than one value could be returned from `findById` and [awaitFirstOrElse] discards the extra + * elements, when in fact, at most a single value can be present. + * + * @suppress + */ +@Deprecated( + message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " + + "Please use awaitSingleOrNull() instead.", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: defaultValue()") +) // Warning since 1.5, error in 1.6 +public suspend fun <T> Mono<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitSingleOrNull() ?: defaultValue() + +/** + * This is a lint function that was added already deprecated in order to guard against confusing usages on [Mono]. + * On [Publisher] instances other than [Mono], this function is not deprecated. + * + * Both [awaitLast] and [awaitSingle] await the single value, or throw [NoSuchElementException] if there is none, but + * the name [Mono.awaitSingle] better reflects the semantics of [Mono]. + * + * For example, consider this code: + * ``` + * myDbClient.findById(uniqueId).awaitLast() // findById returns a `Mono` + * ``` + * It looks like more than one value could be returned from `findById` and [awaitLast] discards the initial elements, + * when in fact, at most a single value can be present. + * + * @suppress + */ +@Deprecated( + message = "Mono produces at most one value, so the last element is the same as the first. " + + "Please use awaitSingle() instead.", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingle()") +) // Warning since 1.5, error in 1.6 +public suspend fun <T> Mono<T>.awaitLast(): T = awaitSingle() diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt index be4b2c7d..d9228409 100644 --- a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt +++ b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt @@ -5,19 +5,21 @@ package kotlinx.coroutines.reactor import kotlinx.coroutines.ExperimentalCoroutinesApi -import reactor.util.context.Context import kotlin.coroutines.* import kotlinx.coroutines.reactive.* +import reactor.util.context.* /** - * Wraps Reactor's [Context] into [CoroutineContext] element for seamless integration Reactor and kotlinx.coroutines. - * [Context.asCoroutineContext] is defined to add Reactor's [Context] elements as part of [CoroutineContext]. - * Coroutine context element that propagates information about Reactor's [Context] through coroutines. + * Wraps Reactor's [Context] into a [CoroutineContext] element for seamless integration between + * Reactor and kotlinx.coroutines. + * [Context.asCoroutineContext] puts Reactor's [Context] elements into a [CoroutineContext], + * which can be used to propagate the information about Reactor's [Context] through coroutines. * - * This context element is implicitly propagated through subscriber's context by all Reactive integrations, such as [mono], [flux], - * [Publisher.asFlow][asFlow], [Flow.asPublisher][asPublisher] and [Flow.asFlux][asFlux]. - * Functions that subscribe to the reactive stream (e.g. [Publisher.awaitFirst][awaitFirst]) also propagate [ReactorContext] to the - * subscriber's [Context]. + * This context element is implicitly propagated through subscribers' context by all Reactive integrations, + * such as [mono], [flux], [Publisher.asFlow][asFlow], [Flow.asPublisher][asPublisher] and [Flow.asFlux][asFlux]. + * Functions that subscribe to a reactive stream + * (e.g. [Publisher.awaitFirst][kotlinx.coroutines.reactive.awaitFirst]), too, propagate [ReactorContext] + * to the subscriber's [Context]. ** * ### Examples of Reactive context integration. * @@ -25,18 +27,18 @@ import kotlinx.coroutines.reactive.* * ``` * val flux = myDatabaseService.getUsers() * .contextWrite { ctx -> println(ctx); ctx } - * flux.await() // Will print "null" + * flux.awaitFirst() // Will print "null" * * // Now add ReactorContext * withContext(Context.of("answer", "42").asCoroutineContext()) { - * flux.await() // Will print "Context{'key'='value'}" + * flux.awaitFirst() // Will print "Context{'key'='value'}" * } * ``` * * #### Propagating subscriber's Context to ReactorContext: * ``` * val flow = flow { - * println("Reactor context in Flow: " + coroutineContext[ReactorContext]) + * println("Reactor context in Flow: " + currentCoroutineContext()[ReactorContext]) * } * // No context * flow.asFlux() @@ -47,14 +49,32 @@ import kotlinx.coroutines.reactive.* * .subscribe() // Will print "Reactor context in Flow: Context{'answer'=42}" * ``` */ -@ExperimentalCoroutinesApi public class ReactorContext(public val context: Context) : AbstractCoroutineContextElement(ReactorContext) { + + // `Context.of` is zero-cost if the argument is a `Context` + public constructor(contextView: ContextView): this(Context.of(contextView)) + public companion object Key : CoroutineContext.Key<ReactorContext> + + override fun toString(): String = context.toString() } /** - * Wraps the given [Context] into [ReactorContext], so it can be added to coroutine's context + * Wraps the given [ContextView] into [ReactorContext], so it can be added to the coroutine's context + * and later used via `coroutineContext[ReactorContext]`. + */ +public fun ContextView.asCoroutineContext(): ReactorContext = ReactorContext(this) + +/** + * Wraps the given [Context] into [ReactorContext], so it can be added to the coroutine's context * and later used via `coroutineContext[ReactorContext]`. + * @suppress + */ +@Deprecated("The more general version for ContextView should be used instead", level = DeprecationLevel.HIDDEN) +public fun Context.asCoroutineContext(): ReactorContext = readOnly().asCoroutineContext() // `readOnly()` is zero-cost. + +/** + * Updates the Reactor context in this [CoroutineContext], adding (or possibly replacing) some values. */ -@ExperimentalCoroutinesApi -public fun Context.asCoroutineContext(): ReactorContext = ReactorContext(this) +internal fun CoroutineContext.extendReactorContext(extensions: ContextView): CoroutineContext = + (this[ReactorContext]?.context?.putAll(extensions) ?: extensions).asCoroutineContext() |