/* * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ @file:JvmMultifileClass @file:JvmName("FlowKt") @file:Suppress("unused", "DeprecatedCallableAddReplaceWith", "UNUSED_PARAMETER", "NO_EXPLICIT_RETURN_TYPE_IN_API_MODE") package kotlinx.coroutines.flow import kotlinx.coroutines.* import kotlin.coroutines.* import kotlin.jvm.* /** * **GENERAL NOTE** * * These deprecations are added to improve user experience when they will start to * search for their favourite operators and/or patterns that are missing or renamed in Flow. * Deprecated functions also are moved here when they renamed. The difference is that they have * a body with their implementation while pure stubs have [noImpl]. */ internal fun noImpl(): Nothing = throw UnsupportedOperationException("Not implemented, should not be called") /** * `observeOn` has no direct match in [Flow] API because all terminal flow operators are suspending and * thus use the context of the caller. * * For example, the following code: * ``` * flowable * .observeOn(Schedulers.io()) * .doOnEach { value -> println("Received $value") } * .subscribe() * ``` * * has the following Flow equivalent: * ``` * withContext(Dispatchers.IO) { * flow.collect { value -> println("Received $value") } * } * * ``` * @suppress */ @Deprecated(message = "Collect flow in the desired context instead", level = DeprecationLevel.ERROR) public fun Flow.observeOn(context: CoroutineContext): Flow = noImpl() /** * `publishOn` has no direct match in [Flow] API because all terminal flow operators are suspending and * thus use the context of the caller. * * For example, the following code: * ``` * flux * .publishOn(Schedulers.io()) * .doOnEach { value -> println("Received $value") } * .subscribe() * ``` * * has the following Flow equivalent: * ``` * withContext(Dispatchers.IO) { * flow.collect { value -> println("Received $value") } * } * * ``` * @suppress */ @Deprecated(message = "Collect flow in the desired context instead", level = DeprecationLevel.ERROR) public fun Flow.publishOn(context: CoroutineContext): Flow = noImpl() /** * `subscribeOn` has no direct match in [Flow] API because [Flow] preserves its context and does not leak it. * * For example, the following code: * ``` * flowable * .map { value -> println("Doing map in IO"); value } * .subscribeOn(Schedulers.io()) * .observeOn(Schedulers.computation()) * .doOnEach { value -> println("Processing $value in computation") * .subscribe() * ``` * has the following Flow equivalent: * ``` * withContext(Dispatchers.Default) { * flow * .map { value -> println("Doing map in IO"); value } * .flowOn(Dispatchers.IO) // Works upstream, doesn't change downstream * .collect { value -> * println("Processing $value in computation") * } * } * ``` * Opposed to subscribeOn, it it **possible** to use multiple `flowOn` operators in the one flow * @suppress */ @Deprecated(message = "Use 'flowOn' instead", level = DeprecationLevel.ERROR) public fun Flow.subscribeOn(context: CoroutineContext): Flow = noImpl() /** * Flow analogue of `onErrorXxx` is [catch]. * Use `catch { emitAll(fallback) }`. * @suppress */ @Deprecated( level = DeprecationLevel.ERROR, message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { emitAll(fallback) }'", replaceWith = ReplaceWith("catch { emitAll(fallback) }") ) public fun Flow.onErrorResume(fallback: Flow): Flow = noImpl() /** * Flow analogue of `onErrorXxx` is [catch]. * Use `catch { emitAll(fallback) }`. * @suppress */ @Deprecated( level = DeprecationLevel.ERROR, message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { emitAll(fallback) }'", replaceWith = ReplaceWith("catch { emitAll(fallback) }") ) public fun Flow.onErrorResumeNext(fallback: Flow): Flow = noImpl() /** * `subscribe` is Rx-specific API that has no direct match in flows. * One can use [launchIn] instead, for example the following: * ``` * flowable * .observeOn(Schedulers.io()) * .subscribe({ println("Received $it") }, { println("Exception $it happened") }, { println("Flowable is completed successfully") } * ``` * * has the following Flow equivalent: * ``` * flow * .onEach { value -> println("Received $value") } * .onCompletion { cause -> if (cause == null) println("Flow is completed successfully") } * .catch { cause -> println("Exception $cause happened") } * .flowOn(Dispatchers.IO) * .launchIn(myScope) * ``` * * Note that resulting value of [launchIn] is not used because the provided scope takes care of cancellation. * * Or terminal operators like [single] can be used from suspend functions. * @suppress */ @Deprecated( message = "Use 'launchIn' with 'onEach', 'onCompletion' and 'catch' instead", level = DeprecationLevel.ERROR ) public fun Flow.subscribe(): Unit = noImpl() /** * Use [launchIn] with [onEach], [onCompletion] and [catch] operators instead. * @suppress */ @Deprecated( message = "Use 'launchIn' with 'onEach', 'onCompletion' and 'catch' instead", level = DeprecationLevel.ERROR )public fun Flow.subscribe(onEach: suspend (T) -> Unit): Unit = noImpl() /** * Use [launchIn] with [onEach], [onCompletion] and [catch] operators instead. * @suppress */ @Deprecated( message = "Use 'launchIn' with 'onEach', 'onCompletion' and 'catch' instead", level = DeprecationLevel.ERROR )public fun Flow.subscribe(onEach: suspend (T) -> Unit, onError: suspend (Throwable) -> Unit): Unit = noImpl() /** * Note that this replacement is sequential (`concat`) by default. * For concurrent flatMap [flatMapMerge] can be used instead. * @suppress */ @Deprecated( level = DeprecationLevel.ERROR, message = "Flow analogue is 'flatMapConcat'", replaceWith = ReplaceWith("flatMapConcat(mapper)") ) public fun Flow.flatMap(mapper: suspend (T) -> Flow): Flow = noImpl() /** * Flow analogue of `concatMap` is [flatMapConcat]. * @suppress */ @Deprecated( level = DeprecationLevel.ERROR, message = "Flow analogue of 'concatMap' is 'flatMapConcat'", replaceWith = ReplaceWith("flatMapConcat(mapper)") ) public fun Flow.concatMap(mapper: (T) -> Flow): Flow = noImpl() /** * Note that this replacement is sequential (`concat`) by default. * For concurrent flatMap [flattenMerge] can be used instead. * @suppress */ @Deprecated( level = DeprecationLevel.ERROR, message = "Flow analogue of 'merge' is 'flattenConcat'", replaceWith = ReplaceWith("flattenConcat()") ) public fun Flow>.merge(): Flow = noImpl() /** * Flow analogue of `flatten` is [flattenConcat]. * @suppress */ @Deprecated( level = DeprecationLevel.ERROR, message = "Flow analogue of 'flatten' is 'flattenConcat'", replaceWith = ReplaceWith("flattenConcat()") ) public fun Flow>.flatten(): Flow = noImpl() /** * Kotlin has a built-in generic mechanism for making chained calls. * If you wish to write something like * ``` * myFlow.compose(MyFlowExtensions.ignoreErrors()).collect { ... } * ``` * you can replace it with * * ``` * myFlow.let(MyFlowExtensions.ignoreErrors()).collect { ... } * ``` * @suppress */ @Deprecated( level = DeprecationLevel.ERROR, message = "Flow analogue of 'compose' is 'let'", replaceWith = ReplaceWith("let(transformer)") ) public fun Flow.compose(transformer: Flow.() -> Flow): Flow = noImpl() /** * Flow analogue of `skip` is [drop]. * @suppress */ @Deprecated( level = DeprecationLevel.ERROR, message = "Flow analogue of 'skip' is 'drop'", replaceWith = ReplaceWith("drop(count)") ) public fun Flow.skip(count: Int): Flow = noImpl() /** * Flow extension to iterate over elements is [collect]. * Foreach wasn't introduced deliberately to avoid confusion. * Flow is not a collection, iteration over it may be not idempotent * and can *launch* computations with side-effects. * This behaviour is not reflected in [forEach] name. * @suppress */ @Deprecated( level = DeprecationLevel.ERROR, message = "Flow analogue of 'forEach' is 'collect'", replaceWith = ReplaceWith("collect(block)") ) public fun Flow.forEach(action: suspend (value: T) -> Unit): Unit = noImpl() /** * Flow has less verbose [scan] shortcut. * @suppress */ @Deprecated( level = DeprecationLevel.ERROR, message = "Flow has less verbose 'scan' shortcut", replaceWith = ReplaceWith("scan(initial, operation)") ) public fun Flow.scanFold(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow = noImpl() /** * Flow analogue of `onErrorXxx` is [catch]. * Use `catch { emit(fallback) }`. * @suppress */ @Deprecated( level = DeprecationLevel.ERROR, message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { emit(fallback) }'", replaceWith = ReplaceWith("catch { emit(fallback) }") ) // Note: this version without predicate gives better "replaceWith" action public fun Flow.onErrorReturn(fallback: T): Flow = noImpl() /** * Flow analogue of `onErrorXxx` is [catch]. * Use `catch { e -> if (predicate(e)) emit(fallback) else throw e }`. * @suppress */ @Deprecated( level = DeprecationLevel.ERROR, message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { e -> if (predicate(e)) emit(fallback) else throw e }'", replaceWith = ReplaceWith("catch { e -> if (predicate(e)) emit(fallback) else throw e }") ) public fun Flow.onErrorReturn(fallback: T, predicate: (Throwable) -> Boolean = { true }): Flow = catch { e -> // Note: default value is for binary compatibility with preview version, that is why it has body if (!predicate(e)) throw e emit(fallback) } /** * Flow analogue of `startWith` is [onStart]. * Use `onStart { emit(value) }`. * @suppress */ @Deprecated( level = DeprecationLevel.ERROR, message = "Flow analogue of 'startWith' is 'onStart'. Use 'onStart { emit(value) }'", replaceWith = ReplaceWith("onStart { emit(value) }") ) public fun Flow.startWith(value: T): Flow = noImpl() /** * Flow analogue of `startWith` is [onStart]. * Use `onStart { emitAll(other) }`. * @suppress */ @Deprecated( level = DeprecationLevel.ERROR, message = "Flow analogue of 'startWith' is 'onStart'. Use 'onStart { emitAll(other) }'", replaceWith = ReplaceWith("onStart { emitAll(other) }") ) public fun Flow.startWith(other: Flow): Flow = noImpl() /** * Flow analogue of `concatWith` is [onCompletion]. * Use `onCompletion { emit(value) }`. * @suppress */ @Deprecated( level = DeprecationLevel.ERROR, message = "Flow analogue of 'concatWith' is 'onCompletion'. Use 'onCompletion { emit(value) }'", replaceWith = ReplaceWith("onCompletion { emit(value) }") ) public fun Flow.concatWith(value: T): Flow = noImpl() /** * Flow analogue of `concatWith` is [onCompletion]. * Use `onCompletion { emitAll(other) }`. * @suppress */ @Deprecated( level = DeprecationLevel.ERROR, message = "Flow analogue of 'concatWith' is 'onCompletion'. Use 'onCompletion { emitAll(other) }'", replaceWith = ReplaceWith("onCompletion { emitAll(other) }") ) public fun Flow.concatWith(other: Flow): Flow = noImpl() @Deprecated( level = DeprecationLevel.ERROR, message = "Flow analogue of 'combineLatest' is 'combine'", replaceWith = ReplaceWith("this.combine(other, transform)") ) public fun Flow.combineLatest(other: Flow, transform: suspend (T1, T2) -> R): Flow = combine(this, other, transform) @Deprecated( level = DeprecationLevel.ERROR, message = "Flow analogue of 'combineLatest' is 'combine'", replaceWith = ReplaceWith("combine(this, other, other2, transform)") ) public fun Flow.combineLatest( other: Flow, other2: Flow, transform: suspend (T1, T2, T3) -> R ) = combine(this, other, other2, transform) @Deprecated( level = DeprecationLevel.ERROR, message = "Flow analogue of 'combineLatest' is 'combine'", replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)") ) public fun Flow.combineLatest( other: Flow, other2: Flow, other3: Flow, transform: suspend (T1, T2, T3, T4) -> R ) = combine(this, other, other2, other3, transform) @Deprecated( level = DeprecationLevel.ERROR, message = "Flow analogue of 'combineLatest' is 'combine'", replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)") ) public fun Flow.combineLatest( other: Flow, other2: Flow, other3: Flow, other4: Flow, transform: suspend (T1, T2, T3, T4, T5) -> R ): Flow = combine(this, other, other2, other3, other4, transform) /** * Delays the emission of values from this flow for the given [timeMillis]. * Use `onStart { delay(timeMillis) }`. * @suppress */ @Deprecated( level = DeprecationLevel.WARNING, // since 1.3.0, error in 1.4.0 message = "Use 'onStart { delay(timeMillis) }'", replaceWith = ReplaceWith("onStart { delay(timeMillis) }") ) public fun Flow.delayFlow(timeMillis: Long): Flow = onStart { delay(timeMillis) } /** * Delays each element emitted by the given flow for the given [timeMillis]. * Use `onEach { delay(timeMillis) }`. * @suppress */ @Deprecated( level = DeprecationLevel.WARNING, // since 1.3.0, error in 1.4.0 message = "Use 'onEach { delay(timeMillis) }'", replaceWith = ReplaceWith("onEach { delay(timeMillis) }") ) public fun Flow.delayEach(timeMillis: Long): Flow = onEach { delay(timeMillis) } @Deprecated( level = DeprecationLevel.ERROR, message = "Flow analogues of 'switchMap' are 'transformLatest', 'flatMapLatest' and 'mapLatest'", replaceWith = ReplaceWith("this.flatMapLatest(transform)") ) public fun Flow.switchMap(transform: suspend (value: T) -> Flow): Flow = flatMapLatest(transform) @Deprecated( level = DeprecationLevel.WARNING, // Since 1.3.8, was experimental when deprecated message = "'scanReduce' was renamed to 'runningReduce' to be consistent with Kotlin standard library", replaceWith = ReplaceWith("runningReduce(operation)") ) public fun Flow.scanReduce(operation: suspend (accumulator: T, value: T) -> T): Flow = runningReduce(operation) @Deprecated( level = DeprecationLevel.ERROR, message = "Flow analogue of 'publish()' is 'shareIn'. \n" + "publish().connect() is the default strategy (no extra call is needed), \n" + "publish().autoConnect() translates to 'started = SharingStared.Lazily' argument, \n" + "publish().refCount() translates to 'started = SharingStared.WhileSubscribed()' argument.", replaceWith = ReplaceWith("this.shareIn(scope, 0)") ) public fun Flow.publish(): Flow = noImpl() @Deprecated( level = DeprecationLevel.ERROR, message = "Flow analogue of 'publish(bufferSize)' is 'buffer' followed by 'shareIn'. \n" + "publish().connect() is the default strategy (no extra call is needed), \n" + "publish().autoConnect() translates to 'started = SharingStared.Lazily' argument, \n" + "publish().refCount() translates to 'started = SharingStared.WhileSubscribed()' argument.", replaceWith = ReplaceWith("this.buffer(bufferSize).shareIn(scope, 0)") ) public fun Flow.publish(bufferSize: Int): Flow = noImpl() @Deprecated( level = DeprecationLevel.ERROR, message = "Flow analogue of 'replay()' is 'shareIn' with unlimited replay. \n" + "replay().connect() is the default strategy (no extra call is needed), \n" + "replay().autoConnect() translates to 'started = SharingStared.Lazily' argument, \n" + "replay().refCount() translates to 'started = SharingStared.WhileSubscribed()' argument.", replaceWith = ReplaceWith("this.shareIn(scope, Int.MAX_VALUE)") ) public fun Flow.replay(): Flow = noImpl() @Deprecated( level = DeprecationLevel.ERROR, message = "Flow analogue of 'replay(bufferSize)' is 'shareIn' with the specified replay parameter. \n" + "replay().connect() is the default strategy (no extra call is needed), \n" + "replay().autoConnect() translates to 'started = SharingStared.Lazily' argument, \n" + "replay().refCount() translates to 'started = SharingStared.WhileSubscribed()' argument.", replaceWith = ReplaceWith("this.shareIn(scope, bufferSize)") ) public fun Flow.replay(bufferSize: Int): Flow = noImpl() @Deprecated( level = DeprecationLevel.ERROR, message = "Flow analogue of 'cache()' is 'shareIn' with unlimited replay and 'started = SharingStared.Lazily' argument'", replaceWith = ReplaceWith("this.shareIn(scope, Int.MAX_VALUE, started = SharingStared.Lazily)") ) public fun Flow.cache(): Flow = noImpl()