diff options
Diffstat (limited to 'kotlinx-coroutines-core/jvm/src/Executors.kt')
-rw-r--r-- | kotlinx-coroutines-core/jvm/src/Executors.kt | 80 |
1 files changed, 51 insertions, 29 deletions
diff --git a/kotlinx-coroutines-core/jvm/src/Executors.kt b/kotlinx-coroutines-core/jvm/src/Executors.kt index 394304f2..7ea3cc68 100644 --- a/kotlinx-coroutines-core/jvm/src/Executors.kt +++ b/kotlinx-coroutines-core/jvm/src/Executors.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines +import kotlinx.coroutines.flow.* import kotlinx.coroutines.internal.* import java.io.* import java.util.concurrent.* @@ -39,6 +40,22 @@ public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closea /** * Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher]. * + * ## Interaction with [delay] and time-based coroutines. + * + * If the given [ExecutorService] is an instance of [ScheduledExecutorService], then all time-related + * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled + * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding + * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future. + * + * If the given [ExecutorService] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling, + * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order + * to reduce the memory pressure of cancelled coroutines. + * + * If the executor service is neither of this types, the separate internal thread will be used to + * _track_ the delay and time-related executions, but the coroutine itself will still be executed + * on top of the given executor. + * + * ## Rejected execution * If the underlying executor throws [RejectedExecutionException] on * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues), @@ -52,6 +69,23 @@ public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher /** * Converts an instance of [Executor] to an implementation of [CoroutineDispatcher]. * + * ## Interaction with [delay] and time-based coroutines. + * + * If the given [Executor] is an instance of [ScheduledExecutorService], then all time-related + * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled + * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding + * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future. + * + * If the given [Executor] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling, + * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order + * to reduce the memory pressure of cancelled coroutines. + * + * If the executor is neither of this types, the separate internal thread will be used to + * _track_ the delay and time-related executions, but the coroutine itself will still be executed + * on top of the given executor. + * + * ## Rejected execution + * * If the underlying executor throws [RejectedExecutionException] on * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues), @@ -75,18 +109,15 @@ private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher) override fun toString(): String = dispatcher.toString() } -private class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcherBase() { - init { - initFutureCancellation() - } -} +internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay { -internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay { - - private var removesFutureOnCancellation: Boolean = false - - internal fun initFutureCancellation() { - removesFutureOnCancellation = removeFutureOnCancel(executor) + /* + * Attempts to reflectively (to be Java 6 compatible) invoke + * ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy in order to cleanup + * internal scheduler queue on cancellation. + */ + init { + removeFutureOnCancel(executor) } override fun dispatch(context: CoroutineContext, block: Runnable) { @@ -99,17 +130,12 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa } } - /* - * removesFutureOnCancellation is required to avoid memory leak. - * On Java 7+ we reflectively invoke ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true) and we're fine. - * On Java 6 we're scheduling time-based coroutines to our own thread safe heap which supports cancellation. - */ override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { - val future = if (removesFutureOnCancellation) { - scheduleBlock(ResumeUndispatchedRunnable(this, continuation), continuation.context, timeMillis) - } else { - null - } + val future = (executor as? ScheduledExecutorService)?.scheduleBlock( + ResumeUndispatchedRunnable(this, continuation), + continuation.context, + timeMillis + ) // If everything went fine and the scheduling attempt was not rejected -- use it if (future != null) { continuation.cancelFutureOnCancellation(future) @@ -120,20 +146,16 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa } override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - val future = if (removesFutureOnCancellation) { - scheduleBlock(block, context, timeMillis) - } else { - null - } + val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeMillis) return when { future != null -> DisposableFutureHandle(future) else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context) } } - private fun scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? { + private fun ScheduledExecutorService.scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? { return try { - (executor as? ScheduledExecutorService)?.schedule(block, timeMillis, TimeUnit.MILLISECONDS) + schedule(block, timeMillis, TimeUnit.MILLISECONDS) } catch (e: RejectedExecutionException) { cancelJobOnRejection(context, e) null @@ -149,7 +171,7 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa } override fun toString(): String = executor.toString() - override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherBase && other.executor === executor + override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherImpl && other.executor === executor override fun hashCode(): Int = System.identityHashCode(executor) } |