/* * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ @file:JvmMultifileClass @file:JvmName("FlowKt") @file:Suppress("UNCHECKED_CAST") package kotlinx.coroutines.flow import kotlinx.coroutines.* import kotlinx.coroutines.flow.internal.* import kotlin.jvm.* // ------------------ WARNING ------------------ // These emitting operators must use safe flow builder, because they allow // user code to directly emit to the underlying FlowCollector. /** * Applies [transform] function to each value of the given flow. * * The receiver of the `transform` is [FlowCollector] and thus `transform` is a * flexible function that may transform emitted element, skip it or emit it multiple times. * * This operator generalizes [filter] and [map] operators and * can be used as a building block for other operators, for example: * * ``` * fun Flow.skipOddAndDuplicateEven(): Flow = transform { value -> * if (value % 2 == 0) { // Emit only even values, but twice * emit(value) * emit(value) * } // Do nothing if odd * } * ``` */ public inline fun Flow.transform( @BuilderInference crossinline transform: suspend FlowCollector.(value: T) -> Unit ): Flow = flow { // Note: safe flow is used here, because collector is exposed to transform on each operation collect { value -> // kludge, without it Unit will be returned and TCE won't kick in, KT-28938 return@collect transform(value) } } // For internal operator implementation @PublishedApi internal inline fun Flow.unsafeTransform( @BuilderInference crossinline transform: suspend FlowCollector.(value: T) -> Unit ): Flow = unsafeFlow { // Note: unsafe flow is used here, because unsafeTransform is only for internal use collect { value -> // kludge, without it Unit will be returned and TCE won't kick in, KT-28938 return@collect transform(value) } } /** * Returns a flow that invokes the given [action] **before** this flow starts to be collected. * * The [action] is called before the upstream flow is started, so if it is used with a [SharedFlow] * there is **no guarantee** that emissions from the upstream flow that happen inside or immediately * after this `onStart` action will be collected * (see [onSubscription] for an alternative operator on shared flows). * * The receiver of the [action] is [FlowCollector], so `onStart` can emit additional elements. * For example: * * ``` * flowOf("a", "b", "c") * .onStart { emit("Begin") } * .collect { println(it) } // prints Begin, a, b, c * ``` */ public fun Flow.onStart( action: suspend FlowCollector.() -> Unit ): Flow = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke start action val safeCollector = SafeCollector(this, currentCoroutineContext()) try { safeCollector.action() } finally { safeCollector.releaseIntercepted() } collect(this) // directly delegate } /** * Returns a flow that invokes the given [action] **after** the flow is completed or cancelled, passing * the cancellation exception or failure as cause parameter of [action]. * * Conceptually, `onCompletion` is similar to wrapping the flow collection into a `finally` block, * for example the following imperative snippet: * * ``` * try { * myFlow.collect { value -> * println(value) * } * } finally { * println("Done") * } * ``` * * can be replaced with a declarative one using `onCompletion`: * * ``` * myFlow * .onEach { println(it) } * .onCompletion { println("Done") } * .collect() * ``` * * Unlike [catch], this operator reports exception that occur both upstream and downstream * and observe exceptions that are thrown to cancel the flow. Exception is empty if and only if * the flow had fully completed successfully. Conceptually, the following code: * * ``` * myFlow.collect { value -> * println(value) * } * println("Completed successfully") * ``` * * can be replaced with: * * ``` * myFlow * .onEach { println(it) } * .onCompletion { if (it == null) println("Completed successfully") } * .collect() * ``` * * The receiver of the [action] is [FlowCollector] and this operator can be used to emit additional * elements at the end **if it completed successfully**. For example: * * ``` * flowOf("a", "b", "c") * .onCompletion { emit("Done") } * .collect { println(it) } // prints a, b, c, Done * ``` * * In case of failure or cancellation, any attempt to emit additional elements throws the corresponding exception. * Use [catch] if you need to suppress failure and replace it with emission of elements. */ public fun Flow.onCompletion( action: suspend FlowCollector.(cause: Throwable?) -> Unit ): Flow = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke completion action try { collect(this) } catch (e: Throwable) { /* * Use throwing collector to prevent any emissions from the * completion sequence when downstream has failed, otherwise it may * lead to a non-sequential behaviour impossible with `finally` */ ThrowingCollector(e).invokeSafely(action, e) throw e } // Normal completion val sc = SafeCollector(this, currentCoroutineContext()) try { sc.action(null) } finally { sc.releaseIntercepted() } } /** * Invokes the given [action] when this flow completes without emitting any elements. * The receiver of the [action] is [FlowCollector], so `onEmpty` can emit additional elements. * For example: * * ``` * emptyFlow().onEmpty { * emit(1) * emit(2) * }.collect { println(it) } // prints 1, 2 * ``` */ public fun Flow.onEmpty( action: suspend FlowCollector.() -> Unit ): Flow = unsafeFlow { var isEmpty = true collect { isEmpty = false emit(it) } if (isEmpty) { val collector = SafeCollector(this, currentCoroutineContext()) try { collector.action() } finally { collector.releaseIntercepted() } } } private class ThrowingCollector(private val e: Throwable) : FlowCollector { override suspend fun emit(value: Any?) { throw e } } private suspend fun FlowCollector.invokeSafely( action: suspend FlowCollector.(cause: Throwable?) -> Unit, cause: Throwable? ) { try { action(cause) } catch (e: Throwable) { if (cause !== null && cause !== e) e.addSuppressedThrowable(cause) throw e } }