aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/common/src/flow/operators/Share.kt
diff options
context:
space:
mode:
Diffstat (limited to 'kotlinx-coroutines-core/common/src/flow/operators/Share.kt')
-rw-r--r--kotlinx-coroutines-core/common/src/flow/operators/Share.kt27
1 files changed, 15 insertions, 12 deletions
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Share.kt b/kotlinx-coroutines-core/common/src/flow/operators/Share.kt
index fe1d7216..4fa74d8e 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Share.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Share.kt
@@ -68,7 +68,7 @@ import kotlin.jvm.*
* ### Upstream completion and error handling
*
* **Normal completion of the upstream flow has no effect on subscribers**, and the sharing coroutine continues to run. If a
- * a strategy like [SharingStarted.WhileSubscribed] is used, then the upstream can get restarted again. If a special
+ * strategy like [SharingStarted.WhileSubscribed] is used, then the upstream can get restarted again. If a special
* action on upstream completion is needed, then an [onCompletion] operator can be used before the
* `shareIn` operator to emit a special value in this case, like this:
*
@@ -144,8 +144,8 @@ public fun <T> Flow<T>.shareIn(
onBufferOverflow = config.onBufferOverflow
)
@Suppress("UNCHECKED_CAST")
- scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)
- return shared.asSharedFlow()
+ val job = scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)
+ return ReadonlySharedFlow(shared, job)
}
private class SharingConfig<T>(
@@ -197,7 +197,7 @@ private fun <T> CoroutineScope.launchSharing(
shared: MutableSharedFlow<T>,
started: SharingStarted,
initialValue: T
-) {
+): Job =
launch(context) { // the single coroutine to rule the sharing
// Optimize common built-in started strategies
when {
@@ -230,7 +230,6 @@ private fun <T> CoroutineScope.launchSharing(
}
}
}
-}
// -------------------------------- stateIn --------------------------------
@@ -303,8 +302,8 @@ public fun <T> Flow<T>.stateIn(
): StateFlow<T> {
val config = configureSharing(1)
val state = MutableStateFlow(initialValue)
- scope.launchSharing(config.context, config.upstream, state, started, initialValue)
- return state.asStateFlow()
+ val job = scope.launchSharing(config.context, config.upstream, state, started, initialValue)
+ return ReadonlyStateFlow(state, job)
}
/**
@@ -332,7 +331,7 @@ private fun <T> CoroutineScope.launchSharingDeferred(
upstream.collect { value ->
state?.let { it.value = value } ?: run {
state = MutableStateFlow(value).also {
- result.complete(it.asStateFlow())
+ result.complete(ReadonlyStateFlow(it, coroutineContext.job))
}
}
}
@@ -351,23 +350,27 @@ private fun <T> CoroutineScope.launchSharingDeferred(
* Represents this mutable shared flow as a read-only shared flow.
*/
public fun <T> MutableSharedFlow<T>.asSharedFlow(): SharedFlow<T> =
- ReadonlySharedFlow(this)
+ ReadonlySharedFlow(this, null)
/**
* Represents this mutable state flow as a read-only state flow.
*/
public fun <T> MutableStateFlow<T>.asStateFlow(): StateFlow<T> =
- ReadonlyStateFlow(this)
+ ReadonlyStateFlow(this, null)
private class ReadonlySharedFlow<T>(
- flow: SharedFlow<T>
+ flow: SharedFlow<T>,
+ @Suppress("unused")
+ private val job: Job? // keeps a strong reference to the job (if present)
) : SharedFlow<T> by flow, CancellableFlow<T>, FusibleFlow<T> {
override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
fuseSharedFlow(context, capacity, onBufferOverflow)
}
private class ReadonlyStateFlow<T>(
- flow: StateFlow<T>
+ flow: StateFlow<T>,
+ @Suppress("unused")
+ private val job: Job? // keeps a strong reference to the job (if present)
) : StateFlow<T> by flow, CancellableFlow<T>, FusibleFlow<T> {
override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
fuseStateFlow(context, capacity, onBufferOverflow)