diff options
Diffstat (limited to 'reactive/kotlinx-coroutines-reactive/src/Channel.kt')
-rw-r--r-- | reactive/kotlinx-coroutines-reactive/src/Channel.kt | 32 |
1 files changed, 18 insertions, 14 deletions
diff --git a/reactive/kotlinx-coroutines-reactive/src/Channel.kt b/reactive/kotlinx-coroutines-reactive/src/Channel.kt index 352a505a..b7fbf134 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Channel.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Channel.kt @@ -11,10 +11,10 @@ import kotlinx.coroutines.internal.* import org.reactivestreams.* /** - * Subscribes to this [Publisher] and returns a channel to receive elements emitted by it. - * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this publisher. + * Subscribes to this [Publisher] and returns a channel to receive the elements emitted by it. + * The resulting channel needs to be [cancelled][ReceiveChannel.cancel] in order to unsubscribe from this publisher. - * @param request how many items to request from publisher in advance (optional, one by default). + * @param request how many items to request from the publisher in advance (optional, a single element by default). * * This method is deprecated in the favor of [Flow]. * Instead of iterating over the resulting channel please use [collect][Flow.collect]: @@ -26,24 +26,28 @@ import org.reactivestreams.* */ @Deprecated( message = "Transforming publisher to channel is deprecated, use asFlow() instead", - level = DeprecationLevel.WARNING) // Will be error in 1.4 + level = DeprecationLevel.ERROR) // Will be error in 1.4 public fun <T> Publisher<T>.openSubscription(request: Int = 1): ReceiveChannel<T> { val channel = SubscriptionChannel<T>(request) subscribe(channel) return channel } -// Will be promoted to error in 1.3.0, removed in 1.4.0 -@Deprecated(message = "Use collect instead", level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("this.collect(action)")) -public suspend inline fun <T> Publisher<T>.consumeEach(action: (T) -> Unit): Unit = - openSubscription().consumeEach(action) - /** * Subscribes to this [Publisher] and performs the specified action for each received element. - * Cancels subscription if any exception happens during collect. + * + * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from + * [collect]. Also, if the publisher signals an error, that error is rethrown from [collect]. */ public suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit): Unit = - openSubscription().consumeEach(action) + toChannel().consumeEach(action) + +@PublishedApi +internal fun <T> Publisher<T>.toChannel(request: Int = 1): ReceiveChannel<T> { + val channel = SubscriptionChannel<T>(request) + subscribe(channel) + return channel +} @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER", "SubscriberImplementation") private class SubscriptionChannel<T>( @@ -59,7 +63,7 @@ private class SubscriptionChannel<T>( // can be negative if we have receivers, but no subscription yet private val _requested = atomic(0) - // AbstractChannel overrides + // --------------------- AbstractChannel overrides ------------------------------- @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER") override fun onReceiveEnqueued() { _requested.loop { wasRequested -> @@ -87,7 +91,7 @@ private class SubscriptionChannel<T>( _subscription.getAndSet(null)?.cancel() // cancel exactly once } - // Subscriber overrides + // --------------------- Subscriber overrides ------------------------------- override fun onSubscribe(s: Subscription) { _subscription.value = s while (true) { // lock-free loop on _requested @@ -107,7 +111,7 @@ private class SubscriptionChannel<T>( override fun onNext(t: T) { _requested.decrementAndGet() - offer(t) + trySend(t) // Safe to ignore return value here, expectedly racing with cancellation } override fun onComplete() { |