aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt')
-rw-r--r--reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt109
1 files changed, 109 insertions, 0 deletions
diff --git a/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt b/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt
new file mode 100644
index 00000000..387c8e77
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+@file:JvmMultifileClass
+@file:JvmName("FlowKt")
+
+package kotlinx.coroutines.reactive
+
+import kotlinx.atomicfu.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import org.reactivestreams.*
+import kotlinx.coroutines.intrinsics.*
+
+/**
+ * Transforms the given flow to a spec-compliant [Publisher].
+ */
+@ExperimentalCoroutinesApi
+public fun <T : Any> Flow<T>.asPublisher(): Publisher<T> = FlowAsPublisher(this)
+
+/**
+ * Adapter that transforms [Flow] into TCK-complaint [Publisher].
+ * [cancel] invocation cancels the original flow.
+ */
+@Suppress("PublisherImplementation")
+private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T> {
+ override fun subscribe(subscriber: Subscriber<in T>?) {
+ if (subscriber == null) throw NullPointerException()
+ subscriber.onSubscribe(FlowSubscription(flow, subscriber))
+ }
+}
+
+/** @suppress */
+@InternalCoroutinesApi
+public class FlowSubscription<T>(
+ @JvmField val flow: Flow<T>,
+ @JvmField val subscriber: Subscriber<in T>
+) : Subscription, AbstractCoroutine<Unit>(Dispatchers.Unconfined, false) {
+ private val requested = atomic(0L)
+ private val producer = atomic<CancellableContinuation<Unit>?>(null)
+
+ override fun onStart() {
+ ::flowProcessing.startCoroutineCancellable(this)
+ }
+
+ private suspend fun flowProcessing() {
+ try {
+ consumeFlow()
+ subscriber.onComplete()
+ } catch (e: Throwable) {
+ try {
+ if (e is CancellationException) {
+ subscriber.onComplete()
+ } else {
+ subscriber.onError(e)
+ }
+ } catch (e: Throwable) {
+ // Last ditch report
+ handleCoroutineException(coroutineContext, e)
+ }
+ }
+ }
+
+ /*
+ * This method has at most one caller at any time (triggered from the `request` method)
+ */
+ private suspend fun consumeFlow() {
+ flow.collect { value ->
+ /*
+ * Flow is scopeless, thus if it's not active, its subscription was cancelled.
+ * No intermediate "child failed, but flow coroutine is not" states are allowed.
+ */
+ coroutineContext.ensureActive()
+ if (requested.value <= 0L) {
+ suspendCancellableCoroutine<Unit> {
+ producer.value = it
+ if (requested.value != 0L) it.resumeSafely()
+ }
+ }
+ requested.decrementAndGet()
+ subscriber.onNext(value)
+ }
+ }
+
+ override fun cancel() {
+ cancel(null)
+ }
+
+ override fun request(n: Long) {
+ if (n <= 0) {
+ return
+ }
+ start()
+ requested.update { value ->
+ val newValue = value + n
+ if (newValue <= 0L) Long.MAX_VALUE else newValue
+ }
+ val producer = producer.getAndSet(null) ?: return
+ producer.resumeSafely()
+ }
+
+ private fun CancellableContinuation<Unit>.resumeSafely() {
+ val token = tryResume(Unit)
+ if (token != null) {
+ completeResume(token)
+ }
+ }
+}