diff options
Diffstat (limited to 'kotlinx-coroutines-core/common/src/flow/operators/Delay.kt')
-rw-r--r-- | kotlinx-coroutines-core/common/src/flow/operators/Delay.kt | 38 |
1 files changed, 19 insertions, 19 deletions
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt index 6381c467..fed5962b 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt @@ -209,8 +209,7 @@ public fun <T> Flow<T>.debounce(timeout: (T) -> Duration): Flow<T> = private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long) : Flow<T> = scopedFlow { downstream -> // Produce the values using the default (rendezvous) channel - // Note: the actual type is Any, KT-30796 - val values = produce<Any?> { + val values = produce { collect { value -> send(value ?: NULL) } } // Now consume the values @@ -237,14 +236,15 @@ private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long) : F lastValue = null // Consume the value } } - // Should be receiveOrClosed when boxing issues are fixed - values.onReceiveOrNull { value -> - if (value == null) { - if (lastValue != null) downstream.emit(NULL.unbox(lastValue)) - lastValue = DONE - } else { - lastValue = value - } + values.onReceiveCatching { value -> + value + .onSuccess { lastValue = it } + .onFailure { + it?.let { throw it } + // If closed normally, emit the latest value + if (lastValue != null) downstream.emit(NULL.unbox(lastValue)) + lastValue = DONE + } } } } @@ -278,21 +278,21 @@ private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long) : F public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> { require(periodMillis > 0) { "Sample period should be positive" } return scopedFlow { downstream -> - val values = produce<Any?>(capacity = Channel.CONFLATED) { - // Actually Any, KT-30796 + val values = produce(capacity = Channel.CONFLATED) { collect { value -> send(value ?: NULL) } } var lastValue: Any? = null val ticker = fixedPeriodTicker(periodMillis) while (lastValue !== DONE) { select<Unit> { - values.onReceiveOrNull { - if (it == null) { - ticker.cancel(ChildCancelledException()) - lastValue = DONE - } else { - lastValue = it - } + values.onReceiveCatching { result -> + result + .onSuccess { lastValue = it } + .onFailure { + it?.let { throw it } + ticker.cancel(ChildCancelledException()) + lastValue = DONE + } } // todo: shall be start sampling only when an element arrives or sample aways as here? |