/* * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.rx3 import io.reactivex.rxjava3.core.* import io.reactivex.rxjava3.disposables.Disposable import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.CancellationException import kotlinx.coroutines.Job import kotlinx.coroutines.suspendCancellableCoroutine import kotlin.coroutines.* // ------------------------ CompletableSource ------------------------ /** * Awaits for completion of this completable without blocking the thread. * Returns `Unit`, or throws the corresponding exception if this completable produces an error. * * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this * suspending function is suspended, this function immediately resumes with [CancellationException] and disposes of its * subscription. */ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont -> subscribe(object : CompletableObserver { override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } override fun onComplete() { cont.resume(Unit) } override fun onError(e: Throwable) { cont.resumeWithException(e) } }) } // ------------------------ MaybeSource ------------------------ /** * Awaits for completion of the [MaybeSource] without blocking the thread. * Returns the resulting value, or `null` if no value is produced, or throws the corresponding exception if this * [MaybeSource] produces an error. * * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this * function immediately resumes with [CancellationException] and disposes of its subscription. */ @Suppress("UNCHECKED_CAST") public suspend fun MaybeSource.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont -> subscribe(object : MaybeObserver { override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } override fun onComplete() { cont.resume(null) } override fun onSuccess(t: T) { cont.resume(t) } override fun onError(error: Throwable) { cont.resumeWithException(error) } }) } /** * Awaits for completion of the [MaybeSource] without blocking the thread. * Returns the resulting value, or throws if either no value is produced or this [MaybeSource] produces an error. * * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this * function immediately resumes with [CancellationException] and disposes of its subscription. * * @throws NoSuchElementException if no elements were produced by this [MaybeSource]. */ public suspend fun MaybeSource.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException() /** * Awaits for completion of the maybe without blocking a thread. * Returns the resulting value, null if no value was produced or throws the corresponding exception if this * maybe had produced error. * * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function * immediately resumes with [CancellationException]. * * ### Deprecation * * Deprecated in favor of [awaitSingleOrNull] in order to reflect that `null` can be returned to denote the absence of * a value, as opposed to throwing in such case. * * @suppress */ @Deprecated( message = "Deprecated in favor of awaitSingleOrNull()", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.awaitSingleOrNull()") ) // Warning since 1.5, error in 1.6, hidden in 1.7 public suspend fun MaybeSource.await(): T? = awaitSingleOrNull() /** * Awaits for completion of the maybe without blocking a thread. * Returns the resulting value, [default] if no value was produced or throws the corresponding exception if this * maybe had produced error. * * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function * immediately resumes with [CancellationException]. * * ### Deprecation * * Deprecated in favor of [awaitSingleOrNull] for naming consistency (see the deprecation of [MaybeSource.await] for * details). * * @suppress */ @Deprecated( message = "Deprecated in favor of awaitSingleOrNull()", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default") ) // Warning since 1.5, error in 1.6, hidden in 1.7 public suspend fun MaybeSource.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default // ------------------------ SingleSource ------------------------ /** * Awaits for completion of the single value response without blocking the thread. * Returns the resulting value, or throws the corresponding exception if this response produces an error. * * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this * function immediately disposes of its subscription and resumes with [CancellationException]. */ public suspend fun SingleSource.await(): T = suspendCancellableCoroutine { cont -> subscribe(object : SingleObserver { override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } override fun onSuccess(t: T) { cont.resume(t) } override fun onError(error: Throwable) { cont.resumeWithException(error) } }) } // ------------------------ ObservableSource ------------------------ /** * Awaits the first value from the given [Observable] without blocking the thread and returns the resulting value, or, * if the observable has produced an error, throws the corresponding exception. * * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this * function immediately disposes of its subscription and resumes with [CancellationException]. * * @throws NoSuchElementException if the observable does not emit any value */ public suspend fun ObservableSource.awaitFirst(): T = awaitOne(Mode.FIRST) /** * Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without * blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the * corresponding exception. * * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this * function immediately disposes of its subscription and resumes with [CancellationException]. */ public suspend fun ObservableSource.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default) /** * Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the * thread, and returns the resulting value, or, if this observable has produced an error, throws the corresponding * exception. * * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this * function immediately disposes of its subscription and resumes with [CancellationException]. */ public suspend fun ObservableSource.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT) /** * Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted, * without blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws * the corresponding exception. * * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this * function immediately disposes of its subscription and resumes with [CancellationException]. */ public suspend fun ObservableSource.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue() /** * Awaits the last value from the given [Observable] without blocking the thread and * returns the resulting value, or, if this observable has produced an error, throws the corresponding exception. * * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this * function immediately disposes of its subscription and resumes with [CancellationException]. * * @throws NoSuchElementException if the observable does not emit any value */ public suspend fun ObservableSource.awaitLast(): T = awaitOne(Mode.LAST) /** * Awaits the single value from the given observable without blocking the thread and returns the resulting value, or, * if this observable has produced an error, throws the corresponding exception. * * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this * function immediately disposes of its subscription and resumes with [CancellationException]. * * @throws NoSuchElementException if the observable does not emit any value * @throws IllegalArgumentException if the observable emits more than one value */ public suspend fun ObservableSource.awaitSingle(): T = awaitOne(Mode.SINGLE) // ------------------------ private ------------------------ internal fun CancellableContinuation<*>.disposeOnCancellation(d: Disposable) = invokeOnCancellation { d.dispose() } private enum class Mode(val s: String) { FIRST("awaitFirst"), FIRST_OR_DEFAULT("awaitFirstOrDefault"), LAST("awaitLast"), SINGLE("awaitSingle"); override fun toString(): String = s } private suspend fun ObservableSource.awaitOne( mode: Mode, default: T? = null ): T = suspendCancellableCoroutine { cont -> subscribe(object : Observer { private lateinit var subscription: Disposable private var value: T? = null private var seenValue = false override fun onSubscribe(sub: Disposable) { subscription = sub cont.invokeOnCancellation { sub.dispose() } } override fun onNext(t: T) { when (mode) { Mode.FIRST, Mode.FIRST_OR_DEFAULT -> { if (!seenValue) { seenValue = true cont.resume(t) subscription.dispose() } } Mode.LAST, Mode.SINGLE -> { if (mode == Mode.SINGLE && seenValue) { if (cont.isActive) cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode")) subscription.dispose() } else { value = t seenValue = true } } } } @Suppress("UNCHECKED_CAST") override fun onComplete() { if (seenValue) { if (cont.isActive) cont.resume(value as T) return } when { mode == Mode.FIRST_OR_DEFAULT -> { cont.resume(default as T) } cont.isActive -> { cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode")) } } } override fun onError(e: Throwable) { cont.resumeWithException(e) } }) }