aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Tolstopyatov <qwwdfsad@gmail.com>2019-08-09 21:17:54 +0300
committerGitHub <noreply@github.com>2019-08-09 21:17:54 +0300
commit0d7f3fbef9b5ccb4972bcd6b974c487581431fcd (patch)
treebde53cc348eff85ca50e714960eeb4ced5dfedaa
parent8ab2130e3d0f54a47c26095f542e645cb1935d05 (diff)
parent2e9886da39b1c344f4d0f96b5ff5a7e6bc654171 (diff)
downloadplatform_external_kotlinx.coroutines-0d7f3fbef9b5ccb4972bcd6b974c487581431fcd.tar.gz
platform_external_kotlinx.coroutines-0d7f3fbef9b5ccb4972bcd6b974c487581431fcd.tar.bz2
platform_external_kotlinx.coroutines-0d7f3fbef9b5ccb4972bcd6b974c487581431fcd.zip
Merge pull request #1431 from Kotlin/version-1.3.0-RC2
Version 1.3.0-RC2
-rw-r--r--CHANGES.md28
-rw-r--r--README.md16
-rw-r--r--binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt31
-rw-r--r--binary-compatibility-validator/reference-public-api/kotlinx-coroutines-debug.txt6
-rw-r--r--binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt27
-rw-r--r--binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt4
-rw-r--r--build.gradle2
-rw-r--r--docs/coroutine-context-and-dispatchers.md5
-rw-r--r--docs/debugging.md3
-rw-r--r--gradle.properties2
-rw-r--r--integration/kotlinx-coroutines-guava/build.gradle2
-rw-r--r--kotlinx-coroutines-bom/build.gradle10
-rw-r--r--kotlinx-coroutines-core/common/README.md3
-rw-r--r--kotlinx-coroutines-core/common/src/JobSupport.kt5
-rw-r--r--kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt40
-rw-r--r--kotlinx-coroutines-core/common/src/channels/Produce.kt2
-rw-r--r--kotlinx-coroutines-core/common/src/flow/Flow.kt56
-rw-r--r--kotlinx-coroutines-core/common/src/flow/Migration.kt102
-rw-r--r--kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt2
-rw-r--r--kotlinx-coroutines-core/common/src/flow/internal/Combine.kt142
-rw-r--r--kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt24
-rw-r--r--kotlinx-coroutines-core/common/src/flow/internal/Merge.kt86
-rw-r--r--kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt8
-rw-r--r--kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt8
-rw-r--r--kotlinx-coroutines-core/common/src/flow/operators/Context.kt2
-rw-r--r--kotlinx-coroutines-core/common/src/flow/operators/Delay.kt53
-rw-r--r--kotlinx-coroutines-core/common/src/flow/operators/Merge.kt119
-rw-r--r--kotlinx-coroutines-core/common/src/flow/operators/Zip.kt374
-rw-r--r--kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt41
-rw-r--r--kotlinx-coroutines-core/common/src/internal/StackTraceRecovery.common.kt6
-rw-r--r--kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt3
-rw-r--r--kotlinx-coroutines-core/common/src/sync/Semaphore.kt41
-rw-r--r--kotlinx-coroutines-core/common/test/TestBase.common.kt17
-rw-r--r--kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt49
-rw-r--r--kotlinx-coroutines-core/common/test/flow/FlowInvariantsTest.kt65
-rw-r--r--kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt22
-rw-r--r--kotlinx-coroutines-core/common/test/flow/operators/BufferTest.kt14
-rw-r--r--kotlinx-coroutines-core/common/test/flow/operators/CombineLatestVarargTest.kt68
-rw-r--r--kotlinx-coroutines-core/common/test/flow/operators/CombineParametersTestBase.kt164
-rw-r--r--kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt (renamed from kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt)37
-rw-r--r--kotlinx-coroutines-core/common/test/flow/operators/FlatMapLatestTest.kt137
-rw-r--r--kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt15
-rw-r--r--kotlinx-coroutines-core/common/test/flow/operators/FlowContextTest.kt154
-rw-r--r--kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt36
-rw-r--r--kotlinx-coroutines-core/common/test/flow/operators/FlowWithTest.kt231
-rw-r--r--kotlinx-coroutines-core/common/test/flow/operators/SwitchMapTest.kt122
-rw-r--r--kotlinx-coroutines-core/common/test/flow/operators/TransformLatestTest.kt172
-rw-r--r--kotlinx-coroutines-core/common/test/flow/terminal/CollectLatestTest.kt56
-rw-r--r--kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt36
-rw-r--r--kotlinx-coroutines-core/js/src/CoroutineContext.kt3
-rw-r--r--kotlinx-coroutines-core/js/src/JSDispatcher.kt72
-rw-r--r--kotlinx-coroutines-core/js/test/SetTimeoutDispatcherTest.kt53
-rw-r--r--kotlinx-coroutines-core/jvm/src/CoroutineContext.kt19
-rw-r--r--kotlinx-coroutines-core/jvm/src/Debug.kt28
-rw-r--r--kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt33
-rw-r--r--kotlinx-coroutines-core/jvm/test/TestBase.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/ArrayChannelStressTest.kt25
-rw-r--r--kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt12
-rw-r--r--kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt4
-rw-r--r--kotlinx-coroutines-core/native/test/WorkerTest.kt35
-rw-r--r--kotlinx-coroutines-debug/README.md4
-rw-r--r--kotlinx-coroutines-debug/src/CoroutineInfo.kt16
-rw-r--r--kotlinx-coroutines-debug/src/internal/DebugProbesImpl.kt6
-rw-r--r--kotlinx-coroutines-debug/test/RunningThreadStackMergeTest.kt11
-rw-r--r--kotlinx-coroutines-test/README.md2
-rw-r--r--reactive/kotlinx-coroutines-reactive/src/Await.kt13
-rw-r--r--reactive/kotlinx-coroutines-reactive/src/ContextInjector.kt15
-rw-r--r--reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt109
-rw-r--r--reactive/kotlinx-coroutines-reactive/src/PublisherAsFlow.kt (renamed from reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt)23
-rw-r--r--reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt103
-rw-r--r--reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt79
-rw-r--r--reactive/kotlinx-coroutines-reactive/test/IterableFlowTckTest.kt (renamed from reactive/kotlinx-coroutines-reactive/test/flow/IterableFlowTckTest.kt)2
-rw-r--r--reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt (renamed from reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt)3
-rw-r--r--reactive/kotlinx-coroutines-reactive/test/RangePublisherBufferedTest.kt (renamed from reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt)2
-rw-r--r--reactive/kotlinx-coroutines-reactive/test/RangePublisherTest.kt (renamed from reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherTest.kt)2
-rw-r--r--reactive/kotlinx-coroutines-reactive/test/UnboundedIntegerIncrementPublisherTest.kt (renamed from reactive/kotlinx-coroutines-reactive/test/flow/UnboundedIntegerIncrementPublisherTest.kt)2
-rw-r--r--reactive/kotlinx-coroutines-reactor/resources/META-INF/services/kotlinx.coroutines.reactive.ContextInjector1
-rw-r--r--reactive/kotlinx-coroutines-reactor/src/FlowAsFlux.kt26
-rw-r--r--reactive/kotlinx-coroutines-reactor/src/Flux.kt2
-rw-r--r--reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt12
-rw-r--r--reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt22
-rw-r--r--reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt1
-rw-r--r--reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt27
-rw-r--r--reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt87
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxConvert.kt9
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/BackpressureTest.kt1
-rw-r--r--ui/coroutines-guide-ui.md2
-rw-r--r--ui/kotlinx-coroutines-android/animation-app/gradle.properties2
-rw-r--r--ui/kotlinx-coroutines-android/build.gradle85
-rw-r--r--ui/kotlinx-coroutines-android/example-app/gradle.properties2
-rw-r--r--ui/kotlinx-coroutines-android/r8-test-common.pro12
-rw-r--r--ui/kotlinx-coroutines-android/r8-test-rules-no-optim.pro4
-rw-r--r--ui/kotlinx-coroutines-android/r8-test-rules.pro7
-rw-r--r--ui/kotlinx-coroutines-android/resources/META-INF/com.android.tools/proguard/coroutines.pro5
-rw-r--r--ui/kotlinx-coroutines-android/resources/META-INF/com.android.tools/r8-from-1.6.0/coroutines.pro6
-rw-r--r--ui/kotlinx-coroutines-android/resources/META-INF/com.android.tools/r8-upto-1.6.0/coroutines.pro5
-rw-r--r--ui/kotlinx-coroutines-android/resources/META-INF/proguard/coroutines.pro7
-rw-r--r--ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt1
-rw-r--r--ui/kotlinx-coroutines-android/test/R8ServiceLoaderOptimizationTest.kt61
99 files changed, 2487 insertions, 1226 deletions
diff --git a/CHANGES.md b/CHANGES.md
index cd920aa2..ecf2852c 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,5 +1,33 @@
# Change log for kotlinx.coroutines
+## Version 1.3.0-RC2
+
+### Flow improvements
+* Operators for UI programming are reworked for the sake of consistency, naming scheme for operator overloads is introduced:
+ * `combineLatest` is deprecated in the favor of `combine`.
+ * `combineTransform` operator for non-trivial transformations (#1224).
+ * Top-level `combine` and `combineTransform` overloads for multiple flows (#1262).
+ * `switchMap` is deprecated. `flatMapLatest`, `mapLatest` and `transformLatest` are introduced instead (#1335).
+ * `collectLatest` terminal operator (#1269).
+
+* Improved cancellation support in `flattenMerge` (#1392).
+* `channelFlow` cancellation does not leak to the parent (#1334).
+* Fixed flow invariant enforcement for `suspend fun main` (#1421).
+* `delayEach` and `delayFlow` are deprecated (#1429).
+
+### General changes
+* Integration with Reactor context
+ * Propagation of the coroutine context of `await` calls into Mono/Flux builder.
+ * Publisher.asFlow propagates coroutine context from `collect` call to the Publisher.
+ * New `Flow.asFlux ` builder.
+
+* ServiceLoader-code is adjusted to avoid I/O on the Main thread on newer (3.6.0+) Android toolchain.
+* Stacktrace recovery support for minified builds on Android (#1416).
+* Guava version in `kotlinx-coroutines-guava` updated to `28.0`.
+* `setTimeout`-based JS dispatcher for platforms where `process` is unavailable (#1404).
+* Native, JS and common modules are added to `kotlinx-coroutines-bom`.
+* Fixed bug with ignored `acquiredPermits` in `Semaphore` (#1423).
+
## Version 1.3.0-RC
### Flow
diff --git a/README.md b/README.md
index 84083a84..c73595a8 100644
--- a/README.md
+++ b/README.md
@@ -2,7 +2,7 @@
[![official JetBrains project](https://jb.gg/badges/official.svg)](https://confluence.jetbrains.com/display/ALL/JetBrains+on+GitHub)
[![GitHub license](https://img.shields.io/badge/license-Apache%20License%202.0-blue.svg?style=flat)](https://www.apache.org/licenses/LICENSE-2.0)
-[![Download](https://api.bintray.com/packages/kotlin/kotlinx/kotlinx.coroutines/images/download.svg?version=1.3.0-RC) ](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines/1.3.0-RC)
+[![Download](https://api.bintray.com/packages/kotlin/kotlinx/kotlinx.coroutines/images/download.svg?version=1.3.0-RC2) ](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines/1.3.0-RC2)
Library support for Kotlin coroutines with [multiplatform](#multiplatform) support.
This is a companion version for Kotlin `1.3.41` release.
@@ -81,7 +81,7 @@ Add dependencies (you can also add other modules that you need):
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-core</artifactId>
- <version>1.3.0-RC</version>
+ <version>1.3.0-RC2</version>
</dependency>
```
@@ -99,7 +99,7 @@ Add dependencies (you can also add other modules that you need):
```groovy
dependencies {
- implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0-RC'
+ implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0-RC2'
}
```
@@ -125,7 +125,7 @@ Add dependencies (you can also add other modules that you need):
```groovy
dependencies {
- implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0-RC")
+ implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0-RC2")
}
```
@@ -144,7 +144,7 @@ Make sure that you have either `jcenter()` or `mavenCentral()` in the list of re
Core modules of `kotlinx.coroutines` are also available for
[Kotlin/JS](#js) and [Kotlin/Native](#native).
In common code that should get compiled for different platforms, add dependency to
-[`kotlinx-coroutines-core-common`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-common/1.3.0-RC/jar)
+[`kotlinx-coroutines-core-common`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-common/1.3.0-RC2/jar)
(follow the link to get the dependency declaration snippet).
### Android
@@ -153,7 +153,7 @@ Add [`kotlinx-coroutines-android`](ui/kotlinx-coroutines-android)
module as dependency when using `kotlinx.coroutines` on Android:
```groovy
-implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.0-RC'
+implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.0-RC2'
```
This gives you access to Android [Dispatchers.Main](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-android/kotlinx.coroutines.android/kotlinx.coroutines.-dispatchers/index.html)
@@ -172,7 +172,7 @@ R8 is a replacement for ProGuard in Android ecosystem, it is enabled by default
### JS
[Kotlin/JS](https://kotlinlang.org/docs/reference/js-overview.html) version of `kotlinx.coroutines` is published as
-[`kotlinx-coroutines-core-js`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-js/1.3.0-RC/jar)
+[`kotlinx-coroutines-core-js`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-js/1.3.0-RC2/jar)
(follow the link to get the dependency declaration snippet).
You can also use [`kotlinx-coroutines-core`](https://www.npmjs.com/package/kotlinx-coroutines-core) package via NPM.
@@ -180,7 +180,7 @@ You can also use [`kotlinx-coroutines-core`](https://www.npmjs.com/package/kotli
### Native
[Kotlin/Native](https://kotlinlang.org/docs/reference/native-overview.html) version of `kotlinx.coroutines` is published as
-[`kotlinx-coroutines-core-native`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-native/1.3.0-RC/jar)
+[`kotlinx-coroutines-core-native`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-native/1.3.0-RC2/jar)
(follow the link to get the dependency declaration snippet).
Only single-threaded code (JS-style) on Kotlin/Native is currently supported.
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
index 3e20e88b..a2771690 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
@@ -826,9 +826,6 @@ public abstract interface class kotlinx/coroutines/flow/FlowCollector {
public final class kotlinx/coroutines/flow/FlowKt {
public static final field DEFAULT_CONCURRENCY_PROPERTY_NAME Ljava/lang/String;
- public static final fun BehaviourSubject ()Ljava/lang/Object;
- public static final fun PublishSubject ()Ljava/lang/Object;
- public static final fun ReplaySubject ()Ljava/lang/Object;
public static final fun asFlow (Ljava/lang/Iterable;)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow (Ljava/util/Iterator;)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow (Lkotlin/jvm/functions/Function0;)Lkotlinx/coroutines/flow/Flow;
@@ -850,12 +847,23 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collectIndexed (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public static final fun collectLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public static final synthetic fun combine (Ljava/lang/Iterable;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun combine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun combine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun combine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun combine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function6;)Lkotlinx/coroutines/flow/Flow;
+ public static final synthetic fun combine ([Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function6;)Lkotlinx/coroutines/flow/Flow;
- public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;[Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
- public static final synthetic fun combineLatest (Lkotlinx/coroutines/flow/Flow;[Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+ public static final synthetic fun combineTransform (Ljava/lang/Iterable;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun combineTransform (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun combineTransform (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun combineTransform (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function6;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun combineTransform (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function7;)Lkotlinx/coroutines/flow/Flow;
+ public static final synthetic fun combineTransform ([Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun compose (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static final fun concatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static final fun concatWith (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
@@ -883,6 +891,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun flatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun flatMapConcat (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun flatMapLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun flatMapMerge (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun flatMapMerge$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flatten (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
@@ -890,6 +899,8 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun flattenMerge (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun flattenMerge$default (Lkotlinx/coroutines/flow/Flow;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun flowCombine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun flowCombineTransform (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowOf (Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowOf ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
@@ -902,6 +913,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun getDEFAULT_CONCURRENCY ()I
public static final fun launchIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/Job;
public static final fun map (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun mapLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun mapNotNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun merge (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
@@ -946,6 +958,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun toSet (Lkotlinx/coroutines/flow/Flow;Ljava/util/Set;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun toSet$default (Lkotlinx/coroutines/flow/Flow;Ljava/util/Set;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun transform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun transformLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun unsafeTransform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun withContext (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;)V
public static final fun withIndex (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
@@ -967,6 +980,10 @@ public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/cor
public static synthetic fun update$default (Lkotlinx/coroutines/flow/internal/ChannelFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/ChannelFlow;
}
+public final class kotlinx/coroutines/flow/internal/CombineKt {
+ public static final fun combineInternal (Lkotlinx/coroutines/flow/FlowCollector;[Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+}
+
public final class kotlinx/coroutines/flow/internal/FlowExceptions_commonKt {
public static final fun checkIndexOverflow (I)I
}
@@ -980,6 +997,10 @@ public final class kotlinx/coroutines/flow/internal/SendingCollector : kotlinx/c
public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}
+public final class kotlinx/coroutines/intrinsics/CancellableKt {
+ public static final fun startCoroutineCancellable (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)V
+}
+
public class kotlinx/coroutines/scheduling/ExperimentalCoroutineDispatcher : kotlinx/coroutines/ExecutorCoroutineDispatcher {
public synthetic fun <init> (II)V
public synthetic fun <init> (IIILkotlin/jvm/internal/DefaultConstructorMarker;)V
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-debug.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-debug.txt
index 604e6cd2..79f5b75d 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-debug.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-debug.txt
@@ -1,13 +1,9 @@
public final class kotlinx/coroutines/debug/CoroutineInfo {
- public final fun component1 ()Lkotlin/coroutines/CoroutineContext;
- public final fun copy (Lkotlin/coroutines/CoroutineContext;Lkotlin/coroutines/jvm/internal/CoroutineStackFrame;J)Lkotlinx/coroutines/debug/CoroutineInfo;
- public static synthetic fun copy$default (Lkotlinx/coroutines/debug/CoroutineInfo;Lkotlin/coroutines/CoroutineContext;Lkotlin/coroutines/jvm/internal/CoroutineStackFrame;JILjava/lang/Object;)Lkotlinx/coroutines/debug/CoroutineInfo;
- public fun equals (Ljava/lang/Object;)Z
+ public final fun copy ()Lkotlinx/coroutines/debug/CoroutineInfo;
public final fun getContext ()Lkotlin/coroutines/CoroutineContext;
public final fun getCreationStackTrace ()Ljava/util/List;
public final fun getJob ()Lkotlinx/coroutines/Job;
public final fun getState ()Lkotlinx/coroutines/debug/State;
- public fun hashCode ()I
public final fun lastObservedStackTrace ()Ljava/util/List;
public fun toString ()Ljava/lang/String;
}
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt
index 643f6417..fb24c874 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt
@@ -14,11 +14,29 @@ public final class kotlinx/coroutines/reactive/ChannelKt {
public static synthetic fun openSubscription$default (Lorg/reactivestreams/Publisher;IILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
}
+public abstract interface class kotlinx/coroutines/reactive/ContextInjector {
+ public abstract fun injectCoroutineContext (Lorg/reactivestreams/Publisher;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher;
+}
+
public final class kotlinx/coroutines/reactive/ConvertKt {
public static final fun asPublisher (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher;
public static synthetic fun asPublisher$default (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lorg/reactivestreams/Publisher;
}
+public final class kotlinx/coroutines/reactive/FlowKt {
+ public static final fun asFlow (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun asFlow (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/flow/Flow;
+ public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;)Lorg/reactivestreams/Publisher;
+}
+
+public final class kotlinx/coroutines/reactive/FlowSubscription : kotlinx/coroutines/AbstractCoroutine, org/reactivestreams/Subscription {
+ public final field flow Lkotlinx/coroutines/flow/Flow;
+ public final field subscriber Lorg/reactivestreams/Subscriber;
+ public fun <init> (Lkotlinx/coroutines/flow/Flow;Lorg/reactivestreams/Subscriber;)V
+ public fun cancel ()V
+ public fun request (J)V
+}
+
public final class kotlinx/coroutines/reactive/PublishKt {
public static final fun publish (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher;
public static final fun publish (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher;
@@ -44,12 +62,3 @@ public final class kotlinx/coroutines/reactive/PublisherCoroutine : kotlinx/coro
public fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}
-public final class kotlinx/coroutines/reactive/flow/FlowAsPublisherKt {
- public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lorg/reactivestreams/Publisher;
-}
-
-public final class kotlinx/coroutines/reactive/flow/PublisherAsFlowKt {
- public static final fun from (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/flow/Flow;
- public static final fun from (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/flow/Flow;
-}
-
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt
index 46b35ed7..20e20baa 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt
@@ -5,6 +5,10 @@ public final class kotlinx/coroutines/reactor/ConvertKt {
public static final fun asMono (Lkotlinx/coroutines/Job;Lkotlin/coroutines/CoroutineContext;)Lreactor/core/publisher/Mono;
}
+public final class kotlinx/coroutines/reactor/FlowKt {
+ public static final fun asFlux (Lkotlinx/coroutines/flow/Flow;)Lreactor/core/publisher/Flux;
+}
+
public final class kotlinx/coroutines/reactor/FluxKt {
public static final fun flux (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Flux;
public static final fun flux (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Flux;
diff --git a/build.gradle b/build.gradle
index 4bccce7a..c05c07af 100644
--- a/build.gradle
+++ b/build.gradle
@@ -147,12 +147,10 @@ if (build_snapshot_train) {
allprojects {
tasks.withType(Test).all {
exclude '**/*LinearizabilityTest*'
- exclude '**/*PublicApiTest*' // KT-30956
exclude '**/*LFTest*'
exclude '**/*StressTest*'
exclude '**/*scheduling*'
exclude '**/*Timeout*'
- exclude '**/*coroutines/debug*' // Unmute after 1.3.31 where inlining was fixed
exclude '**/*definitely/not/kotlinx*'
}
}
diff --git a/docs/coroutine-context-and-dispatchers.md b/docs/coroutine-context-and-dispatchers.md
index 4e366dde..4769c1e2 100644
--- a/docs/coroutine-context-and-dispatchers.md
+++ b/docs/coroutine-context-and-dispatchers.md
@@ -221,7 +221,8 @@ The `log` function prints the name of the thread in square brackets, and you can
thread with the identifier of the currently executing coroutine appended to it. This identifier
is consecutively assigned to all created coroutines when the debugging mode is on.
-You can read more about debugging facilities in the documentation of the [newCoroutineContext] function.
+> Debugging mode is also turned on when JVM is run with `-ea` option.
+You can read more about debugging facilities in the documentation of the [DEBUG_PROPERTY_NAME] property.
### Jumping between threads
@@ -696,7 +697,7 @@ that should be implemented.
[ExecutorCoroutineDispatcher.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-executor-coroutine-dispatcher/close.html
[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
[delay]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/delay.html
-[newCoroutineContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/new-coroutine-context.html
+[DEBUG_PROPERTY_NAME]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-d-e-b-u-g_-p-r-o-p-e-r-t-y_-n-a-m-e.html
[withContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html
[isActive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/is-active.html
[CoroutineScope.coroutineContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/coroutine-context.html
diff --git a/docs/debugging.md b/docs/debugging.md
index fc857012..e2c7ec1e 100644
--- a/docs/debugging.md
+++ b/docs/debugging.md
@@ -45,7 +45,7 @@ It is easy to demonstrate with actual stacktraces of the same program that await
The only downside of this approach is losing referential transparency of the exception.
-### Stacktrace recovery machinery
+### Stacktrace recovery machinery
This section explains the inner mechanism of stacktrace recovery and can be skipped.
@@ -56,6 +56,7 @@ and then throws the resulting exception instead of the original one.
Exception copy logic is straightforward:
1) If the exception class implements [CopyableThrowable], [CopyableThrowable.createCopy] is used.
+ `null` can be returned from `createCopy` to opt-out specific exception from being recovered.
2) If the exception class has class-specific fields not inherited from Throwable, the exception is not copied.
3) Otherwise, one of the public exception's constructor is invoked reflectively with an optional `initCause` call.
diff --git a/gradle.properties b/gradle.properties
index 60c65a82..1fd92b1e 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,5 +1,5 @@
# Kotlin
-version=1.3.0-RC-SNAPSHOT
+version=1.3.0-RC2-SNAPSHOT
group=org.jetbrains.kotlinx
kotlin_version=1.3.41
diff --git a/integration/kotlinx-coroutines-guava/build.gradle b/integration/kotlinx-coroutines-guava/build.gradle
index 48fd0f56..9e44b998 100644
--- a/integration/kotlinx-coroutines-guava/build.gradle
+++ b/integration/kotlinx-coroutines-guava/build.gradle
@@ -2,7 +2,7 @@
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-ext.guava_version = '24.0-jre'
+ext.guava_version = '28.0-jre'
dependencies {
compile "com.google.guava:guava:$guava_version"
diff --git a/kotlinx-coroutines-bom/build.gradle b/kotlinx-coroutines-bom/build.gradle
index 9ec43b2a..c6675dd3 100644
--- a/kotlinx-coroutines-bom/build.gradle
+++ b/kotlinx-coroutines-bom/build.gradle
@@ -10,8 +10,14 @@ def name = project.name
dependencyManagement {
dependencies {
rootProject.subprojects.each {
- if (!ext.unpublished.contains(it.name) && it.name != name) {
- dependency(group: it.group, name: it.name, version: it.version)
+ if (ext.unpublished.contains(it.name)) return
+ if (it.name == name) return
+ if (!it.plugins.hasPlugin('maven-publish')) return
+ evaluationDependsOn(it.path)
+ it.publishing.publications.all {
+ if (it.artifactId.endsWith("-kotlinMultiplatform")) return
+ if (it.artifactId.endsWith("-metadata")) return
+ dependency(group: it.groupId, name: it.artifactId, version: it.version)
}
}
}
diff --git a/kotlinx-coroutines-core/common/README.md b/kotlinx-coroutines-core/common/README.md
index a0cc8091..e59392ee 100644
--- a/kotlinx-coroutines-core/common/README.md
+++ b/kotlinx-coroutines-core/common/README.md
@@ -65,7 +65,7 @@ helper function. [NonCancellable] job object is provided to suppress cancellatio
This module provides debugging facilities for coroutines (run JVM with `-ea` or `-Dkotlinx.coroutines.debug` options)
and [newCoroutineContext] function to write user-defined coroutine builders that work with these
-debugging facilities.
+debugging facilities. See [DEBUG_PROPERTY_NAME] for more details.
This module provides a special CoroutineContext type [TestCoroutineCoroutineContext][kotlinx.coroutines.test.TestCoroutineContext] that
allows the writer of code that contains Coroutines with delays and timeouts to write non-flaky unit-tests for that code allowing these tests to
@@ -124,6 +124,7 @@ Low-level primitives for finer-grained control of coroutines.
[Deferred.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-deferred/await.html
[Deferred.onAwait]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-deferred/on-await.html
[newCoroutineContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/new-coroutine-context.html
+[DEBUG_PROPERTY_NAME]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-d-e-b-u-g_-p-r-o-p-e-r-t-y_-n-a-m-e.html
<!--- INDEX kotlinx.coroutines.sync -->
[kotlinx.coroutines.sync.Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html
[kotlinx.coroutines.sync.Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/lock.html
diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt
index 63e34fda..d7ca5f67 100644
--- a/kotlinx-coroutines-core/common/src/JobSupport.kt
+++ b/kotlinx-coroutines-core/common/src/JobSupport.kt
@@ -326,6 +326,9 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
* may leak to the [CoroutineExceptionHandler].
*/
private fun cancelParent(cause: Throwable): Boolean {
+ // Is scoped coroutine -- don't propagate, will be rethrown
+ if (isScopedCoroutine) return true
+
/* CancellationException is considered "normal" and parent usually is not cancelled when child produces it.
* This allow parent to cancel its children (normally) without being cancelled itself, unless
* child crashes and produce some other exception during its completion.
@@ -337,8 +340,6 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
return isCancellation
}
- // Is scoped coroutine -- don't propagate, will be rethrown
- if (isScopedCoroutine) return isCancellation
// Notify parent but don't forget to check cancellation
return parent.childCancelled(cause) || isCancellation
}
diff --git a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt
index 688125d9..1e1c0d3a 100644
--- a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt
@@ -8,6 +8,7 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.selects.*
import kotlin.jvm.*
+import kotlin.math.*
/**
* Channel with array buffer of a fixed [capacity].
@@ -29,10 +30,14 @@ internal open class ArrayChannel<E>(
}
private val lock = ReentrantLock()
- private val buffer: Array<Any?> = arrayOfNulls<Any?>(capacity)
+ /*
+ * Guarded by lock.
+ * Allocate minimum of capacity and 16 to avoid excess memory pressure for large channels when it's not necessary.
+ */
+ private var buffer: Array<Any?> = arrayOfNulls<Any?>(min(capacity, 8))
private var head: Int = 0
@Volatile
- private var size: Int = 0
+ private var size: Int = 0 // Invariant: size <= capacity
protected final override val isBufferAlwaysEmpty: Boolean get() = false
protected final override val isBufferEmpty: Boolean get() = size == 0
@@ -64,7 +69,8 @@ internal open class ArrayChannel<E>(
}
}
}
- buffer[(head + size) % capacity] = element // actually queue element
+ ensureCapacity(size)
+ buffer[(head + size) % buffer.size] = element // actually queue element
return OFFER_SUCCESS
}
// size == capacity: full
@@ -112,7 +118,8 @@ internal open class ArrayChannel<E>(
this.size = size // restore size
return ALREADY_SELECTED
}
- buffer[(head + size) % capacity] = element // actually queue element
+ ensureCapacity(size)
+ buffer[(head + size) % buffer.size] = element // actually queue element
return OFFER_SUCCESS
}
// size == capacity: full
@@ -123,6 +130,19 @@ internal open class ArrayChannel<E>(
return receive!!.offerResult
}
+ // Guarded by lock
+ private fun ensureCapacity(currentSize: Int) {
+ if (currentSize >= buffer.size) {
+ val newSize = min(buffer.size * 2, capacity)
+ val newBuffer = arrayOfNulls<Any?>(newSize)
+ for (i in 0 until currentSize) {
+ newBuffer[i] = buffer[(head + i) % buffer.size]
+ }
+ buffer = newBuffer
+ head = 0
+ }
+ }
+
// result is `E | POLL_FAILED | Closed`
protected override fun pollInternal(): Any? {
var send: Send? = null
@@ -149,9 +169,9 @@ internal open class ArrayChannel<E>(
}
if (replacement !== POLL_FAILED && replacement !is Closed<*>) {
this.size = size // restore size
- buffer[(head + size) % capacity] = replacement
+ buffer[(head + size) % buffer.size] = replacement
}
- head = (head + 1) % capacity
+ head = (head + 1) % buffer.size
}
// complete send the we're taken replacement from
if (token != null)
@@ -203,7 +223,7 @@ internal open class ArrayChannel<E>(
}
if (replacement !== POLL_FAILED && replacement !is Closed<*>) {
this.size = size // restore size
- buffer[(head + size) % capacity] = replacement
+ buffer[(head + size) % buffer.size] = replacement
} else {
// failed to poll or is already closed --> let's try to select receiving this element from buffer
if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
@@ -212,7 +232,7 @@ internal open class ArrayChannel<E>(
return ALREADY_SELECTED
}
}
- head = (head + 1) % capacity
+ head = (head + 1) % buffer.size
}
// complete send the we're taken replacement from
if (token != null)
@@ -226,7 +246,7 @@ internal open class ArrayChannel<E>(
lock.withLock {
repeat(size) {
buffer[head] = 0
- head = (head + 1) % capacity
+ head = (head + 1) % buffer.size
}
size = 0
}
@@ -237,5 +257,5 @@ internal open class ArrayChannel<E>(
// ------ debug ------
override val bufferDebugString: String
- get() = "(buffer:capacity=${buffer.size},size=$size)"
+ get() = "(buffer:capacity=$capacity,size=$size)"
}
diff --git a/kotlinx-coroutines-core/common/src/channels/Produce.kt b/kotlinx-coroutines-core/common/src/channels/Produce.kt
index a579d7a2..bf88b6a0 100644
--- a/kotlinx-coroutines-core/common/src/channels/Produce.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Produce.kt
@@ -126,7 +126,7 @@ public fun <E> CoroutineScope.produce(
return coroutine
}
-internal open class ProducerCoroutine<E>(
+private class ProducerCoroutine<E>(
parentContext: CoroutineContext, channel: Channel<E>
) : ChannelCoroutine<E>(parentContext, channel, active = true), ProducerScope<E> {
override val isActive: Boolean
diff --git a/kotlinx-coroutines-core/common/src/flow/Flow.kt b/kotlinx-coroutines-core/common/src/flow/Flow.kt
index bda326f8..6d87c2b9 100644
--- a/kotlinx-coroutines-core/common/src/flow/Flow.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Flow.kt
@@ -22,7 +22,7 @@ import kotlin.coroutines.*
* or [launchIn] operator that starts collection of the flow in the given scope.
* They are applied to the upstream flow and trigger execution of all operations.
* Execution of the flow is also called _collecting the flow_ and is always performed in a suspending manner
- * without actual blocking. Terminal operator complete normally or exceptionally depending on successful or failed
+ * without actual blocking. Terminal operators complete normally or exceptionally depending on successful or failed
* execution of all the flow operations in the upstream. The most basic terminal operator is [collect], for example:
*
* ```
@@ -37,10 +37,10 @@ import kotlin.coroutines.*
*
* By default, flows are _sequential_ and all flow operations are executed sequentially in the same coroutine,
* with an exception for a few operations specifically designed to introduce concurrency into flow
- * the execution such a [buffer] and [flatMapMerge]. See their documentation for details.
+ * execution such as [buffer] and [flatMapMerge]. See their documentation for details.
*
- * Flow interface does not carry information whether a flow is a truly a cold stream that can be collected repeatedly and
- * triggers execution of the same code every time it is collected or if it is a hot stream that emits different
+ * The `Flow` interface does not carry information whether a flow truly is a cold stream that can be collected repeatedly and
+ * triggers execution of the same code every time it is collected, or if it is a hot stream that emits different
* values from the same running source on each collection. However, conventionally flows represent cold streams.
* Transitions between hot and cold streams are supported via channels and the corresponding API:
* [channelFlow], [produceIn], [broadcastIn].
@@ -54,18 +54,18 @@ import kotlin.coroutines.*
* * [flow { ... }][flow] builder function to construct arbitrary flows from
* sequential calls to [emit][FlowCollector.emit] function.
* * [channelFlow { ... }][channelFlow] builder function to construct arbitrary flows from
- * potentially concurrent calls to [send][kotlinx.coroutines.channels.SendChannel.send] function.
+ * potentially concurrent calls to the [send][kotlinx.coroutines.channels.SendChannel.send] function.
*
* ### Flow constraints
*
- * All implementations of `Flow` interface must adhere to two key properties that are described in detail below:
+ * All implementations of the `Flow` interface must adhere to two key properties described in detail below:
*
* * Context preservation.
* * Exception transparency.
*
* These properties ensure the ability to perform local reasoning about the code with flows and modularize the code
- * in such a way so that upstream flow emitters can be developed separately from downstream flow collectors.
- * A user of the flow does not needs to know implementation details of the upstream flows it uses.
+ * in such a way that upstream flow emitters can be developed separately from downstream flow collectors.
+ * A user of a flow does not need to be aware of implementation details of the upstream flows it uses.
*
* ### Context preservation
*
@@ -73,8 +73,8 @@ import kotlin.coroutines.*
* it downstream, thus making reasoning about the execution context of particular transformations or terminal
* operations trivial.
*
- * There is the only way to change the context of a flow: [flowOn][Flow.flowOn] operator,
- * that changes the upstream context ("everything above the flowOn operator").
+ * There is only one way to change the context of a flow: the [flowOn][Flow.flowOn] operator
+ * that changes the upstream context ("everything above the `flowOn` operator").
* For additional information refer to its documentation.
*
* This reasoning can be demonstrated in practice:
@@ -97,7 +97,7 @@ import kotlin.coroutines.*
* ```
*
* From the implementation point of view, it means that all flow implementations should
- * emit only from the same coroutine.
+ * only emit from the same coroutine.
* This constraint is efficiently enforced by the default [flow] builder.
* The [flow] builder should be used if flow implementation does not start any coroutines.
* Its implementation prevents most of the development mistakes:
@@ -114,27 +114,27 @@ import kotlin.coroutines.*
* }
* ```
*
- * Use [channelFlow] if the collection and emission of the flow are to be separated into multiple coroutines.
+ * Use [channelFlow] if the collection and emission of a flow are to be separated into multiple coroutines.
* It encapsulates all the context preservation work and allows you to focus on your
* domain-specific problem, rather than invariant implementation details.
* It is possible to use any combination of coroutine builders from within [channelFlow].
*
- * If you are looking for the performance and are sure that no concurrent emits and context jumps will happen,
- * [flow] builder alongside with [coroutineScope] or [supervisorScope] can be used instead:
+ * If you are looking for performance and are sure that no concurrent emits and context jumps will happen,
+ * the [flow] builder can be used alongside a [coroutineScope] or [supervisorScope] instead:
* - Scoped primitive should be used to provide a [CoroutineScope].
* - Changing the context of emission is prohibited, no matter whether it is `withContext(ctx)` or
- * builder argument (e.g. `launch(ctx)`).
+ * a builder argument (e.g. `launch(ctx)`).
* - Collecting another flow from a separate context is allowed, but it has the same effect as
- * [flowOn] operator on that flow, which is more efficient.
+ * applying the [flowOn] operator to that flow, which is more efficient.
*
* ### Exception transparency
*
* Flow implementations never catch or handle exceptions that occur in downstream flows. From the implementation standpoint
* it means that calls to [emit][FlowCollector.emit] and [emitAll] shall never be wrapped into
* `try { ... } catch { ... }` blocks. Exception handling in flows shall be performed with
- * [catch][Flow.catch] operator and it is designed to catch only exception coming from upstream flow while passing
- * all the downstream exceptions. Similarly, terminal operators like [collect][Flow.collect]
- * throw any unhandled exception that occurs in its code or in upstream flows, for example:
+ * [catch][Flow.catch] operator and it is designed to only catch exceptions coming from upstream flows while passing
+ * all downstream exceptions. Similarly, terminal operators like [collect][Flow.collect]
+ * throw any unhandled exceptions that occur in their code or in upstream flows, for example:
*
* ```
* flow { emitData() }
@@ -143,13 +143,13 @@ import kotlin.coroutines.*
* .map { computeTwo(it) }
* .collect { process(it) } // throws exceptions from process and computeTwo
* ```
- * The same reasoning can be applied to [onCompletion] operator that is a declarative replacement for `finally` block.
+ * The same reasoning can be applied to the [onCompletion] operator that is a declarative replacement for the `finally` block.
*
- * Failure to adhere to the exception transparency requirement would result in strange behaviours that would make
+ * Failure to adhere to the exception transparency requirement can lead to strange behaviors which make
* it hard to reason about the code because an exception in the `collect { ... }` could be somehow "caught"
- * by the upstream flow, limiting the ability of local reasoning about the code.
+ * by an upstream flow, limiting the ability of local reasoning about the code.
*
- * Currently, flow infrastructure does not enforce exception transparency contracts, however, it might be enforced
+ * Currently, the flow infrastructure does not enforce exception transparency contracts, however, it might be enforced
* in the future either at run time or at compile time.
*
* ### Reactive streams
@@ -162,9 +162,9 @@ public interface Flow<out T> {
* Accepts the given [collector] and [emits][FlowCollector.emit] values into it.
* This method should never be implemented or used directly.
*
- * The only way to implement flow interface directly is to extend [AbstractFlow].
- * To collect it into the specific collector, either `collector.emitAll(flow)` or `collect { ... }` extension
- * should be used. Such limitation ensures that context preservation property is not violated and prevents most
+ * The only way to implement the `Flow` interface directly is to extend [AbstractFlow].
+ * To collect it into a specific collector, either `collector.emitAll(flow)` or `collect { ... }` extension
+ * should be used. Such limitation ensures that the context preservation property is not violated and prevents most
* of the developer mistakes related to concurrency, inconsistent flow dispatchers and cancellation.
*/
@InternalCoroutinesApi
@@ -172,8 +172,8 @@ public interface Flow<out T> {
}
/**
- * Base class to extend to have a stateful implementation of the flow.
- * It tracks all the properties required for context preservation and throws [IllegalStateException]
+ * Base class for stateful implementations of `Flow`.
+ * It tracks all the properties required for context preservation and throws an [IllegalStateException]
* if any of the properties are violated.
*
* Example of the implementation:
diff --git a/kotlinx-coroutines-core/common/src/flow/Migration.kt b/kotlinx-coroutines-core/common/src/flow/Migration.kt
index b7e91f50..16769ad8 100644
--- a/kotlinx-coroutines-core/common/src/flow/Migration.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Migration.kt
@@ -8,6 +8,9 @@
package kotlinx.coroutines.flow
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.internal.*
+import kotlinx.coroutines.flow.internal.unsafeFlow
import kotlin.coroutines.*
import kotlin.jvm.*
@@ -100,29 +103,6 @@ public fun <T> Flow<T>.publishOn(context: CoroutineContext): Flow<T> = noImpl()
public fun <T> Flow<T>.subscribeOn(context: CoroutineContext): Flow<T> = noImpl()
/**
- * Use [BroadcastChannel][kotlinx.coroutines.channels.BroadcastChannel].asFlow().
- * @suppress
- */
-@Deprecated(message = "Use BroadcastChannel.asFlow()", level = DeprecationLevel.ERROR)
-public fun BehaviourSubject(): Any = noImpl()
-
-/**
- * `ReplaySubject` is not supported. The closest analogue is buffered [BroadcastChannel][kotlinx.coroutines.channels.BroadcastChannel].
- * @suppress
- */
-@Deprecated(
- message = "ReplaySubject is not supported. The closest analogue is buffered broadcast channel",
- level = DeprecationLevel.ERROR)
-public fun ReplaySubject(): Any = noImpl()
-
-/**
- * `PublishSubject` is not supported.
- * @suppress
- */
-@Deprecated(message = "PublishSubject is not supported", level = DeprecationLevel.ERROR)
-public fun PublishSubject(): Any = noImpl()
-
-/**
* Flow analogue of `onErrorXxx` is [catch].
* Use `catch { emitAll(fallback) }`.
* @suppress
@@ -380,7 +360,81 @@ public fun <T> Flow<T>.concatWith(value: T): Flow<T> = noImpl()
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'concatWith' is 'onCompletion'. Use 'onCompletion { emitAll(other) }'",
- replaceWith = ReplaceWith("onCompletion { emitAkk(other) }")
+ replaceWith = ReplaceWith("onCompletion { emitAll(other) }")
)
public fun <T> Flow<T>.concatWith(other: Flow<T>): Flow<T> = noImpl()
+@Deprecated(
+ level = DeprecationLevel.ERROR,
+ message = "Flow analogue of 'combineLatest' is 'combine'",
+ replaceWith = ReplaceWith("this.combine(other, transform)")
+)
+public fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
+ combine(this, other, transform)
+
+@Deprecated(
+ level = DeprecationLevel.ERROR,
+ message = "Flow analogue of 'combineLatest' is 'combine'",
+ replaceWith = ReplaceWith("combine(this, other, other2, transform)")
+)
+public inline fun <T1, T2, T3, R> Flow<T1>.combineLatest(
+ other: Flow<T2>,
+ other2: Flow<T3>,
+ crossinline transform: suspend (T1, T2, T3) -> R
+) = combine(this, other, other2, transform)
+
+@Deprecated(
+ level = DeprecationLevel.ERROR,
+ message = "Flow analogue of 'combineLatest' is 'combine'",
+ replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)")
+)
+public inline fun <T1, T2, T3, T4, R> Flow<T1>.combineLatest(
+ other: Flow<T2>,
+ other2: Flow<T3>,
+ other3: Flow<T4>,
+ crossinline transform: suspend (T1, T2, T3, T4) -> R
+) = combine(this, other, other2, other3, transform)
+
+@Deprecated(
+ level = DeprecationLevel.ERROR,
+ message = "Flow analogue of 'combineLatest' is 'combine'",
+ replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)")
+)
+public inline fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest(
+ other: Flow<T2>,
+ other2: Flow<T3>,
+ other3: Flow<T4>,
+ other4: Flow<T5>,
+ crossinline transform: suspend (T1, T2, T3, T4, T5) -> R
+): Flow<R> = combine(this, other, other2, other3, other4, transform)
+
+/**
+ * Delays the emission of values from this flow for the given [timeMillis].
+ * Use `onStart { delay(timeMillis) }`.
+ * @suppress
+ */
+@Deprecated(
+ level = DeprecationLevel.WARNING, // since 1.3.0, error in 1.4.0
+ message = "Use 'onStart { delay(timeMillis) }'",
+ replaceWith = ReplaceWith("onStart { delay(timeMillis) }")
+)
+public fun <T> Flow<T>.delayFlow(timeMillis: Long): Flow<T> = onStart { delay(timeMillis) }
+
+/**
+ * Delays each element emitted by the given flow for the given [timeMillis].
+ * Use `onEach { delay(timeMillis) }`.
+ * @suppress
+ */
+@Deprecated(
+ level = DeprecationLevel.WARNING, // since 1.3.0, error in 1.4.0
+ message = "Use 'onEach { delay(timeMillis) }'",
+ replaceWith = ReplaceWith("onEach { delay(timeMillis) }")
+)
+public fun <T> Flow<T>.delayEach(timeMillis: Long): Flow<T> = onEach { delay(timeMillis) }
+
+@Deprecated(
+ level = DeprecationLevel.ERROR,
+ message = "Flow analogues of 'switchMap' are 'transformLatest', 'flatMapLatest' and 'mapLatest'",
+ replaceWith = ReplaceWith("this.flatMapLatest(transform)")
+)
+public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = flatMapLatest(transform)
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
index 99a3bdc6..3bae2ebd 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
@@ -68,7 +68,7 @@ public abstract class ChannelFlow<T>(
scope.broadcast(context, produceCapacity, start, block = collectToFun)
open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
- scope.flowProduce(context, produceCapacity, block = collectToFun)
+ scope.produce(context, produceCapacity, block = collectToFun)
override suspend fun collect(collector: FlowCollector<T>) =
coroutineScope {
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
new file mode 100644
index 00000000..f7edad08
--- /dev/null
+++ b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+@file:Suppress("UNCHECKED_CAST", "NON_APPLICABLE_CALL_FOR_BUILDER_INFERENCE") // KT-32203
+
+package kotlinx.coroutines.flow.internal
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.internal.*
+import kotlinx.coroutines.selects.*
+
+internal fun getNull(): Symbol = NULL // Workaround for JS BE bug
+
+internal suspend fun <T1, T2, R> FlowCollector<R>.combineTransformInternal(
+ first: Flow<T1>, second: Flow<T2>,
+ transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
+) {
+ coroutineScope {
+ val firstChannel = asFairChannel(first)
+ val secondChannel = asFairChannel(second)
+ var firstValue: Any? = null
+ var secondValue: Any? = null
+ var firstIsClosed = false
+ var secondIsClosed = false
+ while (!firstIsClosed || !secondIsClosed) {
+ select<Unit> {
+ onReceive(firstIsClosed, firstChannel, { firstIsClosed = true }) { value ->
+ firstValue = value
+ if (secondValue !== null) {
+ transform(getNull().unbox(firstValue), getNull().unbox(secondValue) as T2)
+ }
+ }
+
+ onReceive(secondIsClosed, secondChannel, { secondIsClosed = true }) { value ->
+ secondValue = value
+ if (firstValue !== null) {
+ transform(getNull().unbox(firstValue) as T1, getNull().unbox(secondValue) as T2)
+ }
+ }
+ }
+ }
+ }
+}
+
+@PublishedApi
+internal suspend fun <R, T> FlowCollector<R>.combineInternal(
+ flows: Array<out Flow<T>>,
+ arrayFactory: () -> Array<T?>,
+ transform: suspend FlowCollector<R>.(Array<T>) -> Unit
+) {
+ coroutineScope {
+ val size = flows.size
+ val channels =
+ Array(size) { asFairChannel(flows[it]) }
+ val latestValues = arrayOfNulls<Any?>(size)
+ val isClosed = Array(size) { false }
+
+ // See flow.combine(other) for explanation.
+ while (!isClosed.all { it }) {
+ select<Unit> {
+ for (i in 0 until size) {
+ onReceive(isClosed[i], channels[i], { isClosed[i] = true }) { value ->
+ latestValues[i] = value
+ if (latestValues.all { it !== null }) {
+ val arguments = arrayFactory()
+ for (index in 0 until size) {
+ arguments[index] = NULL.unbox(latestValues[index])
+ }
+ transform(arguments as Array<T>)
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+private inline fun SelectBuilder<Unit>.onReceive(
+ isClosed: Boolean,
+ channel: ReceiveChannel<Any>,
+ crossinline onClosed: () -> Unit,
+ noinline onReceive: suspend (value: Any) -> Unit
+) {
+ if (isClosed) return
+ channel.onReceiveOrNull {
+ // TODO onReceiveOrClosed when boxing issues are fixed
+ if (it === null) onClosed()
+ else onReceive(it)
+ }
+}
+
+// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
+private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
+ val channel = channel as ChannelCoroutine<Any>
+ flow.collect { value ->
+ return@collect channel.sendFair(value ?: NULL)
+ }
+}
+
+internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = unsafeFlow {
+ coroutineScope {
+ val first = asChannel(flow)
+ val second = asChannel(flow2)
+ /*
+ * This approach only works with rendezvous channel and is required to enforce correctness
+ * in the following scenario:
+ * ```
+ * val f1 = flow { emit(1); delay(Long.MAX_VALUE) }
+ * val f2 = flowOf(1)
+ * f1.zip(f2) { ... }
+ * ```
+ *
+ * Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
+ */
+ (second as SendChannel<*>).invokeOnClose {
+ if (!first.isClosedForReceive) first.cancel(AbortFlowException())
+ }
+
+ val otherIterator = second.iterator()
+ try {
+ first.consumeEach { value ->
+ if (!otherIterator.hasNext()) {
+ return@consumeEach
+ }
+ emit(transform(NULL.unbox(value), NULL.unbox(otherIterator.next())))
+ }
+ } catch (e: AbortFlowException) {
+ // complete
+ } finally {
+ if (!second.isClosedForReceive) second.cancel(AbortFlowException())
+ }
+ }
+}
+
+// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
+private fun CoroutineScope.asChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
+ flow.collect { value ->
+ return@collect channel.send(value ?: NULL)
+ }
+}
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt
index f0b5b391..adc3a17d 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt
@@ -52,20 +52,6 @@ internal fun <R> scopedFlow(@BuilderInference block: suspend CoroutineScope.(Flo
flowScope { block(collector) }
}
-/*
- * Shortcut for produce { flowScope {block() } }
- */
-internal fun <T> CoroutineScope.flowProduce(
- context: CoroutineContext,
- capacity: Int = 0, @BuilderInference block: suspend ProducerScope<T>.() -> Unit
-): ReceiveChannel<T> {
- val channel = Channel<T>(capacity)
- val newContext = newCoroutineContext(context)
- val coroutine = FlowProduceCoroutine(newContext, channel)
- coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
- return coroutine
-}
-
private class FlowCoroutine<T>(
context: CoroutineContext,
uCont: Continuation<T>
@@ -75,13 +61,3 @@ private class FlowCoroutine<T>(
return cancelImpl(cause)
}
}
-
-private class FlowProduceCoroutine<T>(
- parentContext: CoroutineContext,
- channel: Channel<T>
-) : ProducerCoroutine<T>(parentContext, channel) {
- public override fun childCancelled(cause: Throwable): Boolean {
- if (cause is ChildCancelledException) return true
- return cancelImpl(cause)
- }
-}
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt b/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt
new file mode 100644
index 00000000..f621be03
--- /dev/null
+++ b/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow.internal
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.sync.*
+import kotlin.coroutines.*
+
+internal class ChannelFlowTransformLatest<T, R>(
+ private val transform: suspend FlowCollector<R>.(value: T) -> Unit,
+ flow: Flow<T>,
+ context: CoroutineContext = EmptyCoroutineContext,
+ capacity: Int = Channel.BUFFERED
+) : ChannelFlowOperator<T, R>(flow, context, capacity) {
+ override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<R> =
+ ChannelFlowTransformLatest(transform, flow, context, capacity)
+
+ override suspend fun flowCollect(collector: FlowCollector<R>) {
+ assert { collector is SendingCollector } // So cancellation behaviour is not leaking into the downstream
+ flowScope {
+ var previousFlow: Job? = null
+ flow.collect { value ->
+ previousFlow?.apply {
+ cancel(ChildCancelledException())
+ join()
+ }
+ // Do not pay for dispatch here, it's never necessary
+ previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
+ collector.transform(value)
+ }
+ }
+ }
+ }
+}
+
+internal class ChannelFlowMerge<T>(
+ flow: Flow<Flow<T>>,
+ private val concurrency: Int,
+ context: CoroutineContext = EmptyCoroutineContext,
+ capacity: Int = Channel.OPTIONAL_CHANNEL
+) : ChannelFlowOperator<Flow<T>, T>(flow, context, capacity) {
+ override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
+ ChannelFlowMerge(flow, concurrency, context, capacity)
+
+ // The actual merge implementation with concurrency limit
+ private suspend fun mergeImpl(scope: CoroutineScope, collector: ConcurrentFlowCollector<T>) {
+ val semaphore = Semaphore(concurrency)
+ val job: Job? = coroutineContext[Job]
+ flow.collect { inner ->
+ /*
+ * We launch a coroutine on each emitted element and the only potential
+ * suspension point in this collector is `semaphore.acquire` that rarely suspends,
+ * so we manually check for cancellation to propagate it to the upstream in time.
+ */
+ job?.ensureActive()
+ semaphore.acquire()
+ scope.launch {
+ try {
+ inner.collect(collector)
+ } finally {
+ semaphore.release() // Release concurrency permit
+ }
+ }
+ }
+ }
+
+ // Fast path in ChannelFlowOperator calls this function (channel was not created yet)
+ override suspend fun flowCollect(collector: FlowCollector<T>) {
+ // this function should not have been invoked when channel was explicitly requested
+ assert { capacity == Channel.OPTIONAL_CHANNEL }
+ flowScope {
+ mergeImpl(this, collector.asConcurrentFlowCollector())
+ }
+ }
+
+ // Slow path when output channel is required (and was created)
+ override suspend fun collectTo(scope: ProducerScope<T>) =
+ mergeImpl(scope, SendingCollector(scope))
+
+ override fun additionalToStringProps(): String =
+ "concurrency=$concurrency, "
+}
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt b/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt
index dbd7120e..c6ff12fc 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt
@@ -14,3 +14,11 @@ import kotlin.jvm.*
@JvmField
@SharedImmutable
internal val NULL = Symbol("NULL")
+
+/*
+ * Symbol used to indicate that the flow is complete.
+ * It should never leak to the outside world.
+ */
+@JvmField
+@SharedImmutable
+internal val DONE = Symbol("DONE")
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt b/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt
index 09a63781..8761058e 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt
@@ -81,7 +81,13 @@ internal class SafeCollector<T>(
"FlowCollector is not thread-safe and concurrent emissions are prohibited. To mitigate this restriction please use 'channelFlow' builder instead of 'flow'"
)
}
- count + 1
+
+ /*
+ * If collect job is null (-> EmptyCoroutineContext, probably run from `suspend fun main`), then invariant is maintained
+ * (common transitive parent is "null"), but count check will fail, so just do not count job context element when
+ * flow is collected from EmptyCoroutineContext
+ */
+ if (collectJob == null) count else count + 1
}
if (result != collectContextSize) {
error(
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt
index 8f3325c5..043c839f 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt
@@ -238,7 +238,7 @@ public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
* 4) It can be confused with [flowOn] operator, though [flowWith] is much rarer.
*/
@FlowPreview
-@Deprecated(message = "flowWith is deprecated without replacement, please refer to its KDoc for an explanation", level = DeprecationLevel.WARNING) // Error in beta release, removal in 1.4
+@Deprecated(message = "flowWith is deprecated without replacement, please refer to its KDoc for an explanation", level = DeprecationLevel.ERROR) // Error in beta release, removal in 1.4
public fun <T, R> Flow<T>.flowWith(
flowContext: CoroutineContext,
bufferSize: Int = BUFFERED,
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
index 8d74be55..85b9b07c 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
@@ -15,26 +15,6 @@ import kotlin.jvm.*
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
/**
- * Delays the emission of values from this flow for the given [timeMillis].
- */
-@ExperimentalCoroutinesApi
-public fun <T> Flow<T>.delayFlow(timeMillis: Long): Flow<T> = flow {
- delay(timeMillis)
- collect(this@flow)
-}
-
-/**
- * Delays each element emitted by the given flow for the given [timeMillis].
- */
-@ExperimentalCoroutinesApi
-public fun <T> Flow<T>.delayEach(timeMillis: Long): Flow<T> = flow {
- collect { value ->
- delay(timeMillis)
- emit(value)
- }
-}
-
-/**
* Returns a flow that mirrors the original flow, but filters out values
* that are followed by the newer values within the given [timeout][timeoutMillis].
* The latest value is always emitted.
@@ -62,18 +42,21 @@ public fun <T> Flow<T>.delayEach(timeMillis: Long): Flow<T> = flow {
public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
require(timeoutMillis > 0) { "Debounce timeout should be positive" }
return scopedFlow { downstream ->
- val values = Channel<Any?>(Channel.CONFLATED) // Actually Any, KT-30796
- // Channel is not closed deliberately as there is no close with value
- val collector = async {
- collect { value -> values.send(value ?: NULL) }
+ // Actually Any, KT-30796
+ val values = produce<Any?>(capacity = Channel.CONFLATED) {
+ collect { value -> send(value ?: NULL) }
}
-
- var isDone = false
var lastValue: Any? = null
- while (!isDone) {
+ while (lastValue !== DONE) {
select<Unit> {
- values.onReceive {
- lastValue = it
+ // Should be receiveOrClosed when boxing issues are fixed
+ values.onReceiveOrNull {
+ if (it == null) {
+ if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
+ lastValue = DONE
+ } else {
+ lastValue = it
+ }
}
lastValue?.let { value ->
@@ -83,12 +66,6 @@ public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
downstream.emit(NULL.unbox(value))
}
}
-
- // Close with value 'idiom'
- collector.onAwait {
- if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
- isDone = true
- }
}
}
}
@@ -118,16 +95,14 @@ public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> {
// Actually Any, KT-30796
collect { value -> send(value ?: NULL) }
}
-
- var isDone = false
var lastValue: Any? = null
val ticker = fixedPeriodTicker(periodMillis)
- while (!isDone) {
+ while (lastValue !== DONE) {
select<Unit> {
values.onReceiveOrNull {
if (it == null) {
ticker.cancel(ChildCancelledException())
- isDone = true
+ lastValue = DONE
} else {
lastValue = it
}
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
index e593d035..dccc1cd8 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
@@ -10,11 +10,8 @@ package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
-import kotlinx.coroutines.channels.Channel.Factory.OPTIONAL_CHANNEL
import kotlinx.coroutines.flow.internal.*
import kotlinx.coroutines.internal.*
-import kotlinx.coroutines.sync.*
-import kotlin.coroutines.*
import kotlin.jvm.*
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
@@ -106,8 +103,33 @@ public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY
}
/**
+ * Returns a flow that produces element by [transform] function every time the original flow emits a value.
+ * When the original flow emits a new value, the previous `transform` block is cancelled, thus the name `transformLatest`.
+ *
+ * For example, the following flow:
+ * ```
+ * flow {
+ * emit("a")
+ * delay(100)
+ * emit("b")
+ * }.transformLatest { value ->
+ * emit(value)
+ * delay(200)
+ * emit(value + "_last")
+ * }
+ * ```
+ * produces `a b b_last`.
+ *
+ * This operator is [buffered][buffer] by default
+ * and size of its output buffer can be changed by applying subsequent [buffer] operator.
+ */
+@ExperimentalCoroutinesApi
+public fun <T, R> Flow<T>.transformLatest(@BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R> =
+ ChannelFlowTransformLatest(transform, this)
+
+/**
* Returns a flow that switches to a new flow produced by [transform] function every time the original flow emits a value.
- * When switch on the a flow is performed, the previous one is cancelled.
+ * When the original flow emits a new value, the previous flow produced by `transform` block is cancelled.
*
* For example, the following flow:
* ```
@@ -115,69 +137,42 @@ public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY
* emit("a")
* delay(100)
* emit("b")
- * }.switchMap { value ->
+ * }.flatMapLatest { value ->
* flow {
- * emit(value + value)
+ * emit(value)
* delay(200)
* emit(value + "_last")
* }
* }
* ```
- * produces `aa bb b_last`
+ * produces `a b b_last`
+ *
+ * This operator is [buffered][buffer] by default and size of its output buffer can be changed by applying subsequent [buffer] operator.
*/
-@FlowPreview
-public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = scopedFlow { downstream ->
- var previousFlow: Job? = null
- collect { value ->
- // Linearize calls to emit as alternative to the channel. Bonus points for never-overlapping channels.
- previousFlow?.cancel(ChildCancelledException())
- previousFlow?.join()
- // Undispatched to have better user experience in case of synchronous flows
- previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
- downstream.emitAll(transform(value))
- }
- }
-}
-
-private class ChannelFlowMerge<T>(
- flow: Flow<Flow<T>>,
- private val concurrency: Int,
- context: CoroutineContext = EmptyCoroutineContext,
- capacity: Int = OPTIONAL_CHANNEL
-) : ChannelFlowOperator<Flow<T>, T>(flow, context, capacity) {
- override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
- ChannelFlowMerge(flow, concurrency, context, capacity)
-
- // The actual merge implementation with concurrency limit
- private suspend fun mergeImpl(scope: CoroutineScope, collector: ConcurrentFlowCollector<T>) {
- val semaphore = Semaphore(concurrency)
- @Suppress("UNCHECKED_CAST")
- flow.collect { inner ->
- semaphore.acquire() // Acquire concurrency permit
- scope.launch {
- try {
- inner.collect(collector)
- } finally {
- semaphore.release() // Release concurrency permit
- }
- }
- }
- }
-
- // Fast path in ChannelFlowOperator calls this function (channel was not created yet)
- override suspend fun flowCollect(collector: FlowCollector<T>) {
- // this function should not have been invoked when channel was explicitly requested
- assert { capacity == OPTIONAL_CHANNEL }
- flowScope {
- mergeImpl(this, collector.asConcurrentFlowCollector())
- }
- }
-
- // Slow path when output channel is required (and was created)
- override suspend fun collectTo(scope: ProducerScope<T>) =
- mergeImpl(scope, SendingCollector(scope))
-
- override fun additionalToStringProps(): String =
- "concurrency=$concurrency, "
-}
+@ExperimentalCoroutinesApi
+public inline fun <T, R> Flow<T>.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>): Flow<R> =
+ transformLatest { emitAll(transform(it)) }
+/**
+ * Returns a flow that emits elements from the original flow transformed by [transform] function.
+ * When the original flow emits a new value, computation of the [transform] block for previous value is cancelled.
+ *
+ * For example, the following flow:
+ * ```
+ * flow {
+ * emit("a")
+ * delay(100)
+ * emit("b")
+ * }.mapLatest { value ->
+ * println("Started computing $value")
+ * delay(200)
+ * "Computed $value"
+ * }
+ * ```
+ * will print "Started computing 1" and "Started computing 2", but the resulting flow will contain only "Computed 2" value.
+ *
+ * This operator is [buffered][buffer] by default and size of its output buffer can be changed by applying subsequent [buffer] operator.
+ */
+@ExperimentalCoroutinesApi
+public fun <T, R> Flow<T>.mapLatest(@BuilderInference transform: suspend (value: T) -> R): Flow<R> =
+ transformLatest { emit(transform(it)) }
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt
index 72822bbe..ba4f0520 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt
@@ -4,15 +4,14 @@
@file:JvmMultifileClass
@file:JvmName("FlowKt")
-@file:Suppress("UNCHECKED_CAST")
+@file:Suppress("UNCHECKED_CAST", "NON_APPLICABLE_CALL_FOR_BUILDER_INFERENCE") // KT-32203
package kotlinx.coroutines.flow
import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.internal.*
-import kotlinx.coroutines.selects.*
import kotlin.jvm.*
+import kotlinx.coroutines.flow.flow as safeFlow
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
/**
@@ -23,69 +22,123 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
* ```
* val flow = flowOf(1, 2).delayEach(10)
* val flow2 = flowOf("a", "b", "c").delayEach(15)
- * flow.combineLatest(flow2) { i, s -> i.toString() + s }.collect {
+ * flow.combine(flow2) { i, s -> i.toString() + s }.collect {
* println(it) // Will print "1a 2a 2b 2c"
* }
* ```
+ *
+ * This function is a shorthand for `flow.combineTransform(flow2) { a, b -> emit(transform(a, b)) }
*/
-@FlowPreview
-public fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = flow {
- coroutineScope {
- val firstChannel = asFairChannel(this@combineLatest)
- val secondChannel = asFairChannel(other)
- var firstValue: Any? = null
- var secondValue: Any? = null
- var firstIsClosed = false
- var secondIsClosed = false
-
- /*
- * Fun fact, this select **semantically** equivalent of the following:
- * ```
- * selectWhile<Unit> {
- * channel.onReceive {
- * emitCombined(...)
- * }
- * channel2.onReceive {
- * emitCombined(...)
- * }
- * }
- * ```
- * but we are waiting for `channels` branch to get merged where we will change semantics of the select
- * to ignore finished clauses.
- *
- * Instead (especially in the face of non-fair channels) we are using our own hand-rolled select emulation
- * on top of previous select.
- */
- while (!firstIsClosed || !secondIsClosed) {
- select<Unit> {
- onReceive(firstIsClosed, firstChannel, { firstIsClosed = true }) { value ->
- firstValue = value
- if (secondValue !== null) {
- emit(transform(NULL.unbox(firstValue), NULL.unbox(secondValue)))
- }
- }
-
- onReceive(secondIsClosed, secondChannel, { secondIsClosed = true }) { value ->
- secondValue = value
- if (firstValue !== null) {
- emit(transform(NULL.unbox(firstValue), NULL.unbox(secondValue)))
- }
- }
- }
- }
+@JvmName("flowCombine")
+@ExperimentalCoroutinesApi
+public fun <T1, T2, R> Flow<T1>.combine(flow: Flow<T2>, transform: suspend (a: T1, b: T2) -> R): Flow<R> = flow {
+ combineTransformInternal(this@combine, flow) { a, b ->
+ emit(transform(a, b))
}
}
/**
* Returns a [Flow] whose values are generated with [transform] function by combining
* the most recently emitted values by each flow.
+ *
+ * It can be demonstrated with the following example:
+ * ```
+ * val flow = flowOf(1, 2).delayEach(10)
+ * val flow2 = flowOf("a", "b", "c").delayEach(15)
+ * combine(flow, flow2) { i, s -> i.toString() + s }.collect {
+ * println(it) // Will print "1a 2a 2b 2c"
+ * }
+ * ```
+ *
+ * This function is a shorthand for `combineTransform(flow, flow2) { a, b -> emit(transform(a, b)) }
*/
-@FlowPreview
-public inline fun <T1, T2, T3, R> Flow<T1>.combineLatest(
- other: Flow<T2>,
- other2: Flow<T3>,
- crossinline transform: suspend (T1, T2, T3) -> R
-): Flow<R> = (this as Flow<*>).combineLatest(other, other2) { args: Array<*> ->
+@ExperimentalCoroutinesApi
+public fun <T1, T2, R> combine(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (a: T1, b: T2) -> R): Flow<R> =
+ flow.combine(flow2, transform)
+
+/**
+ * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
+ *
+ * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
+ * generic function that may transform emitted element, skip it or emit it multiple times.
+ *
+ * Its usage can be demonstrated with the following example:
+ * ```
+ * val flow = requestFlow()
+ * val flow2 = searchEngineFlow()
+ * flow.combineTransform(flow2) { request, searchEngine ->
+ * emit("Downloading in progress")
+ * val result = download(request, searchEngine)
+ * emit(result)
+ * }
+ * ```
+ */
+@JvmName("flowCombineTransform")
+@ExperimentalCoroutinesApi
+public fun <T1, T2, R> Flow<T1>.combineTransform(
+ flow: Flow<T2>,
+ @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
+): Flow<R> = safeFlow {
+ combineTransformInternal(this@combineTransform, flow) { a, b ->
+ transform(a, b)
+ }
+}
+
+/**
+ * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
+ *
+ * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
+ * generic function that may transform emitted element, skip it or emit it multiple times.
+ *
+ * Its usage can be demonstrated with the following example:
+ * ```
+ * val flow = requestFlow()
+ * val flow2 = searchEngineFlow()
+ * combineTransform(flow, flow2) { request, searchEngine ->
+ * emit("Downloading in progress")
+ * val result = download(request, searchEngine)
+ * emit(result)
+ * }
+ * ```
+ */
+@ExperimentalCoroutinesApi
+public fun <T1, T2, R> combineTransform(
+ flow: Flow<T1>,
+ flow2: Flow<T2>,
+ @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
+): Flow<R> = combineTransform(flow, flow2, transform)
+
+/**
+ * Returns a [Flow] whose values are generated with [transform] function by combining
+ * the most recently emitted values by each flow.
+ */
+@ExperimentalCoroutinesApi
+public inline fun <T1, T2, T3, R> combine(
+ flow: Flow<T1>,
+ flow2: Flow<T2>,
+ flow3: Flow<T3>,
+ @BuilderInference crossinline transform: suspend (T1, T2, T3) -> R
+): Flow<R> = combine(flow, flow2, flow3) { args: Array<*> ->
+ transform(
+ args[0] as T1,
+ args[1] as T2,
+ args[2] as T3
+ )
+}
+
+/**
+ * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
+ *
+ * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
+ * generic function that may transform emitted element, skip it or emit it multiple times.
+ */
+@ExperimentalCoroutinesApi
+public inline fun <T1, T2, T3, R> combineTransform(
+ flow: Flow<T1>,
+ flow2: Flow<T2>,
+ flow3: Flow<T3>,
+ @BuilderInference crossinline transform: suspend FlowCollector<R>.(T1, T2, T3) -> Unit
+): Flow<R> = combineTransform(flow, flow2, flow3) { args: Array<*> ->
transform(
args[0] as T1,
args[1] as T2,
@@ -97,13 +150,36 @@ public inline fun <T1, T2, T3, R> Flow<T1>.combineLatest(
* Returns a [Flow] whose values are generated with [transform] function by combining
* the most recently emitted values by each flow.
*/
-@FlowPreview
-public inline fun <T1, T2, T3, T4, R> Flow<T1>.combineLatest(
- other: Flow<T2>,
- other2: Flow<T3>,
- other3: Flow<T4>,
+@ExperimentalCoroutinesApi
+public inline fun <T1, T2, T3, T4, R> combine(
+ flow: Flow<T1>,
+ flow2: Flow<T2>,
+ flow3: Flow<T3>,
+ flow4: Flow<T4>,
crossinline transform: suspend (T1, T2, T3, T4) -> R
-): Flow<R> = (this as Flow<*>).combineLatest(other, other2, other3) { args: Array<*> ->
+): Flow<R> = combine(flow, flow2, flow3, flow4) { args: Array<*> ->
+ transform(
+ args[0] as T1,
+ args[1] as T2,
+ args[2] as T3,
+ args[3] as T4
+ )
+}
+
+/**
+ * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
+ *
+ * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
+ * generic function that may transform emitted element, skip it or emit it multiple times.
+ */
+@ExperimentalCoroutinesApi
+public inline fun <T1, T2, T3, T4, R> combineTransform(
+ flow: Flow<T1>,
+ flow2: Flow<T2>,
+ flow3: Flow<T3>,
+ flow4: Flow<T4>,
+ @BuilderInference crossinline transform: suspend FlowCollector<R>.(T1, T2, T3, T4) -> Unit
+): Flow<R> = combineTransform(flow, flow2, flow3, flow4) { args: Array<*> ->
transform(
args[0] as T1,
args[1] as T2,
@@ -116,14 +192,15 @@ public inline fun <T1, T2, T3, T4, R> Flow<T1>.combineLatest(
* Returns a [Flow] whose values are generated with [transform] function by combining
* the most recently emitted values by each flow.
*/
-@FlowPreview
-public inline fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest(
- other: Flow<T2>,
- other2: Flow<T3>,
- other3: Flow<T4>,
- other4: Flow<T5>,
+@ExperimentalCoroutinesApi
+public inline fun <T1, T2, T3, T4, T5, R> combine(
+ flow: Flow<T1>,
+ flow2: Flow<T2>,
+ flow3: Flow<T3>,
+ flow4: Flow<T4>,
+ flow5: Flow<T5>,
crossinline transform: suspend (T1, T2, T3, T4, T5) -> R
-): Flow<R> = (this as Flow<*>).combineLatest(other, other2, other3, other4) { args: Array<*> ->
+): Flow<R> = combine(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
transform(
args[0] as T1,
args[1] as T2,
@@ -134,67 +211,89 @@ public inline fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest(
}
/**
- * Returns a [Flow] whose values are generated with [transform] function by combining
- * the most recently emitted values by each flow.
+ * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
+ *
+ * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
+ * generic function that may transform emitted element, skip it or emit it multiple times.
*/
-@FlowPreview
-public inline fun <reified T, R> Flow<T>.combineLatest(vararg others: Flow<T>, crossinline transform: suspend (Array<T>) -> R): Flow<R> =
- combineLatest(*others, arrayFactory = { arrayOfNulls(others.size + 1) }, transform = { transform(it) })
+@ExperimentalCoroutinesApi
+public inline fun <T1, T2, T3, T4, T5, R> combineTransform(
+ flow: Flow<T1>,
+ flow2: Flow<T2>,
+ flow3: Flow<T3>,
+ flow4: Flow<T4>,
+ flow5: Flow<T5>,
+ @BuilderInference crossinline transform: suspend FlowCollector<R>.(T1, T2, T3, T4, T5) -> Unit
+): Flow<R> = combineTransform(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
+ transform(
+ args[0] as T1,
+ args[1] as T2,
+ args[2] as T3,
+ args[3] as T4,
+ args[4] as T5
+ )
+}
/**
* Returns a [Flow] whose values are generated with [transform] function by combining
* the most recently emitted values by each flow.
*/
-@PublishedApi
-internal fun <T, R> Flow<T>.combineLatest(vararg others: Flow<T>, arrayFactory: () -> Array<T?>, transform: suspend (Array<T>) -> R): Flow<R> = flow {
- coroutineScope {
- val size = others.size + 1
- val channels =
- Array(size) { if (it == 0) asFairChannel(this@combineLatest) else asFairChannel(others[it - 1]) }
- val latestValues = arrayOfNulls<Any?>(size)
- val isClosed = Array(size) { false }
-
- // See flow.combineLatest(other) for explanation.
- while (!isClosed.all { it }) {
- select<Unit> {
- for (i in 0 until size) {
- onReceive(isClosed[i], channels[i], { isClosed[i] = true }) { value ->
- latestValues[i] = value
- if (latestValues.all { it !== null }) {
- val arguments = arrayFactory()
- for (index in 0 until size) {
- arguments[index] = NULL.unbox(latestValues[index])
- }
- emit(transform(arguments as Array<T>))
- }
- }
- }
- }
- }
- }
+@ExperimentalCoroutinesApi
+public inline fun <reified T, R> combine(
+ vararg flows: Flow<T>,
+ crossinline transform: suspend (Array<T>) -> R
+): Flow<R> = flow {
+ combineInternal(flows, { arrayOfNulls(flows.size) }, { emit(transform(it)) })
}
-private inline fun SelectBuilder<Unit>.onReceive(
- isClosed: Boolean,
- channel: ReceiveChannel<Any>,
- crossinline onClosed: () -> Unit,
- noinline onReceive: suspend (value: Any) -> Unit
-) {
- if (isClosed) return
- channel.onReceiveOrNull {
- if (it === null) onClosed()
- else onReceive(it)
- }
+/**
+ * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
+ *
+ * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
+ * generic function that may transform emitted element, skip it or emit it multiple times.
+ */
+@ExperimentalCoroutinesApi
+public inline fun <reified T, R> combineTransform(
+ vararg flows: Flow<T>,
+ @BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
+): Flow<R> = safeFlow {
+ combineInternal(flows, { arrayOfNulls(flows.size) }, { transform(it) })
}
-// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
-private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
- val channel = channel as ChannelCoroutine<Any>
- flow.collect { value ->
- channel.sendFair(value ?: NULL)
+/**
+ * Returns a [Flow] whose values are generated with [transform] function by combining
+ * the most recently emitted values by each flow.
+ */
+@ExperimentalCoroutinesApi
+public inline fun <reified T, R> combine(
+ flows: Iterable<Flow<T>>,
+ crossinline transform: suspend (Array<T>) -> R
+): Flow<R> {
+ val flowArray = flows.toList().toTypedArray()
+ return flow {
+ combineInternal(
+ flowArray,
+ arrayFactory = { arrayOfNulls(flowArray.size) },
+ transform = { emit(transform(it)) })
}
}
+/**
+ * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
+ *
+ * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
+ * generic function that may transform emitted element, skip it or emit it multiple times.
+ */
+@ExperimentalCoroutinesApi
+public inline fun <reified T, R> combineTransform(
+ flows: Iterable<Flow<T>>,
+ @BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
+): Flow<R> {
+ val flowArray = flows.toList().toTypedArray()
+ return safeFlow {
+ combineInternal(flowArray, { arrayOfNulls(flowArray.size) }, { transform(it) })
+ }
+}
/**
* Zips values from the current flow (`this`) with [other] flow using provided [transform] function applied to each pair of values.
@@ -210,45 +309,4 @@ private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel<Any> = p
* ```
*/
@ExperimentalCoroutinesApi
-public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = flow {
- coroutineScope {
- val first = asChannel(this@zip)
- val second = asChannel(other)
- /*
- * This approach only works with rendezvous channel and is required to enforce correctness
- * in the following scenario:
- * ```
- * val f1 = flow { emit(1); delay(Long.MAX_VALUE) }
- * val f2 = flowOf(1)
- * f1.zip(f2) { ... }
- * ```
- *
- * Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
- */
- (second as SendChannel<*>).invokeOnClose {
- if (!first.isClosedForReceive) first.cancel(AbortFlowException())
- }
-
- val otherIterator = second.iterator()
- try {
- first.consumeEach { value ->
- if (!otherIterator.hasNext()) {
- return@consumeEach
- }
- val secondValue = NULL.unbox<T2>(otherIterator.next())
- emit(transform(NULL.unbox(value), NULL.unbox(secondValue)))
- }
- } catch (e: AbortFlowException) {
- // complete
- } finally {
- if (!second.isClosedForReceive) second.cancel(AbortFlowException())
- }
- }
-}
-
-// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
-private fun CoroutineScope.asChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
- flow.collect { value ->
- channel.send(value ?: NULL)
- }
-}
+public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = zipImpl(this, other, transform)
diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
index 42ac8003..c9480f99 100644
--- a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
+++ b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
@@ -35,6 +35,7 @@ public suspend fun Flow<*>.collect() = collect(NopCollector)
*
* This operator is usually used with [onEach], [onCompletion] and [catch] operators to process all emitted values
* handle an exception that might occur in the upstream flow or during processing, for example:
+ *
* ```
* flow
* .onEach { value -> updateUi(value) }
@@ -87,8 +88,46 @@ public suspend inline fun <T> Flow<T>.collectIndexed(crossinline action: suspend
})
/**
+ * Terminal flow operator that collects the given flow with a provided [action].
+ * The crucial difference from [collect] is that when the original flow emits a new value, [action] block for previous
+ * value is cancelled.
+ *
+ * It can be demonstrated by the following example:
+ *
+ * ```
+ * flow {
+ * emit(1)
+ * delay(50)
+ * emit(2)
+ * }.collectLatest { value ->
+ * println("Collecting $value")
+ * delay(100) // Emulate work
+ * println("$value collected")
+ * }
+ * ```
+ *
+ * prints "Collecting 1, Collecting 2, 2 collected"
+ */
+@ExperimentalCoroutinesApi
+public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit) {
+ /*
+ * Implementation note:
+ * buffer(0) is inserted here to fulfil user's expectations in sequential usages, e.g.:
+ * ```
+ * flowOf(1, 2, 3).collectLatest {
+ * delay(1)
+ * println(it) // Expect only 3 to be printed
+ * }
+ * ```
+ *
+ * It's not the case for intermediate operators which users mostly use for interactive UI,
+ * where performance of dispatch is more important.
+ */
+ mapLatest(action).buffer(0).collect()
+}
+
+/**
* Collects all the values from the given [flow] and emits them to the collector.
- *
* It is a shorthand for `flow.collect { value -> emit(value) }`.
*/
@ExperimentalCoroutinesApi
diff --git a/kotlinx-coroutines-core/common/src/internal/StackTraceRecovery.common.kt b/kotlinx-coroutines-core/common/src/internal/StackTraceRecovery.common.kt
index 8ce0fcd2..8599143e 100644
--- a/kotlinx-coroutines-core/common/src/internal/StackTraceRecovery.common.kt
+++ b/kotlinx-coroutines-core/common/src/internal/StackTraceRecovery.common.kt
@@ -42,9 +42,3 @@ internal expect interface CoroutineStackFrame {
public val callerFrame: CoroutineStackFrame?
public fun getStackTraceElement(): StackTraceElement?
}
-
-/**
- * Marker that indicates that stacktrace of the exception should not be recovered.
- * Currently internal, but may become public in the future
- */
-internal interface NonRecoverableThrowable
diff --git a/kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt b/kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt
index c442c95a..246ae2c2 100644
--- a/kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt
+++ b/kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt
@@ -12,7 +12,8 @@ import kotlin.coroutines.intrinsics.*
* Use this function to start coroutine in a cancellable way, so that it can be cancelled
* while waiting to be dispatched.
*/
-internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) = runSafely(completion) {
+@InternalCoroutinesApi
+public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) = runSafely(completion) {
createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)
}
diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt
index 6e0552d1..6ab377da 100644
--- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt
+++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt
@@ -1,19 +1,19 @@
package kotlinx.coroutines.sync
-import kotlinx.atomicfu.atomic
-import kotlinx.atomicfu.atomicArrayOfNulls
-import kotlinx.atomicfu.getAndUpdate
-import kotlinx.atomicfu.loop
+import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
-import kotlin.coroutines.resume
-import kotlin.math.max
+import kotlin.coroutines.*
+import kotlin.jvm.*
+import kotlin.math.*
/**
- * A counting semaphore for coroutines. It maintains a number of available permits.
- * Each [acquire] suspends if necessary until a permit is available, and then takes it.
+ * A counting semaphore for coroutines that logically maintains a number of available permits.
+ * Each [acquire] takes a single permit or suspends until it is available.
* Each [release] adds a permit, potentially releasing a suspended acquirer.
+ * Semaphore is fair and maintains a FIFO order of acquirers.
*
+ * Semaphores are mostly used to limit the number of coroutines that have an access to particular resource.
* Semaphore with `permits = 1` is essentially a [Mutex].
**/
public interface Semaphore {
@@ -29,11 +29,12 @@ public interface Semaphore {
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
* function is suspended, this function immediately resumes with [CancellationException].
*
- * *Cancellation of suspended semaphore acquisition` is atomic* -- when this function
+ * *Cancellation of suspended semaphore acquisition is atomic* -- when this function
* throws [CancellationException] it means that the semaphore was not acquired.
*
- * Note, that this function does not check for cancellation when it is not suspended.
- * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+ * Note, that this function does not check for cancellation when it does not suspend.
+ * Use [CoroutineScope.isActive] or [CoroutineScope.ensureActive] to periodically
+ * check for cancellation in tight loops if needed.
*
* Use [tryAcquire] to try acquire a permit of this semaphore without suspension.
*/
@@ -49,8 +50,7 @@ public interface Semaphore {
/**
* Releases a permit, returning it into this semaphore. Resumes the first
* suspending acquirer if there is one at the point of invocation.
- * Throws [IllegalStateException] if there is no acquired permit
- * at the point of invocation.
+ * Throws [IllegalStateException] if the number of [release] invocations is greater than the number of preceding [acquire].
*/
public fun release()
}
@@ -83,8 +83,8 @@ private class SemaphoreImpl(
private val permits: Int, acquiredPermits: Int
) : Semaphore, SegmentQueue<SemaphoreSegment>() {
init {
- require(permits > 0) { "Semaphore should have at least 1 permit" }
- require(acquiredPermits in 0..permits) { "The number of acquired permits should be in 0..permits" }
+ require(permits > 0) { "Semaphore should have at least 1 permit, but had $permits" }
+ require(acquiredPermits in 0..permits) { "The number of acquired permits should be in 0..$permits" }
}
override fun newSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev)
@@ -96,7 +96,7 @@ private class SemaphoreImpl(
* and the maximum number of waiting acquirers cannot be greater than 2^31 in any
* real application.
*/
- private val _availablePermits = atomic(permits)
+ private val _availablePermits = atomic(permits - acquiredPermits)
override val availablePermits: Int get() = max(_availablePermits.value, 0)
// The queue of waiting acquirers is essentially an infinite array based on `SegmentQueue`;
@@ -126,8 +126,8 @@ private class SemaphoreImpl(
resumeNextFromQueue()
}
- internal fun incPermits() = _availablePermits.getAndUpdate { cur ->
- check(cur < permits) { "The number of acquired permits cannot be greater than `permits`" }
+ fun incPermits() = _availablePermits.getAndUpdate { cur ->
+ check(cur < permits) { "The number of released permits cannot be greater than $permits" }
cur + 1
}
@@ -176,6 +176,8 @@ private class CancelSemaphoreAcquisitionHandler(
private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?): Segment<SemaphoreSegment>(id, prev) {
val acquirers = atomicArrayOfNulls<Any?>(SEGMENT_SIZE)
+ private val cancelledSlots = atomic(0)
+ override val removed get() = cancelledSlots.value == SEGMENT_SIZE
@Suppress("NOTHING_TO_INLINE")
inline fun get(index: Int): Any? = acquirers[index].value
@@ -186,9 +188,6 @@ private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?): Segment<Semap
@Suppress("NOTHING_TO_INLINE")
inline fun getAndSet(index: Int, value: Any?) = acquirers[index].getAndSet(value)
- private val cancelledSlots = atomic(0)
- override val removed get() = cancelledSlots.value == SEGMENT_SIZE
-
// Cleans the acquirer slot located by the specified index
// and removes this segment physically if all slots are cleaned.
fun cancel(index: Int): Boolean {
diff --git a/kotlinx-coroutines-core/common/test/TestBase.common.kt b/kotlinx-coroutines-core/common/test/TestBase.common.kt
index 50df19a6..ad7b8b15 100644
--- a/kotlinx-coroutines-core/common/test/TestBase.common.kt
+++ b/kotlinx-coroutines-core/common/test/TestBase.common.kt
@@ -2,11 +2,12 @@
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
+@file:Suppress("unused")
+
package kotlinx.coroutines
import kotlinx.coroutines.flow.*
import kotlin.coroutines.*
-import kotlinx.coroutines.internal.*
import kotlin.test.*
public expect open class TestBase constructor() {
@@ -56,12 +57,14 @@ public suspend inline fun <reified T : Throwable> assertFailsWith(flow: Flow<*>)
public suspend fun Flow<Int>.sum() = fold(0) { acc, value -> acc + value }
public suspend fun Flow<Long>.longSum() = fold(0L) { acc, value -> acc + value }
-public class TestException(message: String? = null) : Throwable(message), NonRecoverableThrowable
-public class TestException1(message: String? = null) : Throwable(message), NonRecoverableThrowable
-public class TestException2(message: String? = null) : Throwable(message), NonRecoverableThrowable
-public class TestException3(message: String? = null) : Throwable(message), NonRecoverableThrowable
-public class TestCancellationException(message: String? = null) : CancellationException(message), NonRecoverableThrowable
-public class TestRuntimeException(message: String? = null) : RuntimeException(message), NonRecoverableThrowable
+
+// data is added to avoid stacktrace recovery because CopyableThrowable is not accessible from common modules
+public class TestException(message: String? = null, private val data: Any? = null) : Throwable(message)
+public class TestException1(message: String? = null, private val data: Any? = null) : Throwable(message)
+public class TestException2(message: String? = null, private val data: Any? = null) : Throwable(message)
+public class TestException3(message: String? = null, private val data: Any? = null) : Throwable(message)
+public class TestCancellationException(message: String? = null, private val data: Any? = null) : CancellationException(message)
+public class TestRuntimeException(message: String? = null, private val data: Any? = null) : RuntimeException(message)
public class RecoverableTestException(message: String? = null) : RuntimeException(message)
public class RecoverableTestCancellationException(message: String? = null) : CancellationException(message)
diff --git a/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt
index 2b948dfa..ceef21ed 100644
--- a/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt
@@ -86,7 +86,7 @@ class ArrayChannelTest : TestBase() {
}
@Test
- fun testOfferAndPool() = runTest {
+ fun testOfferAndPoll() = runTest {
val q = Channel<Int>(1)
assertTrue(q.offer(1))
expect(1)
@@ -144,4 +144,51 @@ class ArrayChannelTest : TestBase() {
channel.cancel(TestCancellationException())
channel.receiveOrNull()
}
+
+ @Test
+ fun testBufferSize() = runTest {
+ val capacity = 42
+ val channel = Channel<Int>(capacity)
+ checkBufferChannel(channel, capacity)
+ }
+
+ @Test
+ fun testBufferSizeFromTheMiddle() = runTest {
+ val capacity = 42
+ val channel = Channel<Int>(capacity)
+ repeat(4) {
+ channel.offer(-1)
+ }
+ repeat(4) {
+ channel.receiveOrNull()
+ }
+ checkBufferChannel(channel, capacity)
+ }
+
+ private suspend fun CoroutineScope.checkBufferChannel(
+ channel: Channel<Int>,
+ capacity: Int
+ ) {
+ launch {
+ expect(2)
+ repeat(42) {
+ channel.send(it)
+ }
+ expect(3)
+ channel.send(42)
+ expect(5)
+ channel.close()
+ }
+
+ expect(1)
+ yield()
+
+ expect(4)
+ val result = ArrayList<Int>(42)
+ channel.consumeEach {
+ result.add(it)
+ }
+ assertEquals((0..capacity).toList(), result)
+ finish(6)
+ }
}
diff --git a/kotlinx-coroutines-core/common/test/flow/FlowInvariantsTest.kt b/kotlinx-coroutines-core/common/test/flow/FlowInvariantsTest.kt
index 98406869..e016b031 100644
--- a/kotlinx-coroutines-core/common/test/flow/FlowInvariantsTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/FlowInvariantsTest.kt
@@ -6,6 +6,7 @@ package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.intrinsics.*
import kotlin.coroutines.*
import kotlin.reflect.*
import kotlin.test.*
@@ -214,4 +215,68 @@ class FlowInvariantsTest : TestBase() {
}
}
}
+
+ @Test
+ fun testEmptyCoroutineContext() = runTest {
+ emptyContextTest {
+ map {
+ expect(it)
+ it + 1
+ }
+ }
+ }
+
+ @Test
+ fun testEmptyCoroutineContextTransform() = runTest {
+ emptyContextTest {
+ transform {
+ expect(it)
+ emit(it + 1)
+ }
+ }
+ }
+
+ @Test
+ fun testEmptyCoroutineContextViolation() = runTest {
+ try {
+ emptyContextTest {
+ transform {
+ expect(it)
+ kotlinx.coroutines.withContext(Dispatchers.Unconfined) {
+ emit(it + 1)
+ }
+ }
+ }
+ expectUnreached()
+ } catch (e: IllegalStateException) {
+ assertTrue(e.message!!.contains("Flow invariant is violated"))
+ finish(2)
+ }
+ }
+
+ private suspend fun emptyContextTest(block: Flow<Int>.() -> Flow<Int>) {
+ suspend fun collector(): Int {
+ var result: Int = -1
+ channelFlow {
+ send(1)
+ }.block()
+ .collect {
+ expect(it)
+ result = it
+ }
+ return result
+ }
+
+ val result = runSuspendFun { collector() }
+ assertEquals(2, result)
+ finish(3)
+ }
+
+ private suspend fun runSuspendFun(block: suspend () -> Int): Int {
+ val baseline = Result.failure<Int>(IllegalStateException("Block was suspended"))
+ var result: Result<Int> = baseline
+ block.startCoroutineUnintercepted(Continuation(EmptyCoroutineContext) { result = it })
+ while (result == baseline) yield()
+ return result.getOrThrow()
+ }
}
diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt
index a77f8faf..32c2afc6 100644
--- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt
@@ -29,7 +29,6 @@ class ChannelFlowTest : TestBase() {
assertEquals(listOf(1, 2), flow.toList())
}
- // todo: this is pretty useless behavior
@Test
fun testConflated() = runTest {
val flow = channelFlow {
@@ -114,6 +113,7 @@ class ChannelFlowTest : TestBase() {
}
@Test
+ @Ignore // #1374
fun testBufferWithTimeout() = runTest {
fun Flow<Int>.bufferWithTimeout(): Flow<Int> = channelFlow {
expect(2)
@@ -140,4 +140,24 @@ class ChannelFlowTest : TestBase() {
assertFailsWith<TimeoutCancellationException>(flow)
finish(6)
}
+
+ @Test
+ fun testChildCancellation() = runTest {
+ channelFlow {
+ val job = launch {
+ expect(2)
+ hang { expect(4) }
+ }
+ expect(1)
+ yield()
+ expect(3)
+ job.cancelAndJoin()
+ send(5)
+
+ }.collect {
+ expect(it)
+ }
+
+ finish(6)
+ }
}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/BufferTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/BufferTest.kt
index 65fef02c..0b1b208f 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/BufferTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/BufferTest.kt
@@ -183,5 +183,19 @@ class BufferTest : TestBase() {
}
finish(n + 4)
}
+
+ @Test
+ fun testCancellation() = runTest {
+ val result = flow {
+ emit(1)
+ emit(2)
+ emit(3)
+ expectUnreached()
+ emit(4)
+ }.buffer(0)
+ .take(2)
+ .toList()
+ assertEquals(listOf(1, 2), result)
+ }
}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestVarargTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestVarargTest.kt
deleted file mode 100644
index 37726fad..00000000
--- a/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestVarargTest.kt
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.flow.operators
-
-import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.*
-import kotlin.test.*
-
-class CombineLatestVarargTest : TestBase() {
-
- @Test
- fun testThreeParameters() = runTest {
- val flow = flowOf("1").combineLatest(flowOf(2), flowOf(null)) { a, b, c ->
- a + b + c
- }
-
- assertEquals("12null", flow.single())
- }
-
- @Test
- fun testFourParameters() = runTest {
- val flow = flowOf("1").combineLatest(flowOf(2), flowOf("3"), flowOf(null)) { a, b, c, d ->
- a + b + c + d
- }
-
- assertEquals("123null", flow.single())
- }
-
- @Test
- fun testFiveParameters() = runTest {
- val flow =
- flowOf("1").combineLatest(flowOf(2), flowOf("3"), flowOf(4.toByte()), flowOf(null)) { a, b, c, d, e ->
- a + b + c + d + e
- }
-
- assertEquals("1234null", flow.single())
- }
-
- @Test
- fun testVararg() = runTest {
- val flow = flowOf("1").combineLatest(
- flowOf(2),
- flowOf("3"),
- flowOf(4.toByte()),
- flowOf("5"),
- flowOf(null)
- ) { arr -> arr.joinToString("") }
- assertEquals("12345null", flow.single())
- }
-
- @Test
- fun testEmptyVararg() = runTest {
- val list = flowOf(1, 2, 3).combineLatest { args: Array<Any?> -> args[0] }.toList()
- assertEquals(listOf(1, 2, 3), list)
- }
-
- @Test
- fun testNonNullableAny() = runTest {
- val value = flowOf(1).combineLatest(flowOf(2)) { args: Array<Int> ->
- @Suppress("USELESS_IS_CHECK")
- assertTrue(args is Array<Int>)
- args[0] + args[1]
- }.single()
- assertEquals(3, value)
- }
-}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CombineParametersTestBase.kt b/kotlinx-coroutines-core/common/test/flow/operators/CombineParametersTestBase.kt
new file mode 100644
index 00000000..a987c834
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/operators/CombineParametersTestBase.kt
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow.operators
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import kotlin.test.*
+
+class CombineParametersTest : TestBase() {
+
+ @Test
+ fun testThreeParameters() = runTest {
+ val flow = combine(flowOf("1"), flowOf(2), flowOf(null)) { a, b, c -> a + b + c }
+ assertEquals("12null", flow.single())
+
+ val flow2 = combineTransform(flowOf("1"), flowOf(2), flowOf(null)) { a, b, c -> emit(a + b + c) }
+ assertEquals("12null", flow2.single())
+ }
+
+ @Test
+ fun testThreeParametersTransform() = runTest {
+ val flow = combineTransform(flowOf("1"), flowOf(2), flowOf(null)) { a, b, c -> emit(a + b + c) }
+ assertEquals("12null", flow.single())
+ }
+
+ @Test
+ fun testFourParameters() = runTest {
+ val flow = combine(flowOf("1"), flowOf(2), flowOf("3"), flowOf(null)) { a, b, c, d -> a + b + c + d }
+ assertEquals("123null", flow.single())
+ }
+
+ @Test
+ fun testFourParametersTransform() = runTest {
+ val flow = combineTransform(flowOf("1"), flowOf(2), flowOf("3"), flowOf(null)) { a, b, c, d ->
+ emit(a + b + c + d)
+ }
+ assertEquals("123null", flow.single())
+ }
+
+ @Test
+ fun testFiveParameters() = runTest {
+ val flow = combine(flowOf("1"), flowOf(2), flowOf("3"), flowOf(4.toByte()), flowOf(null)) { a, b, c, d, e ->
+ a + b + c + d + e
+ }
+ assertEquals("1234null", flow.single())
+ }
+
+ @Test
+ fun testFiveParametersTransform() = runTest {
+ val flow =
+ combineTransform(flowOf("1"), flowOf(2), flowOf("3"), flowOf(4.toByte()), flowOf(null)) { a, b, c, d, e ->
+ emit(a + b + c + d + e)
+ }
+ assertEquals("1234null", flow.single())
+ }
+
+ @Test
+ fun testNonMatchingTypes() = runTest {
+ val flow = combine(flowOf(1), flowOf("2")) { args: Array<Any?> ->
+ args[0]?.toString() + args[1]?.toString()
+ }
+ assertEquals("12", flow.single())
+ }
+
+ @Test
+ fun testNonMatchingTypesIterable() = runTest {
+ val flow = combine(listOf(flowOf(1), flowOf("2"))) { args: Array<Any?> ->
+ args[0]?.toString() + args[1]?.toString()
+ }
+ assertEquals("12", flow.single())
+ }
+
+ @Test
+ fun testVararg() = runTest {
+ val flow = combine(
+ flowOf("1"),
+ flowOf(2),
+ flowOf("3"),
+ flowOf(4.toByte()),
+ flowOf("5"),
+ flowOf(null)
+ ) { arr -> arr.joinToString("") }
+ assertEquals("12345null", flow.single())
+ }
+
+ @Test
+ fun testVarargTransform() = runTest {
+ val flow = combineTransform(
+ flowOf("1"),
+ flowOf(2),
+ flowOf("3"),
+ flowOf(4.toByte()),
+ flowOf("5"),
+ flowOf(null)
+ ) { arr -> emit(arr.joinToString("")) }
+ assertEquals("12345null", flow.single())
+ }
+
+ @Test
+ fun testEmptyVararg() = runTest {
+ val list = combine(flowOf(1, 2, 3)) { args: Array<Any?> -> args[0] }.toList()
+ assertEquals(listOf(1, 2, 3), list)
+ }
+
+ @Test
+ fun testEmptyVarargTransform() = runTest {
+ val list = combineTransform(flowOf(1, 2, 3)) { args: Array<Any?> -> emit(args[0]) }.toList()
+ assertEquals(listOf(1, 2, 3), list)
+ }
+
+ @Test
+ fun testReified() = runTest {
+ val value = combine(flowOf(1), flowOf(2)) { args: Array<Int> ->
+ @Suppress("USELESS_IS_CHECK")
+ assertTrue(args is Array<Int>)
+ args[0] + args[1]
+ }.single()
+ assertEquals(3, value)
+ }
+
+ @Test
+ fun testReifiedTransform() = runTest {
+ val value = combineTransform(flowOf(1), flowOf(2)) { args: Array<Int> ->
+ @Suppress("USELESS_IS_CHECK")
+ assertTrue(args is Array<Int>)
+ emit(args[0] + args[1])
+ }.single()
+ assertEquals(3, value)
+ }
+
+ @Test
+ fun testEmpty() = runTest {
+ val value = combineTransform { args: Array<Int> ->
+ emit(args[0] + args[1])
+ }.singleOrNull()
+ assertNull(value)
+ }
+
+ @Test
+ fun testEmptyIterable() = runTest {
+ val value = combineTransform(emptyList()) { args: Array<Int> ->
+ emit(args[0] + args[1])
+ }.singleOrNull()
+ assertNull(value)
+ }
+
+ @Test
+ fun testEmptyReified() = runTest {
+ val value = combineTransform { args: Array<Int> ->
+ emit(args[0] + args[1])
+ }.singleOrNull()
+ assertNull(value)
+ }
+
+ @Test
+ fun testEmptyIterableReified() = runTest {
+ val value = combineTransform(emptyList()) { args: Array<Int> ->
+ emit(args[0] + args[1])
+ }.singleOrNull()
+ assertNull(value)
+ }
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt
index 54244f05..637cb3d6 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt
@@ -6,12 +6,13 @@ package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlin.test.*
-import kotlinx.coroutines.flow.combineLatest as combineLatestOriginal
+import kotlinx.coroutines.flow.combine as combineOriginal
+import kotlinx.coroutines.flow.combineTransform as combineTransformOriginal
/*
* Replace: { i, j -> i + j } -> { i, j -> i + j } as soon as KT-30991 is fixed
*/
-abstract class CombineLatestTestBase : TestBase() {
+abstract class CombineTestBase : TestBase() {
abstract fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>
@@ -239,11 +240,33 @@ abstract class CombineLatestTestBase : TestBase() {
}
}
-class CombineLatestTest : CombineLatestTestBase() {
- override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = combineLatestOriginal(other, transform)
+class CombineTest : CombineTestBase() {
+ override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = combineOriginal(other, transform)
}
-class CombineLatestVarargAdapterTest : CombineLatestTestBase() {
+class CombineTransformTest : CombineTestBase() {
+ override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = combineTransformOriginal(other) { a, b ->
+ emit(transform(a, b))
+ }
+}
+
+class CombineVarargAdapterTest : CombineTestBase() {
+ override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
+ combineOriginal(this, other) { args: Array<Any?> -> transform(args[0] as T1, args[1] as T2) }
+}
+
+class CombineIterableTest : CombineTestBase() {
+ override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
+ combineOriginal(listOf(this, other)) { args -> transform(args[0] as T1, args[1] as T2) }
+}
+
+class CombineTransformVarargAdapterTest : CombineTestBase() {
override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
- (this as Flow<*>).combineLatestOriginal(other) { args: Array<Any?> -> transform(args[0] as T1, args[1] as T2) }
-} \ No newline at end of file
+ combineTransformOriginal(this, other) { args: Array<Any?> -> emit(transform(args[0] as T1, args[1] as T2)) }
+}
+
+class CombineTransformIterableTest : CombineTestBase() {
+ override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
+ combineTransformOriginal(listOf(this, other)) { args -> emit(transform(args[0] as T1, args[1] as T2)) }
+}
+
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapLatestTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapLatestTest.kt
new file mode 100644
index 00000000..ad0bda9e
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapLatestTest.kt
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlin.test.*
+
+class FlatMapLatestTest : TestBase() {
+
+ @Test
+ fun testFlatMapLatest() = runTest {
+ val flow = flowOf(1, 2, 3).flatMapLatest { value ->
+ flowOf(value, value + 1)
+ }
+ assertEquals(listOf(1, 2, 2, 3, 3, 4), flow.toList())
+ }
+
+ @Test
+ fun testEmission() = runTest {
+ val list = flow {
+ repeat(5) {
+ emit(it)
+ }
+ }.flatMapLatest { flowOf(it) }.toList()
+ assertEquals(listOf(0, 1, 2, 3, 4), list)
+ }
+
+ @Test
+ fun testSwitchIntuitiveBehaviour() = runTest {
+ val flow = flowOf(1, 2, 3, 4, 5)
+ flow.flatMapLatest {
+ flow {
+ expect(it)
+ emit(it)
+ yield() // Explicit cancellation check
+ if (it != 5) expectUnreached()
+ else expect(6)
+ }
+ }.collect()
+ finish(7)
+ }
+
+ @Test
+ fun testSwitchRendevouzBuffer() = runTest {
+ val flow = flowOf(1, 2, 3, 4, 5)
+ flow.flatMapLatest {
+ flow {
+ emit(it)
+ // Reach here every uneven element because of channel's unfairness
+ expect(it)
+ }
+ }.buffer(0).onEach { expect(it + 1) }
+ .collect()
+ finish(7)
+ }
+
+ @Test
+ fun testHangFlows() = runTest {
+ val flow = listOf(1, 2, 3, 4).asFlow()
+ val result = flow.flatMapLatest { value ->
+ flow {
+ if (value != 4) hang { expect(value) }
+ emit(42)
+ }
+ }.toList()
+
+ assertEquals(listOf(42), result)
+ finish(4)
+ }
+
+ @Test
+ fun testEmptyFlow() = runTest {
+ assertNull(emptyFlow<Int>().flatMapLatest { flowOf(1) }.singleOrNull())
+ }
+
+ @Test
+ fun testFailureInTransform() = runTest {
+ val flow = flowOf(1, 2).flatMapLatest { value ->
+ flow {
+ if (value == 1) {
+ emit(1)
+ hang { expect(1) }
+ } else {
+ expect(2)
+ throw TestException()
+ }
+ }
+ }
+ assertFailsWith<TestException>(flow)
+ finish(3)
+ }
+
+ @Test
+ fun testFailureDownstream() = runTest {
+ val flow = flowOf(1).flatMapLatest { value ->
+ flow {
+ expect(1)
+ emit(value)
+ expect(2)
+ hang { expect(4) }
+ }
+ }.flowOn(NamedDispatchers("downstream")).onEach {
+ expect(3)
+ throw TestException()
+ }
+ assertFailsWith<TestException>(flow)
+ finish(5)
+ }
+
+ @Test
+ fun testFailureUpstream() = runTest {
+ val flow = flow {
+ expect(1)
+ emit(1)
+ yield()
+ expect(3)
+ throw TestException()
+ }.flatMapLatest<Int, Long> {
+ flow {
+ expect(2)
+ hang {
+ expect(4)
+ }
+ }
+ }
+ assertFailsWith<TestException>(flow)
+ finish(5)
+ }
+
+ @Test
+ fun testTake() = runTest {
+ val flow = flowOf(1, 2, 3, 4, 5).flatMapLatest { flowOf(it) }
+ assertEquals(listOf(1), flow.take(1).toList())
+ }
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt
index 6069ae6d..511a003a 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt
@@ -73,4 +73,19 @@ class FlatMapMergeTest : FlatMapMergeBaseTest() {
assertFailsWith<CancellationException>(flow)
finish(5)
}
+
+ @Test
+ fun testCancellation() = runTest {
+ val result = flow {
+ emit(1)
+ emit(2)
+ emit(3)
+ emit(4)
+ expectUnreached() // Cancelled by take
+ emit(5)
+ }.flatMapMerge(2) { v -> flow { emit(v) } }
+ .take(2)
+ .toList()
+ assertEquals(listOf(1, 2), result)
+ }
}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlowContextTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlowContextTest.kt
deleted file mode 100644
index cd8af1d0..00000000
--- a/kotlinx-coroutines-core/common/test/flow/operators/FlowContextTest.kt
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.flow
-
-import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.*
-import kotlin.coroutines.*
-import kotlin.test.*
-
-@Suppress("DEPRECATION")
-class FlowContextTest : TestBase() {
-
- private val captured = ArrayList<String>()
-
- @Test
- fun testMixedContext() = runTest {
- val flow = flow {
- captured += NamedDispatchers.nameOr("main")
- emit(314)
- }
-
- val mapper: suspend (Int) -> Int = {
- captured += NamedDispatchers.nameOr("main")
- it
- }
-
- val value = flow // upstream
- .map(mapper) // upstream
- .flowOn(NamedDispatchers("upstream"))
- .map(mapper) // upstream 2
- .flowWith(NamedDispatchers("downstream")) {
- map(mapper) // downstream
- }
- .flowOn(NamedDispatchers("upstream 2"))
- .map(mapper) // main
- .single()
-
- assertEquals(314, value)
- assertEquals(listOf("upstream", "upstream", "upstream 2", "downstream", "main"), captured)
- }
-
- @Test
- fun testException() = runTest {
- val flow = flow {
- emit(314)
- delay(Long.MAX_VALUE)
- }.flowOn(NamedDispatchers("upstream"))
- .map {
- throw TestException()
- }
-
- assertFailsWith<TestException> { flow.single() }
- assertFailsWith<TestException>(flow)
- ensureActive()
- }
-
- @Test
- fun testMixedContextsAndException() = runTest {
- val baseFlow = flow {
- emit(314)
- hang { }
- }
-
- var state = 0
- var needle = 1
- val mapper: suspend (Int) -> Int = {
- if (++state == needle) throw TestException()
- it
- }
-
- val flow = baseFlow.map(mapper) // 1
- .flowOn(NamedDispatchers("ctx 1"))
- .map(mapper) // 2
- .flowWith(NamedDispatchers("ctx 2")) {
- map(mapper) // 3
- }
- .map(mapper) // 4
- .flowOn(NamedDispatchers("ctx 3"))
- .map(mapper) // 5
-
- repeat(5) { // Will hang for 6
- state = 0
- needle = it + 1
- assertFailsWith<TestException> { flow.single() }
-
- state = 0
- assertFailsWith<TestException>(flow)
- }
-
- ensureActive()
- }
-
- @Test
- fun testNestedContexts() = runTest {
- val mapper: suspend (Int) -> Int = { captured += NamedDispatchers.nameOr("main"); it }
- val value = flow {
- captured += NamedDispatchers.nameOr("main")
- emit(1)
- }.flowWith(NamedDispatchers("outer")) {
- map(mapper)
- .flowOn(NamedDispatchers("nested first"))
- .flowWith(NamedDispatchers("nested second")) {
- map(mapper)
- .flowOn(NamedDispatchers("inner first"))
- .map(mapper)
- }
- .map(mapper)
- }.map(mapper)
- .single()
-
- val expected = listOf("main", "nested first", "inner first", "nested second", "outer", "main")
- assertEquals(expected, captured)
- assertEquals(1, value)
- }
-
-
- @Test
- fun testFlowContextCancellation() = runTest {
- val latch = Channel<Unit>()
- val flow = flow {
- assertEquals("delayed", NamedDispatchers.name())
- expect(2)
- emit(1)
- }.flowWith(NamedDispatchers("outer")) {
- map { expect(3); it + 1 }.flowOn(NamedDispatchers("inner"))
- }.map {
- expect(4)
- assertEquals("delayed", NamedDispatchers.name())
- latch.send(Unit)
- hang { expect(6) }
- }.flowOn(NamedDispatchers("delayed"))
-
-
- val job = launch(NamedDispatchers("launch")) {
- expect(1)
- flow.single()
- }
-
- latch.receive()
- expect(5)
- job.cancelAndJoin()
- finish(7)
- ensureActive()
- }
-
- @Test
- fun testIllegalArgumentException() {
- val flow = emptyFlow<Int>()
- assertFailsWith<IllegalArgumentException> { flow.flowOn(Job()) }
- assertFailsWith<IllegalArgumentException> { flow.flowWith(Job()) { this } }
- }
-}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt
index 4adc3541..34c0476e 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt
@@ -261,6 +261,42 @@ class FlowOnTest : TestBase() {
finish(3)
}
+ @Test
+ fun testCancellation() = runTest {
+ val result = flow {
+ emit(1)
+ emit(2)
+ emit(3)
+ expectUnreached()
+ emit(4)
+ }.flowOn(wrapperDispatcher())
+ .buffer(0)
+ .take(2)
+ .toList()
+ assertEquals(listOf(1, 2), result)
+ }
+
+ @Test
+ fun testException() = runTest {
+ val flow = flow {
+ emit(314)
+ delay(Long.MAX_VALUE)
+ }.flowOn(NamedDispatchers("upstream"))
+ .map {
+ throw TestException()
+ }
+
+ assertFailsWith<TestException> { flow.single() }
+ assertFailsWith<TestException>(flow)
+ ensureActive()
+ }
+
+ @Test
+ fun testIllegalArgumentException() {
+ val flow = emptyFlow<Int>()
+ assertFailsWith<IllegalArgumentException> { flow.flowOn(Job()) }
+ }
+
private inner class Source(private val value: Int) {
public var contextName: String = "unknown"
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlowWithTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlowWithTest.kt
deleted file mode 100644
index a7858142..00000000
--- a/kotlinx-coroutines-core/common/test/flow/operators/FlowWithTest.kt
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.flow
-
-import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.*
-import kotlin.test.*
-
-@Suppress("DEPRECATION")
-class FlowWithTest : TestBase() {
-
- private fun mapper(name: String, index: Int): suspend (Int) -> Int = {
- assertEquals(name, NamedDispatchers.nameOr("main"))
- expect(index)
- it
- }
-
- @Test
- fun testFlowWith() = runTest {
- val flow = flow {
- assertEquals("main", NamedDispatchers.nameOr("main"))
- expect(1)
- emit(314)
- }
-
- val result = flow.flowWith(NamedDispatchers("ctx1")) {
- map(mapper("ctx1", 2))
- }.flowWith(NamedDispatchers("ctx2")) {
- map(mapper("ctx2", 3))
- }.map(mapper("main", 4)).single()
- assertEquals(314, result)
- finish(5)
- }
-
- @Test
- public fun testFlowWithThrowingSource() = runTest {
- val flow = flow {
- emit(NamedDispatchers.nameOr("main"))
- throw TestException()
- }.flowWith(NamedDispatchers("throwing")) {
- map {
- assertEquals("main", it)
- it
- }
- }
-
- assertFailsWith<TestException> { flow.single() }
- assertFailsWith<TestException>(flow)
- ensureActive()
- }
-
- @Test
- public fun testFlowWithThrowingOperator() = runTest {
- val flow = flow {
- emit(NamedDispatchers.nameOr("main"))
- hang {}
- }.flowWith(NamedDispatchers("throwing")) {
- map {
- assertEquals("main", it)
- throw TestException()
- }
- }
-
- assertFailsWith<TestException> { flow.single() }
- assertFailsWith<TestException>(flow)
- ensureActive()
- }
-
- @Test
- public fun testFlowWithThrowingDownstreamOperator() = runTest {
- val flow = flow {
- emit(42)
- hang {}
- }.flowWith(NamedDispatchers("throwing")) {
- map { it }
- }.map { throw TestException() }
-
- assertFailsWith<TestException> { flow.single() }
- assertFailsWith<TestException>(flow)
- ensureActive()
- }
-
- @Test
- fun testMultipleFlowWith() = runTest() {
- flow {
- expect(1)
- emit(1)
- }.map(mapper("main", 2))
- .flowWith(NamedDispatchers("downstream")) {
- map(mapper("downstream", 3))
- }
- .flowWith(NamedDispatchers("downstream 2")) {
- map(mapper("downstream 2", 4))
- }
- .flowWith(NamedDispatchers("downstream 3")) {
- map(mapper("downstream 3", 5))
- }
- .map(mapper("main", 6))
- .flowWith(NamedDispatchers("downstream 4")) {
- map(mapper("downstream 4", 7))
- }.flowWith(NamedDispatchers("ignored")) { this }
- .single()
-
- finish(8)
- }
-
- @Test
- fun testFlowWithCancellation() = runTest() {
- val latch = Channel<Unit>()
- expect(1)
- val job = launch(NamedDispatchers("launch")) {
- flow<Int> {
- expect(2)
- latch.send(Unit)
- expect(3)
- hang {
- assertEquals("launch", NamedDispatchers.nameOr("main"))
- expect(5)
- }
- }.flowWith(NamedDispatchers("cancelled")) {
- map {
- expectUnreached()
- it
- }
- }.single()
- }
-
- latch.receive()
- expect(4)
- job.cancel()
- job.join()
- ensureActive()
- finish(6)
- }
-
- @Test
- fun testFlowWithCancellationHappensBefore() = runTest {
- launch {
- try {
- flow<Int> {
- expect(1)
- val flowJob = kotlin.coroutines.coroutineContext[Job]!!
- launch {
- expect(2)
- flowJob.cancel()
- }
- hang { expect(3) }
- }.flowWith(NamedDispatchers("downstream")) {
- map { it }
- }.single()
- } catch (e: CancellationException) {
- expect(4)
- }
- }.join()
- finish(5)
- }
-
- @Test
- fun testMultipleFlowWithException() = runTest() {
- var switch = 0
- val flow = flow {
- emit(Unit)
- if (switch == 0) throw TestException()
- }.map { if (switch == 1) throw TestException() else Unit }
- .flowWith(NamedDispatchers("downstream")) {
- map { if (switch == 2) throw TestException() else Unit }
- }
- repeat(3) {
- switch = it
- assertFailsWith<TestException> { flow.single() }
- assertFailsWith<TestException>(flow)
- }
- }
-
- @Test
- fun testMultipleFlowWithJobsCancellation() = runTest() {
- val latch = Channel<Unit>()
- val flow = flow {
- expect(1)
- emit(Unit)
- latch.send(Unit)
- hang { expect(4) }
- }.flowWith(NamedDispatchers("downstream")) {
- map {
- expect(2)
- Unit
- }
- }
-
- val job = launch {
- flow.single()
- }
-
- latch.receive()
- expect(3)
- job.cancelAndJoin()
- ensureActive()
- finish(5)
- }
-
- @Test
- fun testTimeoutException() = runTest {
- val flow = flow {
- emit(1)
- yield()
- withTimeout(-1) {}
- emit(42)
- }.flowWith(NamedDispatchers("foo")) {
- onEach { expect(1) }
- }
- assertFailsWith<TimeoutCancellationException>(flow)
- finish(2)
- }
-
- @Test
- fun testTimeoutExceptionDownstream() = runTest {
- val flow = flow {
- emit(1)
- hang { expect(2) }
- }.flowWith(NamedDispatchers("foo")) {
- onEach {
- expect(1)
- withTimeout(-1) {}
- }
- }
- assertFailsWith<TimeoutCancellationException>(flow)
- finish(3)
- }
-}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/SwitchMapTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/SwitchMapTest.kt
deleted file mode 100644
index fabca72c..00000000
--- a/kotlinx-coroutines-core/common/test/flow/operators/SwitchMapTest.kt
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.flow
-
-import kotlinx.coroutines.*
-import kotlin.test.*
-
-class SwitchMapTest : TestBase() {
-
- @Test
- fun testConstantDynamic() = runTest {
- val flow = flowOf(1, 2, 3).switchMap { value -> (value until value + 3).asFlow() }
- assertEquals(listOf(1, 2, 3, 2, 3, 4, 3, 4, 5), flow.toList())
- }
-
- @Test
- fun testHangFlows() = runTest {
- val flow = listOf(1, 2, 3, 4).asFlow()
- val result = flow.switchMap { value ->
- flow {
- if (value != 4) hang { expect(value) }
- else emit(42)
- }
- }.toList()
-
- assertEquals(listOf(42), result)
- finish(4)
- }
-
- @Test
- fun testEmptyFlow() = runTest {
- assertNull(emptyFlow<Int>().switchMap { flowOf(1) }.singleOrNull())
- }
-
- @Test
- fun testIsolatedContext() = runTest {
- val flow = flow {
- assertEquals("source", NamedDispatchers.name())
- expect(1)
- emit(2)
- emit(4)
- }.flowOn(NamedDispatchers("source")).switchMap { value ->
- flow {
- assertEquals("switch$value", NamedDispatchers.name())
- emit(value)
- expect(value)
- }.flowOn(NamedDispatchers("switch$value"))
- }.onEach {
- expect(it + 1)
- assertEquals("main", NamedDispatchers.nameOr("main"))
- }
-
- assertEquals(2, flow.count())
- finish(6)
- }
-
- @Test
- fun testFailureInTransform() = runTest {
- val flow = flowOf(1, 2).switchMap { value ->
- if (value == 1) {
- flow {
- emit(1)
- hang { expect(1) }
- }
- } else {
- expect(2)
- throw TestException()
- }
- }
-
- assertFailsWith<TestException>(flow)
- finish(3)
- }
-
- @Test
- fun testFailureDownstream() = runTest {
- val flow = flowOf(1).switchMap { value ->
- flow {
- expect(1)
- emit(value)
- expect(2)
- hang { expect(4) }
- }
- }.flowOn(NamedDispatchers("downstream")).map {
- expect(3)
- throw TestException()
- it
- }
-
- assertFailsWith<TestException>(flow)
- finish(5)
- }
-
- @Test
- fun testFailureUpstream() = runTest {
- val flow = flow {
- expect(1)
- emit(1)
- yield()
- expect(3)
- throw TestException()
- }.switchMap {
- flow<Int> {
- expect(2)
- hang {
- expect(4)
- }
- }
- }
-
- assertFailsWith<TestException>(flow)
- finish(5)
- }
-
- @Test
- fun testTake() = runTest {
- val flow = flowOf(1, 2, 3, 4, 5).switchMap { flowOf(it) }
- assertEquals(listOf(1), flow.take(1).toList())
- }
-}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/TransformLatestTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/TransformLatestTest.kt
new file mode 100644
index 00000000..a37cca21
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/operators/TransformLatestTest.kt
@@ -0,0 +1,172 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlin.test.*
+
+class TransformLatestTest : TestBase() {
+
+ @Test
+ fun testTransformLatest() = runTest {
+ val flow = flowOf(1, 2, 3).transformLatest { value ->
+ emit(value)
+ emit(value + 1)
+ }
+ assertEquals(listOf(1, 2, 2, 3, 3, 4), flow.toList())
+ }
+
+ @Test
+ fun testEmission() = runTest {
+ val list = flow {
+ repeat(5) {
+ emit(it)
+ }
+ }.transformLatest {
+ emit(it)
+ }.toList()
+ assertEquals(listOf(0, 1, 2, 3, 4), list)
+ }
+
+ @Test
+ fun testSwitchIntuitiveBehaviour() = runTest {
+ val flow = flowOf(1, 2, 3, 4, 5)
+ flow.transformLatest {
+ expect(it)
+ emit(it)
+ yield() // Explicit cancellation check
+ if (it != 5) expectUnreached()
+ else expect(6)
+ }.collect()
+ finish(7)
+ }
+
+ @Test
+ fun testSwitchRendevouzBuffer() = runTest {
+ val flow = flowOf(1, 2, 3, 4, 5)
+ flow.transformLatest {
+ emit(it)
+ // Reach here every uneven element because of channel's unfairness
+ expect(it)
+ }.buffer(0).onEach { expect(it + 1) }.collect()
+ finish(7)
+ }
+
+ @Test
+ fun testSwitchBuffer() = runTest {
+ val flow = flowOf(1, 2, 3, 42, 4)
+ flow.transformLatest {
+ emit(it)
+ expect(it)
+ }.buffer(2).collect()
+ finish(5)
+ }
+
+ @Test
+ fun testHangFlows() = runTest {
+ val flow = listOf(1, 2, 3, 4).asFlow()
+ val result = flow.transformLatest { value ->
+ if (value != 4) hang { expect(value) }
+ emit(42)
+ }.toList()
+
+ assertEquals(listOf(42), result)
+ finish(4)
+ }
+
+ @Test
+ fun testEmptyFlow() = runTest {
+ assertNull(emptyFlow<Int>().transformLatest { emit(1) }.singleOrNull())
+ }
+
+ @Test
+ fun testIsolatedContext() = runTest {
+ val flow = flow {
+ assertEquals("source", NamedDispatchers.name())
+ expect(1)
+ emit(4)
+ expect(2)
+ emit(5)
+ expect(3)
+ }.flowOn(NamedDispatchers("source")).transformLatest<Int, Int> { value ->
+ emitAll(flow<Int> {
+ assertEquals("switch$value", NamedDispatchers.name())
+ expect(value)
+ emit(value)
+ }.flowOn(NamedDispatchers("switch$value")))
+ }.onEach {
+ expect(it + 2)
+ assertEquals("main", NamedDispatchers.nameOr("main"))
+ }
+ assertEquals(2, flow.count())
+ finish(8)
+ }
+
+ @Test
+ fun testFailureInTransform() = runTest {
+ val flow = flowOf(1, 2).transformLatest { value ->
+ if (value == 1) {
+ emit(1)
+ hang { expect(1) }
+ } else {
+ expect(2)
+ throw TestException()
+ }
+ }
+ assertFailsWith<TestException>(flow)
+ finish(3)
+ }
+
+ @Test
+ fun testFailureDownstream() = runTest {
+ val flow = flowOf(1).transformLatest { value ->
+ expect(1)
+ emit(value)
+ expect(2)
+ hang { expect(4) }
+ }.flowOn(NamedDispatchers("downstream")).onEach {
+ expect(3)
+ throw TestException()
+ }
+ assertFailsWith<TestException>(flow)
+ finish(5)
+ }
+
+ @Test
+ fun testFailureUpstream() = runTest {
+ val flow = flow {
+ expect(1)
+ emit(1)
+ yield()
+ expect(3)
+ throw TestException()
+ }.transformLatest<Int, Long> {
+ expect(2)
+ hang {
+ expect(4)
+ }
+ }
+ assertFailsWith<TestException>(flow)
+ finish(5)
+ }
+
+ @Test
+ fun testTake() = runTest {
+ val flow = flowOf(1, 2, 3, 4, 5).transformLatest { emit(it) }
+ assertEquals(listOf(1), flow.take(1).toList())
+ }
+
+ @Test
+ @Ignore // TODO separate branch and/or discuss
+ fun testTakeUpstreamCancellation() = runTest {
+ val flow = flow {
+ emit(1)
+ expectUnreached()
+ emit(2)
+ emit(3)
+ }.transformLatest { emit(it) }
+ assertEquals(listOf(1), flow.take(1).toList())
+ }
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/CollectLatestTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/CollectLatestTest.kt
new file mode 100644
index 00000000..122420c6
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/terminal/CollectLatestTest.kt
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlin.test.*
+
+class CollectLatestTest : TestBase() {
+ @Test
+ fun testNoSuspension() = runTest {
+ flowOf(1, 2, 3).collectLatest {
+ expect(it)
+ }
+ finish(4)
+ }
+
+ @Test
+ fun testSuspension() = runTest {
+ flowOf(1, 2, 3).collectLatest {
+ yield()
+ expect(1)
+ }
+ finish(2)
+ }
+
+ @Test
+ fun testUpstreamErrorSuspension() = runTest({it is TestException}) {
+ try {
+ flow {
+ emit(1)
+ throw TestException()
+ }.collectLatest { expect(1) }
+ expectUnreached()
+ } finally {
+ finish(2)
+ }
+ }
+
+ @Test
+ fun testDownstreamError() = runTest({it is TestException}) {
+ try {
+ flow {
+ emit(1)
+ hang { expect(1) }
+ }.collectLatest {
+ throw TestException()
+ }
+ expectUnreached()
+ } finally {
+ finish(2)
+ }
+
+ }
+} \ No newline at end of file
diff --git a/kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt b/kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt
index dc14a122..b4ff88b8 100644
--- a/kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt
+++ b/kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt
@@ -1,9 +1,6 @@
package kotlinx.coroutines.sync
-import kotlinx.coroutines.TestBase
-import kotlinx.coroutines.cancelAndJoin
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.yield
+import kotlinx.coroutines.*
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFalse
@@ -140,4 +137,35 @@ class SemaphoreTest : TestBase() {
job1.cancel()
finish(6)
}
+
+ @Test
+ fun testAcquiredPermits() = runTest {
+ val semaphore = Semaphore(5, acquiredPermits = 4)
+ assertEquals(semaphore.availablePermits, 1)
+ semaphore.acquire()
+ assertEquals(semaphore.availablePermits, 0)
+ assertFalse(semaphore.tryAcquire())
+ semaphore.release()
+ assertEquals(semaphore.availablePermits, 1)
+ assertTrue(semaphore.tryAcquire())
+ }
+
+ @Test
+ fun testReleaseAcquiredPermits() = runTest {
+ val semaphore = Semaphore(5, acquiredPermits = 4)
+ assertEquals(semaphore.availablePermits, 1)
+ repeat(4) { semaphore.release() }
+ assertEquals(5, semaphore.availablePermits)
+ assertFailsWith<IllegalStateException> { semaphore.release() }
+ repeat(5) { assertTrue(semaphore.tryAcquire()) }
+ assertFalse(semaphore.tryAcquire())
+ }
+
+ @Test
+ fun testIllegalArguments() {
+ assertFailsWith<IllegalArgumentException> { Semaphore(-1, 0) }
+ assertFailsWith<IllegalArgumentException> { Semaphore(0, 0) }
+ assertFailsWith<IllegalArgumentException> { Semaphore(1, -1) }
+ assertFailsWith<IllegalArgumentException> { Semaphore(1, 2) }
+ }
} \ No newline at end of file
diff --git a/kotlinx-coroutines-core/js/src/CoroutineContext.kt b/kotlinx-coroutines-core/js/src/CoroutineContext.kt
index de02723a..3390fc1b 100644
--- a/kotlinx-coroutines-core/js/src/CoroutineContext.kt
+++ b/kotlinx-coroutines-core/js/src/CoroutineContext.kt
@@ -9,6 +9,7 @@ import kotlin.coroutines.*
private external val navigator: dynamic
private const val UNDEFINED = "undefined"
+internal external val process: dynamic
internal actual fun createDefaultDispatcher(): CoroutineDispatcher = when {
// Check if we are running under ReactNative. We have to use NodeDispatcher under it.
@@ -24,6 +25,8 @@ internal actual fun createDefaultDispatcher(): CoroutineDispatcher = when {
// Check if we are in the browser and must use window.postMessage to avoid setTimeout throttling
jsTypeOf(window) != UNDEFINED && window.asDynamic() != null && jsTypeOf(window.asDynamic().addEventListener) != UNDEFINED ->
window.asCoroutineDispatcher()
+ // If process is undefined (e.g. in NativeScript, #1404), use SetTimeout-based dispatcher
+ jsTypeOf(process) == UNDEFINED -> SetTimeoutDispatcher
// Fallback to NodeDispatcher when browser environment is not detected
else -> NodeDispatcher
}
diff --git a/kotlinx-coroutines-core/js/src/JSDispatcher.kt b/kotlinx-coroutines-core/js/src/JSDispatcher.kt
index e1137771..5a85244d 100644
--- a/kotlinx-coroutines-core/js/src/JSDispatcher.kt
+++ b/kotlinx-coroutines-core/js/src/JSDispatcher.kt
@@ -7,34 +7,71 @@ package kotlinx.coroutines
import kotlinx.coroutines.internal.*
import org.w3c.dom.*
import kotlin.coroutines.*
-import kotlin.js.*
+import kotlin.js.Promise
private const val MAX_DELAY = Int.MAX_VALUE.toLong()
private fun delayToInt(timeMillis: Long): Int =
timeMillis.coerceIn(0, MAX_DELAY).toInt()
-internal object NodeDispatcher : CoroutineDispatcher(), Delay {
- override fun dispatch(context: CoroutineContext, block: Runnable) = NodeJsMessageQueue.enqueue(block)
+internal sealed class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay {
+ inner class ScheduledMessageQueue : MessageQueue() {
+ internal val processQueue: dynamic = { process() }
+
+ override fun schedule() {
+ scheduleQueueProcessing()
+ }
+
+ override fun reschedule() {
+ setTimeout(processQueue, 0)
+ }
+ }
+
+ internal val messageQueue = ScheduledMessageQueue()
+
+ abstract fun scheduleQueueProcessing()
+
+ override fun dispatch(context: CoroutineContext, block: Runnable) {
+ messageQueue.enqueue(block)
+ }
+
+ override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
+ val handle = setTimeout({ block.run() }, delayToInt(timeMillis))
+ return ClearTimeout(handle)
+ }
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val handle = setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis))
// Actually on cancellation, but clearTimeout is idempotent
continuation.invokeOnCancellation(handler = ClearTimeout(handle).asHandler)
}
+}
- private class ClearTimeout(private val handle: Int) : CancelHandler(), DisposableHandle {
- override fun dispose() { clearTimeout(handle) }
- override fun invoke(cause: Throwable?) { dispose() }
- override fun toString(): String = "ClearTimeout[$handle]"
+internal object NodeDispatcher : SetTimeoutBasedDispatcher() {
+ override fun scheduleQueueProcessing() {
+ process.nextTick(messageQueue.processQueue)
}
+}
- override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
- val handle = setTimeout({ block.run() }, delayToInt(timeMillis))
- return ClearTimeout(handle)
+internal object SetTimeoutDispatcher : SetTimeoutBasedDispatcher() {
+ override fun scheduleQueueProcessing() {
+ setTimeout(messageQueue.processQueue, 0)
}
}
+private class ClearTimeout(private val handle: Int) : CancelHandler(), DisposableHandle {
+
+ override fun dispose() {
+ clearTimeout(handle)
+ }
+
+ override fun invoke(cause: Throwable?) {
+ dispose()
+ }
+
+ override fun toString(): String = "ClearTimeout[$handle]"
+}
+
internal class WindowDispatcher(private val window: Window) : CoroutineDispatcher(), Delay {
private val queue = WindowMessageQueue(window)
@@ -75,17 +112,6 @@ private class WindowMessageQueue(private val window: Window) : MessageQueue() {
}
}
-private object NodeJsMessageQueue : MessageQueue() {
- override fun schedule() {
- // next tick is even faster than resolve
- process.nextTick({ process() })
- }
-
- override fun reschedule() {
- setTimeout({ process() }, 0)
- }
-}
-
/**
* An abstraction over JS scheduling mechanism that leverages micro-batching of [dispatch] blocks without
* paying the cost of JS callbacks scheduling on every dispatch.
@@ -100,9 +126,8 @@ private object NodeJsMessageQueue : MessageQueue() {
*/
internal abstract class MessageQueue : ArrayQueue<Runnable>() {
val yieldEvery = 16 // yield to JS macrotask event loop after this many processed messages
-
private var scheduled = false
-
+
abstract fun schedule()
abstract fun reschedule()
@@ -136,4 +161,3 @@ internal abstract class MessageQueue : ArrayQueue<Runnable>() {
// using them via "window" (which only works in browser)
private external fun setTimeout(handler: dynamic, timeout: Int = definedExternally): Int
private external fun clearTimeout(handle: Int = definedExternally)
-private external val process: dynamic
diff --git a/kotlinx-coroutines-core/js/test/SetTimeoutDispatcherTest.kt b/kotlinx-coroutines-core/js/test/SetTimeoutDispatcherTest.kt
new file mode 100644
index 00000000..78700776
--- /dev/null
+++ b/kotlinx-coroutines-core/js/test/SetTimeoutDispatcherTest.kt
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines
+
+import kotlin.test.*
+
+class SetTimeoutDispatcherTest : TestBase() {
+ @Test
+ fun testDispatch() = runTest {
+ launch(SetTimeoutDispatcher) {
+ expect(1)
+ launch {
+ expect(3)
+ }
+ expect(2)
+ yield()
+ expect(4)
+ }.join()
+ finish(5)
+ }
+
+ @Test
+ fun testDelay() = runTest {
+ withContext(SetTimeoutDispatcher) {
+ val job = launch(SetTimeoutDispatcher) {
+ expect(2)
+ delay(100)
+ expect(4)
+ }
+ expect(1)
+ yield() // Yield uses microtask, so should be in the same context
+ expect(3)
+ job.join()
+ finish(5)
+ }
+ }
+
+ @Test
+ fun testWithTimeout() = runTest {
+ withContext(SetTimeoutDispatcher) {
+ val result = withTimeoutOrNull(10) {
+ expect(1)
+ delay(100)
+ expectUnreached()
+ 42
+ }
+ assertNull(result)
+ finish(2)
+ }
+ }
+}
diff --git a/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt b/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt
index 1d0c4d6b..d0375a61 100644
--- a/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt
+++ b/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt
@@ -26,24 +26,7 @@ internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
* Creates context for the new coroutine. It installs [Dispatchers.Default] when no other dispatcher nor
* [ContinuationInterceptor] is specified, and adds optional support for debugging facilities (when turned on).
*
- * **Debugging facilities:** In debug mode every coroutine is assigned a unique consecutive identifier.
- * Every thread that executes a coroutine has its name modified to include the name and identifier of the
- * currently running coroutine.
- * When one coroutine is suspended and resumes another coroutine that is dispatched in the same thread,
- * then the thread name displays
- * the whole stack of coroutine descriptions that are being executed on this thread.
- *
- * Enable debugging facilities with "`kotlinx.coroutines.debug`" ([DEBUG_PROPERTY_NAME]) system property
- * , use the following values:
- * * "`auto`" (default mode, [DEBUG_PROPERTY_VALUE_AUTO]) -- enabled when assertions are enabled with "`-ea`" JVM option.
- * * "`on`" ([DEBUG_PROPERTY_VALUE_ON]) or empty string -- enabled.
- * * "`off`" ([DEBUG_PROPERTY_VALUE_OFF]) -- disabled.
- *
- * Coroutine name can be explicitly assigned using [CoroutineName] context element.
- * The string "coroutine" is used as a default name.
- *
- * **Note: This is an experimental api.**
- * Behavior of this function may change in the future with respect to its support for debugging facilities.
+ * See [DEBUG_PROPERTY_NAME] for description of debugging facilities on JVM.
*/
@ExperimentalCoroutinesApi
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
diff --git a/kotlinx-coroutines-core/jvm/src/Debug.kt b/kotlinx-coroutines-core/jvm/src/Debug.kt
index 40de02ab..98a1c1ea 100644
--- a/kotlinx-coroutines-core/jvm/src/Debug.kt
+++ b/kotlinx-coroutines-core/jvm/src/Debug.kt
@@ -12,7 +12,26 @@ import java.util.concurrent.atomic.*
import kotlin.internal.InlineOnly
/**
- * Name of the property that controls coroutine debugging. See [newCoroutineContext][CoroutineScope.newCoroutineContext].
+ * Name of the property that controls coroutine debugging.
+ *
+ * ### Debugging facilities
+ *
+ * In debug mode every coroutine is assigned a unique consecutive identifier.
+ * Every thread that executes a coroutine has its name modified to include the name and identifier of
+ * the currently running coroutine.
+ *
+ * Enable debugging facilities with "`kotlinx.coroutines.debug`" ([DEBUG_PROPERTY_NAME]) system property,
+ * use the following values:
+ *
+ * * "`auto`" (default mode, [DEBUG_PROPERTY_VALUE_AUTO]) -- enabled when assertions are enabled with "`-ea`" JVM option.
+ * * "`on`" ([DEBUG_PROPERTY_VALUE_ON]) or empty string -- enabled.
+ * * "`off`" ([DEBUG_PROPERTY_VALUE_OFF]) -- disabled.
+ *
+ * Coroutine name can be explicitly assigned using [CoroutineName] context element.
+ * The string "coroutine" is used as a default name.
+ *
+ * Debugging facilities are implemented by [newCoroutineContext][CoroutineScope.newCoroutineContext] function that
+ * is used in all coroutine builders to create context of a new coroutine.
*/
public const val DEBUG_PROPERTY_NAME = "kotlinx.coroutines.debug"
@@ -23,7 +42,6 @@ public const val DEBUG_PROPERTY_NAME = "kotlinx.coroutines.debug"
* Stacktrace recovery mode wraps every exception into the exception of the same type with original exception
* as cause, but with stacktrace of the current coroutine.
* Exception is instantiated using reflection by using no-arg, cause or cause and message constructor.
- * Stacktrace is not recovered if exception is an instance of [CancellationException] or [NonRecoverableThrowable].
*
* This mechanism is currently supported for channels, [async], [launch], [coroutineScope], [supervisorScope]
* and [withContext] builders.
@@ -58,17 +76,17 @@ public interface CopyableThrowable<T> where T : Throwable, T : CopyableThrowable
}
/**
- * Automatic debug configuration value for [DEBUG_PROPERTY_NAME]. See [newCoroutineContext][CoroutineScope.newCoroutineContext].
+ * Automatic debug configuration value for [DEBUG_PROPERTY_NAME].
*/
public const val DEBUG_PROPERTY_VALUE_AUTO = "auto"
/**
- * Debug turned on value for [DEBUG_PROPERTY_NAME]. See [newCoroutineContext][CoroutineScope.newCoroutineContext].
+ * Debug turned on value for [DEBUG_PROPERTY_NAME].
*/
public const val DEBUG_PROPERTY_VALUE_ON = "on"
/**
- * Debug turned on value for [DEBUG_PROPERTY_NAME]. See [newCoroutineContext][CoroutineScope.newCoroutineContext].
+ * Debug turned on value for [DEBUG_PROPERTY_NAME].
*/
public const val DEBUG_PROPERTY_VALUE_OFF = "off"
diff --git a/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt b/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt
index 0323c731..2d7ed7a3 100644
--- a/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt
+++ b/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt
@@ -11,8 +11,23 @@ import java.util.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
+/*
+ * `Class.forName(name).canonicalName` instead of plain `name` is required to properly handle
+ * Android's minifier that renames these classes and breaks our recovery heuristic without such lookup.
+ */
+private const val baseContinuationImplClass = "kotlin.coroutines.jvm.internal.BaseContinuationImpl"
+private const val stackTraceRecoveryClass = "kotlinx.coroutines.internal.StackTraceRecoveryKt"
+
+private val baseContinuationImplClassName = runCatching {
+ Class.forName(baseContinuationImplClass).canonicalName
+}.getOrElse { baseContinuationImplClass }
+
+private val stackTraceRecoveryClassName = runCatching {
+ Class.forName(stackTraceRecoveryClass).canonicalName
+}.getOrElse { stackTraceRecoveryClass }
+
internal actual fun <E : Throwable> recoverStackTrace(exception: E): E {
- if (recoveryDisabled(exception)) return exception
+ if (!RECOVER_STACK_TRACES) return exception
// No unwrapping on continuation-less path: exception is not reported multiple times via slow paths
val copy = tryCopyException(exception) ?: return exception
return copy.sanitizeStackTrace()
@@ -21,10 +36,9 @@ internal actual fun <E : Throwable> recoverStackTrace(exception: E): E {
private fun <E : Throwable> E.sanitizeStackTrace(): E {
val stackTrace = stackTrace
val size = stackTrace.size
-
- val lastIntrinsic = stackTrace.frameIndex("kotlinx.coroutines.internal.StackTraceRecoveryKt")
+ val lastIntrinsic = stackTrace.frameIndex(stackTraceRecoveryClassName)
val startIndex = lastIntrinsic + 1
- val endIndex = stackTrace.frameIndex("kotlin.coroutines.jvm.internal.BaseContinuationImpl")
+ val endIndex = stackTrace.frameIndex(baseContinuationImplClassName)
val adjustment = if (endIndex == -1) 0 else size - endIndex
val trace = Array(size - lastIntrinsic - adjustment) {
if (it == 0) {
@@ -39,7 +53,7 @@ private fun <E : Throwable> E.sanitizeStackTrace(): E {
}
internal actual fun <E : Throwable> recoverStackTrace(exception: E, continuation: Continuation<*>): E {
- if (recoveryDisabled(exception) || continuation !is CoroutineStackFrame) return exception
+ if (!RECOVER_STACK_TRACES || continuation !is CoroutineStackFrame) return exception
return recoverFromStackFrame(exception, continuation)
}
@@ -83,7 +97,7 @@ private fun <E : Throwable> recoverFromStackFrame(exception: E, continuation: Co
private fun <E : Throwable> createFinalException(cause: E, result: E, resultStackTrace: ArrayDeque<StackTraceElement>): E {
resultStackTrace.addFirst(artificialFrame("Coroutine boundary"))
val causeTrace = cause.stackTrace
- val size = causeTrace.frameIndex("kotlin.coroutines.jvm.internal.BaseContinuationImpl")
+ val size = causeTrace.frameIndex(baseContinuationImplClassName)
if (size == -1) {
result.stackTrace = resultStackTrace.toTypedArray()
return result
@@ -133,7 +147,7 @@ private fun mergeRecoveredTraces(recoveredStacktrace: Array<StackTraceElement>,
@Suppress("NOTHING_TO_INLINE")
internal actual suspend inline fun recoverAndThrow(exception: Throwable): Nothing {
- if (recoveryDisabled(exception)) throw exception
+ if (!RECOVER_STACK_TRACES) throw exception
suspendCoroutineUninterceptedOrReturn<Nothing> {
if (it !is CoroutineStackFrame) throw exception
throw recoverFromStackFrame(exception, it)
@@ -141,7 +155,7 @@ internal actual suspend inline fun recoverAndThrow(exception: Throwable): Nothin
}
internal actual fun <E : Throwable> unwrap(exception: E): E {
- if (recoveryDisabled(exception)) return exception
+ if (!RECOVER_STACK_TRACES) return exception
val cause = exception.cause
// Fast-path to avoid array cloning
if (cause == null || cause.javaClass != exception.javaClass) {
@@ -156,9 +170,6 @@ internal actual fun <E : Throwable> unwrap(exception: E): E {
}
}
-private fun <E : Throwable> recoveryDisabled(exception: E) =
- !RECOVER_STACK_TRACES || exception is NonRecoverableThrowable
-
private fun createStackTrace(continuation: CoroutineStackFrame): ArrayDeque<StackTraceElement> {
val stack = ArrayDeque<StackTraceElement>()
continuation.getStackTraceElement()?.let { stack.add(it) }
diff --git a/kotlinx-coroutines-core/jvm/test/TestBase.kt b/kotlinx-coroutines-core/jvm/test/TestBase.kt
index 073c7a55..32007a04 100644
--- a/kotlinx-coroutines-core/jvm/test/TestBase.kt
+++ b/kotlinx-coroutines-core/jvm/test/TestBase.kt
@@ -144,7 +144,7 @@ public actual open class TestBase actual constructor() {
// onCompletion should not throw exceptions before it finishes all cleanup, so that other tests always
// start in a clear, restored state
if (actionIndex.get() != 0 && !finished.get()) {
- makeError("Expecting that 'finish(...)' was invoked, but it was not")
+ makeError("Expecting that 'finish(${actionIndex.get() + 1})' was invoked, but it was not")
}
// Shutdown all thread pools
shutdownPoolsAfterTest()
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ArrayChannelStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ArrayChannelStressTest.kt
index ccb0e874..74dc24c7 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/ArrayChannelStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/ArrayChannelStressTest.kt
@@ -22,13 +22,13 @@ class ArrayChannelStressTest(private val capacity: Int) : TestBase() {
fun testStress() = runTest {
val n = 100_000 * stressTestMultiplier
val q = Channel<Int>(capacity)
- val sender = launch(coroutineContext) {
+ val sender = launch {
for (i in 1..n) {
q.send(i)
}
expect(2)
}
- val receiver = launch(coroutineContext) {
+ val receiver = launch {
for (i in 1..n) {
val next = q.receive()
check(next == i)
@@ -40,4 +40,25 @@ class ArrayChannelStressTest(private val capacity: Int) : TestBase() {
receiver.join()
finish(4)
}
+
+ @Test
+ fun testBurst() = runTest {
+ Assume.assumeTrue(capacity < 100_000)
+ repeat(10_000 * stressTestMultiplier) {
+ val channel = Channel<Int>(capacity)
+ val sender = launch(Dispatchers.Default) {
+ for (i in 1..capacity * 2) {
+ channel.send(i)
+ }
+ }
+ val receiver = launch(Dispatchers.Default) {
+ for (i in 1..capacity * 2) {
+ val next = channel.receive()
+ check(next == i)
+ }
+ }
+ sender.join()
+ receiver.join()
+ }
+ }
}
diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt
index db5fabcc..e7b46cd1 100644
--- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt
@@ -10,6 +10,7 @@ import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.selects.*
import org.junit.Test
import java.util.concurrent.*
+import kotlin.concurrent.*
import kotlin.coroutines.*
import kotlin.test.*
@@ -292,10 +293,13 @@ class StackTraceRecoveryTest : TestBase() {
val barrier = CyclicBarrier(2)
var exception: Throwable? = null
- await.startCoroutineUnintercepted(Continuation(EmptyCoroutineContext) {
- exception = it.exceptionOrNull()
- barrier.await()
- })
+
+ thread {
+ await.startCoroutineUnintercepted(Continuation(EmptyCoroutineContext) {
+ exception = it.exceptionOrNull()
+ barrier.await()
+ })
+ }
barrier.await()
val e = exception
diff --git a/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt
index e2b64a88..f7104034 100644
--- a/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt
@@ -83,7 +83,7 @@ class CallbackFlowTest : TestBase() {
}
}
- val flow = channelFlow<Int> {
+ val flow = callbackFlow<Int>() {
api.start(channel)
awaitClose {
api.stop()
@@ -118,7 +118,7 @@ class CallbackFlowTest : TestBase() {
}
}
- private fun Flow<Int>.merge(other: Flow<Int>): Flow<Int> = channelFlow {
+ private fun Flow<Int>.merge(other: Flow<Int>): Flow<Int> = callbackFlow {
launch {
collect { send(it) }
}
diff --git a/kotlinx-coroutines-core/native/test/WorkerTest.kt b/kotlinx-coroutines-core/native/test/WorkerTest.kt
new file mode 100644
index 00000000..84acedac
--- /dev/null
+++ b/kotlinx-coroutines-core/native/test/WorkerTest.kt
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines
+
+import kotlin.coroutines.*
+import kotlin.native.concurrent.*
+import kotlin.test.*
+
+class WorkerTest : TestBase() {
+
+ @Test
+ fun testLaunchInWorker() {
+ val worker = Worker.start()
+ worker.execute(TransferMode.SAFE, { }) {
+ runBlocking {
+ launch { }.join()
+ delay(1)
+ }
+ }.result
+ }
+
+ @Test
+ fun testLaunchInWorkerTroughGlobalScope() {
+ val worker = Worker.start()
+ worker.execute(TransferMode.SAFE, { }) {
+ runBlocking {
+ CoroutineScope(EmptyCoroutineContext).launch {
+ delay(1)
+ }.join()
+ }
+ }.result
+ }
+}
diff --git a/kotlinx-coroutines-debug/README.md b/kotlinx-coroutines-debug/README.md
index 22869270..790eeaad 100644
--- a/kotlinx-coroutines-debug/README.md
+++ b/kotlinx-coroutines-debug/README.md
@@ -18,7 +18,7 @@ of coroutines hierarchy referenced by a [Job] or [CoroutineScope] instances usin
Add `kotlinx-coroutines-debug` to your project test dependencies:
```
dependencies {
- testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-debug:1.3.0-RC'
+ testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-debug:1.3.0-RC2'
}
```
@@ -57,7 +57,7 @@ stacktraces will be dumped to the console.
### Using as JVM agent
It is possible to use this module as a standalone JVM agent to enable debug probes on the application startup.
-You can run your application with an additional argument: `-javaagent:kotlinx-coroutines-debug-1.3.0-RC.jar`.
+You can run your application with an additional argument: `-javaagent:kotlinx-coroutines-debug-1.3.0-RC2.jar`.
Additionally, on Linux and Mac OS X you can use `kill -5 $pid` command in order to force your application to print all alive coroutines.
diff --git a/kotlinx-coroutines-debug/src/CoroutineInfo.kt b/kotlinx-coroutines-debug/src/CoroutineInfo.kt
index 56f391a2..84cd9f37 100644
--- a/kotlinx-coroutines-debug/src/CoroutineInfo.kt
+++ b/kotlinx-coroutines-debug/src/CoroutineInfo.kt
@@ -14,7 +14,7 @@ import kotlin.coroutines.jvm.internal.*
* Class describing coroutine info such as its context, state and stacktrace.
*/
@ExperimentalCoroutinesApi
-public data class CoroutineInfo internal constructor(
+public class CoroutineInfo internal constructor(
val context: CoroutineContext,
private val creationStackBottom: CoroutineStackFrame,
@JvmField internal val sequenceNumber: Long
@@ -44,14 +44,10 @@ public data class CoroutineInfo internal constructor(
@JvmField
internal var lastObservedFrame: CoroutineStackFrame? = null
- // Copy constructor
- internal constructor(coroutine: Continuation<*>, state: CoroutineInfo) : this(
- coroutine.context,
- state.creationStackBottom,
- state.sequenceNumber
- ) {
- _state = state.state
- this.lastObservedFrame = state.lastObservedFrame
+ public fun copy(): CoroutineInfo = CoroutineInfo(context, creationStackBottom, sequenceNumber).also {
+ it._state = _state
+ it.lastObservedFrame = lastObservedFrame
+ it.lastObservedThread = lastObservedThread
}
/**
@@ -94,6 +90,8 @@ public data class CoroutineInfo internal constructor(
lastObservedThread = null
}
}
+
+ override fun toString(): String = "CoroutineInfo(state=$state,context=$context)"
}
/**
diff --git a/kotlinx-coroutines-debug/src/internal/DebugProbesImpl.kt b/kotlinx-coroutines-debug/src/internal/DebugProbesImpl.kt
index 29b1e36e..b8b01c35 100644
--- a/kotlinx-coroutines-debug/src/internal/DebugProbesImpl.kt
+++ b/kotlinx-coroutines-debug/src/internal/DebugProbesImpl.kt
@@ -80,7 +80,7 @@ internal object DebugProbesImpl {
check(isInstalled) { "Debug probes are not installed" }
val jobToStack = capturedCoroutines
.filter { it.delegate.context[Job] != null }
- .associateBy({ it.delegate.context[Job]!! }, {it.info})
+ .associateBy({ it.delegate.context[Job]!! }, { it.info })
return buildString {
job.build(jobToStack, this, "")
}
@@ -118,7 +118,7 @@ internal object DebugProbesImpl {
public fun dumpCoroutinesInfo(): List<CoroutineInfo> {
check(isInstalled) { "Debug probes are not installed" }
return capturedCoroutines.asSequence()
- .map { CoroutineInfo(it.delegate, it.info) }
+ .map { it.info.copy() } // Copy as CoroutineInfo can be mutated concurrently by DebugProbes
.sortedBy { it.sequenceNumber }
.toList()
}
@@ -373,7 +373,7 @@ internal object DebugProbesImpl {
private fun <T : Throwable> sanitizeStackTrace(throwable: T): List<StackTraceElement> {
val stackTrace = throwable.stackTrace
val size = stackTrace.size
- val probeIndex = stackTrace.indexOfLast { it.className == "kotlin.coroutines.jvm.internal.DebugProbesKt" }
+ val probeIndex = stackTrace.indexOfLast { it.className == "kotlin.coroutines.jvm.internal.DebugProbesKt" }
if (!DebugProbes.sanitizeStackTraces) {
return List(size - probeIndex) {
diff --git a/kotlinx-coroutines-debug/test/RunningThreadStackMergeTest.kt b/kotlinx-coroutines-debug/test/RunningThreadStackMergeTest.kt
index 3ccbe0ae..c15fe894 100644
--- a/kotlinx-coroutines-debug/test/RunningThreadStackMergeTest.kt
+++ b/kotlinx-coroutines-debug/test/RunningThreadStackMergeTest.kt
@@ -167,4 +167,15 @@ class RunningThreadStackMergeTest : DebugTestBase() {
yield()
assertTrue(true)
}
+
+ @Test
+ fun testActiveThread() = runBlocking<Unit> {
+ launchCoroutine()
+ awaitCoroutineStarted()
+ val info = DebugProbes.dumpCoroutinesInfo().find { it.state == State.RUNNING }
+ assertNotNull(info)
+ @Suppress("INVISIBLE_MEMBER") // IDEA bug
+ assertNotNull(info.lastObservedThread)
+ coroutineBlocker.await()
+ }
}
diff --git a/kotlinx-coroutines-test/README.md b/kotlinx-coroutines-test/README.md
index 1838bbff..8e8be757 100644
--- a/kotlinx-coroutines-test/README.md
+++ b/kotlinx-coroutines-test/README.md
@@ -9,7 +9,7 @@ This package provides testing utilities for effectively testing coroutines.
Add `kotlinx-coroutines-test` to your project test dependencies:
```
dependencies {
- testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.3.0-RC'
+ testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.3.0-RC2'
}
```
diff --git a/reactive/kotlinx-coroutines-reactive/src/Await.kt b/reactive/kotlinx-coroutines-reactive/src/Await.kt
index d12a6280..072773a4 100644
--- a/reactive/kotlinx-coroutines-reactive/src/Await.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/Await.kt
@@ -10,6 +10,7 @@ import kotlinx.coroutines.suspendCancellableCoroutine
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
+import java.util.*
import kotlin.coroutines.*
/**
@@ -81,6 +82,16 @@ public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
// ------------------------ private ------------------------
+// ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only.
+// If `kotlinx-coroutines-reactor` module is not included, the list is empty.
+private val contextInjectors: Array<ContextInjector> =
+ ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader).iterator().asSequence().toList().toTypedArray() // R8 opto
+
+private fun <T> Publisher<T>.injectCoroutineContext(coroutineContext: CoroutineContext) =
+ contextInjectors.fold(this) { pub, contextInjector ->
+ contextInjector.injectCoroutineContext(pub, coroutineContext)
+ }
+
private enum class Mode(val s: String) {
FIRST("awaitFirst"),
FIRST_OR_DEFAULT("awaitFirstOrDefault"),
@@ -93,7 +104,7 @@ private suspend fun <T> Publisher<T>.awaitOne(
mode: Mode,
default: T? = null
): T = suspendCancellableCoroutine { cont ->
- subscribe(object : Subscriber<T> {
+ injectCoroutineContext(cont.context).subscribe(object : Subscriber<T> {
private lateinit var subscription: Subscription
private var value: T? = null
private var seenValue = false
diff --git a/reactive/kotlinx-coroutines-reactive/src/ContextInjector.kt b/reactive/kotlinx-coroutines-reactive/src/ContextInjector.kt
new file mode 100644
index 00000000..45f65530
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactive/src/ContextInjector.kt
@@ -0,0 +1,15 @@
+package kotlinx.coroutines.reactive
+
+import kotlinx.coroutines.InternalCoroutinesApi
+import org.reactivestreams.Publisher
+import kotlin.coroutines.CoroutineContext
+
+/** @suppress */
+@InternalCoroutinesApi
+public interface ContextInjector {
+ /**
+ * Injects `ReactorContext` element from the given context into the `SubscriberContext` of the publisher.
+ * This API used as an indirection layer between `reactive` and `reactor` modules.
+ */
+ public fun <T> injectCoroutineContext(publisher: Publisher<T>, coroutineContext: CoroutineContext): Publisher<T>
+} \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt b/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt
new file mode 100644
index 00000000..387c8e77
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+@file:JvmMultifileClass
+@file:JvmName("FlowKt")
+
+package kotlinx.coroutines.reactive
+
+import kotlinx.atomicfu.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import org.reactivestreams.*
+import kotlinx.coroutines.intrinsics.*
+
+/**
+ * Transforms the given flow to a spec-compliant [Publisher].
+ */
+@ExperimentalCoroutinesApi
+public fun <T : Any> Flow<T>.asPublisher(): Publisher<T> = FlowAsPublisher(this)
+
+/**
+ * Adapter that transforms [Flow] into TCK-complaint [Publisher].
+ * [cancel] invocation cancels the original flow.
+ */
+@Suppress("PublisherImplementation")
+private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T> {
+ override fun subscribe(subscriber: Subscriber<in T>?) {
+ if (subscriber == null) throw NullPointerException()
+ subscriber.onSubscribe(FlowSubscription(flow, subscriber))
+ }
+}
+
+/** @suppress */
+@InternalCoroutinesApi
+public class FlowSubscription<T>(
+ @JvmField val flow: Flow<T>,
+ @JvmField val subscriber: Subscriber<in T>
+) : Subscription, AbstractCoroutine<Unit>(Dispatchers.Unconfined, false) {
+ private val requested = atomic(0L)
+ private val producer = atomic<CancellableContinuation<Unit>?>(null)
+
+ override fun onStart() {
+ ::flowProcessing.startCoroutineCancellable(this)
+ }
+
+ private suspend fun flowProcessing() {
+ try {
+ consumeFlow()
+ subscriber.onComplete()
+ } catch (e: Throwable) {
+ try {
+ if (e is CancellationException) {
+ subscriber.onComplete()
+ } else {
+ subscriber.onError(e)
+ }
+ } catch (e: Throwable) {
+ // Last ditch report
+ handleCoroutineException(coroutineContext, e)
+ }
+ }
+ }
+
+ /*
+ * This method has at most one caller at any time (triggered from the `request` method)
+ */
+ private suspend fun consumeFlow() {
+ flow.collect { value ->
+ /*
+ * Flow is scopeless, thus if it's not active, its subscription was cancelled.
+ * No intermediate "child failed, but flow coroutine is not" states are allowed.
+ */
+ coroutineContext.ensureActive()
+ if (requested.value <= 0L) {
+ suspendCancellableCoroutine<Unit> {
+ producer.value = it
+ if (requested.value != 0L) it.resumeSafely()
+ }
+ }
+ requested.decrementAndGet()
+ subscriber.onNext(value)
+ }
+ }
+
+ override fun cancel() {
+ cancel(null)
+ }
+
+ override fun request(n: Long) {
+ if (n <= 0) {
+ return
+ }
+ start()
+ requested.update { value ->
+ val newValue = value + n
+ if (newValue <= 0L) Long.MAX_VALUE else newValue
+ }
+ val producer = producer.getAndSet(null) ?: return
+ producer.resumeSafely()
+ }
+
+ private fun CancellableContinuation<Unit>.resumeSafely() {
+ val token = tryResume(Unit)
+ if (token != null) {
+ completeResume(token)
+ }
+ }
+}
diff --git a/reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt b/reactive/kotlinx-coroutines-reactive/src/PublisherAsFlow.kt
index 50338de6..8da106e5 100644
--- a/reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/PublisherAsFlow.kt
@@ -2,14 +2,17 @@
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-package kotlinx.coroutines.reactive.flow
+@file:JvmMultifileClass
+@file:JvmName("FlowKt")
+
+package kotlinx.coroutines.reactive
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.internal.*
-import kotlinx.coroutines.reactive.*
import org.reactivestreams.*
+import java.util.*
import kotlin.coroutines.*
/**
@@ -21,13 +24,11 @@ import kotlin.coroutines.*
* If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flights elements
* are discarded.
*/
-@JvmName("from")
@ExperimentalCoroutinesApi
public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
PublisherAsFlow(this, 1)
@FlowPreview
-@JvmName("from")
@Deprecated(
message = "batchSize parameter is deprecated, use .buffer() instead to control the backpressure",
level = DeprecationLevel.ERROR,
@@ -46,7 +47,9 @@ private class PublisherAsFlow<T : Any>(
// use another channel for conflation (cannot do openSubscription)
if (capacity < 0) return super.produceImpl(scope)
// Open subscription channel directly
- val channel = publisher.openSubscription(capacity)
+ val channel = publisher
+ .injectCoroutineContext(scope.coroutineContext)
+ .openSubscription(capacity)
val handle = scope.coroutineContext[Job]?.invokeOnCompletion(onCancelling = true) { cause ->
channel.cancel(cause?.let {
it as? CancellationException ?: CancellationException("Job was cancelled", it)
@@ -70,7 +73,7 @@ private class PublisherAsFlow<T : Any>(
override suspend fun collect(collector: FlowCollector<T>) {
val subscriber = ReactiveSubscriber<T>(capacity, requestSize)
- publisher.subscribe(subscriber)
+ publisher.injectCoroutineContext(coroutineContext).subscribe(subscriber)
try {
var consumed = 0L
while (true) {
@@ -127,3 +130,11 @@ private class ReactiveSubscriber<T : Any>(
subscription.cancel()
}
}
+
+// ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only.
+// If `kotlinx-coroutines-reactor` module is not included, the list is empty.
+private val contextInjectors: List<ContextInjector> =
+ ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader).toList()
+
+private fun <T> Publisher<T>.injectCoroutineContext(coroutineContext: CoroutineContext) =
+ contextInjectors.fold(this) { pub, contextInjector -> contextInjector.injectCoroutineContext(pub, coroutineContext) } \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt b/reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt
deleted file mode 100644
index 05f2391e..00000000
--- a/reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.reactive.flow
-
-import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.*
-import org.reactivestreams.*
-import java.util.concurrent.atomic.*
-import kotlin.coroutines.*
-
-/**
- * Transforms the given flow to a spec-compliant [Publisher].
- */
-@JvmName("from")
-@ExperimentalCoroutinesApi
-public fun <T : Any> Flow<T>.asPublisher(): Publisher<T> = FlowAsPublisher(this)
-
-/**
- * Adapter that transforms [Flow] into TCK-complaint [Publisher].
- * [cancel] invocation cancels the original flow.
- */
-@Suppress("PublisherImplementation")
-private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T> {
-
- override fun subscribe(subscriber: Subscriber<in T>?) {
- if (subscriber == null) throw NullPointerException()
- subscriber.onSubscribe(FlowSubscription(flow, subscriber))
- }
-
- private class FlowSubscription<T>(val flow: Flow<T>, val subscriber: Subscriber<in T>) : Subscription {
- @Volatile
- internal var canceled: Boolean = false
- private val requested = AtomicLong(0L)
- private val producer: AtomicReference<CancellableContinuation<Unit>?> = AtomicReference()
-
- // This is actually optimizable
- private var job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.LAZY) {
- try {
- consumeFlow()
- subscriber.onComplete()
- } catch (e: Throwable) {
- // Failed with real exception, not due to cancellation
- if (!coroutineContext[Job]!!.isCancelled) {
- subscriber.onError(e)
- }
- }
- }
-
- private suspend fun consumeFlow() {
- flow.collect { value ->
- if (!coroutineContext.isActive) {
- subscriber.onComplete()
- coroutineContext.ensureActive()
- }
-
- if (requested.get() == 0L) {
- suspendCancellableCoroutine<Unit> {
- producer.set(it)
- if (requested.get() != 0L) it.resumeSafely()
- }
- }
-
- requested.decrementAndGet()
- subscriber.onNext(value)
- }
- }
-
- override fun cancel() {
- canceled = true
- job.cancel()
- }
-
- override fun request(n: Long) {
- if (n <= 0) {
- return
- }
-
- if (canceled) return
-
- job.start()
- var snapshot: Long
- var newValue: Long
- do {
- snapshot = requested.get()
- newValue = snapshot + n
- if (newValue <= 0L) newValue = Long.MAX_VALUE
- } while (!requested.compareAndSet(snapshot, newValue))
-
- val prev = producer.get()
- if (prev == null || !producer.compareAndSet(prev, null)) return
- prev.resumeSafely()
- }
-
- private fun CancellableContinuation<Unit>.resumeSafely() {
- val token = tryResume(Unit)
- if (token != null) {
- completeResume(token)
- }
- }
- }
-}
diff --git a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt
new file mode 100644
index 00000000..86334928
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.reactive
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import org.junit.Test
+import org.reactivestreams.*
+import kotlin.test.*
+
+class FlowAsPublisherTest : TestBase() {
+
+ @Test
+ fun testErrorOnCancellationIsReported() {
+ expect(1)
+ flow<Int> {
+ emit(2)
+ try {
+ hang { expect(3) }
+ } finally {
+ throw TestException()
+ }
+ }.asPublisher().subscribe(object : Subscriber<Int> {
+ private lateinit var subscription: Subscription
+
+ override fun onComplete() {
+ expectUnreached()
+ }
+
+ override fun onSubscribe(s: Subscription?) {
+ subscription = s!!
+ subscription.request(2)
+ }
+
+ override fun onNext(t: Int) {
+ expect(t)
+ subscription.cancel()
+ }
+
+ override fun onError(t: Throwable?) {
+ assertTrue(t is TestException)
+ expect(4)
+ }
+ })
+ finish(5)
+ }
+
+ @Test
+ fun testCancellationIsNotReported() {
+ expect(1)
+ flow<Int> {
+ emit(2)
+ hang { expect(3) }
+ }.asPublisher().subscribe(object : Subscriber<Int> {
+ private lateinit var subscription: Subscription
+
+ override fun onComplete() {
+ expect(4)
+ }
+
+ override fun onSubscribe(s: Subscription?) {
+ subscription = s!!
+ subscription.request(2)
+ }
+
+ override fun onNext(t: Int) {
+ expect(t)
+ subscription.cancel()
+ }
+
+ override fun onError(t: Throwable?) {
+ expectUnreached()
+ }
+ })
+ finish(5)
+ }
+}
diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/IterableFlowTckTest.kt b/reactive/kotlinx-coroutines-reactive/test/IterableFlowTckTest.kt
index 31c5a3c4..5dfd9d53 100644
--- a/reactive/kotlinx-coroutines-reactive/test/flow/IterableFlowTckTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/IterableFlowTckTest.kt
@@ -4,7 +4,7 @@
@file:Suppress("UNCHECKED_CAST")
-package kotlinx.coroutines.reactive.flow
+package kotlinx.coroutines.reactive
import kotlinx.coroutines.flow.*
import org.junit.*
diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt
index 3f33b33c..a37719de 100644
--- a/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt
@@ -2,12 +2,11 @@
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-package kotlinx.coroutines.reactive.flow
+package kotlinx.coroutines.reactive
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
-import kotlinx.coroutines.reactive.*
import kotlin.test.*
class PublisherAsFlowTest : TestBase() {
diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt b/reactive/kotlinx-coroutines-reactive/test/RangePublisherBufferedTest.kt
index 2ff96eb1..b710c590 100644
--- a/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/RangePublisherBufferedTest.kt
@@ -2,7 +2,7 @@
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-package kotlinx.coroutines.reactive.flow
+package kotlinx.coroutines.reactive
import kotlinx.coroutines.flow.*
import org.junit.*
diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherTest.kt b/reactive/kotlinx-coroutines-reactive/test/RangePublisherTest.kt
index 1b37ee99..72d5de5e 100644
--- a/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/RangePublisherTest.kt
@@ -2,7 +2,7 @@
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-package kotlinx.coroutines.reactive.flow
+package kotlinx.coroutines.reactive
import org.junit.*
import org.reactivestreams.*
diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/UnboundedIntegerIncrementPublisherTest.kt b/reactive/kotlinx-coroutines-reactive/test/UnboundedIntegerIncrementPublisherTest.kt
index 9e611008..63d444c1 100644
--- a/reactive/kotlinx-coroutines-reactive/test/flow/UnboundedIntegerIncrementPublisherTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/UnboundedIntegerIncrementPublisherTest.kt
@@ -2,7 +2,7 @@
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-package kotlinx.coroutines.reactive.flow
+package kotlinx.coroutines.reactive
import org.junit.*
import org.reactivestreams.example.unicast.AsyncIterablePublisher
diff --git a/reactive/kotlinx-coroutines-reactor/resources/META-INF/services/kotlinx.coroutines.reactive.ContextInjector b/reactive/kotlinx-coroutines-reactor/resources/META-INF/services/kotlinx.coroutines.reactive.ContextInjector
new file mode 100644
index 00000000..0097ec35
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactor/resources/META-INF/services/kotlinx.coroutines.reactive.ContextInjector
@@ -0,0 +1 @@
+kotlinx.coroutines.reactor.ReactorContextInjector \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactor/src/FlowAsFlux.kt b/reactive/kotlinx-coroutines-reactor/src/FlowAsFlux.kt
new file mode 100644
index 00000000..7c6182bf
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactor/src/FlowAsFlux.kt
@@ -0,0 +1,26 @@
+@file:JvmName("FlowKt")
+
+package kotlinx.coroutines.reactor
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.flowOn
+import kotlinx.coroutines.reactive.FlowSubscription
+import reactor.core.CoreSubscriber
+import reactor.core.publisher.Flux
+
+/**
+ * Converts the given flow to a cold flux.
+ * The original flow is cancelled when the flux subscriber is disposed.
+ */
+@ExperimentalCoroutinesApi
+public fun <T: Any> Flow<T>.asFlux(): Flux<T> = FlowAsFlux(this)
+
+private class FlowAsFlux<T : Any>(private val flow: Flow<T>) : Flux<T>() {
+ override fun subscribe(subscriber: CoreSubscriber<in T>?) {
+ if (subscriber == null) throw NullPointerException()
+ val hasContext = subscriber.currentContext().isEmpty
+ val source = if (hasContext) flow.flowOn(subscriber.currentContext().asCoroutineContext()) else flow
+ subscriber.onSubscribe(FlowSubscription(source, subscriber))
+ }
+} \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt
index 316146b5..18b84ac1 100644
--- a/reactive/kotlinx-coroutines-reactor/src/Flux.kt
+++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt
@@ -74,4 +74,4 @@ private fun <T> reactorPublish(
val coroutine = PublisherCoroutine(newContext, subscriber)
subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
-}
+} \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt
index 5a4ccd04..942ba7b6 100644
--- a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt
+++ b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt
@@ -30,6 +30,18 @@ import kotlin.coroutines.*
* .subscribe()
* }
* ```
+ *
+ * [CoroutineContext] of a suspendable function that awaits a value from [Mono] or [Flux] instance
+ * is propagated into [mono] and [flux] Reactor builders:
+ * ```
+ * launch(Context.of("key", "value").asCoroutineContext()) {
+ * assertEquals(bar().awaitFirst(), "value")
+ * }
+ *
+ * fun bar(): Mono<String> = mono {
+ * coroutineContext[ReactorContext]!!.context.get("key")
+ * }
+ * ```
*/
@ExperimentalCoroutinesApi
public class ReactorContext(val context: Context) : AbstractCoroutineContextElement(ReactorContext) {
diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt
new file mode 100644
index 00000000..68309bbc
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt
@@ -0,0 +1,22 @@
+package kotlinx.coroutines.reactor
+
+import kotlinx.coroutines.reactive.*
+import org.reactivestreams.*
+import reactor.core.publisher.*
+import reactor.util.context.*
+import kotlin.coroutines.*
+
+internal class ReactorContextInjector : ContextInjector {
+ /**
+ * Injects all values from the [ReactorContext] entry of the given coroutine context
+ * into the downstream [Context] of Reactor's [Publisher] instances of [Mono] or [Flux].
+ */
+ override fun <T> injectCoroutineContext(publisher: Publisher<T>, coroutineContext: CoroutineContext): Publisher<T> {
+ val reactorContext = coroutineContext[ReactorContext]?.context ?: return publisher
+ return when(publisher) {
+ is Mono -> publisher.subscriberContext(reactorContext)
+ is Flux -> publisher.subscriberContext(reactorContext)
+ else -> publisher
+ }
+ }
+} \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt b/reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt
index 120cd72b..80feaeb8 100644
--- a/reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt
+++ b/reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt
@@ -7,7 +7,6 @@ package kotlinx.coroutines.reactor
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
-import kotlinx.coroutines.reactive.flow.*
import org.junit.Test
import reactor.core.publisher.*
import kotlin.test.*
diff --git a/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt
new file mode 100644
index 00000000..2f8ce9ac
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt
@@ -0,0 +1,27 @@
+package kotlinx.coroutines.reactor
+
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.reactive.*
+import kotlinx.coroutines.runBlocking
+import org.junit.Test
+import reactor.core.publisher.Mono
+import reactor.util.context.Context
+import kotlin.test.assertEquals
+
+class FlowAsFluxTest {
+ @Test
+ fun testFlowToFluxContextPropagation() = runBlocking<Unit> {
+ val flux = flow<String> {
+ (1..4).forEach { i -> emit(m(i).awaitFirst()) }
+ } .asFlux()
+ .subscriberContext(Context.of(1, "1"))
+ .subscriberContext(Context.of(2, "2", 3, "3", 4, "4"))
+ var i = 0
+ flux.subscribe { str -> i++; println(str); assertEquals(str, i.toString()) }
+ }
+
+ private fun m(i: Int): Mono<String> = mono {
+ val ctx = coroutineContext[ReactorContext]?.context
+ ctx?.getOrDefault(i, "noValue")
+ }
+} \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt
index 1fb4f0bb..e9ac200f 100644
--- a/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt
+++ b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt
@@ -1,10 +1,13 @@
package kotlinx.coroutines.reactor
import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
import org.junit.Test
-import reactor.util.context.Context
-import kotlin.test.assertEquals
+import reactor.core.publisher.*
+import reactor.util.context.*
+import kotlin.test.*
class ReactorContextTest {
@Test
@@ -14,8 +17,8 @@ class ReactorContextTest {
buildString {
(1..7).forEach { append(ctx?.getOrDefault(it, "noValue")) }
}
- } .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
- .subscriberContext { ctx -> ctx.put(6, "6") }
+ } .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
+ .subscriberContext { ctx -> ctx.put(6, "6") }
assertEquals(mono.awaitFirst(), "1234567")
}
@@ -29,4 +32,80 @@ class ReactorContextTest {
var i = 0
flux.subscribe { str -> i++; assertEquals(str, i.toString()) }
}
+
+ @Test
+ fun testAwait() = runBlocking(Context.of(3, "3").asCoroutineContext()) {
+ val result = mono(Context.of(1, "1").asCoroutineContext()) {
+ val ctx = coroutineContext[ReactorContext]?.context
+ buildString {
+ (1..3).forEach { append(ctx?.getOrDefault(it, "noValue")) }
+ }
+ } .subscriberContext(Context.of(2, "2"))
+ .awaitFirst()
+ assertEquals(result, "123")
+ }
+
+ @Test
+ fun testMonoAwaitContextPropagation() = runBlocking(Context.of(7, "7").asCoroutineContext()) {
+ assertEquals(m().awaitFirst(), "7")
+ assertEquals(m().awaitFirstOrDefault("noValue"), "7")
+ assertEquals(m().awaitFirstOrNull(), "7")
+ assertEquals(m().awaitFirstOrElse { "noValue" }, "7")
+ assertEquals(m().awaitLast(), "7")
+ assertEquals(m().awaitSingle(), "7")
+ }
+
+ @Test
+ fun testFluxAwaitContextPropagation() = runBlocking<Unit>(
+ Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()
+ ) {
+ assertEquals(f().awaitFirst(), "1")
+ assertEquals(f().awaitFirstOrDefault("noValue"), "1")
+ assertEquals(f().awaitFirstOrNull(), "1")
+ assertEquals(f().awaitFirstOrElse { "noValue" }, "1")
+ assertEquals(f().awaitLast(), "3")
+ var i = 0
+ f().subscribe { str -> i++; assertEquals(str, i.toString()) }
+ }
+
+ private fun m(): Mono<String> = mono {
+ val ctx = coroutineContext[ReactorContext]?.context
+ ctx?.getOrDefault(7, "noValue")
+ }
+
+
+ private fun f(): Flux<String?> = flux {
+ val ctx = coroutineContext[ReactorContext]?.context
+ (1..3).forEach { send(ctx?.getOrDefault(it, "noValue")) }
+ }
+
+ @Test
+ fun testFlowToFluxContextPropagation() = runBlocking(
+ Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()
+ ) {
+ var i = 0
+ // call "collect" on the converted Flow
+ bar().collect { str ->
+ i++; assertEquals(str, i.toString())
+ }
+ assertEquals(i, 3)
+ }
+
+ @Test
+ fun testFlowToFluxDirectContextPropagation() = runBlocking(
+ Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()
+ ) {
+ var i = 0
+ // convert resulting flow to channel using "produceIn"
+ val channel = bar().produceIn(this)
+ channel.consumeEach { str ->
+ i++; assertEquals(str, i.toString())
+ }
+ assertEquals(i, 3)
+ }
+
+ private fun bar(): Flow<String> = flux {
+ val ctx = coroutineContext[ReactorContext]!!.context
+ (1..3).forEach { send(ctx.getOrDefault(it, "noValue")) }
+ }.asFlow()
} \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
index d5678de9..4b121271 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
@@ -8,8 +8,7 @@ import io.reactivex.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
-import kotlinx.coroutines.reactive.flow.*
-import org.reactivestreams.*
+import kotlinx.coroutines.reactive.*
import kotlin.coroutines.*
/**
@@ -82,7 +81,7 @@ public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext):
/**
* Converts the given flow to a cold observable.
- * The original flow is cancelled if the observable subscriber was disposed.
+ * The original flow is cancelled when the observable subscriber is disposed.
*/
@JvmName("from")
@ExperimentalCoroutinesApi
@@ -106,8 +105,8 @@ public fun <T: Any> Flow<T>.asObservable() : Observable<T> = Observable.create {
}
/**
- * Converts the given flow to a cold observable.
- * The original flow is cancelled if the flowable subscriber was disposed.
+ * Converts the given flow to a cold flowable.
+ * The original flow is cancelled when the flowable subscriber is disposed.
*/
@JvmName("from")
@ExperimentalCoroutinesApi
diff --git a/reactive/kotlinx-coroutines-rx2/test/BackpressureTest.kt b/reactive/kotlinx-coroutines-rx2/test/BackpressureTest.kt
index 19043341..ed0bc369 100644
--- a/reactive/kotlinx-coroutines-rx2/test/BackpressureTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/BackpressureTest.kt
@@ -8,7 +8,6 @@ import io.reactivex.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
-import kotlinx.coroutines.reactive.flow.*
import org.junit.Test
import kotlin.test.*
diff --git a/ui/coroutines-guide-ui.md b/ui/coroutines-guide-ui.md
index 4d12d957..b49983e4 100644
--- a/ui/coroutines-guide-ui.md
+++ b/ui/coroutines-guide-ui.md
@@ -165,7 +165,7 @@ Add dependencies on `kotlinx-coroutines-android` module to the `dependencies { .
`app/build.gradle` file:
```groovy
-implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.0-RC"
+implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.0-RC2"
```
You can clone [kotlinx.coroutines](https://github.com/Kotlin/kotlinx.coroutines) project from GitHub onto your
diff --git a/ui/kotlinx-coroutines-android/animation-app/gradle.properties b/ui/kotlinx-coroutines-android/animation-app/gradle.properties
index 1fe12d6b..9e9d0d22 100644
--- a/ui/kotlinx-coroutines-android/animation-app/gradle.properties
+++ b/ui/kotlinx-coroutines-android/animation-app/gradle.properties
@@ -19,5 +19,5 @@ org.gradle.jvmargs=-Xmx1536m
kotlin.coroutines=enable
kotlin_version=1.3.41
-coroutines_version=1.3.0-RC
+coroutines_version=1.3.0-RC2
diff --git a/ui/kotlinx-coroutines-android/build.gradle b/ui/kotlinx-coroutines-android/build.gradle
index 195d6b53..5537577d 100644
--- a/ui/kotlinx-coroutines-android/build.gradle
+++ b/ui/kotlinx-coroutines-android/build.gradle
@@ -4,6 +4,17 @@
repositories {
google()
+ // TODO Remove once R8 is updated to a 1.6.x version.
+ maven {
+ url "http://storage.googleapis.com/r8-releases/raw/master"
+ metadataSources {
+ artifact()
+ }
+ }
+}
+
+configurations {
+ r8
}
dependencies {
@@ -12,6 +23,80 @@ dependencies {
testImplementation 'com.google.android:android:4.1.1.4'
testImplementation 'org.robolectric:robolectric:4.0-alpha-3'
+ testImplementation 'org.smali:baksmali:2.2.7'
+
+ // TODO Replace with a 1.6.x version once released to maven.google.com.
+ r8 'com.android.tools:r8:a7ce65837bec81c62261bf0adac73d9c09d32af2'
+}
+
+class RunR8Task extends JavaExec {
+
+ @OutputDirectory
+ File outputDex
+
+ @InputFile
+ File inputConfig
+
+ @InputFile
+ final File inputConfigCommon = new File('r8-test-common.pro')
+
+ @InputFiles
+ final File jarFile = project.jar.archivePath
+
+ @Override
+ Task configure(Closure closure) {
+ super.configure(closure)
+ classpath = project.configurations.r8
+ main = 'com.android.tools.r8.R8'
+
+ def arguments = [
+ '--release',
+ '--no-desugaring',
+ '--output', outputDex.absolutePath,
+ '--pg-conf', inputConfig.absolutePath
+ ]
+ arguments.addAll(project.configurations.runtimeClasspath.files.collect { it.absolutePath })
+ arguments.addAll(jarFile.absolutePath)
+
+ args = arguments
+ return this
+ }
+
+ @Override
+ void exec() {
+ if (outputDex.exists()) {
+ outputDex.deleteDir()
+ }
+ outputDex.mkdirs()
+
+ super.exec()
+ }
+}
+
+def optimizedDex = new File(buildDir, "dex-optim/")
+def unOptimizedDex = new File(buildDir, "dex-unoptim/")
+
+task runR8(type: RunR8Task, dependsOn: 'jar'){
+ outputDex = optimizedDex
+ inputConfig = file('r8-test-rules.pro')
+}
+
+task runR8NoOptim(type: RunR8Task, dependsOn: 'jar'){
+ outputDex = unOptimizedDex
+ inputConfig = file('r8-test-rules-no-optim.pro')
+}
+
+test {
+ // Ensure the R8-processed dex is built and supply its path as a property to the test.
+ dependsOn(runR8)
+ dependsOn(runR8NoOptim)
+ def dex1 = new File(optimizedDex, "classes.dex")
+ def dex2 = new File(unOptimizedDex, "classes.dex")
+
+ inputs.files(dex1, dex2)
+
+ systemProperty 'dexPath', dex1.absolutePath
+ systemProperty 'noOptimDexPath', dex2.absolutePath
}
tasks.withType(dokka.getClass()) {
diff --git a/ui/kotlinx-coroutines-android/example-app/gradle.properties b/ui/kotlinx-coroutines-android/example-app/gradle.properties
index 1fe12d6b..9e9d0d22 100644
--- a/ui/kotlinx-coroutines-android/example-app/gradle.properties
+++ b/ui/kotlinx-coroutines-android/example-app/gradle.properties
@@ -19,5 +19,5 @@ org.gradle.jvmargs=-Xmx1536m
kotlin.coroutines=enable
kotlin_version=1.3.41
-coroutines_version=1.3.0-RC
+coroutines_version=1.3.0-RC2
diff --git a/ui/kotlinx-coroutines-android/r8-test-common.pro b/ui/kotlinx-coroutines-android/r8-test-common.pro
new file mode 100644
index 00000000..03f36a82
--- /dev/null
+++ b/ui/kotlinx-coroutines-android/r8-test-common.pro
@@ -0,0 +1,12 @@
+# Entry point for retaining MainDispatcherLoader which uses a ServiceLoader.
+-keep class kotlinx.coroutines.Dispatchers {
+ ** getMain();
+}
+
+# Entry point for retaining CoroutineExceptionHandlerImpl.handlers which uses a ServiceLoader.
+-keep class kotlinx.coroutines.CoroutineExceptionHandlerKt {
+ void handleCoroutineException(...);
+}
+
+# We are cheating a bit by not having android.jar on R8's library classpath. Ignore those warnings.
+-ignorewarnings \ No newline at end of file
diff --git a/ui/kotlinx-coroutines-android/r8-test-rules-no-optim.pro b/ui/kotlinx-coroutines-android/r8-test-rules-no-optim.pro
new file mode 100644
index 00000000..d6bd4a42
--- /dev/null
+++ b/ui/kotlinx-coroutines-android/r8-test-rules-no-optim.pro
@@ -0,0 +1,4 @@
+-include r8-test-common.pro
+
+# Include the shrinker config used by legacy versions of AGP and ProGuard
+-include resources/META-INF/com.android.tools/proguard/coroutines.pro
diff --git a/ui/kotlinx-coroutines-android/r8-test-rules.pro b/ui/kotlinx-coroutines-android/r8-test-rules.pro
new file mode 100644
index 00000000..2e7fdd8e
--- /dev/null
+++ b/ui/kotlinx-coroutines-android/r8-test-rules.pro
@@ -0,0 +1,7 @@
+-include r8-test-common.pro
+
+# Ensure the custom, fast service loader implementation is removed. In the case of fast service
+# loader encountering an exception it falls back to regular ServiceLoader in a way that cannot be
+# optimized out by R8.
+-include resources/META-INF/com.android.tools/r8-from-1.6.0/coroutines.pro
+-checkdiscard class kotlinx.coroutines.internal.FastServiceLoader \ No newline at end of file
diff --git a/ui/kotlinx-coroutines-android/resources/META-INF/com.android.tools/proguard/coroutines.pro b/ui/kotlinx-coroutines-android/resources/META-INF/com.android.tools/proguard/coroutines.pro
new file mode 100644
index 00000000..c7cd15fe
--- /dev/null
+++ b/ui/kotlinx-coroutines-android/resources/META-INF/com.android.tools/proguard/coroutines.pro
@@ -0,0 +1,5 @@
+# When editing this file, update the following files as well:
+# - META-INF/com.android.tools/r8-upto-1.6.0/coroutines.pro
+# - META-INF/proguard/coroutines.pro
+
+-keep class kotlinx.coroutines.android.AndroidDispatcherFactory {*;}
diff --git a/ui/kotlinx-coroutines-android/resources/META-INF/com.android.tools/r8-from-1.6.0/coroutines.pro b/ui/kotlinx-coroutines-android/resources/META-INF/com.android.tools/r8-from-1.6.0/coroutines.pro
new file mode 100644
index 00000000..3c0b7e6a
--- /dev/null
+++ b/ui/kotlinx-coroutines-android/resources/META-INF/com.android.tools/r8-from-1.6.0/coroutines.pro
@@ -0,0 +1,6 @@
+# Allow R8 to optimize away the FastServiceLoader.
+# Together with ServiceLoader optimization in R8
+# this results in direct instantiation when loading Dispatchers.Main
+-assumenosideeffects class kotlinx.coroutines.internal.MainDispatcherLoader {
+ boolean FAST_SERVICE_LOADER_ENABLED return false;
+} \ No newline at end of file
diff --git a/ui/kotlinx-coroutines-android/resources/META-INF/com.android.tools/r8-upto-1.6.0/coroutines.pro b/ui/kotlinx-coroutines-android/resources/META-INF/com.android.tools/r8-upto-1.6.0/coroutines.pro
new file mode 100644
index 00000000..de1b70fc
--- /dev/null
+++ b/ui/kotlinx-coroutines-android/resources/META-INF/com.android.tools/r8-upto-1.6.0/coroutines.pro
@@ -0,0 +1,5 @@
+# When editing this file, update the following files as well:
+# - META-INF/com.android.tools/proguard/coroutines.pro
+# - META-INF/proguard/coroutines.pro
+
+-keep class kotlinx.coroutines.android.AndroidDispatcherFactory {*;}
diff --git a/ui/kotlinx-coroutines-android/resources/META-INF/proguard/coroutines.pro b/ui/kotlinx-coroutines-android/resources/META-INF/proguard/coroutines.pro
new file mode 100644
index 00000000..6c918d49
--- /dev/null
+++ b/ui/kotlinx-coroutines-android/resources/META-INF/proguard/coroutines.pro
@@ -0,0 +1,7 @@
+# Files in this directory will be ignored starting with Android Gradle Plugin 3.6.0+
+
+# When editing this file, update the following files as well for AGP 3.6.0+:
+# - META-INF/com.android.tools/proguard/coroutines.pro
+# - META-INF/com.android.tools/r8-upto-1.6.0/coroutines.pro
+
+-keep class kotlinx.coroutines.android.AndroidDispatcherFactory {*;}
diff --git a/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt b/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt
index f656b353..8d4cecb0 100644
--- a/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt
+++ b/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt
@@ -49,7 +49,6 @@ public sealed class HandlerDispatcher : MainCoroutineDispatcher(), Delay {
public abstract override val immediate: HandlerDispatcher
}
-@Keep
internal class AndroidDispatcherFactory : MainDispatcherFactory {
override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
diff --git a/ui/kotlinx-coroutines-android/test/R8ServiceLoaderOptimizationTest.kt b/ui/kotlinx-coroutines-android/test/R8ServiceLoaderOptimizationTest.kt
new file mode 100644
index 00000000..2d2281bd
--- /dev/null
+++ b/ui/kotlinx-coroutines-android/test/R8ServiceLoaderOptimizationTest.kt
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.android
+
+import org.jf.dexlib2.*
+import org.junit.Test
+import java.io.*
+import java.util.stream.*
+import kotlin.test.*
+
+class R8ServiceLoaderOptimizationTest {
+ private val r8Dex = File(System.getProperty("dexPath")!!).asDexFile()
+ private val r8DexNoOptim = File(System.getProperty("noOptimDexPath")!!).asDexFile()
+
+ @Test
+ fun noServiceLoaderCalls() {
+ val serviceLoaderInvocations = r8Dex.types.any {
+ it.type == "Ljava/util/ServiceLoader;"
+ }
+ assertEquals(
+ false,
+ serviceLoaderInvocations,
+ "References to the ServiceLoader class were found in the resulting DEX."
+ )
+ }
+
+ @Test
+ fun androidDispatcherIsKept() {
+ val hasAndroidDispatcher = r8DexNoOptim.classes.any {
+ it.type == "Lkotlinx/coroutines/android/AndroidDispatcherFactory;"
+ }
+
+ assertEquals(true, hasAndroidDispatcher)
+ }
+
+ @Test
+ fun noOptimRulesMatch() {
+ val paths = listOf(
+ "META-INF/com.android.tools/proguard/coroutines.pro",
+ "META-INF/proguard/coroutines.pro",
+ "META-INF/com.android.tools/r8-upto-1.6.0/coroutines.pro"
+ )
+ paths.associateWith { path ->
+ val ruleSet = javaClass.classLoader.getResourceAsStream(path)!!.bufferedReader().lines().filter { line ->
+ line.isNotBlank() && !line.startsWith("#")
+ }.collect(Collectors.toSet())
+ ruleSet
+ }.asSequence().reduce { acc, entry ->
+ assertEquals(
+ acc.value,
+ entry.value,
+ "Rule sets between ${acc.key} and ${entry.key} don't match."
+ )
+ entry
+ }
+ }
+}
+
+private fun File.asDexFile() = DexFileFactory.loadDexFile(this, null)