1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package benchmarks.scheduler.actors
import benchmarks.*
import benchmarks.akka.*
import benchmarks.scheduler.actors.StatefulActorBenchmark.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import org.openjdk.jmh.annotations.*
import java.util.concurrent.*
/*
* Noisy benchmarks useful to measure scheduling fairness and migration of affinity-sensitive tasks.
*
* Benchmark: single actor fans out requests to all (#cores count) computation actors and then ping pongs each in loop.
* Fair benchmark expects that every computation actor will receive exactly N messages, unfair expects N * cores messages received in total.
*
* Benchmark (dispatcher) (stateSize) Mode Cnt Score Error Units
* ConcurrentStatefulActorBenchmark.multipleComputationsFair fjp 1024 avgt 5 215.439 ± 29.685 ms/op
* ConcurrentStatefulActorBenchmark.multipleComputationsFair ftp_1 1024 avgt 5 85.374 ± 4.477 ms/op
* ConcurrentStatefulActorBenchmark.multipleComputationsFair ftp_8 1024 avgt 5 418.510 ± 46.906 ms/op
* ConcurrentStatefulActorBenchmark.multipleComputationsFair experimental 1024 avgt 5 165.250 ± 20.309 ms/op
*
* ConcurrentStatefulActorBenchmark.multipleComputationsFair fjp 8192 avgt 5 220.576 ± 35.596 ms/op
* ConcurrentStatefulActorBenchmark.multipleComputationsFair ftp_1 8192 avgt 5 298.276 ± 22.256 ms/op
* ConcurrentStatefulActorBenchmark.multipleComputationsFair ftp_8 8192 avgt 5 426.105 ± 29.870 ms/op
* ConcurrentStatefulActorBenchmark.multipleComputationsFair experimental 8192 avgt 5 288.546 ± 20.280 ms/op
*
* ConcurrentStatefulActorBenchmark.multipleComputationsFair fjp 262144 avgt 5 4146.057 ± 284.377 ms/op
* ConcurrentStatefulActorBenchmark.multipleComputationsFair ftp_1 262144 avgt 5 10250.107 ± 1421.253 ms/op
* ConcurrentStatefulActorBenchmark.multipleComputationsFair ftp_8 262144 avgt 5 6761.283 ± 4091.452 ms/op
* ConcurrentStatefulActorBenchmark.multipleComputationsFair experimental 262144 avgt 5 6521.436 ± 346.726 ms/op
*
* ConcurrentStatefulActorBenchmark.multipleComputationsUnfair fjp 1024 avgt 5 289.875 ± 14.241 ms/op
* ConcurrentStatefulActorBenchmark.multipleComputationsUnfair ftp_1 1024 avgt 5 87.336 ± 5.160 ms/op
* ConcurrentStatefulActorBenchmark.multipleComputationsUnfair ftp_8 1024 avgt 5 430.718 ± 23.497 ms/op
* ConcurrentStatefulActorBenchmark.multipleComputationsUnfair experimental 1024 avgt 5 153.704 ± 13.869 ms/op
*
* ConcurrentStatefulActorBenchmark.multipleComputationsUnfair fjp 8192 avgt 5 289.836 ± 9.719 ms/op
* ConcurrentStatefulActorBenchmark.multipleComputationsUnfair ftp_1 8192 avgt 5 299.523 ± 17.357 ms/op
* ConcurrentStatefulActorBenchmark.multipleComputationsUnfair ftp_8 8192 avgt 5 433.959 ± 27.669 ms/op
* ConcurrentStatefulActorBenchmark.multipleComputationsUnfair experimental 8192 avgt 5 283.441 ± 22.740 ms/op
*
* ConcurrentStatefulActorBenchmark.multipleComputationsUnfair fjp 262144 avgt 5 7804.066 ± 1386.595 ms/op
* ConcurrentStatefulActorBenchmark.multipleComputationsUnfair ftp_1 262144 avgt 5 11142.530 ± 381.401 ms/op
* ConcurrentStatefulActorBenchmark.multipleComputationsUnfair ftp_8 262144 avgt 5 7739.136 ± 1317.885 ms/op
* ConcurrentStatefulActorBenchmark.multipleComputationsUnfair experimental 262144 avgt 5 7076.911 ± 1971.615 ms/op
*
*/
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(value = 1)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
open class ConcurrentStatefulActorBenchmark : ParametrizedDispatcherBase() {
@Param("1024", "8192")
var stateSize: Int = -1
@Param("fjp", "scheduler")
override var dispatcher: String = "fjp"
@Benchmark
fun multipleComputationsUnfair() = runBlocking {
val resultChannel: Channel<Unit> = Channel(1)
val computations = (0 until CORES_COUNT).map { computationActor(stateSize) }
val requestor = requestorActorUnfair(computations, resultChannel)
requestor.send(Letter(Start(), requestor))
resultChannel.receive()
}
@Benchmark
fun multipleComputationsFair() = runBlocking {
val resultChannel: Channel<Unit> = Channel(1)
val computations = (0 until CORES_COUNT).map { computationActor(stateSize) }
val requestor = requestorActorFair(computations, resultChannel)
requestor.send(Letter(Start(), requestor))
resultChannel.receive()
}
fun requestorActorUnfair(
computations: List<SendChannel<Letter>>,
stopChannel: Channel<Unit>
) =
actor<Letter>(capacity = 1024) {
var received = 0
for (letter in channel) with(letter) {
when (message) {
is Start -> {
computations.shuffled()
.forEach { it.send(Letter(ThreadLocalRandom.current().nextLong(), channel)) }
}
is Long -> {
if (++received >= ROUNDS * 8) {
computations.forEach { it.close() }
stopChannel.send(Unit)
return@actor
} else {
sender.send(Letter(ThreadLocalRandom.current().nextLong(), channel))
}
}
else -> error("Cannot happen: $letter")
}
}
}
fun requestorActorFair(
computations: List<SendChannel<Letter>>,
stopChannel: Channel<Unit>
) =
actor<Letter>(capacity = 1024) {
val received = hashMapOf(*computations.map { it to 0 }.toTypedArray())
var receivedTotal = 0
for (letter in channel) with(letter) {
when (message) {
is Start -> {
computations.shuffled()
.forEach { it.send(Letter(ThreadLocalRandom.current().nextLong(), channel)) }
}
is Long -> {
if (++receivedTotal >= ROUNDS * computations.size) {
computations.forEach { it.close() }
stopChannel.send(Unit)
return@actor
} else {
val receivedFromSender = received[sender]!!
if (receivedFromSender <= ROUNDS) {
received[sender] = receivedFromSender + 1
sender.send(Letter(ThreadLocalRandom.current().nextLong(), channel))
}
}
}
else -> error("Cannot happen: $letter")
}
}
}
}
|