aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-rx2/src/RxAwait.kt')
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxAwait.kt221
1 files changed, 221 insertions, 0 deletions
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt b/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt
new file mode 100644
index 00000000..ce1040e9
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt
@@ -0,0 +1,221 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx2
+
+import io.reactivex.*
+import io.reactivex.disposables.Disposable
+import kotlinx.coroutines.CancellableContinuation
+import kotlinx.coroutines.CancellationException
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.suspendCancellableCoroutine
+import kotlin.coroutines.*
+
+// ------------------------ CompletableSource ------------------------
+
+/**
+ * Awaits for completion of this completable without blocking a thread.
+ * Returns `Unit` or throws the corresponding exception if this completable had produced error.
+ *
+ * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this
+ * suspending function is suspended, this function immediately resumes with [CancellationException].
+ */
+public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
+ subscribe(object : CompletableObserver {
+ override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
+ override fun onComplete() { cont.resume(Unit) }
+ override fun onError(e: Throwable) { cont.resumeWithException(e) }
+ })
+}
+
+// ------------------------ MaybeSource ------------------------
+
+/**
+ * Awaits for completion of the maybe without blocking a thread.
+ * Returns the resulting value, null if no value was produced or throws the corresponding exception if this
+ * maybe had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ */
+@Suppress("UNCHECKED_CAST")
+public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).awaitOrDefault(null)
+
+/**
+ * Awaits for completion of the maybe without blocking a thread.
+ * Returns the resulting value, [default] if no value was produced or throws the corresponding exception if this
+ * maybe had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ */
+public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = suspendCancellableCoroutine { cont ->
+ subscribe(object : MaybeObserver<T> {
+ override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
+ override fun onComplete() { cont.resume(default) }
+ override fun onSuccess(t: T) { cont.resume(t) }
+ override fun onError(error: Throwable) { cont.resumeWithException(error) }
+ })
+}
+
+// ------------------------ SingleSource ------------------------
+
+/**
+ * Awaits for completion of the single value without blocking a thread.
+ * Returns the resulting value or throws the corresponding exception if this single had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ */
+public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
+ subscribe(object : SingleObserver<T> {
+ override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
+ override fun onSuccess(t: T) { cont.resume(t) }
+ override fun onError(error: Throwable) { cont.resumeWithException(error) }
+ })
+}
+
+// ------------------------ ObservableSource ------------------------
+
+/**
+ * Awaits for the first value from the given observable without blocking a thread.
+ * Returns the resulting value or throws the corresponding exception if this observable had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ *
+ * @throws NoSuchElementException if observable does not emit any value
+ */
+public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
+
+/**
+ * Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
+ * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ */
+public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
+
+/**
+ * Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
+ * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ */
+public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
+
+/**
+ * Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
+ * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ */
+public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
+
+/**
+ * Awaits for the last value from the given observable without blocking a thread.
+ * Returns the resulting value or throws the corresponding exception if this observable had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ *
+ * @throws NoSuchElementException if observable does not emit any value
+ */
+public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST)
+
+/**
+ * Awaits for the single value from the given observable without blocking a thread.
+ * Returns the resulting value or throws the corresponding exception if this observable had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ *
+ * @throws NoSuchElementException if observable does not emit any value
+ * @throws IllegalArgumentException if observable emits more than one value
+ */
+public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
+
+// ------------------------ private ------------------------
+
+internal fun CancellableContinuation<*>.disposeOnCancellation(d: Disposable) =
+ invokeOnCancellation { d.dispose() }
+
+private enum class Mode(val s: String) {
+ FIRST("awaitFirst"),
+ FIRST_OR_DEFAULT("awaitFirstOrDefault"),
+ LAST("awaitLast"),
+ SINGLE("awaitSingle");
+ override fun toString(): String = s
+}
+
+private suspend fun <T> ObservableSource<T>.awaitOne(
+ mode: Mode,
+ default: T? = null
+): T = suspendCancellableCoroutine { cont ->
+ subscribe(object : Observer<T> {
+ private lateinit var subscription: Disposable
+ private var value: T? = null
+ private var seenValue = false
+
+ override fun onSubscribe(sub: Disposable) {
+ subscription = sub
+ cont.invokeOnCancellation { sub.dispose() }
+ }
+
+ override fun onNext(t: T) {
+ when (mode) {
+ Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
+ if (!seenValue) {
+ seenValue = true
+ cont.resume(t)
+ subscription.dispose()
+ }
+ }
+ Mode.LAST, Mode.SINGLE -> {
+ if (mode == Mode.SINGLE && seenValue) {
+ if (cont.isActive)
+ cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
+ subscription.dispose()
+ } else {
+ value = t
+ seenValue = true
+ }
+ }
+ }
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ override fun onComplete() {
+ if (seenValue) {
+ if (cont.isActive) cont.resume(value as T)
+ return
+ }
+ when {
+ mode == Mode.FIRST_OR_DEFAULT -> {
+ cont.resume(default as T)
+ }
+ cont.isActive -> {
+ cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
+ }
+ }
+ }
+
+ override fun onError(e: Throwable) {
+ cont.resumeWithException(e)
+ }
+ })
+}
+