diff options
Diffstat (limited to 'integration')
14 files changed, 521 insertions, 55 deletions
diff --git a/integration/kotlinx-coroutines-guava/README.md b/integration/kotlinx-coroutines-guava/README.md index 130cf0a0..34b8e581 100644 --- a/integration/kotlinx-coroutines-guava/README.md +++ b/integration/kotlinx-coroutines-guava/README.md @@ -56,9 +56,12 @@ Integration with Guava [ListenableFuture](https://github.com/google/guava/wiki/L <!--- MODULE kotlinx-coroutines-guava --> <!--- INDEX kotlinx.coroutines.guava --> -[future]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-guava/kotlinx.coroutines.guava/kotlinx.coroutines.-coroutine-scope/future.html -[com.google.common.util.concurrent.ListenableFuture]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-guava/kotlinx.coroutines.guava/com.google.common.util.concurrent.-listenable-future/index.html -[com.google.common.util.concurrent.ListenableFuture.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-guava/kotlinx.coroutines.guava/com.google.common.util.concurrent.-listenable-future/await.html -[kotlinx.coroutines.Deferred.asListenableFuture]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-guava/kotlinx.coroutines.guava/kotlinx.coroutines.-deferred/as-listenable-future.html +[future]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-guava/kotlinx.coroutines.guava/future.html +[com.google.common.util.concurrent.ListenableFuture.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-guava/kotlinx.coroutines.guava/await.html +[kotlinx.coroutines.Deferred.asListenableFuture]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-guava/kotlinx.coroutines.guava/as-listenable-future.html + +<!--- INDEX com.google.common.util.concurrent --> + +[com.google.common.util.concurrent.ListenableFuture]: https://kotlin.github.io/kotlinx.coroutines/https://google.github.io/guava/releases/28.0-jre/api/docs/com/google/common/util/concurrent/ListenableFuture.html <!--- END --> diff --git a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt index 53019c4b..8f11e0a9 100644 --- a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt +++ b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt @@ -136,11 +136,13 @@ public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> { override fun onSuccess(result: T?) { // Here we work with flexible types, so we unchecked cast to trick the type system @Suppress("UNCHECKED_CAST") - deferred.complete(result as T) + runCatching { deferred.complete(result as T) } + .onFailure { handleCoroutineException(EmptyCoroutineContext, it) } } override fun onFailure(t: Throwable) { - deferred.completeExceptionally(t) + runCatching { deferred.completeExceptionally(t) } + .onFailure { handleCoroutineException(EmptyCoroutineContext, it) } } }, MoreExecutors.directExecutor()) @@ -299,7 +301,7 @@ private class ToContinuation<T>( */ private class ListenableFutureCoroutine<T>( context: CoroutineContext -) : AbstractCoroutine<T>(context) { +) : AbstractCoroutine<T>(context, initParentJob = true, active = true) { // JobListenableFuture propagates external cancellation to `this` coroutine. See JobListenableFuture. @JvmField diff --git a/integration/kotlinx-coroutines-guava/test/FutureAsDeferredUnhandledCompletionExceptionTest.kt b/integration/kotlinx-coroutines-guava/test/FutureAsDeferredUnhandledCompletionExceptionTest.kt new file mode 100644 index 00000000..d6469a94 --- /dev/null +++ b/integration/kotlinx-coroutines-guava/test/FutureAsDeferredUnhandledCompletionExceptionTest.kt @@ -0,0 +1,46 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.guava + +import com.google.common.util.concurrent.* +import kotlinx.coroutines.* +import org.junit.* +import org.junit.Test +import kotlin.test.* + +class FutureAsDeferredUnhandledCompletionExceptionTest : TestBase() { + + // This is a separate test in order to avoid interference with uncaught exception handlers in other tests + private val exceptionHandler = Thread.getDefaultUncaughtExceptionHandler() + private lateinit var caughtException: Throwable + + @Before + fun setUp() { + Thread.setDefaultUncaughtExceptionHandler { _, e -> caughtException = e } + } + + @After + fun tearDown() { + Thread.setDefaultUncaughtExceptionHandler(exceptionHandler) + } + + @Test + fun testLostExceptionOnSuccess() = runTest { + val future = SettableFuture.create<Int>() + val deferred = future.asDeferred() + deferred.invokeOnCompletion { throw TestException() } + future.set(1) + assertTrue { caughtException is CompletionHandlerException && caughtException.cause is TestException } + } + + @Test + fun testLostExceptionOnFailure() = runTest { + val future = SettableFuture.create<Int>() + val deferred = future.asDeferred() + deferred.invokeOnCompletion { throw TestException() } + future.setException(TestException2()) + assertTrue { caughtException is CompletionHandlerException && caughtException.cause is TestException } + } +} diff --git a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt index 9dca9e9b..69ba1930 100644 --- a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt +++ b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.guava @@ -11,6 +11,7 @@ import org.junit.Ignore import org.junit.Test import java.util.concurrent.* import java.util.concurrent.CancellationException +import java.util.concurrent.atomic.* import kotlin.test.* class ListenableFutureTest : TestBase() { @@ -747,4 +748,31 @@ class ListenableFutureTest : TestBase() { latch.countDown() return future } + + @Test + fun testCancelledParent() = runTest({ it is CancellationException }) { + cancel() + future { expectUnreached() } + future(start = CoroutineStart.ATOMIC) { } + future(start = CoroutineStart.UNDISPATCHED) { } + } + + @Test + fun testStackOverflow() = runTest { + val future = SettableFuture.create<Int>() + val completed = AtomicLong() + val count = 10000L + val children = ArrayList<Job>() + for (i in 0 until count) { + children += launch(Dispatchers.Default) { + future.asDeferred().await() + completed.incrementAndGet() + } + } + future.set(1) + withTimeout(60_000) { + children.forEach { it.join() } + assertEquals(count, completed.get()) + } + } } diff --git a/integration/kotlinx-coroutines-jdk8/README.md b/integration/kotlinx-coroutines-jdk8/README.md index aebd90f0..35808c6f 100644 --- a/integration/kotlinx-coroutines-jdk8/README.md +++ b/integration/kotlinx-coroutines-jdk8/README.md @@ -60,9 +60,9 @@ Integration with JDK8 [CompletableFuture] (Android API level 24). <!--- MODULE kotlinx-coroutines-jdk8 --> <!--- INDEX kotlinx.coroutines.future --> -[future]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.future/kotlinx.coroutines.-coroutine-scope/future.html -[java.util.concurrent.CompletionStage.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.future/java.util.concurrent.-completion-stage/await.html -[java.util.concurrent.CompletionStage.asDeferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.future/java.util.concurrent.-completion-stage/as-deferred.html -[kotlinx.coroutines.Deferred.asCompletableFuture]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.future/kotlinx.coroutines.-deferred/as-completable-future.html +[future]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.future/future.html +[java.util.concurrent.CompletionStage.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.future/await.html +[java.util.concurrent.CompletionStage.asDeferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.future/as-deferred.html +[kotlinx.coroutines.Deferred.asCompletableFuture]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.future/as-completable-future.html <!--- END --> diff --git a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt index b3b45e9d..caf2a3c3 100644 --- a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt +++ b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt @@ -48,7 +48,7 @@ public fun <T> CoroutineScope.future( private class CompletableFutureCoroutine<T>( context: CoroutineContext, private val future: CompletableFuture<T> -) : AbstractCoroutine<T>(context), BiConsumer<T?, Throwable?> { +) : AbstractCoroutine<T>(context, initParentJob = true, active = true), BiConsumer<T?, Throwable?> { override fun accept(value: T?, exception: Throwable?) { cancel() } @@ -126,13 +126,18 @@ public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> { } val result = CompletableDeferred<T>() whenComplete { value, exception -> - if (exception == null) { - // the future has completed normally - result.complete(value) - } else { - // the future has completed with an exception, unwrap it consistently with fast path - // Note: In the fast-path the implementation of CompletableFuture.get() does unwrapping - result.completeExceptionally((exception as? CompletionException)?.cause ?: exception) + try { + if (exception == null) { + // the future has completed normally + result.complete(value) + } else { + // the future has completed with an exception, unwrap it consistently with fast path + // Note: In the fast-path the implementation of CompletableFuture.get() does unwrapping + result.completeExceptionally((exception as? CompletionException)?.cause ?: exception) + } + } catch (e: Throwable) { + // We come here iff the internals of Deferred threw an exception during its completion + handleCoroutineException(EmptyCoroutineContext, e) } } result.cancelFutureOnCompletion(future) diff --git a/integration/kotlinx-coroutines-jdk8/test/future/FutureAsDeferredUnhandledCompletionExceptionTest.kt b/integration/kotlinx-coroutines-jdk8/test/future/FutureAsDeferredUnhandledCompletionExceptionTest.kt new file mode 100644 index 00000000..bf810af7 --- /dev/null +++ b/integration/kotlinx-coroutines-jdk8/test/future/FutureAsDeferredUnhandledCompletionExceptionTest.kt @@ -0,0 +1,38 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package future + +import kotlinx.coroutines.* +import kotlinx.coroutines.future.* +import org.junit.* +import org.junit.Test +import java.util.concurrent.* +import kotlin.test.* + +class FutureAsDeferredUnhandledCompletionExceptionTest : TestBase() { + + // This is a separate test in order to avoid interference with uncaught exception handlers in other tests + private val exceptionHandler = Thread.getDefaultUncaughtExceptionHandler() + private lateinit var caughtException: Throwable + + @Before + fun setUp() { + Thread.setDefaultUncaughtExceptionHandler { _, e -> caughtException = e } + } + + @After + fun tearDown() { + Thread.setDefaultUncaughtExceptionHandler(exceptionHandler) + } + + @Test + fun testLostException() = runTest { + val future = CompletableFuture<Int>() + val deferred = future.asDeferred() + deferred.invokeOnCompletion { throw TestException() } + future.complete(1) + assertTrue { caughtException is CompletionHandlerException && caughtException.cause is TestException } + } +} diff --git a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt index 998aaa08..372e79ef 100644 --- a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt +++ b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.future @@ -567,4 +567,31 @@ class FutureTest : TestBase() { assertFailsWith<CancellationException> { stage.await() } finish(4) } + + @Test + fun testCancelledParent() = runTest({ it is java.util.concurrent.CancellationException }) { + cancel() + future { expectUnreached() } + future(start = CoroutineStart.ATOMIC) { } + future(start = CoroutineStart.UNDISPATCHED) { } + } + + @Test + fun testStackOverflow() = runTest { + val future = CompletableFuture<Int>() + val completed = AtomicLong() + val count = 10000L + val children = ArrayList<Job>() + for (i in 0 until count) { + children += launch(Dispatchers.Default) { + future.asDeferred().await() + completed.incrementAndGet() + } + } + future.complete(1) + withTimeout(60_000) { + children.forEach { it.join() } + assertEquals(count, completed.get()) + } + } } diff --git a/integration/kotlinx-coroutines-jdk8/test/time/FlowSampleTest.kt b/integration/kotlinx-coroutines-jdk8/test/time/FlowSampleTest.kt index 11ceb1a8..d35ee72d 100644 --- a/integration/kotlinx-coroutines-jdk8/test/time/FlowSampleTest.kt +++ b/integration/kotlinx-coroutines-jdk8/test/time/FlowSampleTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.time @@ -12,8 +12,7 @@ import org.junit.Test import java.time.Duration import kotlin.test.assertEquals - -class SampleTest : TestBase() { +class FlowSampleTest : TestBase() { @Test public fun testBasic() = withVirtualTime { expect(1) diff --git a/integration/kotlinx-coroutines-play-services/README.md b/integration/kotlinx-coroutines-play-services/README.md index 4ee6bf42..e5e0e613 100644 --- a/integration/kotlinx-coroutines-play-services/README.md +++ b/integration/kotlinx-coroutines-play-services/README.md @@ -6,6 +6,7 @@ Extension functions: | **Name** | **Description** | -------- | --------------- +| [Task.asDeferred][asDeferred] | Converts a Task into a Deferred | [Task.await][await] | Awaits for completion of the Task (cancellable) | [Deferred.asTask][asTask] | Converts a deferred value to a Task @@ -25,5 +26,14 @@ val snapshot = try { // Do stuff ``` +If the `Task` supports cancellation via passing a `CancellationToken`, pass the corresponding `CancellationTokenSource` to `asDeferred` or `await` to support bi-directional cancellation: + +```kotlin +val cancellationTokenSource = CancellationTokenSource() +val currentLocationTask = fusedLocationProviderClient.getCurrentLocation(PRIORITY_HIGH_ACCURACY, cancellationTokenSource.token) +val currentLocation = currentLocationTask.await(cancellationTokenSource) // cancelling `await` also cancels `currentLocationTask`, and vice versa +``` + +[asDeferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-play-services/kotlinx.coroutines.tasks/com.google.android.gms.tasks.-task/as-deferred.html [await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-play-services/kotlinx.coroutines.tasks/com.google.android.gms.tasks.-task/await.html [asTask]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-play-services/kotlinx.coroutines.tasks/kotlinx.coroutines.-deferred/as-task.html diff --git a/integration/kotlinx-coroutines-play-services/api/kotlinx-coroutines-play-services.api b/integration/kotlinx-coroutines-play-services/api/kotlinx-coroutines-play-services.api index 9b2c4dd3..cc23e8db 100644 --- a/integration/kotlinx-coroutines-play-services/api/kotlinx-coroutines-play-services.api +++ b/integration/kotlinx-coroutines-play-services/api/kotlinx-coroutines-play-services.api @@ -1,6 +1,8 @@ public final class kotlinx/coroutines/tasks/TasksKt { public static final fun asDeferred (Lcom/google/android/gms/tasks/Task;)Lkotlinx/coroutines/Deferred; + public static final fun asDeferred (Lcom/google/android/gms/tasks/Task;Lcom/google/android/gms/tasks/CancellationTokenSource;)Lkotlinx/coroutines/Deferred; public static final fun asTask (Lkotlinx/coroutines/Deferred;)Lcom/google/android/gms/tasks/Task; + public static final fun await (Lcom/google/android/gms/tasks/Task;Lcom/google/android/gms/tasks/CancellationTokenSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun await (Lcom/google/android/gms/tasks/Task;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } diff --git a/integration/kotlinx-coroutines-play-services/src/Tasks.kt b/integration/kotlinx-coroutines-play-services/src/Tasks.kt index d89d1aec..c37ac7a0 100644 --- a/integration/kotlinx-coroutines-play-services/src/Tasks.kt +++ b/integration/kotlinx-coroutines-play-services/src/Tasks.kt @@ -6,15 +6,8 @@ package kotlinx.coroutines.tasks -import com.google.android.gms.tasks.CancellationTokenSource -import com.google.android.gms.tasks.RuntimeExecutionException -import com.google.android.gms.tasks.Task -import com.google.android.gms.tasks.TaskCompletionSource -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.Deferred -import kotlinx.coroutines.Job -import kotlinx.coroutines.suspendCancellableCoroutine +import com.google.android.gms.tasks.* +import kotlinx.coroutines.* import kotlin.coroutines.* /** @@ -45,39 +38,85 @@ public fun <T> Deferred<T>.asTask(): Task<T> { /** * Converts this task to an instance of [Deferred]. * If task is cancelled then resulting deferred will be cancelled as well. + * However, the opposite is not true: if the deferred is cancelled, the [Task] will not be cancelled. + * For bi-directional cancellation, an overload that accepts [CancellationTokenSource] can be used. */ -public fun <T> Task<T>.asDeferred(): Deferred<T> { +public fun <T> Task<T>.asDeferred(): Deferred<T> = asDeferredImpl(null) + +/** + * Converts this task to an instance of [Deferred] with a [CancellationTokenSource] to control cancellation. + * The cancellation of this function is bi-directional: + * * If the given task is cancelled, the resulting deferred will be cancelled. + * * If the resulting deferred is cancelled, the provided [cancellationTokenSource] will be cancelled. + * + * Providing a [CancellationTokenSource] that is unrelated to the receiving [Task] is not supported and + * leads to an unspecified behaviour. + */ +@ExperimentalCoroutinesApi // Since 1.5.1, tentatively until 1.6.0 +public fun <T> Task<T>.asDeferred(cancellationTokenSource: CancellationTokenSource): Deferred<T> = + asDeferredImpl(cancellationTokenSource) + +private fun <T> Task<T>.asDeferredImpl(cancellationTokenSource: CancellationTokenSource?): Deferred<T> { + val deferred = CompletableDeferred<T>() if (isComplete) { val e = exception - return if (e == null) { - @Suppress("UNCHECKED_CAST") - CompletableDeferred<T>().apply { if (isCanceled) cancel() else complete(result as T) } + if (e == null) { + if (isCanceled) { + deferred.cancel() + } else { + @Suppress("UNCHECKED_CAST") + deferred.complete(result as T) + } } else { - CompletableDeferred<T>().apply { completeExceptionally(e) } + deferred.completeExceptionally(e) + } + } else { + addOnCompleteListener { + val e = it.exception + if (e == null) { + @Suppress("UNCHECKED_CAST") + if (it.isCanceled) deferred.cancel() else deferred.complete(it.result as T) + } else { + deferred.completeExceptionally(e) + } } } - val result = CompletableDeferred<T>() - addOnCompleteListener { - val e = it.exception - if (e == null) { - @Suppress("UNCHECKED_CAST") - if (isCanceled) result.cancel() else result.complete(it.result as T) - } else { - result.completeExceptionally(e) + if (cancellationTokenSource != null) { + deferred.invokeOnCompletion { + cancellationTokenSource.cancel() } } - return result + // Prevent casting to CompletableDeferred and manual completion. + return object : Deferred<T> by deferred {} } /** - * Awaits for completion of the task without blocking a thread. + * Awaits the completion of the task without blocking a thread. * * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function * stops waiting for the completion stage and immediately resumes with [CancellationException]. + * + * For bi-directional cancellation, an overload that accepts [CancellationTokenSource] can be used. + */ +public suspend fun <T> Task<T>.await(): T = awaitImpl(null) + +/** + * Awaits the completion of the task that is linked to the given [CancellationTokenSource] to control cancellation. + * + * This suspending function is cancellable and cancellation is bi-directional: + * * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function + * cancels the [cancellationTokenSource] and throws a [CancellationException]. + * * If the task is cancelled, then this function will throw a [CancellationException]. + * + * Providing a [CancellationTokenSource] that is unrelated to the receiving [Task] is not supported and + * leads to an unspecified behaviour. */ -public suspend fun <T> Task<T>.await(): T { +@ExperimentalCoroutinesApi // Since 1.5.1, tentatively until 1.6.0 +public suspend fun <T> Task<T>.await(cancellationTokenSource: CancellationTokenSource): T = awaitImpl(cancellationTokenSource) + +private suspend fun <T> Task<T>.awaitImpl(cancellationTokenSource: CancellationTokenSource?): T { // fast path if (isComplete) { val e = exception @@ -95,13 +134,19 @@ public suspend fun <T> Task<T>.await(): T { return suspendCancellableCoroutine { cont -> addOnCompleteListener { - val e = exception + val e = it.exception if (e == null) { @Suppress("UNCHECKED_CAST") - if (isCanceled) cont.cancel() else cont.resume(result as T) + if (it.isCanceled) cont.cancel() else cont.resume(it.result as T) } else { cont.resumeWithException(e) } } + + if (cancellationTokenSource != null) { + cont.invokeOnCancellation { + cancellationTokenSource.cancel() + } + } } } diff --git a/integration/kotlinx-coroutines-play-services/test/TaskTest.kt b/integration/kotlinx-coroutines-play-services/test/TaskTest.kt index 0f125ac9..b125192e 100644 --- a/integration/kotlinx-coroutines-play-services/test/TaskTest.kt +++ b/integration/kotlinx-coroutines-play-services/test/TaskTest.kt @@ -149,5 +149,270 @@ class TaskTest : TestBase() { } } + @Test + fun testCancellableTaskAsDeferred() = runTest { + val cancellationTokenSource = CancellationTokenSource() + val deferred = Tasks.forResult(42).asDeferred(cancellationTokenSource) + assertEquals(42, deferred.await()) + assertTrue(cancellationTokenSource.token.isCancellationRequested) + } + + @Test + fun testNullResultCancellableTaskAsDeferred() = runTest { + val cancellationTokenSource = CancellationTokenSource() + assertNull(Tasks.forResult(null).asDeferred(cancellationTokenSource).await()) + assertTrue(cancellationTokenSource.token.isCancellationRequested) + } + + @Test + fun testCancelledCancellableTaskAsDeferred() = runTest { + val cancellationTokenSource = CancellationTokenSource() + val deferred = Tasks.forCanceled<Int>().asDeferred(cancellationTokenSource) + + assertTrue(deferred.isCancelled) + try { + deferred.await() + fail("deferred.await() should be cancelled") + } catch (e: Exception) { + assertTrue(e is CancellationException) + } + assertTrue(cancellationTokenSource.token.isCancellationRequested) + } + + @Test + fun testCancellingCancellableTaskAsDeferred() = runTest { + val cancellationTokenSource = CancellationTokenSource() + val task = TaskCompletionSource<Int>(cancellationTokenSource.token).task + val deferred = task.asDeferred(cancellationTokenSource) + + deferred.cancel() + try { + deferred.await() + fail("deferred.await() should be cancelled") + } catch (e: Exception) { + assertTrue(e is CancellationException) + } + assertTrue(cancellationTokenSource.token.isCancellationRequested) + } + + @Test + fun testExternallyCancelledCancellableTaskAsDeferred() = runTest { + val cancellationTokenSource = CancellationTokenSource() + val task = TaskCompletionSource<Int>(cancellationTokenSource.token).task + val deferred = task.asDeferred(cancellationTokenSource) + + cancellationTokenSource.cancel() + + try { + deferred.await() + fail("deferred.await() should be cancelled") + } catch (e: Exception) { + assertTrue(e is CancellationException) + } + assertTrue(cancellationTokenSource.token.isCancellationRequested) + } + + @Test + fun testSeparatelyCancelledCancellableTaskAsDeferred() = runTest { + val cancellationTokenSource = CancellationTokenSource() + val task = TaskCompletionSource<Int>().task + task.asDeferred(cancellationTokenSource) + + cancellationTokenSource.cancel() + + assertTrue(cancellationTokenSource.token.isCancellationRequested) + } + + @Test + fun testFailedCancellableTaskAsDeferred() = runTest { + val cancellationTokenSource = CancellationTokenSource() + val deferred = Tasks.forException<Int>(TestException("something went wrong")).asDeferred(cancellationTokenSource) + + assertTrue(deferred.isCancelled && deferred.isCompleted) + val completionException = deferred.getCompletionExceptionOrNull()!! + assertTrue(completionException is TestException) + assertEquals("something went wrong", completionException.message) + + try { + deferred.await() + fail("deferred.await() should throw an exception") + } catch (e: Exception) { + assertTrue(e is TestException) + assertEquals("something went wrong", e.message) + } + assertTrue(cancellationTokenSource.token.isCancellationRequested) + } + + @Test + fun testFailingCancellableTaskAsDeferred() = runTest { + val cancellationTokenSource = CancellationTokenSource() + val lock = ReentrantLock().apply { lock() } + + val deferred: Deferred<Int> = Tasks.call { + lock.withLock { throw TestException("something went wrong") } + }.asDeferred(cancellationTokenSource) + + assertFalse(deferred.isCompleted) + lock.unlock() + + try { + deferred.await() + fail("deferred.await() should throw an exception") + } catch (e: Exception) { + assertTrue(e is TestException) + assertEquals("something went wrong", e.message) + assertSame(e.cause, deferred.getCompletionExceptionOrNull()) // debug mode stack augmentation + } + assertTrue(cancellationTokenSource.token.isCancellationRequested) + } + + @Test + fun testFastPathCompletedTaskWithCancelledTokenSourceAsDeferred() = runTest { + val cancellationTokenSource = CancellationTokenSource() + val deferred = Tasks.forResult(42).asDeferred(cancellationTokenSource) + cancellationTokenSource.cancel() + assertEquals(42, deferred.await()) + } + + @Test + fun testAwaitCancellableTask() = runTest { + val cancellationTokenSource = CancellationTokenSource() + val taskCompletionSource = TaskCompletionSource<Int>(cancellationTokenSource.token) + + val deferred: Deferred<Int> = async(start = CoroutineStart.UNDISPATCHED) { + taskCompletionSource.task.await(cancellationTokenSource) + } + + assertFalse(deferred.isCompleted) + taskCompletionSource.setResult(42) + + assertEquals(42, deferred.await()) + assertTrue(deferred.isCompleted) + } + + @Test + fun testFailedAwaitTask() = runTest(expected = { it is TestException }) { + val cancellationTokenSource = CancellationTokenSource() + val taskCompletionSource = TaskCompletionSource<Int>(cancellationTokenSource.token) + + val deferred: Deferred<Int> = async(start = CoroutineStart.UNDISPATCHED) { + taskCompletionSource.task.await(cancellationTokenSource) + } + + assertFalse(deferred.isCompleted) + taskCompletionSource.setException(TestException("something went wrong")) + + deferred.await() + } + + @Test + fun testCancelledAwaitCancellableTask() = runTest { + val cancellationTokenSource = CancellationTokenSource() + val taskCompletionSource = TaskCompletionSource<Int>(cancellationTokenSource.token) + + val deferred: Deferred<Int> = async(start = CoroutineStart.UNDISPATCHED) { + taskCompletionSource.task.await(cancellationTokenSource) + } + + assertFalse(deferred.isCompleted) + // Cancel the deferred + deferred.cancel() + + try { + deferred.await() + fail("deferred.await() should be cancelled") + } catch (e: Exception) { + assertTrue(e is CancellationException) + } + + assertTrue(cancellationTokenSource.token.isCancellationRequested) + } + + @Test + fun testExternallyCancelledAwaitCancellableTask() = runTest { + val cancellationTokenSource = CancellationTokenSource() + val taskCompletionSource = TaskCompletionSource<Int>(cancellationTokenSource.token) + + val deferred: Deferred<Int> = async(start = CoroutineStart.UNDISPATCHED) { + taskCompletionSource.task.await(cancellationTokenSource) + } + + assertFalse(deferred.isCompleted) + // Cancel the cancellation token source + cancellationTokenSource.cancel() + + try { + deferred.await() + fail("deferred.await() should be cancelled") + } catch (e: Exception) { + assertTrue(e is CancellationException) + } + + assertTrue(cancellationTokenSource.token.isCancellationRequested) + } + + @Test + fun testFastPathCancellationTokenSourceCancelledAwaitCancellableTask() = runTest { + val cancellationTokenSource = CancellationTokenSource() + // Construct a task without the cancellation token source + val taskCompletionSource = TaskCompletionSource<Int>() + + val deferred: Deferred<Int> = async(start = CoroutineStart.LAZY) { + taskCompletionSource.task.await(cancellationTokenSource) + } + + assertFalse(deferred.isCompleted) + cancellationTokenSource.cancel() + + // Cancelling the token doesn't cancel the deferred + assertTrue(cancellationTokenSource.token.isCancellationRequested) + assertFalse(deferred.isCompleted) + + // Cleanup + deferred.cancel() + } + + @Test + fun testSlowPathCancellationTokenSourceCancelledAwaitCancellableTask() = runTest { + val cancellationTokenSource = CancellationTokenSource() + // Construct a task without the cancellation token source + val taskCompletionSource = TaskCompletionSource<Int>() + + val deferred: Deferred<Int> = async(start = CoroutineStart.UNDISPATCHED) { + taskCompletionSource.task.await(cancellationTokenSource) + } + + assertFalse(deferred.isCompleted) + cancellationTokenSource.cancel() + + // Cancelling the token doesn't cancel the deferred + assertTrue(cancellationTokenSource.token.isCancellationRequested) + assertFalse(deferred.isCompleted) + + // Cleanup + deferred.cancel() + } + + @Test + fun testFastPathWithCompletedTaskAndCanceledTokenSourceAwaitTask() = runTest { + val firstCancellationTokenSource = CancellationTokenSource() + val secondCancellationTokenSource = CancellationTokenSource() + // Construct a task with a different cancellation token source + val taskCompletionSource = TaskCompletionSource<Int>(firstCancellationTokenSource.token) + + val deferred: Deferred<Int> = async(start = CoroutineStart.LAZY) { + taskCompletionSource.task.await(secondCancellationTokenSource) + } + + assertFalse(deferred.isCompleted) + secondCancellationTokenSource.cancel() + + assertFalse(deferred.isCompleted) + taskCompletionSource.setResult(42) + + assertEquals(42, deferred.await()) + assertTrue(deferred.isCompleted) + } + class TestException(message: String) : Exception(message) } diff --git a/integration/kotlinx-coroutines-slf4j/api/kotlinx-coroutines-slf4j.api b/integration/kotlinx-coroutines-slf4j/api/kotlinx-coroutines-slf4j.api index a8bf271b..6b565d4c 100644 --- a/integration/kotlinx-coroutines-slf4j/api/kotlinx-coroutines-slf4j.api +++ b/integration/kotlinx-coroutines-slf4j/api/kotlinx-coroutines-slf4j.api @@ -3,11 +3,7 @@ public final class kotlinx/coroutines/slf4j/MDCContext : kotlin/coroutines/Abstr public fun <init> ()V public fun <init> (Ljava/util/Map;)V public synthetic fun <init> (Ljava/util/Map;ILkotlin/jvm/internal/DefaultConstructorMarker;)V - public fun fold (Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)Ljava/lang/Object; - public fun get (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext$Element; public final fun getContextMap ()Ljava/util/Map; - public fun minusKey (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext; - public fun plus (Lkotlin/coroutines/CoroutineContext;)Lkotlin/coroutines/CoroutineContext; public synthetic fun restoreThreadContext (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Object;)V public fun restoreThreadContext (Lkotlin/coroutines/CoroutineContext;Ljava/util/Map;)V public synthetic fun updateThreadContext (Lkotlin/coroutines/CoroutineContext;)Ljava/lang/Object; |