diff options
Diffstat (limited to 'kotlinx-coroutines-core/common/src/flow/operators/Share.kt')
-rw-r--r-- | kotlinx-coroutines-core/common/src/flow/operators/Share.kt | 27 |
1 files changed, 15 insertions, 12 deletions
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Share.kt b/kotlinx-coroutines-core/common/src/flow/operators/Share.kt index fe1d7216..4fa74d8e 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Share.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Share.kt @@ -68,7 +68,7 @@ import kotlin.jvm.* * ### Upstream completion and error handling * * **Normal completion of the upstream flow has no effect on subscribers**, and the sharing coroutine continues to run. If a - * a strategy like [SharingStarted.WhileSubscribed] is used, then the upstream can get restarted again. If a special + * strategy like [SharingStarted.WhileSubscribed] is used, then the upstream can get restarted again. If a special * action on upstream completion is needed, then an [onCompletion] operator can be used before the * `shareIn` operator to emit a special value in this case, like this: * @@ -144,8 +144,8 @@ public fun <T> Flow<T>.shareIn( onBufferOverflow = config.onBufferOverflow ) @Suppress("UNCHECKED_CAST") - scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T) - return shared.asSharedFlow() + val job = scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T) + return ReadonlySharedFlow(shared, job) } private class SharingConfig<T>( @@ -197,7 +197,7 @@ private fun <T> CoroutineScope.launchSharing( shared: MutableSharedFlow<T>, started: SharingStarted, initialValue: T -) { +): Job = launch(context) { // the single coroutine to rule the sharing // Optimize common built-in started strategies when { @@ -230,7 +230,6 @@ private fun <T> CoroutineScope.launchSharing( } } } -} // -------------------------------- stateIn -------------------------------- @@ -303,8 +302,8 @@ public fun <T> Flow<T>.stateIn( ): StateFlow<T> { val config = configureSharing(1) val state = MutableStateFlow(initialValue) - scope.launchSharing(config.context, config.upstream, state, started, initialValue) - return state.asStateFlow() + val job = scope.launchSharing(config.context, config.upstream, state, started, initialValue) + return ReadonlyStateFlow(state, job) } /** @@ -332,7 +331,7 @@ private fun <T> CoroutineScope.launchSharingDeferred( upstream.collect { value -> state?.let { it.value = value } ?: run { state = MutableStateFlow(value).also { - result.complete(it.asStateFlow()) + result.complete(ReadonlyStateFlow(it, coroutineContext.job)) } } } @@ -351,23 +350,27 @@ private fun <T> CoroutineScope.launchSharingDeferred( * Represents this mutable shared flow as a read-only shared flow. */ public fun <T> MutableSharedFlow<T>.asSharedFlow(): SharedFlow<T> = - ReadonlySharedFlow(this) + ReadonlySharedFlow(this, null) /** * Represents this mutable state flow as a read-only state flow. */ public fun <T> MutableStateFlow<T>.asStateFlow(): StateFlow<T> = - ReadonlyStateFlow(this) + ReadonlyStateFlow(this, null) private class ReadonlySharedFlow<T>( - flow: SharedFlow<T> + flow: SharedFlow<T>, + @Suppress("unused") + private val job: Job? // keeps a strong reference to the job (if present) ) : SharedFlow<T> by flow, CancellableFlow<T>, FusibleFlow<T> { override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) = fuseSharedFlow(context, capacity, onBufferOverflow) } private class ReadonlyStateFlow<T>( - flow: StateFlow<T> + flow: StateFlow<T>, + @Suppress("unused") + private val job: Job? // keeps a strong reference to the job (if present) ) : StateFlow<T> by flow, CancellableFlow<T>, FusibleFlow<T> { override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) = fuseStateFlow(context, capacity, onBufferOverflow) |