aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
diff options
context:
space:
mode:
authorRoman Elizarov <elizarov@gmail.com>2020-08-10 18:12:22 +0300
committerGitHub <noreply@github.com>2020-08-10 18:12:22 +0300
commit1a6beba52fa5027452bb14b53b04e0b6a4fbed99 (patch)
treec8f333bcdbdc8a6e5f9bb17e8aa42e4a35c46a07 /reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
parent3cc9b94f8f9ce68d99d46120ee48e6f6ff61c160 (diff)
downloadplatform_external_kotlinx.coroutines-1a6beba52fa5027452bb14b53b04e0b6a4fbed99.tar.gz
platform_external_kotlinx.coroutines-1a6beba52fa5027452bb14b53b04e0b6a4fbed99.tar.bz2
platform_external_kotlinx.coroutines-1a6beba52fa5027452bb14b53b04e0b6a4fbed99.zip
Support context in Flow.asPublisher and similar methods (#2156)
Fixes #2155 Co-authored-by: Vsevolod Tolstopyatov <qwwdfsad@gmail.com>
Diffstat (limited to 'reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt')
-rw-r--r--reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt23
1 files changed, 17 insertions, 6 deletions
diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
index efa9c9c9..a51f583b 100644
--- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
@@ -34,8 +34,15 @@ public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
*
* This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module,
* see its documentation for additional details.
+ *
+ * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
+ * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
+ * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
+ * is used, so calls are performed from an arbitrary thread.
*/
-public fun <T : Any> Flow<T>.asPublisher(): Publisher<T> = FlowAsPublisher(this)
+@JvmOverloads // binary compatibility
+public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T> =
+ FlowAsPublisher(this, Dispatchers.Unconfined + context)
private class PublisherAsFlow<T : Any>(
private val publisher: Publisher<T>,
@@ -153,11 +160,14 @@ internal fun <T> Publisher<T>.injectCoroutineContext(coroutineContext: Coroutine
* Adapter that transforms [Flow] into TCK-complaint [Publisher].
* [cancel] invocation cancels the original flow.
*/
-@Suppress("PublisherImplementation")
-private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T> {
+@Suppress("ReactiveStreamsPublisherImplementation")
+private class FlowAsPublisher<T : Any>(
+ private val flow: Flow<T>,
+ private val context: CoroutineContext
+) : Publisher<T> {
override fun subscribe(subscriber: Subscriber<in T>?) {
if (subscriber == null) throw NullPointerException()
- subscriber.onSubscribe(FlowSubscription(flow, subscriber))
+ subscriber.onSubscribe(FlowSubscription(flow, subscriber, context))
}
}
@@ -165,8 +175,9 @@ private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T>
@InternalCoroutinesApi
public class FlowSubscription<T>(
@JvmField public val flow: Flow<T>,
- @JvmField public val subscriber: Subscriber<in T>
-) : Subscription, AbstractCoroutine<Unit>(Dispatchers.Unconfined, true) {
+ @JvmField public val subscriber: Subscriber<in T>,
+ context: CoroutineContext
+) : Subscription, AbstractCoroutine<Unit>(context, true) {
private val requested = atomic(0L)
private val producer = atomic<Continuation<Unit>?>(createInitialContinuation())