aboutsummaryrefslogtreecommitdiffstats
path: root/benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/ConcurrentStatefulActorBenchmark.kt
blob: 6ac97ad3e77845b915bfd832d6b03e1b963969fa (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
/*
 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
 */

package benchmarks.scheduler.actors

import benchmarks.*
import benchmarks.akka.*
import benchmarks.scheduler.actors.StatefulActorBenchmark.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import org.openjdk.jmh.annotations.*
import java.util.concurrent.*

/*
 * Noisy benchmarks useful to measure scheduling fairness and migration of affinity-sensitive tasks.
 *
 * Benchmark: single actor fans out requests to all (#cores count) computation actors and then ping pongs each in loop.
 * Fair benchmark expects that every computation actor will receive exactly N messages, unfair expects N * cores messages received in total.
 *
 * Benchmark                                                    (dispatcher)  (stateSize)  Mode  Cnt      Score      Error  Units
 * ConcurrentStatefulActorBenchmark.multipleComputationsFair             fjp         1024  avgt    5    215.439 ±   29.685  ms/op
 * ConcurrentStatefulActorBenchmark.multipleComputationsFair           ftp_1         1024  avgt    5     85.374 ±    4.477  ms/op
 * ConcurrentStatefulActorBenchmark.multipleComputationsFair           ftp_8         1024  avgt    5    418.510 ±   46.906  ms/op
 * ConcurrentStatefulActorBenchmark.multipleComputationsFair    experimental         1024  avgt    5    165.250 ±   20.309  ms/op
 *
 * ConcurrentStatefulActorBenchmark.multipleComputationsFair             fjp         8192  avgt    5    220.576 ±   35.596  ms/op
 * ConcurrentStatefulActorBenchmark.multipleComputationsFair           ftp_1         8192  avgt    5    298.276 ±   22.256  ms/op
 * ConcurrentStatefulActorBenchmark.multipleComputationsFair           ftp_8         8192  avgt    5    426.105 ±   29.870  ms/op
 * ConcurrentStatefulActorBenchmark.multipleComputationsFair    experimental         8192  avgt    5    288.546 ±   20.280  ms/op
 *
 * ConcurrentStatefulActorBenchmark.multipleComputationsFair             fjp       262144  avgt    5   4146.057 ±  284.377  ms/op
 * ConcurrentStatefulActorBenchmark.multipleComputationsFair           ftp_1       262144  avgt    5  10250.107 ± 1421.253  ms/op
 * ConcurrentStatefulActorBenchmark.multipleComputationsFair           ftp_8       262144  avgt    5   6761.283 ± 4091.452  ms/op
 * ConcurrentStatefulActorBenchmark.multipleComputationsFair    experimental       262144  avgt    5   6521.436 ±  346.726  ms/op
 *
 * ConcurrentStatefulActorBenchmark.multipleComputationsUnfair           fjp         1024  avgt    5    289.875 ±   14.241  ms/op
 * ConcurrentStatefulActorBenchmark.multipleComputationsUnfair         ftp_1         1024  avgt    5     87.336 ±    5.160  ms/op
 * ConcurrentStatefulActorBenchmark.multipleComputationsUnfair         ftp_8         1024  avgt    5    430.718 ±   23.497  ms/op
 * ConcurrentStatefulActorBenchmark.multipleComputationsUnfair  experimental         1024  avgt    5    153.704 ±   13.869  ms/op
 *
 * ConcurrentStatefulActorBenchmark.multipleComputationsUnfair           fjp         8192  avgt    5    289.836 ±    9.719  ms/op
 * ConcurrentStatefulActorBenchmark.multipleComputationsUnfair         ftp_1         8192  avgt    5    299.523 ±   17.357  ms/op
 * ConcurrentStatefulActorBenchmark.multipleComputationsUnfair         ftp_8         8192  avgt    5    433.959 ±   27.669  ms/op
 * ConcurrentStatefulActorBenchmark.multipleComputationsUnfair  experimental         8192  avgt    5    283.441 ±   22.740  ms/op
 *
 * ConcurrentStatefulActorBenchmark.multipleComputationsUnfair           fjp       262144  avgt    5   7804.066 ± 1386.595  ms/op
 * ConcurrentStatefulActorBenchmark.multipleComputationsUnfair         ftp_1       262144  avgt    5  11142.530 ±  381.401  ms/op
 * ConcurrentStatefulActorBenchmark.multipleComputationsUnfair         ftp_8       262144  avgt    5   7739.136 ± 1317.885  ms/op
 * ConcurrentStatefulActorBenchmark.multipleComputationsUnfair  experimental       262144  avgt    5   7076.911 ± 1971.615  ms/op
 *
 */
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(value = 1)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
open class ConcurrentStatefulActorBenchmark : ParametrizedDispatcherBase() {

    @Param("1024", "8192")
    var stateSize: Int = -1

    @Param("fjp", "scheduler")
    override var dispatcher: String = "fjp"

    @Benchmark
    fun multipleComputationsUnfair() = runBlocking {
        val resultChannel: Channel<Unit> = Channel(1)
        val computations = (0 until CORES_COUNT).map { computationActor(stateSize) }
        val requestor = requestorActorUnfair(computations, resultChannel)
        requestor.send(Letter(Start(), requestor))
        resultChannel.receive()
    }

    @Benchmark
    fun multipleComputationsFair() = runBlocking {
        val resultChannel: Channel<Unit> = Channel(1)
        val computations = (0 until CORES_COUNT).map { computationActor(stateSize) }
        val requestor = requestorActorFair(computations, resultChannel)
        requestor.send(Letter(Start(), requestor))
        resultChannel.receive()
    }

    fun requestorActorUnfair(
        computations: List<SendChannel<Letter>>,
        stopChannel: Channel<Unit>
    ) =
        actor<Letter>(capacity = 1024) {
            var received = 0
            for (letter in channel) with(letter) {
                when (message) {
                    is Start -> {
                        computations.shuffled()
                            .forEach { it.send(Letter(ThreadLocalRandom.current().nextLong(), channel)) }
                    }
                    is Long -> {
                        if (++received >= ROUNDS * 8) {
                            computations.forEach { it.close() }
                            stopChannel.send(Unit)
                            return@actor
                        } else {
                            sender.send(Letter(ThreadLocalRandom.current().nextLong(), channel))
                        }
                    }
                    else -> error("Cannot happen: $letter")
                }
            }
        }

    fun requestorActorFair(
        computations: List<SendChannel<Letter>>,
        stopChannel: Channel<Unit>
    ) =
        actor<Letter>(capacity = 1024) {
            val received = hashMapOf(*computations.map { it to 0 }.toTypedArray())
            var receivedTotal = 0

            for (letter in channel) with(letter) {
                when (message) {
                    is Start -> {
                        computations.shuffled()
                            .forEach { it.send(Letter(ThreadLocalRandom.current().nextLong(), channel)) }
                    }
                    is Long -> {
                        if (++receivedTotal >= ROUNDS * computations.size) {
                            computations.forEach { it.close() }
                            stopChannel.send(Unit)
                            return@actor
                        } else {
                            val receivedFromSender = received[sender]!!
                            if (receivedFromSender <= ROUNDS) {
                                received[sender] = receivedFromSender + 1
                                sender.send(Letter(ThreadLocalRandom.current().nextLong(), channel))
                            }
                        }
                    }
                    else -> error("Cannot happen: $letter")
                }
            }
        }
}