aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/jvm/src/Executors.kt
diff options
context:
space:
mode:
Diffstat (limited to 'kotlinx-coroutines-core/jvm/src/Executors.kt')
-rw-r--r--kotlinx-coroutines-core/jvm/src/Executors.kt80
1 files changed, 51 insertions, 29 deletions
diff --git a/kotlinx-coroutines-core/jvm/src/Executors.kt b/kotlinx-coroutines-core/jvm/src/Executors.kt
index 394304f2..7ea3cc68 100644
--- a/kotlinx-coroutines-core/jvm/src/Executors.kt
+++ b/kotlinx-coroutines-core/jvm/src/Executors.kt
@@ -4,6 +4,7 @@
package kotlinx.coroutines
+import kotlinx.coroutines.flow.*
import kotlinx.coroutines.internal.*
import java.io.*
import java.util.concurrent.*
@@ -39,6 +40,22 @@ public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closea
/**
* Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher].
*
+ * ## Interaction with [delay] and time-based coroutines.
+ *
+ * If the given [ExecutorService] is an instance of [ScheduledExecutorService], then all time-related
+ * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled
+ * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding
+ * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future.
+ *
+ * If the given [ExecutorService] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling,
+ * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order
+ * to reduce the memory pressure of cancelled coroutines.
+ *
+ * If the executor service is neither of this types, the separate internal thread will be used to
+ * _track_ the delay and time-related executions, but the coroutine itself will still be executed
+ * on top of the given executor.
+ *
+ * ## Rejected execution
* If the underlying executor throws [RejectedExecutionException] on
* attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
* resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
@@ -52,6 +69,23 @@ public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher
/**
* Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
*
+ * ## Interaction with [delay] and time-based coroutines.
+ *
+ * If the given [Executor] is an instance of [ScheduledExecutorService], then all time-related
+ * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled
+ * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding
+ * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future.
+ *
+ * If the given [Executor] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling,
+ * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order
+ * to reduce the memory pressure of cancelled coroutines.
+ *
+ * If the executor is neither of this types, the separate internal thread will be used to
+ * _track_ the delay and time-related executions, but the coroutine itself will still be executed
+ * on top of the given executor.
+ *
+ * ## Rejected execution
+ *
* If the underlying executor throws [RejectedExecutionException] on
* attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
* resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
@@ -75,18 +109,15 @@ private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher)
override fun toString(): String = dispatcher.toString()
}
-private class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcherBase() {
- init {
- initFutureCancellation()
- }
-}
+internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay {
-internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay {
-
- private var removesFutureOnCancellation: Boolean = false
-
- internal fun initFutureCancellation() {
- removesFutureOnCancellation = removeFutureOnCancel(executor)
+ /*
+ * Attempts to reflectively (to be Java 6 compatible) invoke
+ * ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy in order to cleanup
+ * internal scheduler queue on cancellation.
+ */
+ init {
+ removeFutureOnCancel(executor)
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
@@ -99,17 +130,12 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
}
}
- /*
- * removesFutureOnCancellation is required to avoid memory leak.
- * On Java 7+ we reflectively invoke ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true) and we're fine.
- * On Java 6 we're scheduling time-based coroutines to our own thread safe heap which supports cancellation.
- */
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
- val future = if (removesFutureOnCancellation) {
- scheduleBlock(ResumeUndispatchedRunnable(this, continuation), continuation.context, timeMillis)
- } else {
- null
- }
+ val future = (executor as? ScheduledExecutorService)?.scheduleBlock(
+ ResumeUndispatchedRunnable(this, continuation),
+ continuation.context,
+ timeMillis
+ )
// If everything went fine and the scheduling attempt was not rejected -- use it
if (future != null) {
continuation.cancelFutureOnCancellation(future)
@@ -120,20 +146,16 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
}
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
- val future = if (removesFutureOnCancellation) {
- scheduleBlock(block, context, timeMillis)
- } else {
- null
- }
+ val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeMillis)
return when {
future != null -> DisposableFutureHandle(future)
else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context)
}
}
- private fun scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? {
+ private fun ScheduledExecutorService.scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? {
return try {
- (executor as? ScheduledExecutorService)?.schedule(block, timeMillis, TimeUnit.MILLISECONDS)
+ schedule(block, timeMillis, TimeUnit.MILLISECONDS)
} catch (e: RejectedExecutionException) {
cancelJobOnRejection(context, e)
null
@@ -149,7 +171,7 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
}
override fun toString(): String = executor.toString()
- override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherBase && other.executor === executor
+ override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherImpl && other.executor === executor
override fun hashCode(): Int = System.identityHashCode(executor)
}