diff options
author | Grigoriy Chudnov <g.chudnov@gmail.com> | 2016-12-24 19:58:20 +0300 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2016-12-24 08:58:20 -0800 |
commit | 4ce03456ca6dadc387874d134d6d987f9fc48550 (patch) | |
tree | 9feb1e3e97ad5b050d7b73669521e186035044b3 /Rx/v2 | |
parent | bb0871143b4beeb1947a7cedfa78bf3d8259a283 (diff) | |
download | platform_external_Reactive-Extensions_RxCpp-4ce03456ca6dadc387874d134d6d987f9fc48550.tar.gz platform_external_Reactive-Extensions_RxCpp-4ce03456ca6dadc387874d134d6d987f9fc48550.tar.bz2 platform_external_Reactive-Extensions_RxCpp-4ce03456ca6dadc387874d134d6d987f9fc48550.zip |
decouple amb from observable (#300)
* decouple amb from observable
* refactoring to remove redundant dependencies
Diffstat (limited to 'Rx/v2')
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-amb.hpp | 135 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-includes.hpp | 1 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 118 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 8 | ||||
-rw-r--r-- | Rx/v2/test/operators/amb.cpp | 50 | ||||
-rw-r--r-- | Rx/v2/test/operators/amb_variadic.cpp | 43 |
6 files changed, 215 insertions, 140 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-amb.hpp b/Rx/v2/src/rxcpp/operators/rx-amb.hpp index c89e911..595dc4d 100644 --- a/Rx/v2/src/rxcpp/operators/rx-amb.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-amb.hpp @@ -2,6 +2,40 @@ #pragma once +/*! \file rx-amb.hpp + + \brief For each item from only the first of the given observables deliver from the new observable that is returned, on the specified scheduler. + + There are 2 variants of the operator: + - The source observable emits nested observables, one of the nested observables is selected. + - The source observable and the arguments v0...vn are used to provide the observables to select from. + + \tparam Coordination the type of the scheduler (optional). + \tparam Value0 ... (optional). + \tparam ValueN types of source observables (optional). + + \param cn the scheduler to synchronize sources from different contexts (optional). + \param v0 ... (optional). + \param vn source observables (optional). + + \return Observable that emits the same sequence as whichever of the source observables first emitted an item or sent a termination notification. + + If scheduler is omitted, identity_current_thread is used. + + \sample + \snippet amb.cpp threaded implicit amb sample + \snippet output.txt threaded implicit amb sample + + \snippet amb.cpp implicit amb sample + \snippet output.txt implicit amb sample + + \snippet amb.cpp amb sample + \snippet output.txt amb sample + + \snippet amb.cpp threaded amb sample + \snippet output.txt threaded amb sample +*/ + #if !defined(RXCPP_OPERATORS_RX_AMB_HPP) #define RXCPP_OPERATORS_RX_AMB_HPP @@ -13,6 +47,16 @@ namespace operators { namespace detail { +template<class... AN> +struct amb_invalid_arguments {}; + +template<class... AN> +struct amb_invalid : public rxo::operator_base<amb_invalid_arguments<AN...>> { + using type = observable<amb_invalid_arguments<AN...>, amb_invalid<AN...>>; +}; +template<class... AN> +using amb_invalid_t = typename amb_invalid<AN...>::type; + template<class T, class Observable, class Coordination> struct amb : public operator_base<rxu::value_type_t<T>> @@ -172,35 +216,82 @@ struct amb } }; -template<class Coordination> -class amb_factory -{ - typedef rxu::decay_t<Coordination> coordination_type; +} - coordination_type coordination; -public: - amb_factory(coordination_type sf) - : coordination(std::move(sf)) - { +/*! @copydoc rx-amb.hpp +*/ +template<class... AN> +auto amb(AN&&... an) + -> operator_factory<amb_tag, AN...> { + return operator_factory<amb_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); +} + +} + +template<> +struct member_overload<amb_tag> +{ + template<class Observable, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>>, + class SourceValue = rxu::value_type_t<Observable>, + class Amb = rxo::detail::amb<SourceValue, rxu::decay_t<Observable>, identity_one_worker>, + class Value = rxu::value_type_t<SourceValue>, + class Result = observable<Value, Amb> + > + static Result member(Observable&& o) { + return Result(Amb(std::forward<Observable>(o), identity_current_thread())); } - template<class Observable> - auto operator()(Observable source) - -> observable<rxu::value_type_t<amb<rxu::value_type_t<Observable>, Observable, Coordination>>, amb<rxu::value_type_t<Observable>, Observable, Coordination>> { - return observable<rxu::value_type_t<amb<rxu::value_type_t<Observable>, Observable, Coordination>>, amb<rxu::value_type_t<Observable>, Observable, Coordination>>( - amb<rxu::value_type_t<Observable>, Observable, Coordination>(std::move(source), coordination)); + template<class Observable, class Coordination, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>, + is_coordination<Coordination>>, + class SourceValue = rxu::value_type_t<Observable>, + class Amb = rxo::detail::amb<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>, + class Value = rxu::value_type_t<SourceValue>, + class Result = observable<Value, Amb> + > + static Result member(Observable&& o, Coordination&& cn) { + return Result(Amb(std::forward<Observable>(o), std::forward<Coordination>(cn))); } -}; -} + template<class Observable, class Value0, class... ValueN, + class Enabled = rxu::enable_if_all_true_type_t< + all_observables<Observable, Value0, ValueN...>>, + class EmittedValue = rxu::value_type_t<Observable>, + class SourceValue = observable<EmittedValue>, + class ObservableObservable = observable<SourceValue>, + class Amb = typename rxu::defer_type<rxo::detail::amb, SourceValue, ObservableObservable, identity_one_worker>::type, + class Value = rxu::value_type_t<Amb>, + class Result = observable<Value, Amb> + > + static Result member(Observable&& o, Value0&& v0, ValueN&&... vn) { + return Result(Amb(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread())); + } -template<class Coordination> -auto amb(Coordination&& sf) - -> detail::amb_factory<Coordination> { - return detail::amb_factory<Coordination>(std::forward<Coordination>(sf)); -} + template<class Observable, class Coordination, class Value0, class... ValueN, + class Enabled = rxu::enable_if_all_true_type_t< + all_observables<Observable, Value0, ValueN...>, + is_coordination<Coordination>>, + class EmittedValue = rxu::value_type_t<Observable>, + class SourceValue = observable<EmittedValue>, + class ObservableObservable = observable<SourceValue>, + class Amb = typename rxu::defer_type<rxo::detail::amb, SourceValue, ObservableObservable, rxu::decay_t<Coordination>>::type, + class Value = rxu::value_type_t<Amb>, + class Result = observable<Value, Amb> + > + static Result member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) { + return Result(Amb(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn))); + } -} + template<class... AN> + static operators::detail::amb_invalid_t<AN...> member(AN...) { + std::terminate(); + return {}; + static_assert(sizeof...(AN) == 10000, "amb takes (optional Coordination, optional Value0, optional ValueN...)"); + } +}; } diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp index fa0678f..07a160a 100644 --- a/Rx/v2/src/rxcpp/rx-includes.hpp +++ b/Rx/v2/src/rxcpp/rx-includes.hpp @@ -182,6 +182,7 @@ #include "rx-grouped_observable.hpp" #if !defined(RXCPP_LITE) +#include "operators/rx-amb.hpp" #include "operators/rx-all.hpp" #include "operators/rx-any.hpp" #include "operators/rx-combine_latest.hpp" diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 4524f23..e3d0e0f 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -1668,123 +1668,15 @@ public: return defer_merge_from<Coordination, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::move(cn)); } - /// \cond SHOW_SERVICE_MEMBERS - template<class Coordination> - struct defer_amb : public defer_observable< - is_observable<value_type>, - this_type, - rxo::detail::amb, value_type, observable<value_type>, Coordination> - { - }; - /// \endcond - - /*! For each item from only the first of the nested observables deliver from the new observable that is returned. - - \return Observable that emits the same sequence as whichever of the observables emitted from this observable that first emitted an item or sent a termination notification. - - \note All sources must be synchronized! This means that calls across all the subscribers must be serial. - - \sample - \snippet amb.cpp implicit amb sample - \snippet output.txt implicit amb sample - */ + /*! @copydoc rx-amb.hpp + */ template<class... AN> - auto amb(AN**...) const + auto amb(AN... an) const /// \cond SHOW_SERVICE_MEMBERS - -> typename defer_amb<identity_one_worker>::observable_type - /// \endcond - { - return defer_amb<identity_one_worker>::make(*this, *this, identity_current_thread()); - static_assert(sizeof...(AN) == 0, "amb() was passed too many arguments."); - } - - /*! For each item from only the first of the nested observables deliver from the new observable that is returned, on the specified scheduler. - - \tparam Coordination the type of the scheduler - - \param cn the scheduler to synchronize sources from different contexts. - - \return Observable that emits the same sequence as whichever of the observables emitted from this observable that first emitted an item or sent a termination notification. - - \sample - \snippet amb.cpp threaded implicit amb sample - \snippet output.txt threaded implicit amb sample - */ - template<class Coordination> - auto amb(Coordination cn) const - /// \cond SHOW_SERVICE_MEMBERS - -> typename std::enable_if< - defer_amb<Coordination>::value, - typename defer_amb<Coordination>::observable_type>::type - /// \endcond - { - return defer_amb<Coordination>::make(*this, *this, std::move(cn)); - } - - /// \cond SHOW_SERVICE_MEMBERS - template<class Coordination, class Value0> - struct defer_amb_from : public defer_observable< - rxu::all_true< - is_coordination<Coordination>::value, - is_observable<Value0>::value>, - this_type, - rxo::detail::amb, observable<value_type>, observable<observable<value_type>>, Coordination> - { - }; - /// \endcond - - /*! For each item from only the first of the given observables deliver from the new observable that is returned. - - \tparam Value0 ... - \tparam ValueN types of source observables - - \param v0 ... - \param vn source observables - - \return Observable that emits the same sequence as whichever of the source observables first emitted an item or sent a termination notification. - - \note All sources must be synchronized! This means that calls across all the subscribers must be serial. - - \sample - \snippet amb.cpp amb sample - \snippet output.txt amb sample - */ - template<class Value0, class... ValueN> - auto amb(Value0 v0, ValueN... vn) const - /// \cond SHOW_SERVICE_MEMBERS - -> typename std::enable_if< - defer_amb_from<identity_one_worker, Value0>::value, - typename defer_amb_from<identity_one_worker, Value0>::observable_type>::type - /// \endcond - { - return defer_amb_from<identity_one_worker, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread()); - } - - /*! For each item from only the first of the given observables deliver from the new observable that is returned, on the specified scheduler. - - \tparam Coordination the type of the scheduler - \tparam Value0 ... - \tparam ValueN types of source observables - - \param cn the scheduler to synchronize sources from different contexts. - \param v0 ... - \param vn source observables - - \return Observable that emits the same sequence as whichever of the source observables first emitted an item or sent a termination notification. - - \sample - \snippet amb.cpp threaded amb sample - \snippet output.txt threaded amb sample - */ - template<class Coordination, class Value0, class... ValueN> - auto amb(Coordination cn, Value0 v0, ValueN... vn) const - /// \cond SHOW_SERVICE_MEMBERS - -> typename std::enable_if< - defer_amb_from<Coordination, Value0>::value, - typename defer_amb_from<Coordination, Value0>::observable_type>::type + -> decltype(observable_member(amb_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) /// \endcond { - return defer_amb_from<Coordination, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::move(cn)); + return observable_member(amb_tag{}, *this, std::forward<AN>(an)...); } /*! For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index c96964e..2d02fe2 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -95,7 +95,6 @@ public: } -#include "operators/rx-amb.hpp" #include "operators/rx-buffer_count.hpp" #include "operators/rx-buffer_time.hpp" #include "operators/rx-buffer_time_count.hpp" @@ -134,6 +133,13 @@ public: namespace rxcpp { +struct amb_tag { + template<class Included> + struct include_header{ + static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-amb.hpp>"); + }; +}; + struct all_tag { template<class Included> struct include_header{ diff --git a/Rx/v2/test/operators/amb.cpp b/Rx/v2/test/operators/amb.cpp index 60bbf83..9d6297c 100644 --- a/Rx/v2/test/operators/amb.cpp +++ b/Rx/v2/test/operators/amb.cpp @@ -1,4 +1,5 @@ #include "../test.h" +#include "rxcpp/operators/rx-amb.hpp" SCENARIO("amb never 3", "[amb][join][operators]"){ GIVEN("1 cold observable with 3 hot observables of ints."){ @@ -31,9 +32,9 @@ SCENARIO("amb never 3", "[amb][join][operators]"){ auto res = w.start( [&]() { return xs - .amb() + | rxo::amb() // forget type to workaround lambda deduction bug on msvc 2013 - .as_dynamic(); + | rxo::as_dynamic(); } ); @@ -905,3 +906,48 @@ SCENARIO("amb source throws after selection", "[amb][join][operators]"){ } } } + +SCENARIO("amb never empty, custom coordination", "[amb][join][operators]"){ + GIVEN("1 cold observable with 2 hot observables of ints."){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<rx::observable<int>> o_on; + + auto ys1 = sc.make_hot_observable({ + on.next(100, 1) + }); + + auto ys2 = sc.make_hot_observable({ + on.next(110, 2), + on.completed(400) + }); + + auto xs = sc.make_cold_observable({ + o_on.next(100, ys1), + o_on.next(100, ys2), + o_on.completed(150) + }); + + WHEN("the first observable is selected to produce ints"){ + + auto res = w.start( + [&]() { + return xs + .amb(so) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains only complete message"){ + auto required = rxu::to_vector({ + on.completed(401) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + } + } +} diff --git a/Rx/v2/test/operators/amb_variadic.cpp b/Rx/v2/test/operators/amb_variadic.cpp index 14f898b..a188758 100644 --- a/Rx/v2/test/operators/amb_variadic.cpp +++ b/Rx/v2/test/operators/amb_variadic.cpp @@ -1,4 +1,5 @@ #include "../test.h" +#include "rxcpp/operators/rx-amb.hpp" SCENARIO("variadic amb never 3", "[amb][join][operators]"){ GIVEN("3 hot observables of ints."){ @@ -23,9 +24,9 @@ SCENARIO("variadic amb never 3", "[amb][join][operators]"){ auto res = w.start( [&]() { return ys1 - .amb(ys2, ys3) + | rxo::amb(ys2, ys3) // forget type to workaround lambda deduction bug on msvc 2013 - .as_dynamic(); + | rxo::as_dynamic(); } ); @@ -549,3 +550,41 @@ SCENARIO("variadic amb loser&owner throws", "[amb][join][operators]"){ } } } + +SCENARIO("variadic amb never empty, custom coordination", "[amb][join][operators]"){ + GIVEN("2 hot observables of ints."){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto ys1 = sc.make_hot_observable({ + on.next(100, 1) + }); + + auto ys2 = sc.make_hot_observable({ + on.next(110, 2), + on.completed(400) + }); + + WHEN("the first observable is selected to produce ints"){ + + auto res = w.start( + [&]() { + return ys1 + .amb(so, ys2) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains only complete message"){ + auto required = rxu::to_vector({ + on.completed(401) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + } + } +} |