diff options
author | Roman Elizarov <elizarov@gmail.com> | 2020-08-10 18:12:22 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-10 18:12:22 +0300 |
commit | 1a6beba52fa5027452bb14b53b04e0b6a4fbed99 (patch) | |
tree | c8f333bcdbdc8a6e5f9bb17e8aa42e4a35c46a07 /reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt | |
parent | 3cc9b94f8f9ce68d99d46120ee48e6f6ff61c160 (diff) | |
download | platform_external_kotlinx.coroutines-1a6beba52fa5027452bb14b53b04e0b6a4fbed99.tar.gz platform_external_kotlinx.coroutines-1a6beba52fa5027452bb14b53b04e0b6a4fbed99.tar.bz2 platform_external_kotlinx.coroutines-1a6beba52fa5027452bb14b53b04e0b6a4fbed99.zip |
Support context in Flow.asPublisher and similar methods (#2156)
Fixes #2155
Co-authored-by: Vsevolod Tolstopyatov <qwwdfsad@gmail.com>
Diffstat (limited to 'reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt')
-rw-r--r-- | reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt | 23 |
1 files changed, 17 insertions, 6 deletions
diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt index efa9c9c9..a51f583b 100644 --- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt @@ -34,8 +34,15 @@ public fun <T : Any> Publisher<T>.asFlow(): Flow<T> = * * This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module, * see its documentation for additional details. + * + * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods. + * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to + * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher + * is used, so calls are performed from an arbitrary thread. */ -public fun <T : Any> Flow<T>.asPublisher(): Publisher<T> = FlowAsPublisher(this) +@JvmOverloads // binary compatibility +public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T> = + FlowAsPublisher(this, Dispatchers.Unconfined + context) private class PublisherAsFlow<T : Any>( private val publisher: Publisher<T>, @@ -153,11 +160,14 @@ internal fun <T> Publisher<T>.injectCoroutineContext(coroutineContext: Coroutine * Adapter that transforms [Flow] into TCK-complaint [Publisher]. * [cancel] invocation cancels the original flow. */ -@Suppress("PublisherImplementation") -private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T> { +@Suppress("ReactiveStreamsPublisherImplementation") +private class FlowAsPublisher<T : Any>( + private val flow: Flow<T>, + private val context: CoroutineContext +) : Publisher<T> { override fun subscribe(subscriber: Subscriber<in T>?) { if (subscriber == null) throw NullPointerException() - subscriber.onSubscribe(FlowSubscription(flow, subscriber)) + subscriber.onSubscribe(FlowSubscription(flow, subscriber, context)) } } @@ -165,8 +175,9 @@ private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T> @InternalCoroutinesApi public class FlowSubscription<T>( @JvmField public val flow: Flow<T>, - @JvmField public val subscriber: Subscriber<in T> -) : Subscription, AbstractCoroutine<Unit>(Dispatchers.Unconfined, true) { + @JvmField public val subscriber: Subscriber<in T>, + context: CoroutineContext +) : Subscription, AbstractCoroutine<Unit>(context, true) { private val requested = atomic(0L) private val producer = atomic<Continuation<Unit>?>(createInitialContinuation()) |