aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/jvm/test/WithTimeoutOrNullThreadDispatchTest.kt
blob: 5d8c0221a7721441988d92d51750955339bed047 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/*
 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
 */

package kotlinx.coroutines

import kotlin.test.*
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ThreadFactory
import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.CoroutineContext

class WithTimeoutOrNullThreadDispatchTest : TestBase() {
    var executor: ExecutorService? = null

    @AfterTest
    fun tearDown() {
        executor?.shutdown()
    }

    @Test
    fun testCancellationDispatchScheduled() {
        checkCancellationDispatch {
            executor = Executors.newScheduledThreadPool(1, it)
            executor!!.asCoroutineDispatcher()
        }
    }

    @Test
    fun testCancellationDispatchNonScheduled() {
        checkCancellationDispatch {
            executor = Executors.newSingleThreadExecutor(it)
            executor!!.asCoroutineDispatcher()
        }
    }

    @Test
    fun testCancellationDispatchCustomNoDelay() {
        // it also checks that there is at most once scheduled request in flight (no spurious concurrency)
        var error: String? = null
        checkCancellationDispatch {
            executor = Executors.newSingleThreadExecutor(it)
            val scheduled = AtomicInteger(0)
            object : CoroutineDispatcher() {
                override fun dispatch(context: CoroutineContext, block: Runnable) {
                    if (scheduled.incrementAndGet() > 1) error = "Two requests are scheduled concurrently"
                    executor!!.execute {
                        scheduled.decrementAndGet()
                        block.run()
                    }
                }
            }
        }
        error?.let { error(it) }
    }

    private fun checkCancellationDispatch(factory: (ThreadFactory) -> CoroutineDispatcher) = runBlocking {
        expect(1)
        var thread: Thread? = null
        val dispatcher = factory(ThreadFactory { Thread(it).also { thread = it } })
        withContext(dispatcher) {
            expect(2)
            assertEquals(thread, Thread.currentThread())
            val result = withTimeoutOrNull(100) {
                try {
                    expect(3)
                    delay(1000)
                    expectUnreached()
                } catch (e: CancellationException) {
                    expect(4)
                    assertEquals(thread, Thread.currentThread())
                    throw e // rethrow
                }
            }
            assertEquals(thread, Thread.currentThread())
            assertEquals(null, result)
            expect(5)
        }
        finish(6)
    }
}