aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt
diff options
context:
space:
mode:
Diffstat (limited to 'kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt')
-rw-r--r--kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt228
1 files changed, 228 insertions, 0 deletions
diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt
new file mode 100644
index 00000000..bd1ba95d
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt
@@ -0,0 +1,228 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.scheduling
+
+import kotlinx.atomicfu.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.internal.*
+import java.util.concurrent.*
+import kotlin.coroutines.*
+
+/**
+ * Default instance of coroutine dispatcher.
+ */
+internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
+ val IO = blocking(systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)))
+
+ override fun close() {
+ throw UnsupportedOperationException("$DEFAULT_SCHEDULER_NAME cannot be closed")
+ }
+
+ override fun toString(): String = DEFAULT_SCHEDULER_NAME
+
+ @InternalCoroutinesApi
+ @Suppress("UNUSED")
+ public fun toDebugString(): String = super.toString()
+}
+
+/**
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+// TODO make internal (and rename) after complete integration
+@InternalCoroutinesApi
+open class ExperimentalCoroutineDispatcher(
+ private val corePoolSize: Int,
+ private val maxPoolSize: Int,
+ private val idleWorkerKeepAliveNs: Long,
+ private val schedulerName: String = "CoroutineScheduler"
+) : ExecutorCoroutineDispatcher() {
+ constructor(
+ corePoolSize: Int = CORE_POOL_SIZE,
+ maxPoolSize: Int = MAX_POOL_SIZE,
+ schedulerName: String = DEFAULT_SCHEDULER_NAME
+ ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)
+
+ @Deprecated(message = "Binary compatibility for Ktor 1.0-beta", level = DeprecationLevel.HIDDEN)
+ constructor(
+ corePoolSize: Int = CORE_POOL_SIZE,
+ maxPoolSize: Int = MAX_POOL_SIZE
+ ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS)
+
+ override val executor: Executor
+ get() = coroutineScheduler
+
+ // This is variable for test purposes, so that we can reinitialize from clean state
+ private var coroutineScheduler = createScheduler()
+
+ override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
+ try {
+ coroutineScheduler.dispatch(block)
+ } catch (e: RejectedExecutionException) {
+ DefaultExecutor.dispatch(context, block)
+ }
+
+ override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
+ try {
+ coroutineScheduler.dispatch(block, fair = true)
+ } catch (e: RejectedExecutionException) {
+ DefaultExecutor.dispatchYield(context, block)
+ }
+
+ override fun close() = coroutineScheduler.close()
+
+ override fun toString(): String {
+ return "${super.toString()}[scheduler = $coroutineScheduler]"
+ }
+
+ /**
+ * Creates a coroutine execution context with limited parallelism to execute tasks which may potentially block.
+ * Resulting [CoroutineDispatcher] doesn't own any resources (its threads) and provides a view of the original [ExperimentalCoroutineDispatcher],
+ * giving it additional hints to adjust its behaviour.
+ *
+ * @param parallelism parallelism level, indicating how many threads can execute tasks in the resulting dispatcher parallel.
+ */
+ public fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {
+ require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
+ return LimitingDispatcher(this, parallelism, TaskMode.PROBABLY_BLOCKING)
+ }
+
+ /**
+ * Creates a coroutine execution context with limited parallelism to execute CPU-intensive tasks.
+ * Resulting [CoroutineDispatcher] doesn't own any resources (its threads) and provides a view of the original [ExperimentalCoroutineDispatcher],
+ * giving it additional hints to adjust its behaviour.
+ *
+ * @param parallelism parallelism level, indicating how many threads can execute tasks in the resulting dispatcher parallel.
+ */
+ public fun limited(parallelism: Int): CoroutineDispatcher {
+ require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
+ require(parallelism <= corePoolSize) { "Expected parallelism level lesser than core pool size ($corePoolSize), but have $parallelism" }
+ return LimitingDispatcher(this, parallelism, TaskMode.NON_BLOCKING)
+ }
+
+ internal fun dispatchWithContext(block: Runnable, context: TaskContext, fair: Boolean) {
+ try {
+ coroutineScheduler.dispatch(block, context, fair)
+ } catch (e: RejectedExecutionException) {
+ // Context shouldn't be lost here to properly invoke before/after task
+ DefaultExecutor.enqueue(coroutineScheduler.createTask(block, context))
+ }
+ }
+
+ private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
+
+ // fot tests only
+ @Synchronized
+ internal fun usePrivateScheduler() {
+ coroutineScheduler.shutdown(1_000L)
+ coroutineScheduler = createScheduler()
+ }
+
+ // for tests only
+ @Synchronized
+ internal fun shutdown(timeout: Long) {
+ coroutineScheduler.shutdown(timeout)
+ }
+
+ // for tests only
+ internal fun restore() = usePrivateScheduler() // recreate scheduler
+}
+
+private class LimitingDispatcher(
+ val dispatcher: ExperimentalCoroutineDispatcher,
+ val parallelism: Int,
+ override val taskMode: TaskMode
+) : ExecutorCoroutineDispatcher(), TaskContext, Executor {
+
+ private val queue = ConcurrentLinkedQueue<Runnable>()
+ private val inFlightTasks = atomic(0)
+
+ override val executor: Executor
+ get() = this
+
+ override fun execute(command: Runnable) = dispatch(command, false)
+
+ override fun close(): Unit = error("Close cannot be invoked on LimitingBlockingDispatcher")
+
+ override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)
+
+ private fun dispatch(block: Runnable, fair: Boolean) {
+ var taskToSchedule = block
+ while (true) {
+ // Commit in-flight tasks slot
+ val inFlight = inFlightTasks.incrementAndGet()
+
+ // Fast path, if parallelism limit is not reached, dispatch task and return
+ if (inFlight <= parallelism) {
+ dispatcher.dispatchWithContext(taskToSchedule, this, fair)
+ return
+ }
+
+ // Parallelism limit is reached, add task to the queue
+ queue.add(taskToSchedule)
+
+ /*
+ * We're not actually scheduled anything, so rollback committed in-flight task slot:
+ * If the amount of in-flight tasks is still above the limit, do nothing
+ * If the amount of in-flight tasks is lesser than parallelism, then
+ * it's a race with a thread which finished the task from the current context, we should resubmit the first task from the queue
+ * to avoid starvation.
+ *
+ * Race example #1 (TN is N-th thread, R is current in-flight tasks number), execution is sequential:
+ *
+ * T1: submit task, start execution, R == 1
+ * T2: commit slot for next task, R == 2
+ * T1: finish T1, R == 1
+ * T2: submit next task to local queue, decrement R, R == 0
+ * Without retries, task from T2 will be stuck in the local queue
+ */
+ if (inFlightTasks.decrementAndGet() >= parallelism) {
+ return
+ }
+
+ taskToSchedule = queue.poll() ?: return
+ }
+ }
+
+ override fun toString(): String {
+ return "${super.toString()}[dispatcher = $dispatcher]"
+ }
+
+ /**
+ * Tries to dispatch tasks which were blocked due to reaching parallelism limit if there is any.
+ *
+ * Implementation note: blocking tasks are scheduled in a fair manner (to local queue tail) to avoid
+ * non-blocking continuations starvation.
+ * E.g. for
+ * ```
+ * foo()
+ * blocking()
+ * bar()
+ * ```
+ * it's more profitable to execute bar at the end of `blocking` rather than pending blocking task
+ */
+ override fun afterTask() {
+ var next = queue.poll()
+ // If we have pending tasks in current blocking context, dispatch first
+ if (next != null) {
+ dispatcher.dispatchWithContext(next, this, true)
+ return
+ }
+ inFlightTasks.decrementAndGet()
+
+ /*
+ * Re-poll again and try to submit task if it's required otherwise tasks may be stuck in the local queue.
+ * Race example #2 (TN is N-th thread, R is current in-flight tasks number), execution is sequential:
+ * T1: submit task, start execution, R == 1
+ * T2: commit slot for next task, R == 2
+ * T1: finish T1, poll queue (it's still empty), R == 2
+ * T2: submit next task to the local queue, decrement R, R == 1
+ * T1: decrement R, finish. R == 0
+ *
+ * The task from T2 is stuck is the local queue
+ */
+ next = queue.poll() ?: return
+ dispatch(next, true)
+ }
+}