diff options
Diffstat (limited to 'reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt')
-rw-r--r-- | reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt | 103 |
1 files changed, 0 insertions, 103 deletions
diff --git a/reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt b/reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt deleted file mode 100644 index 05f2391e..00000000 --- a/reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.reactive.flow - -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.* -import org.reactivestreams.* -import java.util.concurrent.atomic.* -import kotlin.coroutines.* - -/** - * Transforms the given flow to a spec-compliant [Publisher]. - */ -@JvmName("from") -@ExperimentalCoroutinesApi -public fun <T : Any> Flow<T>.asPublisher(): Publisher<T> = FlowAsPublisher(this) - -/** - * Adapter that transforms [Flow] into TCK-complaint [Publisher]. - * [cancel] invocation cancels the original flow. - */ -@Suppress("PublisherImplementation") -private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T> { - - override fun subscribe(subscriber: Subscriber<in T>?) { - if (subscriber == null) throw NullPointerException() - subscriber.onSubscribe(FlowSubscription(flow, subscriber)) - } - - private class FlowSubscription<T>(val flow: Flow<T>, val subscriber: Subscriber<in T>) : Subscription { - @Volatile - internal var canceled: Boolean = false - private val requested = AtomicLong(0L) - private val producer: AtomicReference<CancellableContinuation<Unit>?> = AtomicReference() - - // This is actually optimizable - private var job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.LAZY) { - try { - consumeFlow() - subscriber.onComplete() - } catch (e: Throwable) { - // Failed with real exception, not due to cancellation - if (!coroutineContext[Job]!!.isCancelled) { - subscriber.onError(e) - } - } - } - - private suspend fun consumeFlow() { - flow.collect { value -> - if (!coroutineContext.isActive) { - subscriber.onComplete() - coroutineContext.ensureActive() - } - - if (requested.get() == 0L) { - suspendCancellableCoroutine<Unit> { - producer.set(it) - if (requested.get() != 0L) it.resumeSafely() - } - } - - requested.decrementAndGet() - subscriber.onNext(value) - } - } - - override fun cancel() { - canceled = true - job.cancel() - } - - override fun request(n: Long) { - if (n <= 0) { - return - } - - if (canceled) return - - job.start() - var snapshot: Long - var newValue: Long - do { - snapshot = requested.get() - newValue = snapshot + n - if (newValue <= 0L) newValue = Long.MAX_VALUE - } while (!requested.compareAndSet(snapshot, newValue)) - - val prev = producer.get() - if (prev == null || !producer.compareAndSet(prev, null)) return - prev.resumeSafely() - } - - private fun CancellableContinuation<Unit>.resumeSafely() { - val token = tryResume(Unit) - if (token != null) { - completeResume(token) - } - } - } -} |