aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
diff options
context:
space:
mode:
Diffstat (limited to 'kotlinx-coroutines-core/common/src/flow/operators/Delay.kt')
-rw-r--r--kotlinx-coroutines-core/common/src/flow/operators/Delay.kt38
1 files changed, 19 insertions, 19 deletions
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
index 6381c467..fed5962b 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
@@ -209,8 +209,7 @@ public fun <T> Flow<T>.debounce(timeout: (T) -> Duration): Flow<T> =
private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long) : Flow<T> =
scopedFlow { downstream ->
// Produce the values using the default (rendezvous) channel
- // Note: the actual type is Any, KT-30796
- val values = produce<Any?> {
+ val values = produce {
collect { value -> send(value ?: NULL) }
}
// Now consume the values
@@ -237,14 +236,15 @@ private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long) : F
lastValue = null // Consume the value
}
}
- // Should be receiveOrClosed when boxing issues are fixed
- values.onReceiveOrNull { value ->
- if (value == null) {
- if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
- lastValue = DONE
- } else {
- lastValue = value
- }
+ values.onReceiveCatching { value ->
+ value
+ .onSuccess { lastValue = it }
+ .onFailure {
+ it?.let { throw it }
+ // If closed normally, emit the latest value
+ if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
+ lastValue = DONE
+ }
}
}
}
@@ -278,21 +278,21 @@ private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long) : F
public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> {
require(periodMillis > 0) { "Sample period should be positive" }
return scopedFlow { downstream ->
- val values = produce<Any?>(capacity = Channel.CONFLATED) {
- // Actually Any, KT-30796
+ val values = produce(capacity = Channel.CONFLATED) {
collect { value -> send(value ?: NULL) }
}
var lastValue: Any? = null
val ticker = fixedPeriodTicker(periodMillis)
while (lastValue !== DONE) {
select<Unit> {
- values.onReceiveOrNull {
- if (it == null) {
- ticker.cancel(ChildCancelledException())
- lastValue = DONE
- } else {
- lastValue = it
- }
+ values.onReceiveCatching { result ->
+ result
+ .onSuccess { lastValue = it }
+ .onFailure {
+ it?.let { throw it }
+ ticker.cancel(ChildCancelledException())
+ lastValue = DONE
+ }
}
// todo: shall be start sampling only when an element arrives or sample aways as here?