aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/jvm/src/Executors.kt
blob: 394304f231e815f2397d419351396e69a119cddf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
/*
 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
 */

package kotlinx.coroutines

import kotlinx.coroutines.internal.*
import java.io.*
import java.util.concurrent.*
import kotlin.coroutines.*

/**
 * [CoroutineDispatcher] that has underlying [Executor] for dispatching tasks.
 * Instances of [ExecutorCoroutineDispatcher] should be closed by the owner of the dispatcher.
 *
 * This class is generally used as a bridge between coroutine-based API and
 * asynchronous API that requires an instance of the [Executor].
 */
public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closeable {
    /** @suppress */
    @ExperimentalStdlibApi
    public companion object Key : AbstractCoroutineContextKey<CoroutineDispatcher, ExecutorCoroutineDispatcher>(
        CoroutineDispatcher,
        { it as? ExecutorCoroutineDispatcher })

    /**
     * Underlying executor of current [CoroutineDispatcher].
     */
    public abstract val executor: Executor

    /**
     * Closes this coroutine dispatcher and shuts down its executor.
     *
     * It may throw an exception if this dispatcher is global and cannot be closed.
     */
    public abstract override fun close()
}

/**
 * Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher].
 *
 * If the underlying executor throws [RejectedExecutionException] on
 * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
 * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
 * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the
 * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete.
 */
@JvmName("from") // this is for a nice Java API, see issue #255
public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher =
    ExecutorCoroutineDispatcherImpl(this)

/**
 * Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
 *
 * If the underlying executor throws [RejectedExecutionException] on
 * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
 * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
 * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the
 * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete.
 */
@JvmName("from") // this is for a nice Java API, see issue #255
public fun Executor.asCoroutineDispatcher(): CoroutineDispatcher =
    (this as? DispatcherExecutor)?.dispatcher ?: ExecutorCoroutineDispatcherImpl(this)

/**
 * Converts an instance of [CoroutineDispatcher] to an implementation of [Executor].
 *
 * It returns the original executor when used on the result of [Executor.asCoroutineDispatcher] extensions.
 */
public fun CoroutineDispatcher.asExecutor(): Executor =
    (this as? ExecutorCoroutineDispatcher)?.executor ?: DispatcherExecutor(this)

private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher) : Executor {
    override fun execute(block: Runnable) = dispatcher.dispatch(EmptyCoroutineContext, block)
    override fun toString(): String = dispatcher.toString()
}

private class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcherBase() {
    init {
        initFutureCancellation()
    }
}

internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay {

    private var removesFutureOnCancellation: Boolean = false

    internal fun initFutureCancellation() {
        removesFutureOnCancellation = removeFutureOnCancel(executor)
    }

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        try {
            executor.execute(wrapTask(block))
        } catch (e: RejectedExecutionException) {
            unTrackTask()
            cancelJobOnRejection(context, e)
            Dispatchers.IO.dispatch(context, block)
        }
    }

    /*
     * removesFutureOnCancellation is required to avoid memory leak.
     * On Java 7+ we reflectively invoke ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true) and we're fine.
     * On Java 6 we're scheduling time-based coroutines to our own thread safe heap which supports cancellation.
     */
    override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
        val future = if (removesFutureOnCancellation) {
            scheduleBlock(ResumeUndispatchedRunnable(this, continuation), continuation.context, timeMillis)
        } else {
            null
        }
        // If everything went fine and the scheduling attempt was not rejected -- use it
        if (future != null) {
            continuation.cancelFutureOnCancellation(future)
            return
        }
        // Otherwise fallback to default executor
        DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation)
    }

    override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
        val future = if (removesFutureOnCancellation) {
            scheduleBlock(block, context, timeMillis)
        } else {
            null
        }
        return when {
            future != null -> DisposableFutureHandle(future)
            else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context)
        }
    }

    private fun scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? {
        return try {
            (executor as? ScheduledExecutorService)?.schedule(block, timeMillis, TimeUnit.MILLISECONDS)
        } catch (e: RejectedExecutionException) {
            cancelJobOnRejection(context, e)
            null
        }
    }

    private fun cancelJobOnRejection(context: CoroutineContext, exception: RejectedExecutionException) {
        context.cancel(CancellationException("The task was rejected", exception))
    }

    override fun close() {
        (executor as? ExecutorService)?.shutdown()
    }

    override fun toString(): String = executor.toString()
    override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherBase && other.executor === executor
    override fun hashCode(): Int = System.identityHashCode(executor)
}

private class ResumeUndispatchedRunnable(
    private val dispatcher: CoroutineDispatcher,
    private val continuation: CancellableContinuation<Unit>
) : Runnable {
    override fun run() {
        with(continuation) { dispatcher.resumeUndispatched(Unit) }
    }
}

/**
 * An implementation of [DisposableHandle] that cancels the specified future on dispose.
 * @suppress **This is unstable API and it is subject to change.**
 */
private class DisposableFutureHandle(private val future: Future<*>) : DisposableHandle {
    override fun dispose() {
        future.cancel(false)
    }
    override fun toString(): String = "DisposableFutureHandle[$future]"
}