aboutsummaryrefslogtreecommitdiffstats
path: root/integration
diff options
context:
space:
mode:
Diffstat (limited to 'integration')
-rw-r--r--integration/kotlinx-coroutines-guava/README.md11
-rw-r--r--integration/kotlinx-coroutines-guava/src/ListenableFuture.kt8
-rw-r--r--integration/kotlinx-coroutines-guava/test/FutureAsDeferredUnhandledCompletionExceptionTest.kt46
-rw-r--r--integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt30
-rw-r--r--integration/kotlinx-coroutines-jdk8/README.md8
-rw-r--r--integration/kotlinx-coroutines-jdk8/src/future/Future.kt21
-rw-r--r--integration/kotlinx-coroutines-jdk8/test/future/FutureAsDeferredUnhandledCompletionExceptionTest.kt38
-rw-r--r--integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt29
-rw-r--r--integration/kotlinx-coroutines-jdk8/test/time/FlowSampleTest.kt5
-rw-r--r--integration/kotlinx-coroutines-play-services/README.md10
-rw-r--r--integration/kotlinx-coroutines-play-services/api/kotlinx-coroutines-play-services.api2
-rw-r--r--integration/kotlinx-coroutines-play-services/src/Tasks.kt99
-rw-r--r--integration/kotlinx-coroutines-play-services/test/TaskTest.kt265
-rw-r--r--integration/kotlinx-coroutines-slf4j/api/kotlinx-coroutines-slf4j.api4
14 files changed, 521 insertions, 55 deletions
diff --git a/integration/kotlinx-coroutines-guava/README.md b/integration/kotlinx-coroutines-guava/README.md
index 130cf0a0..34b8e581 100644
--- a/integration/kotlinx-coroutines-guava/README.md
+++ b/integration/kotlinx-coroutines-guava/README.md
@@ -56,9 +56,12 @@ Integration with Guava [ListenableFuture](https://github.com/google/guava/wiki/L
<!--- MODULE kotlinx-coroutines-guava -->
<!--- INDEX kotlinx.coroutines.guava -->
-[future]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-guava/kotlinx.coroutines.guava/kotlinx.coroutines.-coroutine-scope/future.html
-[com.google.common.util.concurrent.ListenableFuture]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-guava/kotlinx.coroutines.guava/com.google.common.util.concurrent.-listenable-future/index.html
-[com.google.common.util.concurrent.ListenableFuture.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-guava/kotlinx.coroutines.guava/com.google.common.util.concurrent.-listenable-future/await.html
-[kotlinx.coroutines.Deferred.asListenableFuture]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-guava/kotlinx.coroutines.guava/kotlinx.coroutines.-deferred/as-listenable-future.html
+[future]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-guava/kotlinx.coroutines.guava/future.html
+[com.google.common.util.concurrent.ListenableFuture.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-guava/kotlinx.coroutines.guava/await.html
+[kotlinx.coroutines.Deferred.asListenableFuture]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-guava/kotlinx.coroutines.guava/as-listenable-future.html
+
+<!--- INDEX com.google.common.util.concurrent -->
+
+[com.google.common.util.concurrent.ListenableFuture]: https://kotlin.github.io/kotlinx.coroutines/https://google.github.io/guava/releases/28.0-jre/api/docs/com/google/common/util/concurrent/ListenableFuture.html
<!--- END -->
diff --git a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt
index 53019c4b..8f11e0a9 100644
--- a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt
+++ b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt
@@ -136,11 +136,13 @@ public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> {
override fun onSuccess(result: T?) {
// Here we work with flexible types, so we unchecked cast to trick the type system
@Suppress("UNCHECKED_CAST")
- deferred.complete(result as T)
+ runCatching { deferred.complete(result as T) }
+ .onFailure { handleCoroutineException(EmptyCoroutineContext, it) }
}
override fun onFailure(t: Throwable) {
- deferred.completeExceptionally(t)
+ runCatching { deferred.completeExceptionally(t) }
+ .onFailure { handleCoroutineException(EmptyCoroutineContext, it) }
}
}, MoreExecutors.directExecutor())
@@ -299,7 +301,7 @@ private class ToContinuation<T>(
*/
private class ListenableFutureCoroutine<T>(
context: CoroutineContext
-) : AbstractCoroutine<T>(context) {
+) : AbstractCoroutine<T>(context, initParentJob = true, active = true) {
// JobListenableFuture propagates external cancellation to `this` coroutine. See JobListenableFuture.
@JvmField
diff --git a/integration/kotlinx-coroutines-guava/test/FutureAsDeferredUnhandledCompletionExceptionTest.kt b/integration/kotlinx-coroutines-guava/test/FutureAsDeferredUnhandledCompletionExceptionTest.kt
new file mode 100644
index 00000000..d6469a94
--- /dev/null
+++ b/integration/kotlinx-coroutines-guava/test/FutureAsDeferredUnhandledCompletionExceptionTest.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.guava
+
+import com.google.common.util.concurrent.*
+import kotlinx.coroutines.*
+import org.junit.*
+import org.junit.Test
+import kotlin.test.*
+
+class FutureAsDeferredUnhandledCompletionExceptionTest : TestBase() {
+
+ // This is a separate test in order to avoid interference with uncaught exception handlers in other tests
+ private val exceptionHandler = Thread.getDefaultUncaughtExceptionHandler()
+ private lateinit var caughtException: Throwable
+
+ @Before
+ fun setUp() {
+ Thread.setDefaultUncaughtExceptionHandler { _, e -> caughtException = e }
+ }
+
+ @After
+ fun tearDown() {
+ Thread.setDefaultUncaughtExceptionHandler(exceptionHandler)
+ }
+
+ @Test
+ fun testLostExceptionOnSuccess() = runTest {
+ val future = SettableFuture.create<Int>()
+ val deferred = future.asDeferred()
+ deferred.invokeOnCompletion { throw TestException() }
+ future.set(1)
+ assertTrue { caughtException is CompletionHandlerException && caughtException.cause is TestException }
+ }
+
+ @Test
+ fun testLostExceptionOnFailure() = runTest {
+ val future = SettableFuture.create<Int>()
+ val deferred = future.asDeferred()
+ deferred.invokeOnCompletion { throw TestException() }
+ future.setException(TestException2())
+ assertTrue { caughtException is CompletionHandlerException && caughtException.cause is TestException }
+ }
+}
diff --git a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt
index 9dca9e9b..69ba1930 100644
--- a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt
+++ b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.guava
@@ -11,6 +11,7 @@ import org.junit.Ignore
import org.junit.Test
import java.util.concurrent.*
import java.util.concurrent.CancellationException
+import java.util.concurrent.atomic.*
import kotlin.test.*
class ListenableFutureTest : TestBase() {
@@ -747,4 +748,31 @@ class ListenableFutureTest : TestBase() {
latch.countDown()
return future
}
+
+ @Test
+ fun testCancelledParent() = runTest({ it is CancellationException }) {
+ cancel()
+ future { expectUnreached() }
+ future(start = CoroutineStart.ATOMIC) { }
+ future(start = CoroutineStart.UNDISPATCHED) { }
+ }
+
+ @Test
+ fun testStackOverflow() = runTest {
+ val future = SettableFuture.create<Int>()
+ val completed = AtomicLong()
+ val count = 10000L
+ val children = ArrayList<Job>()
+ for (i in 0 until count) {
+ children += launch(Dispatchers.Default) {
+ future.asDeferred().await()
+ completed.incrementAndGet()
+ }
+ }
+ future.set(1)
+ withTimeout(60_000) {
+ children.forEach { it.join() }
+ assertEquals(count, completed.get())
+ }
+ }
}
diff --git a/integration/kotlinx-coroutines-jdk8/README.md b/integration/kotlinx-coroutines-jdk8/README.md
index aebd90f0..35808c6f 100644
--- a/integration/kotlinx-coroutines-jdk8/README.md
+++ b/integration/kotlinx-coroutines-jdk8/README.md
@@ -60,9 +60,9 @@ Integration with JDK8 [CompletableFuture] (Android API level 24).
<!--- MODULE kotlinx-coroutines-jdk8 -->
<!--- INDEX kotlinx.coroutines.future -->
-[future]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.future/kotlinx.coroutines.-coroutine-scope/future.html
-[java.util.concurrent.CompletionStage.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.future/java.util.concurrent.-completion-stage/await.html
-[java.util.concurrent.CompletionStage.asDeferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.future/java.util.concurrent.-completion-stage/as-deferred.html
-[kotlinx.coroutines.Deferred.asCompletableFuture]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.future/kotlinx.coroutines.-deferred/as-completable-future.html
+[future]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.future/future.html
+[java.util.concurrent.CompletionStage.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.future/await.html
+[java.util.concurrent.CompletionStage.asDeferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.future/as-deferred.html
+[kotlinx.coroutines.Deferred.asCompletableFuture]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.future/as-completable-future.html
<!--- END -->
diff --git a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt
index b3b45e9d..caf2a3c3 100644
--- a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt
+++ b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt
@@ -48,7 +48,7 @@ public fun <T> CoroutineScope.future(
private class CompletableFutureCoroutine<T>(
context: CoroutineContext,
private val future: CompletableFuture<T>
-) : AbstractCoroutine<T>(context), BiConsumer<T?, Throwable?> {
+) : AbstractCoroutine<T>(context, initParentJob = true, active = true), BiConsumer<T?, Throwable?> {
override fun accept(value: T?, exception: Throwable?) {
cancel()
}
@@ -126,13 +126,18 @@ public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> {
}
val result = CompletableDeferred<T>()
whenComplete { value, exception ->
- if (exception == null) {
- // the future has completed normally
- result.complete(value)
- } else {
- // the future has completed with an exception, unwrap it consistently with fast path
- // Note: In the fast-path the implementation of CompletableFuture.get() does unwrapping
- result.completeExceptionally((exception as? CompletionException)?.cause ?: exception)
+ try {
+ if (exception == null) {
+ // the future has completed normally
+ result.complete(value)
+ } else {
+ // the future has completed with an exception, unwrap it consistently with fast path
+ // Note: In the fast-path the implementation of CompletableFuture.get() does unwrapping
+ result.completeExceptionally((exception as? CompletionException)?.cause ?: exception)
+ }
+ } catch (e: Throwable) {
+ // We come here iff the internals of Deferred threw an exception during its completion
+ handleCoroutineException(EmptyCoroutineContext, e)
}
}
result.cancelFutureOnCompletion(future)
diff --git a/integration/kotlinx-coroutines-jdk8/test/future/FutureAsDeferredUnhandledCompletionExceptionTest.kt b/integration/kotlinx-coroutines-jdk8/test/future/FutureAsDeferredUnhandledCompletionExceptionTest.kt
new file mode 100644
index 00000000..bf810af7
--- /dev/null
+++ b/integration/kotlinx-coroutines-jdk8/test/future/FutureAsDeferredUnhandledCompletionExceptionTest.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package future
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.future.*
+import org.junit.*
+import org.junit.Test
+import java.util.concurrent.*
+import kotlin.test.*
+
+class FutureAsDeferredUnhandledCompletionExceptionTest : TestBase() {
+
+ // This is a separate test in order to avoid interference with uncaught exception handlers in other tests
+ private val exceptionHandler = Thread.getDefaultUncaughtExceptionHandler()
+ private lateinit var caughtException: Throwable
+
+ @Before
+ fun setUp() {
+ Thread.setDefaultUncaughtExceptionHandler { _, e -> caughtException = e }
+ }
+
+ @After
+ fun tearDown() {
+ Thread.setDefaultUncaughtExceptionHandler(exceptionHandler)
+ }
+
+ @Test
+ fun testLostException() = runTest {
+ val future = CompletableFuture<Int>()
+ val deferred = future.asDeferred()
+ deferred.invokeOnCompletion { throw TestException() }
+ future.complete(1)
+ assertTrue { caughtException is CompletionHandlerException && caughtException.cause is TestException }
+ }
+}
diff --git a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt
index 998aaa08..372e79ef 100644
--- a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt
+++ b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.future
@@ -567,4 +567,31 @@ class FutureTest : TestBase() {
assertFailsWith<CancellationException> { stage.await() }
finish(4)
}
+
+ @Test
+ fun testCancelledParent() = runTest({ it is java.util.concurrent.CancellationException }) {
+ cancel()
+ future { expectUnreached() }
+ future(start = CoroutineStart.ATOMIC) { }
+ future(start = CoroutineStart.UNDISPATCHED) { }
+ }
+
+ @Test
+ fun testStackOverflow() = runTest {
+ val future = CompletableFuture<Int>()
+ val completed = AtomicLong()
+ val count = 10000L
+ val children = ArrayList<Job>()
+ for (i in 0 until count) {
+ children += launch(Dispatchers.Default) {
+ future.asDeferred().await()
+ completed.incrementAndGet()
+ }
+ }
+ future.complete(1)
+ withTimeout(60_000) {
+ children.forEach { it.join() }
+ assertEquals(count, completed.get())
+ }
+ }
}
diff --git a/integration/kotlinx-coroutines-jdk8/test/time/FlowSampleTest.kt b/integration/kotlinx-coroutines-jdk8/test/time/FlowSampleTest.kt
index 11ceb1a8..d35ee72d 100644
--- a/integration/kotlinx-coroutines-jdk8/test/time/FlowSampleTest.kt
+++ b/integration/kotlinx-coroutines-jdk8/test/time/FlowSampleTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.time
@@ -12,8 +12,7 @@ import org.junit.Test
import java.time.Duration
import kotlin.test.assertEquals
-
-class SampleTest : TestBase() {
+class FlowSampleTest : TestBase() {
@Test
public fun testBasic() = withVirtualTime {
expect(1)
diff --git a/integration/kotlinx-coroutines-play-services/README.md b/integration/kotlinx-coroutines-play-services/README.md
index 4ee6bf42..e5e0e613 100644
--- a/integration/kotlinx-coroutines-play-services/README.md
+++ b/integration/kotlinx-coroutines-play-services/README.md
@@ -6,6 +6,7 @@ Extension functions:
| **Name** | **Description**
| -------- | ---------------
+| [Task.asDeferred][asDeferred] | Converts a Task into a Deferred
| [Task.await][await] | Awaits for completion of the Task (cancellable)
| [Deferred.asTask][asTask] | Converts a deferred value to a Task
@@ -25,5 +26,14 @@ val snapshot = try {
// Do stuff
```
+If the `Task` supports cancellation via passing a `CancellationToken`, pass the corresponding `CancellationTokenSource` to `asDeferred` or `await` to support bi-directional cancellation:
+
+```kotlin
+val cancellationTokenSource = CancellationTokenSource()
+val currentLocationTask = fusedLocationProviderClient.getCurrentLocation(PRIORITY_HIGH_ACCURACY, cancellationTokenSource.token)
+val currentLocation = currentLocationTask.await(cancellationTokenSource) // cancelling `await` also cancels `currentLocationTask`, and vice versa
+```
+
+[asDeferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-play-services/kotlinx.coroutines.tasks/com.google.android.gms.tasks.-task/as-deferred.html
[await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-play-services/kotlinx.coroutines.tasks/com.google.android.gms.tasks.-task/await.html
[asTask]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-play-services/kotlinx.coroutines.tasks/kotlinx.coroutines.-deferred/as-task.html
diff --git a/integration/kotlinx-coroutines-play-services/api/kotlinx-coroutines-play-services.api b/integration/kotlinx-coroutines-play-services/api/kotlinx-coroutines-play-services.api
index 9b2c4dd3..cc23e8db 100644
--- a/integration/kotlinx-coroutines-play-services/api/kotlinx-coroutines-play-services.api
+++ b/integration/kotlinx-coroutines-play-services/api/kotlinx-coroutines-play-services.api
@@ -1,6 +1,8 @@
public final class kotlinx/coroutines/tasks/TasksKt {
public static final fun asDeferred (Lcom/google/android/gms/tasks/Task;)Lkotlinx/coroutines/Deferred;
+ public static final fun asDeferred (Lcom/google/android/gms/tasks/Task;Lcom/google/android/gms/tasks/CancellationTokenSource;)Lkotlinx/coroutines/Deferred;
public static final fun asTask (Lkotlinx/coroutines/Deferred;)Lcom/google/android/gms/tasks/Task;
+ public static final fun await (Lcom/google/android/gms/tasks/Task;Lcom/google/android/gms/tasks/CancellationTokenSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun await (Lcom/google/android/gms/tasks/Task;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}
diff --git a/integration/kotlinx-coroutines-play-services/src/Tasks.kt b/integration/kotlinx-coroutines-play-services/src/Tasks.kt
index d89d1aec..c37ac7a0 100644
--- a/integration/kotlinx-coroutines-play-services/src/Tasks.kt
+++ b/integration/kotlinx-coroutines-play-services/src/Tasks.kt
@@ -6,15 +6,8 @@
package kotlinx.coroutines.tasks
-import com.google.android.gms.tasks.CancellationTokenSource
-import com.google.android.gms.tasks.RuntimeExecutionException
-import com.google.android.gms.tasks.Task
-import com.google.android.gms.tasks.TaskCompletionSource
-import kotlinx.coroutines.CancellationException
-import kotlinx.coroutines.CompletableDeferred
-import kotlinx.coroutines.Deferred
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.suspendCancellableCoroutine
+import com.google.android.gms.tasks.*
+import kotlinx.coroutines.*
import kotlin.coroutines.*
/**
@@ -45,39 +38,85 @@ public fun <T> Deferred<T>.asTask(): Task<T> {
/**
* Converts this task to an instance of [Deferred].
* If task is cancelled then resulting deferred will be cancelled as well.
+ * However, the opposite is not true: if the deferred is cancelled, the [Task] will not be cancelled.
+ * For bi-directional cancellation, an overload that accepts [CancellationTokenSource] can be used.
*/
-public fun <T> Task<T>.asDeferred(): Deferred<T> {
+public fun <T> Task<T>.asDeferred(): Deferred<T> = asDeferredImpl(null)
+
+/**
+ * Converts this task to an instance of [Deferred] with a [CancellationTokenSource] to control cancellation.
+ * The cancellation of this function is bi-directional:
+ * * If the given task is cancelled, the resulting deferred will be cancelled.
+ * * If the resulting deferred is cancelled, the provided [cancellationTokenSource] will be cancelled.
+ *
+ * Providing a [CancellationTokenSource] that is unrelated to the receiving [Task] is not supported and
+ * leads to an unspecified behaviour.
+ */
+@ExperimentalCoroutinesApi // Since 1.5.1, tentatively until 1.6.0
+public fun <T> Task<T>.asDeferred(cancellationTokenSource: CancellationTokenSource): Deferred<T> =
+ asDeferredImpl(cancellationTokenSource)
+
+private fun <T> Task<T>.asDeferredImpl(cancellationTokenSource: CancellationTokenSource?): Deferred<T> {
+ val deferred = CompletableDeferred<T>()
if (isComplete) {
val e = exception
- return if (e == null) {
- @Suppress("UNCHECKED_CAST")
- CompletableDeferred<T>().apply { if (isCanceled) cancel() else complete(result as T) }
+ if (e == null) {
+ if (isCanceled) {
+ deferred.cancel()
+ } else {
+ @Suppress("UNCHECKED_CAST")
+ deferred.complete(result as T)
+ }
} else {
- CompletableDeferred<T>().apply { completeExceptionally(e) }
+ deferred.completeExceptionally(e)
+ }
+ } else {
+ addOnCompleteListener {
+ val e = it.exception
+ if (e == null) {
+ @Suppress("UNCHECKED_CAST")
+ if (it.isCanceled) deferred.cancel() else deferred.complete(it.result as T)
+ } else {
+ deferred.completeExceptionally(e)
+ }
}
}
- val result = CompletableDeferred<T>()
- addOnCompleteListener {
- val e = it.exception
- if (e == null) {
- @Suppress("UNCHECKED_CAST")
- if (isCanceled) result.cancel() else result.complete(it.result as T)
- } else {
- result.completeExceptionally(e)
+ if (cancellationTokenSource != null) {
+ deferred.invokeOnCompletion {
+ cancellationTokenSource.cancel()
}
}
- return result
+ // Prevent casting to CompletableDeferred and manual completion.
+ return object : Deferred<T> by deferred {}
}
/**
- * Awaits for completion of the task without blocking a thread.
+ * Awaits the completion of the task without blocking a thread.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* stops waiting for the completion stage and immediately resumes with [CancellationException].
+ *
+ * For bi-directional cancellation, an overload that accepts [CancellationTokenSource] can be used.
+ */
+public suspend fun <T> Task<T>.await(): T = awaitImpl(null)
+
+/**
+ * Awaits the completion of the task that is linked to the given [CancellationTokenSource] to control cancellation.
+ *
+ * This suspending function is cancellable and cancellation is bi-directional:
+ * * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * cancels the [cancellationTokenSource] and throws a [CancellationException].
+ * * If the task is cancelled, then this function will throw a [CancellationException].
+ *
+ * Providing a [CancellationTokenSource] that is unrelated to the receiving [Task] is not supported and
+ * leads to an unspecified behaviour.
*/
-public suspend fun <T> Task<T>.await(): T {
+@ExperimentalCoroutinesApi // Since 1.5.1, tentatively until 1.6.0
+public suspend fun <T> Task<T>.await(cancellationTokenSource: CancellationTokenSource): T = awaitImpl(cancellationTokenSource)
+
+private suspend fun <T> Task<T>.awaitImpl(cancellationTokenSource: CancellationTokenSource?): T {
// fast path
if (isComplete) {
val e = exception
@@ -95,13 +134,19 @@ public suspend fun <T> Task<T>.await(): T {
return suspendCancellableCoroutine { cont ->
addOnCompleteListener {
- val e = exception
+ val e = it.exception
if (e == null) {
@Suppress("UNCHECKED_CAST")
- if (isCanceled) cont.cancel() else cont.resume(result as T)
+ if (it.isCanceled) cont.cancel() else cont.resume(it.result as T)
} else {
cont.resumeWithException(e)
}
}
+
+ if (cancellationTokenSource != null) {
+ cont.invokeOnCancellation {
+ cancellationTokenSource.cancel()
+ }
+ }
}
}
diff --git a/integration/kotlinx-coroutines-play-services/test/TaskTest.kt b/integration/kotlinx-coroutines-play-services/test/TaskTest.kt
index 0f125ac9..b125192e 100644
--- a/integration/kotlinx-coroutines-play-services/test/TaskTest.kt
+++ b/integration/kotlinx-coroutines-play-services/test/TaskTest.kt
@@ -149,5 +149,270 @@ class TaskTest : TestBase() {
}
}
+ @Test
+ fun testCancellableTaskAsDeferred() = runTest {
+ val cancellationTokenSource = CancellationTokenSource()
+ val deferred = Tasks.forResult(42).asDeferred(cancellationTokenSource)
+ assertEquals(42, deferred.await())
+ assertTrue(cancellationTokenSource.token.isCancellationRequested)
+ }
+
+ @Test
+ fun testNullResultCancellableTaskAsDeferred() = runTest {
+ val cancellationTokenSource = CancellationTokenSource()
+ assertNull(Tasks.forResult(null).asDeferred(cancellationTokenSource).await())
+ assertTrue(cancellationTokenSource.token.isCancellationRequested)
+ }
+
+ @Test
+ fun testCancelledCancellableTaskAsDeferred() = runTest {
+ val cancellationTokenSource = CancellationTokenSource()
+ val deferred = Tasks.forCanceled<Int>().asDeferred(cancellationTokenSource)
+
+ assertTrue(deferred.isCancelled)
+ try {
+ deferred.await()
+ fail("deferred.await() should be cancelled")
+ } catch (e: Exception) {
+ assertTrue(e is CancellationException)
+ }
+ assertTrue(cancellationTokenSource.token.isCancellationRequested)
+ }
+
+ @Test
+ fun testCancellingCancellableTaskAsDeferred() = runTest {
+ val cancellationTokenSource = CancellationTokenSource()
+ val task = TaskCompletionSource<Int>(cancellationTokenSource.token).task
+ val deferred = task.asDeferred(cancellationTokenSource)
+
+ deferred.cancel()
+ try {
+ deferred.await()
+ fail("deferred.await() should be cancelled")
+ } catch (e: Exception) {
+ assertTrue(e is CancellationException)
+ }
+ assertTrue(cancellationTokenSource.token.isCancellationRequested)
+ }
+
+ @Test
+ fun testExternallyCancelledCancellableTaskAsDeferred() = runTest {
+ val cancellationTokenSource = CancellationTokenSource()
+ val task = TaskCompletionSource<Int>(cancellationTokenSource.token).task
+ val deferred = task.asDeferred(cancellationTokenSource)
+
+ cancellationTokenSource.cancel()
+
+ try {
+ deferred.await()
+ fail("deferred.await() should be cancelled")
+ } catch (e: Exception) {
+ assertTrue(e is CancellationException)
+ }
+ assertTrue(cancellationTokenSource.token.isCancellationRequested)
+ }
+
+ @Test
+ fun testSeparatelyCancelledCancellableTaskAsDeferred() = runTest {
+ val cancellationTokenSource = CancellationTokenSource()
+ val task = TaskCompletionSource<Int>().task
+ task.asDeferred(cancellationTokenSource)
+
+ cancellationTokenSource.cancel()
+
+ assertTrue(cancellationTokenSource.token.isCancellationRequested)
+ }
+
+ @Test
+ fun testFailedCancellableTaskAsDeferred() = runTest {
+ val cancellationTokenSource = CancellationTokenSource()
+ val deferred = Tasks.forException<Int>(TestException("something went wrong")).asDeferred(cancellationTokenSource)
+
+ assertTrue(deferred.isCancelled && deferred.isCompleted)
+ val completionException = deferred.getCompletionExceptionOrNull()!!
+ assertTrue(completionException is TestException)
+ assertEquals("something went wrong", completionException.message)
+
+ try {
+ deferred.await()
+ fail("deferred.await() should throw an exception")
+ } catch (e: Exception) {
+ assertTrue(e is TestException)
+ assertEquals("something went wrong", e.message)
+ }
+ assertTrue(cancellationTokenSource.token.isCancellationRequested)
+ }
+
+ @Test
+ fun testFailingCancellableTaskAsDeferred() = runTest {
+ val cancellationTokenSource = CancellationTokenSource()
+ val lock = ReentrantLock().apply { lock() }
+
+ val deferred: Deferred<Int> = Tasks.call {
+ lock.withLock { throw TestException("something went wrong") }
+ }.asDeferred(cancellationTokenSource)
+
+ assertFalse(deferred.isCompleted)
+ lock.unlock()
+
+ try {
+ deferred.await()
+ fail("deferred.await() should throw an exception")
+ } catch (e: Exception) {
+ assertTrue(e is TestException)
+ assertEquals("something went wrong", e.message)
+ assertSame(e.cause, deferred.getCompletionExceptionOrNull()) // debug mode stack augmentation
+ }
+ assertTrue(cancellationTokenSource.token.isCancellationRequested)
+ }
+
+ @Test
+ fun testFastPathCompletedTaskWithCancelledTokenSourceAsDeferred() = runTest {
+ val cancellationTokenSource = CancellationTokenSource()
+ val deferred = Tasks.forResult(42).asDeferred(cancellationTokenSource)
+ cancellationTokenSource.cancel()
+ assertEquals(42, deferred.await())
+ }
+
+ @Test
+ fun testAwaitCancellableTask() = runTest {
+ val cancellationTokenSource = CancellationTokenSource()
+ val taskCompletionSource = TaskCompletionSource<Int>(cancellationTokenSource.token)
+
+ val deferred: Deferred<Int> = async(start = CoroutineStart.UNDISPATCHED) {
+ taskCompletionSource.task.await(cancellationTokenSource)
+ }
+
+ assertFalse(deferred.isCompleted)
+ taskCompletionSource.setResult(42)
+
+ assertEquals(42, deferred.await())
+ assertTrue(deferred.isCompleted)
+ }
+
+ @Test
+ fun testFailedAwaitTask() = runTest(expected = { it is TestException }) {
+ val cancellationTokenSource = CancellationTokenSource()
+ val taskCompletionSource = TaskCompletionSource<Int>(cancellationTokenSource.token)
+
+ val deferred: Deferred<Int> = async(start = CoroutineStart.UNDISPATCHED) {
+ taskCompletionSource.task.await(cancellationTokenSource)
+ }
+
+ assertFalse(deferred.isCompleted)
+ taskCompletionSource.setException(TestException("something went wrong"))
+
+ deferred.await()
+ }
+
+ @Test
+ fun testCancelledAwaitCancellableTask() = runTest {
+ val cancellationTokenSource = CancellationTokenSource()
+ val taskCompletionSource = TaskCompletionSource<Int>(cancellationTokenSource.token)
+
+ val deferred: Deferred<Int> = async(start = CoroutineStart.UNDISPATCHED) {
+ taskCompletionSource.task.await(cancellationTokenSource)
+ }
+
+ assertFalse(deferred.isCompleted)
+ // Cancel the deferred
+ deferred.cancel()
+
+ try {
+ deferred.await()
+ fail("deferred.await() should be cancelled")
+ } catch (e: Exception) {
+ assertTrue(e is CancellationException)
+ }
+
+ assertTrue(cancellationTokenSource.token.isCancellationRequested)
+ }
+
+ @Test
+ fun testExternallyCancelledAwaitCancellableTask() = runTest {
+ val cancellationTokenSource = CancellationTokenSource()
+ val taskCompletionSource = TaskCompletionSource<Int>(cancellationTokenSource.token)
+
+ val deferred: Deferred<Int> = async(start = CoroutineStart.UNDISPATCHED) {
+ taskCompletionSource.task.await(cancellationTokenSource)
+ }
+
+ assertFalse(deferred.isCompleted)
+ // Cancel the cancellation token source
+ cancellationTokenSource.cancel()
+
+ try {
+ deferred.await()
+ fail("deferred.await() should be cancelled")
+ } catch (e: Exception) {
+ assertTrue(e is CancellationException)
+ }
+
+ assertTrue(cancellationTokenSource.token.isCancellationRequested)
+ }
+
+ @Test
+ fun testFastPathCancellationTokenSourceCancelledAwaitCancellableTask() = runTest {
+ val cancellationTokenSource = CancellationTokenSource()
+ // Construct a task without the cancellation token source
+ val taskCompletionSource = TaskCompletionSource<Int>()
+
+ val deferred: Deferred<Int> = async(start = CoroutineStart.LAZY) {
+ taskCompletionSource.task.await(cancellationTokenSource)
+ }
+
+ assertFalse(deferred.isCompleted)
+ cancellationTokenSource.cancel()
+
+ // Cancelling the token doesn't cancel the deferred
+ assertTrue(cancellationTokenSource.token.isCancellationRequested)
+ assertFalse(deferred.isCompleted)
+
+ // Cleanup
+ deferred.cancel()
+ }
+
+ @Test
+ fun testSlowPathCancellationTokenSourceCancelledAwaitCancellableTask() = runTest {
+ val cancellationTokenSource = CancellationTokenSource()
+ // Construct a task without the cancellation token source
+ val taskCompletionSource = TaskCompletionSource<Int>()
+
+ val deferred: Deferred<Int> = async(start = CoroutineStart.UNDISPATCHED) {
+ taskCompletionSource.task.await(cancellationTokenSource)
+ }
+
+ assertFalse(deferred.isCompleted)
+ cancellationTokenSource.cancel()
+
+ // Cancelling the token doesn't cancel the deferred
+ assertTrue(cancellationTokenSource.token.isCancellationRequested)
+ assertFalse(deferred.isCompleted)
+
+ // Cleanup
+ deferred.cancel()
+ }
+
+ @Test
+ fun testFastPathWithCompletedTaskAndCanceledTokenSourceAwaitTask() = runTest {
+ val firstCancellationTokenSource = CancellationTokenSource()
+ val secondCancellationTokenSource = CancellationTokenSource()
+ // Construct a task with a different cancellation token source
+ val taskCompletionSource = TaskCompletionSource<Int>(firstCancellationTokenSource.token)
+
+ val deferred: Deferred<Int> = async(start = CoroutineStart.LAZY) {
+ taskCompletionSource.task.await(secondCancellationTokenSource)
+ }
+
+ assertFalse(deferred.isCompleted)
+ secondCancellationTokenSource.cancel()
+
+ assertFalse(deferred.isCompleted)
+ taskCompletionSource.setResult(42)
+
+ assertEquals(42, deferred.await())
+ assertTrue(deferred.isCompleted)
+ }
+
class TestException(message: String) : Exception(message)
}
diff --git a/integration/kotlinx-coroutines-slf4j/api/kotlinx-coroutines-slf4j.api b/integration/kotlinx-coroutines-slf4j/api/kotlinx-coroutines-slf4j.api
index a8bf271b..6b565d4c 100644
--- a/integration/kotlinx-coroutines-slf4j/api/kotlinx-coroutines-slf4j.api
+++ b/integration/kotlinx-coroutines-slf4j/api/kotlinx-coroutines-slf4j.api
@@ -3,11 +3,7 @@ public final class kotlinx/coroutines/slf4j/MDCContext : kotlin/coroutines/Abstr
public fun <init> ()V
public fun <init> (Ljava/util/Map;)V
public synthetic fun <init> (Ljava/util/Map;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
- public fun fold (Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)Ljava/lang/Object;
- public fun get (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext$Element;
public final fun getContextMap ()Ljava/util/Map;
- public fun minusKey (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext;
- public fun plus (Lkotlin/coroutines/CoroutineContext;)Lkotlin/coroutines/CoroutineContext;
public synthetic fun restoreThreadContext (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Object;)V
public fun restoreThreadContext (Lkotlin/coroutines/CoroutineContext;Ljava/util/Map;)V
public synthetic fun updateThreadContext (Lkotlin/coroutines/CoroutineContext;)Ljava/lang/Object;