diff options
Diffstat (limited to 'kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt')
-rw-r--r-- | kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt | 18 |
1 files changed, 3 insertions, 15 deletions
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt index bf82cf9a..0efe5f86 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt @@ -37,7 +37,7 @@ public interface FusibleFlow<T> : Flow<T> { /** * Operators that use channels as their "output" extend this `ChannelFlow` and are always fused with each other. * This class servers as a skeleton implementation of [FusibleFlow] and provides other cross-cutting - * methods like ability to [produceIn] and [broadcastIn] the corresponding flow, thus making it + * methods like ability to [produceIn] the corresponding flow, thus making it * possible to directly use the backing channel if it exists (hence the `ChannelFlow` name). * * @suppress **This an internal API and should not be used from general code.** @@ -59,7 +59,7 @@ public abstract class ChannelFlow<T>( internal val collectToFun: suspend (ProducerScope<T>) -> Unit get() = { collectTo(it) } - private val produceCapacity: Int + internal val produceCapacity: Int get() = if (capacity == Channel.OPTIONAL_CHANNEL) Channel.BUFFERED else capacity /** @@ -107,18 +107,6 @@ public abstract class ChannelFlow<T>( protected abstract suspend fun collectTo(scope: ProducerScope<T>) - // broadcastImpl is used in broadcastIn operator which is obsolete and replaced by SharedFlow. - // BroadcastChannel does not support onBufferOverflow beyond simple conflation - public open fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> { - val broadcastCapacity = when (onBufferOverflow) { - BufferOverflow.SUSPEND -> produceCapacity - BufferOverflow.DROP_OLDEST -> Channel.CONFLATED - BufferOverflow.DROP_LATEST -> - throw IllegalArgumentException("Broadcast channel does not support BufferOverflow.DROP_LATEST") - } - return scope.broadcast(context, broadcastCapacity, start, block = collectToFun) - } - /** * Here we use ATOMIC start for a reason (#1825). * NB: [produceImpl] is used for [flowOn]. @@ -201,7 +189,7 @@ internal class ChannelFlowOperatorImpl<T>( override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> = ChannelFlowOperatorImpl(flow, context, capacity, onBufferOverflow) - override fun dropChannelOperators(): Flow<T>? = flow + override fun dropChannelOperators(): Flow<T> = flow override suspend fun flowCollect(collector: FlowCollector<T>) = flow.collect(collector) |