aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-rx2/src/RxAwait.kt')
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxAwait.kt142
1 files changed, 96 insertions, 46 deletions
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt b/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt
index 6e162c9a..0e0b47eb 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt
@@ -15,11 +15,12 @@ import kotlin.coroutines.*
// ------------------------ CompletableSource ------------------------
/**
- * Awaits for completion of this completable without blocking a thread.
- * Returns `Unit` or throws the corresponding exception if this completable had produced error.
+ * Awaits for completion of this completable without blocking the thread.
+ * Returns `Unit`, or throws the corresponding exception if this completable produces an error.
*
* This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this
- * suspending function is suspended, this function immediately resumes with [CancellationException].
+ * suspending function is suspended, this function immediately resumes with [CancellationException] and disposes of its
+ * subscription.
*/
public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
subscribe(object : CompletableObserver {
@@ -32,6 +33,37 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine
// ------------------------ MaybeSource ------------------------
/**
+ * Awaits for completion of the [MaybeSource] without blocking the thread.
+ * Returns the resulting value, or `null` if no value is produced, or throws the corresponding exception if this
+ * [MaybeSource] produces an error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
+ * function immediately resumes with [CancellationException] and disposes of its subscription.
+ */
+@Suppress("UNCHECKED_CAST")
+public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
+ subscribe(object : MaybeObserver<T> {
+ override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
+ override fun onComplete() { cont.resume(null) }
+ override fun onSuccess(t: T) { cont.resume(t) }
+ override fun onError(error: Throwable) { cont.resumeWithException(error) }
+ })
+}
+
+/**
+ * Awaits for completion of the [MaybeSource] without blocking the thread.
+ * Returns the resulting value, or throws if either no value is produced or this [MaybeSource] produces an error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
+ * function immediately resumes with [CancellationException] and disposes of its subscription.
+ *
+ * @throws NoSuchElementException if no elements were produced by this [MaybeSource].
+ */
+public suspend fun <T> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException()
+
+/**
* Awaits for completion of the maybe without blocking a thread.
* Returns the resulting value, null if no value was produced or throws the corresponding exception if this
* maybe had produced error.
@@ -39,9 +71,19 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
+ *
+ * ### Deprecation
+ *
+ * Deprecated in favor of [awaitSingleOrNull] in order to reflect that `null` can be returned to denote the absence of
+ * a value, as opposed to throwing in such case.
+ * @suppress
*/
-@Suppress("UNCHECKED_CAST")
-public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).awaitOrDefault(null)
+@Deprecated(
+ message = "Deprecated in favor of awaitSingleOrNull()",
+ level = DeprecationLevel.WARNING,
+ replaceWith = ReplaceWith("this.awaitSingleOrNull()")
+) // Warning since 1.5, error in 1.6, hidden in 1.7
+public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull()
/**
* Awaits for completion of the maybe without blocking a thread.
@@ -51,25 +93,29 @@ public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).aw
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
+ *
+ * ### Deprecation
+ *
+ * Deprecated in favor of [awaitSingleOrNull] for naming consistency (see the deprecation of [MaybeSource.await] for
+ * details).
+ * @suppress
*/
-public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = suspendCancellableCoroutine { cont ->
- subscribe(object : MaybeObserver<T> {
- override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
- override fun onComplete() { cont.resume(default) }
- override fun onSuccess(t: T) { cont.resume(t) }
- override fun onError(error: Throwable) { cont.resumeWithException(error) }
- })
-}
+@Deprecated(
+ message = "Deprecated in favor of awaitSingleOrNull()",
+ level = DeprecationLevel.WARNING,
+ replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default")
+) // Warning since 1.5, error in 1.6, hidden in 1.7
+public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default
// ------------------------ SingleSource ------------------------
/**
- * Awaits for completion of the single value without blocking a thread.
- * Returns the resulting value or throws the corresponding exception if this single had produced error.
+ * Awaits for completion of the single value response without blocking the thread.
+ * Returns the resulting value, or throws the corresponding exception if this response produces an error.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*/
public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
subscribe(object : SingleObserver<T> {
@@ -82,69 +128,73 @@ public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine
// ------------------------ ObservableSource ------------------------
/**
- * Awaits for the first value from the given observable without blocking a thread.
- * Returns the resulting value or throws the corresponding exception if this observable had produced error.
+ * Awaits the first value from the given [Observable] without blocking the thread and returns the resulting value, or,
+ * if the observable has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*
- * @throws NoSuchElementException if observable does not emit any value
+ * @throws NoSuchElementException if the observable does not emit any value
*/
public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
/**
- * Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
- * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
+ * Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without
+ * blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the
+ * corresponding exception.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*/
public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
/**
- * Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
- * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
+ * Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the
+ * thread, and returns the resulting value, or, if this observable has produced an error, throws the corresponding
+ * exception.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*/
public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
/**
- * Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
- * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
+ * Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted,
+ * without blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws
+ * the corresponding exception.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*/
-public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
+public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T =
+ awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
/**
- * Awaits for the last value from the given observable without blocking a thread.
- * Returns the resulting value or throws the corresponding exception if this observable had produced error.
+ * Awaits the last value from the given [Observable] without blocking the thread and
+ * returns the resulting value, or, if this observable has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*
- * @throws NoSuchElementException if observable does not emit any value
+ * @throws NoSuchElementException if the observable does not emit any value
*/
public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST)
/**
- * Awaits for the single value from the given observable without blocking a thread.
- * Returns the resulting value or throws the corresponding exception if this observable had produced error.
+ * Awaits the single value from the given observable without blocking the thread and returns the resulting value, or,
+ * if this observable has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*
- * @throws NoSuchElementException if observable does not emit any value
- * @throws IllegalArgumentException if observable emits more than one value
+ * @throws NoSuchElementException if the observable does not emit any value
+ * @throws IllegalArgumentException if the observable emits more than one value
*/
public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)