diff options
author | Kirk Shoop <kirk.shoop@gmail.com> | 2018-08-05 15:15:24 -0700 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@gmail.com> | 2018-08-05 16:40:01 -0700 |
commit | b3753b360072a32822c564e288782ed704c7494d (patch) | |
tree | 1937e4da27410eeccecce004529fdabf3343cf88 | |
parent | 1e9312fc7eba8e78719f68d2d6743cf8ecce0e85 (diff) | |
download | platform_external_Reactive-Extensions_RxCpp-b3753b360072a32822c564e288782ed704c7494d.tar.gz platform_external_Reactive-Extensions_RxCpp-b3753b360072a32822c564e288782ed704c7494d.tar.bz2 platform_external_Reactive-Extensions_RxCpp-b3753b360072a32822c564e288782ed704c7494d.zip |
fix blocking_observable::subscribe
removes spinning from blocking submit. ran all perf tests on osx without issue.
should fix #430 and help with #451
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 45 | ||||
-rw-r--r-- | Rx/v2/test/operators/merge_delay_error.cpp | 6 |
2 files changed, 10 insertions, 41 deletions
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 496db0b..037f375 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -173,28 +173,9 @@ class blocking_observable -> void { std::mutex lock; std::condition_variable wake; + bool disposed = false; std::exception_ptr error; - struct tracking - { - ~tracking() - { - if (!disposed || !wakened) std::terminate(); - } - tracking() - { - disposed = false; - wakened = false; - false_wakes = 0; - true_wakes = 0; - } - std::atomic_bool disposed; - std::atomic_bool wakened; - std::atomic_int false_wakes; - std::atomic_int true_wakes; - }; - auto track = std::make_shared<tracking>(); - auto dest = make_subscriber<T>(std::forward<ArgN>(an)...); // keep any error to rethrow at the end. @@ -213,31 +194,19 @@ class blocking_observable auto cs = scbr.get_subscription(); cs.add( - [&, track](){ - // OSX geting invalid x86 op if notify_one is after the disposed = true - // presumably because the condition_variable may already have been awakened - // and is now sitting in a while loop on disposed + [&](){ + std::unique_lock<std::mutex> guard(lock); wake.notify_one(); - track->disposed = true; + disposed = true; }); - std::unique_lock<std::mutex> guard(lock); source.subscribe(std::move(scbr)); + std::unique_lock<std::mutex> guard(lock); wake.wait(guard, - [&, track](){ - // this is really not good. - // false wakeups were never followed by true wakeups so.. - - // anyways this gets triggered before disposed is set now so wait. - while (!track->disposed) { - ++track->false_wakes; - } - ++track->true_wakes; - return true; + [&](){ + return disposed; }); - track->wakened = true; - if (!track->disposed || !track->wakened) std::terminate(); if (error) {std::rethrow_exception(error);} } diff --git a/Rx/v2/test/operators/merge_delay_error.cpp b/Rx/v2/test/operators/merge_delay_error.cpp index d560b45..b53b884 100644 --- a/Rx/v2/test/operators/merge_delay_error.cpp +++ b/Rx/v2/test/operators/merge_delay_error.cpp @@ -7,7 +7,7 @@ const int static_onnextcalls = 1000000; //merge_delay_error must work the very same way as `merge()` except the error handling -SCENARIO("merge completes", "[merge][join][operators]"){ +SCENARIO("merge_delay_error completes", "[merge][join][operators]"){ GIVEN("1 hot observable with 3 cold observables of ints."){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); @@ -117,7 +117,7 @@ SCENARIO("merge completes", "[merge][join][operators]"){ } } -SCENARIO("variadic merge completes with error", "[merge][join][operators]"){ +SCENARIO("variadic merge_delay_error completes with error", "[merge][join][operators]"){ GIVEN("1 hot observable with 3 cold observables of ints."){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); @@ -211,7 +211,7 @@ SCENARIO("variadic merge completes with error", "[merge][join][operators]"){ } } -SCENARIO("variadic merge completes with 2 errors", "[merge][join][operators]"){ +SCENARIO("variadic merge_delay_error completes with 2 errors", "[merge][join][operators]"){ GIVEN("1 hot observable with 3 cold observables of ints."){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); |