aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/common/src/flow/Channels.kt
diff options
context:
space:
mode:
Diffstat (limited to 'kotlinx-coroutines-core/common/src/flow/Channels.kt')
-rw-r--r--kotlinx-coroutines-core/common/src/flow/Channels.kt53
1 files changed, 28 insertions, 25 deletions
diff --git a/kotlinx-coroutines-core/common/src/flow/Channels.kt b/kotlinx-coroutines-core/common/src/flow/Channels.kt
index e883c3b4..5cc8ad8b 100644
--- a/kotlinx-coroutines-core/common/src/flow/Channels.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Channels.kt
@@ -30,7 +30,8 @@ public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>): Uni
emitAllImpl(channel, consume = true)
private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
- // Manually inlined "consumeEach" implementation that does not use iterator but works via "receiveOrClosed".
+ ensureActive()
+ // Manually inlined "consumeEach" implementation that does not use iterator but works via "receiveCatching".
// It has smaller and more efficient spilled state which also allows to implement a manual kludge to
// fix retention of the last emitted value.
// See https://youtrack.jetbrains.com/issue/KT-16222
@@ -47,9 +48,9 @@ private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>,
// L$1 <- channel
// L$2 <- cause
// L$3 <- this$run (actually equal to this)
- val result = run { channel.receiveOrClosed() }
+ val result = run { channel.receiveCatching() }
if (result.isClosed) {
- result.closeCause?.let { throw it }
+ result.exceptionOrNull()?.let { throw it }
break // returns normally when result.closeCause == null
}
// result is spilled here to the coroutine state and retained after the call, even though
@@ -58,7 +59,7 @@ private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>,
// L$1 <- channel
// L$2 <- cause
// L$3 <- result
- emit(result.value)
+ emit(result.getOrThrow())
}
} catch (e: Throwable) {
cause = e
@@ -133,17 +134,12 @@ private class ChannelAsFlow<T>(
override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
ChannelAsFlow(channel, consume, context, capacity, onBufferOverflow)
- override fun dropChannelOperators(): Flow<T>? =
+ override fun dropChannelOperators(): Flow<T> =
ChannelAsFlow(channel, consume)
override suspend fun collectTo(scope: ProducerScope<T>) =
SendingCollector(scope).emitAllImpl(channel, consume) // use efficient channel receiving code from emitAll
- override fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> {
- markConsumed() // fail fast on repeated attempt to collect it
- return super.broadcastImpl(scope, start)
- }
-
override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> {
markConsumed() // fail fast on repeated attempt to collect it
return if (capacity == Channel.OPTIONAL_CHANNEL) {
@@ -173,22 +169,16 @@ private class ChannelAsFlow<T>(
* 2) Flow consumer completes normally when the original channel completes (~is closed) normally.
* 3) If the flow consumer fails with an exception, subscription is cancelled.
*/
-@FlowPreview
+@Deprecated(
+ level = DeprecationLevel.WARNING,
+ message = "'BroadcastChannel' is obsolete and all coreresponding operators are deprecated " +
+ "in the favour of StateFlow and SharedFlow"
+) // Since 1.5.0, was @FlowPreview, safe to remove in 1.7.0
public fun <T> BroadcastChannel<T>.asFlow(): Flow<T> = flow {
emitAll(openSubscription())
}
/**
- * Creates a [broadcast] coroutine that collects the given flow.
- *
- * This transformation is **stateful**, it launches a [broadcast] coroutine
- * that collects the given flow and thus resulting channel should be properly closed or cancelled.
- *
- * A channel with [default][Channel.Factory.BUFFERED] buffer size is created.
- * Use [buffer] operator on the flow before calling `broadcastIn` to specify a value other than
- * default and to control what happens when data is produced faster than it is consumed,
- * that is to control backpressure behavior.
- *
* ### Deprecated
*
* **This API is deprecated.** The [BroadcastChannel] provides a complex channel-like API for hot flows.
@@ -202,13 +192,26 @@ public fun <T> BroadcastChannel<T>.asFlow(): Flow<T> = flow {
@Deprecated(
message = "Use shareIn operator and the resulting SharedFlow as a replacement for BroadcastChannel",
replaceWith = ReplaceWith("this.shareIn(scope, SharingStarted.Lazily, 0)"),
- level = DeprecationLevel.WARNING
-)
+ level = DeprecationLevel.ERROR
+) // WARNING in 1.4.0, error in 1.5.0, removed in 1.6.0 (was @FlowPreview)
public fun <T> Flow<T>.broadcastIn(
scope: CoroutineScope,
start: CoroutineStart = CoroutineStart.LAZY
-): BroadcastChannel<T> =
- asChannelFlow().broadcastImpl(scope, start)
+): BroadcastChannel<T> {
+ // Backwards compatibility with operator fusing
+ val channelFlow = asChannelFlow()
+ val capacity = when (channelFlow.onBufferOverflow) {
+ BufferOverflow.SUSPEND -> channelFlow.produceCapacity
+ BufferOverflow.DROP_OLDEST -> Channel.CONFLATED
+ BufferOverflow.DROP_LATEST ->
+ throw IllegalArgumentException("Broadcast channel does not support BufferOverflow.DROP_LATEST")
+ }
+ return scope.broadcast(channelFlow.context, capacity = capacity, start = start) {
+ collect { value ->
+ send(value)
+ }
+ }
+}
/**
* Creates a [produce] coroutine that collects the given flow.