aboutsummaryrefslogtreecommitdiffstats
path: root/benchmarks/src/jmh/kotlin/benchmarks/actors/StatefulActorBenchmark.kt
blob: 6968c8952d75b2eea0b06859cb996545e94b287e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
/*
 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
 */

package benchmarks.actors

import benchmarks.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import org.openjdk.jmh.annotations.*
import java.util.concurrent.*

/*
 * kotlinx-based counterpart of [StatefulActorAkkaBenchmark]
 *
 * Benchmark                                                      (dispatcher)  Mode  Cnt    Score    Error  Units
 * StatefulActorBenchmark.multipleComputationsMultipleRequestors           fjp  avgt   10   81.649 ±  9.671  ms/op
 * StatefulActorBenchmark.multipleComputationsMultipleRequestors         ftp_1  avgt   10  160.590 ± 50.342  ms/op
 * StatefulActorBenchmark.multipleComputationsMultipleRequestors         ftp_8  avgt   10  275.798 ± 32.795  ms/op
 *
 * StatefulActorBenchmark.multipleComputationsSingleRequestor              fjp  avgt   10   67.206 ±  4.023  ms/op
 * StatefulActorBenchmark.multipleComputationsSingleRequestor            ftp_1  avgt   10   17.883 ±  1.314  ms/op
 * StatefulActorBenchmark.multipleComputationsSingleRequestor            ftp_8  avgt   10   77.052 ± 10.132  ms/op
 *
 * StatefulActorBenchmark.singleComputationMultipleRequestors              fjp  avgt   10  488.003 ± 53.014  ms/op
 * StatefulActorBenchmark.singleComputationMultipleRequestors            ftp_1  avgt   10  120.445 ± 24.659  ms/op
 * StatefulActorBenchmark.singleComputationMultipleRequestors            ftp_8  avgt   10  527.118 ± 51.139  ms/op
 *
 * StatefulActorBenchmark.singleComputationSingleRequestor                 fjp  avgt   10   95.030 ± 23.850  ms/op
 * StatefulActorBenchmark.singleComputationSingleRequestor               ftp_1  avgt   10   16.005 ±  0.629  ms/op
 * StatefulActorBenchmark.singleComputationSingleRequestor               ftp_8  avgt   10   76.435 ±  5.076  ms/op
 */
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(value = 2)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
open class StatefulActorBenchmark : ParametrizedDispatcherBase() {

    data class Letter(val message: Any, val sender: Channel<Letter>)

    @Param("fjp", "ftp_1", "ftp_8", "experimental")
    override var dispatcher: String = "fjp"

    @Benchmark
    fun singleComputationSingleRequestor() = runBlocking {
        run(1, 1)
    }

    @Benchmark
    fun singleComputationMultipleRequestors() = runBlocking {
        run(1, CORES_COUNT)
    }

    @Benchmark
    fun multipleComputationsSingleRequestor() = runBlocking {
        run(CORES_COUNT, 1)
    }

    @Benchmark
    fun multipleComputationsMultipleRequestors() = runBlocking {
        run(CORES_COUNT, CORES_COUNT)
    }

    private suspend fun run(computationActorsCount: Int, requestorActorsCount: Int) {
        val resultChannel: Channel<Unit> = Channel(requestorActorsCount)
        val computations = (0 until computationActorsCount).map { computationActor() }
        val requestors = (0 until requestorActorsCount).map { requestorActor(computations, resultChannel) }

        for (requestor in requestors) {
            requestor.send(Letter(1L, Channel()))
        }

        repeat(requestorActorsCount) {
            resultChannel.receive()
        }
    }

    private fun CoroutineScope.requestorActor(computations: List<SendChannel<Letter>>, stopChannel: Channel<Unit>) =
        actor<Letter>(capacity = 1024) {
            var received = 0
            for (letter in channel) with(letter) {
                when (message) {
                    is Long -> {
                        if (++received >= ROUNDS) {
                            stopChannel.send(Unit)
                            return@actor
                        } else {
                            computations[ThreadLocalRandom.current().nextInt(0, computations.size)]
                                    .send(Letter(ThreadLocalRandom.current().nextLong(), channel))
                        }
                    }
                    else -> error("Cannot happen: $letter")
                }
            }
        }
}

fun CoroutineScope.computationActor(stateSize: Int = STATE_SIZE) =
    actor<StatefulActorBenchmark.Letter>(capacity = 1024) {
        val coefficients = LongArray(stateSize) { ThreadLocalRandom.current().nextLong(0, 100) }

        for (letter in channel) with(letter) {
            when (message) {
                is Long -> {
                    var result = 0L
                    for (coefficient in coefficients) {
                        result += message * coefficient
                    }

                    sender.send(StatefulActorBenchmark.Letter(result, channel))
                }
                is Stop -> return@actor
                else -> error("Cannot happen: $letter")
            }
        }
    }