aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt
blob: 57007bbdd44104f2db16288d2d119adc987cfea0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
/*
 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
 */

package kotlinx.coroutines.rx3

import io.reactivex.rxjava3.core.*
import io.reactivex.rxjava3.exceptions.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.*
import kotlin.coroutines.*
import kotlinx.coroutines.internal.*

/**
 * Creates cold [observable][Observable] that will run a given [block] in a coroutine.
 * Every time the returned observable is subscribed, it starts a new coroutine.
 *
 * Coroutine emits ([ObservableEmitter.onNext]) values with `send`, completes ([ObservableEmitter.onComplete])
 * when the coroutine completes or channel is explicitly closed and emits error ([ObservableEmitter.onError])
 * if coroutine throws an exception or closes channel with a cause.
 * Unsubscribing cancels running coroutine.
 *
 * Invocations of `send` are suspended appropriately to ensure that `onNext` is not invoked concurrently.
 * Note that Rx 2.x [Observable] **does not support backpressure**.
 *
 * Coroutine context can be specified with [context] argument.
 * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
 * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
 */
public fun <T : Any> rxObservable(
    context: CoroutineContext = EmptyCoroutineContext,
    @BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Observable<T> {
    require(context[Job] === null) { "Observable context cannot contain job in it." +
            "Its lifecycle should be managed via Disposable handle. Had $context" }
    return rxObservableInternal(GlobalScope, context, block)
}

private fun <T : Any> rxObservableInternal(
    scope: CoroutineScope, // support for legacy rxObservable in scope
    context: CoroutineContext,
    block: suspend ProducerScope<T>.() -> Unit
): Observable<T> = Observable.create { subscriber ->
    val newContext = scope.newCoroutineContext(context)
    val coroutine = RxObservableCoroutine(newContext, subscriber)
    subscriber.setCancellable(RxCancellable(coroutine)) // do it first (before starting coroutine), to await unnecessary suspensions
    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}

private const val OPEN = 0        // open channel, still working
private const val CLOSED = -1     // closed, but have not signalled onCompleted/onError yet
private const val SIGNALLED = -2  // already signalled subscriber onCompleted/onError

private class RxObservableCoroutine<T : Any>(
    parentContext: CoroutineContext,
    private val subscriber: ObservableEmitter<T>
) : AbstractCoroutine<Unit>(parentContext, false, true), ProducerScope<T>, SelectClause2<T, SendChannel<T>> {
    override val channel: SendChannel<T> get() = this

    // Mutex is locked while subscriber.onXXX is being invoked
    private val mutex = Mutex()

    private val _signal = atomic(OPEN)

    override val isClosedForSend: Boolean get() = !isActive
    override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause)
    override fun invokeOnClose(handler: (Throwable?) -> Unit) =
        throw UnsupportedOperationException("RxObservableCoroutine doesn't support invokeOnClose")

    override fun trySend(element: T): ChannelResult<Unit> =
        if (!mutex.tryLock()) {
            ChannelResult.failure()
        } else {
            when (val throwable = doLockedNext(element)) {
                null -> ChannelResult.success(Unit)
                else -> ChannelResult.closed(throwable)
            }
        }

    public override suspend fun send(element: T) {
        mutex.lock()
        doLockedNext(element)?.let { throw it }
    }

    override val onSend: SelectClause2<T, SendChannel<T>>
        get() = this

    // registerSelectSend
    @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
    override fun <R> registerSelectClause2(
        select: SelectInstance<R>,
        element: T,
        block: suspend (SendChannel<T>) -> R
    ) {
        mutex.onLock.registerSelectClause2(select, null) {
            doLockedNext(element)?.let { throw it }
            block(this)
        }
    }

    // assert: mutex.isLocked()
    private fun doLockedNext(elem: T): Throwable? {
        // check if already closed for send
        if (!isActive) {
            doLockedSignalCompleted(completionCause, completionCauseHandled)
            return getCancellationException()
        }
        // notify subscriber
        try {
            subscriber.onNext(elem)
        } catch (e: Throwable) {
            val cause = UndeliverableException(e)
            val causeDelivered = close(cause)
            unlockAndCheckCompleted()
            return if (causeDelivered) {
                // `cause` is the reason this channel is closed
                cause
            } else {
                // Someone else closed the channel during `onNext`. We report `cause` as an undeliverable exception.
                handleUndeliverableException(cause, context)
                getCancellationException()
            }
        }
        /*
         * There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
         * happen after this check and before `unlock` (see signalCompleted that does not do anything
         * if it fails to acquire the lock that we are still holding).
         * We have to recheck `isCompleted` after `unlock` anyway.
         */
        unlockAndCheckCompleted()
        return null
    }

    private fun unlockAndCheckCompleted() {
        mutex.unlock()
        // recheck isActive
        if (!isActive && mutex.tryLock())
            doLockedSignalCompleted(completionCause, completionCauseHandled)
    }

    // assert: mutex.isLocked()
    private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) {
        // cancellation failures
        try {
            if (_signal.value == SIGNALLED)
                return
            _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
            @Suppress("INVISIBLE_MEMBER")
            val unwrappedCause = cause?.let { unwrap(it) }
            if (unwrappedCause == null) {
                try {
                    subscriber.onComplete()
                } catch (e: Exception) {
                    handleUndeliverableException(e, context)
                }
            } else if (unwrappedCause is UndeliverableException && !handled) {
                /** Such exceptions are not reported to `onError`, as, according to the reactive specifications,
                 * exceptions thrown from the Subscriber methods must be treated as if the Subscriber was already
                 * cancelled. */
                handleUndeliverableException(cause, context)
            } else if (unwrappedCause !== getCancellationException() || !subscriber.isDisposed) {
                try {
                    /** If the subscriber is already in a terminal state, the error will be signalled to
                     * `RxJavaPlugins.onError`. */
                    subscriber.onError(cause)
                } catch (e: Exception) {
                    cause.addSuppressed(e)
                    handleUndeliverableException(cause, context)
                }
            }
        } finally {
            mutex.unlock()
        }
    }

    private fun signalCompleted(cause: Throwable?, handled: Boolean) {
        if (!_signal.compareAndSet(OPEN, CLOSED)) return // abort, other thread invoked doLockedSignalCompleted
        if (mutex.tryLock()) // if we can acquire the lock
            doLockedSignalCompleted(cause, handled)
    }

    override fun onCompleted(value: Unit) {
        signalCompleted(null, false)
    }

    override fun onCancelled(cause: Throwable, handled: Boolean) {
        signalCompleted(cause, handled)
    }
}