aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
diff options
context:
space:
mode:
Diffstat (limited to 'kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt')
-rw-r--r--kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt18
1 files changed, 3 insertions, 15 deletions
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
index bf82cf9a..0efe5f86 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
@@ -37,7 +37,7 @@ public interface FusibleFlow<T> : Flow<T> {
/**
* Operators that use channels as their "output" extend this `ChannelFlow` and are always fused with each other.
* This class servers as a skeleton implementation of [FusibleFlow] and provides other cross-cutting
- * methods like ability to [produceIn] and [broadcastIn] the corresponding flow, thus making it
+ * methods like ability to [produceIn] the corresponding flow, thus making it
* possible to directly use the backing channel if it exists (hence the `ChannelFlow` name).
*
* @suppress **This an internal API and should not be used from general code.**
@@ -59,7 +59,7 @@ public abstract class ChannelFlow<T>(
internal val collectToFun: suspend (ProducerScope<T>) -> Unit
get() = { collectTo(it) }
- private val produceCapacity: Int
+ internal val produceCapacity: Int
get() = if (capacity == Channel.OPTIONAL_CHANNEL) Channel.BUFFERED else capacity
/**
@@ -107,18 +107,6 @@ public abstract class ChannelFlow<T>(
protected abstract suspend fun collectTo(scope: ProducerScope<T>)
- // broadcastImpl is used in broadcastIn operator which is obsolete and replaced by SharedFlow.
- // BroadcastChannel does not support onBufferOverflow beyond simple conflation
- public open fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> {
- val broadcastCapacity = when (onBufferOverflow) {
- BufferOverflow.SUSPEND -> produceCapacity
- BufferOverflow.DROP_OLDEST -> Channel.CONFLATED
- BufferOverflow.DROP_LATEST ->
- throw IllegalArgumentException("Broadcast channel does not support BufferOverflow.DROP_LATEST")
- }
- return scope.broadcast(context, broadcastCapacity, start, block = collectToFun)
- }
-
/**
* Here we use ATOMIC start for a reason (#1825).
* NB: [produceImpl] is used for [flowOn].
@@ -201,7 +189,7 @@ internal class ChannelFlowOperatorImpl<T>(
override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
ChannelFlowOperatorImpl(flow, context, capacity, onBufferOverflow)
- override fun dropChannelOperators(): Flow<T>? = flow
+ override fun dropChannelOperators(): Flow<T> = flow
override suspend fun flowCollect(collector: FlowCollector<T>) =
flow.collect(collector)