aboutsummaryrefslogtreecommitdiffstats
path: root/benchmarks/src/jmh/kotlin/benchmarks/ChannelProducerConsumerBenchmark.kt
diff options
context:
space:
mode:
Diffstat (limited to 'benchmarks/src/jmh/kotlin/benchmarks/ChannelProducerConsumerBenchmark.kt')
-rw-r--r--benchmarks/src/jmh/kotlin/benchmarks/ChannelProducerConsumerBenchmark.kt146
1 files changed, 146 insertions, 0 deletions
diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelProducerConsumerBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelProducerConsumerBenchmark.kt
new file mode 100644
index 00000000..77b907f6
--- /dev/null
+++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelProducerConsumerBenchmark.kt
@@ -0,0 +1,146 @@
+package benchmarks
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.selects.select
+import org.openjdk.jmh.annotations.*
+import org.openjdk.jmh.infra.Blackhole
+import java.lang.Integer.max
+import java.util.concurrent.ForkJoinPool
+import java.util.concurrent.Phaser
+import java.util.concurrent.ThreadLocalRandom
+import java.util.concurrent.TimeUnit
+
+
+/**
+ * Benchmark to measure channel algorithm performance in terms of average time per `send-receive` pair;
+ * actually, it measures the time for a batch of such operations separated into the specified number of consumers/producers.
+ * It uses different channels (rendezvous, buffered, unlimited; see [ChannelCreator]) and different dispatchers
+ * (see [DispatcherCreator]). If the [_3_withSelect] property is set, it invokes `send` and
+ * `receive` via [select], waiting on a local dummy channel simultaneously, simulating a "cancellation" channel.
+ *
+ * Please, be patient, this benchmark takes quite a lot of time to complete.
+ */
+@Warmup(iterations = 3, time = 500, timeUnit = TimeUnit.MICROSECONDS)
+@Measurement(iterations = 10, time = 500, timeUnit = TimeUnit.MICROSECONDS)
+@Fork(value = 3)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)
+open class ChannelProducerConsumerBenchmark {
+ @Param
+ private var _0_dispatcher: DispatcherCreator = DispatcherCreator.FORK_JOIN
+
+ @Param
+ private var _1_channel: ChannelCreator = ChannelCreator.RENDEZVOUS
+
+ @Param("0", "1000")
+ private var _2_coroutines: Int = 0
+
+ @Param("false", "true")
+ private var _3_withSelect: Boolean = false
+
+ @Param("1", "2", "4") // local machine
+// @Param("1", "2", "4", "8", "12") // local machine
+// @Param("1", "2", "4", "8", "16", "32", "64", "128", "144") // dasquad
+// @Param("1", "2", "4", "8", "16", "32", "64", "96") // Google Cloud
+ private var _4_parallelism: Int = 0
+
+ private lateinit var dispatcher: CoroutineDispatcher
+ private lateinit var channel: Channel<Int>
+
+ @InternalCoroutinesApi
+ @Setup
+ fun setup() {
+ dispatcher = _0_dispatcher.create(_4_parallelism)
+ channel = _1_channel.create()
+ }
+
+ @Benchmark
+ fun spmc() {
+ if (_2_coroutines != 0) return
+ val producers = max(1, _4_parallelism - 1)
+ val consumers = 1
+ run(producers, consumers)
+ }
+
+ @Benchmark
+ fun mpmc() {
+ val producers = if (_2_coroutines == 0) (_4_parallelism + 1) / 2 else _2_coroutines / 2
+ val consumers = producers
+ run(producers, consumers)
+ }
+
+ private fun run(producers: Int, consumers: Int) {
+ val n = APPROX_BATCH_SIZE / producers * producers
+ val phaser = Phaser(producers + consumers + 1)
+ // Run producers
+ repeat(producers) {
+ GlobalScope.launch(dispatcher) {
+ val dummy = if (_3_withSelect) _1_channel.create() else null
+ repeat(n / producers) {
+ produce(it, dummy)
+ }
+ phaser.arrive()
+ }
+ }
+ // Run consumers
+ repeat(consumers) {
+ GlobalScope.launch(dispatcher) {
+ val dummy = if (_3_withSelect) _1_channel.create() else null
+ repeat(n / consumers) {
+ consume(dummy)
+ }
+ phaser.arrive()
+ }
+ }
+ // Wait until work is done
+ phaser.arriveAndAwaitAdvance()
+ }
+
+ private suspend fun produce(element: Int, dummy: Channel<Int>?) {
+ if (_3_withSelect) {
+ select<Unit> {
+ channel.onSend(element) {}
+ dummy!!.onReceive {}
+ }
+ } else {
+ channel.send(element)
+ }
+ doWork()
+ }
+
+ private suspend fun consume(dummy: Channel<Int>?) {
+ if (_3_withSelect) {
+ select<Unit> {
+ channel.onReceive {}
+ dummy!!.onReceive {}
+ }
+ } else {
+ channel.receive()
+ }
+ doWork()
+ }
+}
+
+enum class DispatcherCreator(val create: (parallelism: Int) -> CoroutineDispatcher) {
+ FORK_JOIN({ parallelism -> ForkJoinPool(parallelism).asCoroutineDispatcher() })
+}
+
+enum class ChannelCreator(private val capacity: Int) {
+ RENDEZVOUS(Channel.RENDEZVOUS),
+// BUFFERED_1(1),
+ BUFFERED_2(2),
+// BUFFERED_4(4),
+ BUFFERED_32(32),
+ BUFFERED_128(128),
+ BUFFERED_UNLIMITED(Channel.UNLIMITED);
+
+ fun create(): Channel<Int> = Channel(capacity)
+}
+
+private fun doWork(): Unit = Blackhole.consumeCPU(ThreadLocalRandom.current().nextLong(WORK_MIN, WORK_MAX))
+
+private const val WORK_MIN = 50L
+private const val WORK_MAX = 100L
+private const val APPROX_BATCH_SIZE = 100000 \ No newline at end of file