aboutsummaryrefslogtreecommitdiffstats
path: root/Rx/v2
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2016-12-24 19:58:20 +0300
committerKirk Shoop <kirk.shoop@microsoft.com>2016-12-24 08:58:20 -0800
commit4ce03456ca6dadc387874d134d6d987f9fc48550 (patch)
tree9feb1e3e97ad5b050d7b73669521e186035044b3 /Rx/v2
parentbb0871143b4beeb1947a7cedfa78bf3d8259a283 (diff)
downloadplatform_external_Reactive-Extensions_RxCpp-4ce03456ca6dadc387874d134d6d987f9fc48550.tar.gz
platform_external_Reactive-Extensions_RxCpp-4ce03456ca6dadc387874d134d6d987f9fc48550.tar.bz2
platform_external_Reactive-Extensions_RxCpp-4ce03456ca6dadc387874d134d6d987f9fc48550.zip
decouple amb from observable (#300)
* decouple amb from observable * refactoring to remove redundant dependencies
Diffstat (limited to 'Rx/v2')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-amb.hpp135
-rw-r--r--Rx/v2/src/rxcpp/rx-includes.hpp1
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp118
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp8
-rw-r--r--Rx/v2/test/operators/amb.cpp50
-rw-r--r--Rx/v2/test/operators/amb_variadic.cpp43
6 files changed, 215 insertions, 140 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-amb.hpp b/Rx/v2/src/rxcpp/operators/rx-amb.hpp
index c89e911..595dc4d 100644
--- a/Rx/v2/src/rxcpp/operators/rx-amb.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-amb.hpp
@@ -2,6 +2,40 @@
#pragma once
+/*! \file rx-amb.hpp
+
+ \brief For each item from only the first of the given observables deliver from the new observable that is returned, on the specified scheduler.
+
+ There are 2 variants of the operator:
+ - The source observable emits nested observables, one of the nested observables is selected.
+ - The source observable and the arguments v0...vn are used to provide the observables to select from.
+
+ \tparam Coordination the type of the scheduler (optional).
+ \tparam Value0 ... (optional).
+ \tparam ValueN types of source observables (optional).
+
+ \param cn the scheduler to synchronize sources from different contexts (optional).
+ \param v0 ... (optional).
+ \param vn source observables (optional).
+
+ \return Observable that emits the same sequence as whichever of the source observables first emitted an item or sent a termination notification.
+
+ If scheduler is omitted, identity_current_thread is used.
+
+ \sample
+ \snippet amb.cpp threaded implicit amb sample
+ \snippet output.txt threaded implicit amb sample
+
+ \snippet amb.cpp implicit amb sample
+ \snippet output.txt implicit amb sample
+
+ \snippet amb.cpp amb sample
+ \snippet output.txt amb sample
+
+ \snippet amb.cpp threaded amb sample
+ \snippet output.txt threaded amb sample
+*/
+
#if !defined(RXCPP_OPERATORS_RX_AMB_HPP)
#define RXCPP_OPERATORS_RX_AMB_HPP
@@ -13,6 +47,16 @@ namespace operators {
namespace detail {
+template<class... AN>
+struct amb_invalid_arguments {};
+
+template<class... AN>
+struct amb_invalid : public rxo::operator_base<amb_invalid_arguments<AN...>> {
+ using type = observable<amb_invalid_arguments<AN...>, amb_invalid<AN...>>;
+};
+template<class... AN>
+using amb_invalid_t = typename amb_invalid<AN...>::type;
+
template<class T, class Observable, class Coordination>
struct amb
: public operator_base<rxu::value_type_t<T>>
@@ -172,35 +216,82 @@ struct amb
}
};
-template<class Coordination>
-class amb_factory
-{
- typedef rxu::decay_t<Coordination> coordination_type;
+}
- coordination_type coordination;
-public:
- amb_factory(coordination_type sf)
- : coordination(std::move(sf))
- {
+/*! @copydoc rx-amb.hpp
+*/
+template<class... AN>
+auto amb(AN&&... an)
+ -> operator_factory<amb_tag, AN...> {
+ return operator_factory<amb_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
+}
+
+}
+
+template<>
+struct member_overload<amb_tag>
+{
+ template<class Observable,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class Amb = rxo::detail::amb<SourceValue, rxu::decay_t<Observable>, identity_one_worker>,
+ class Value = rxu::value_type_t<SourceValue>,
+ class Result = observable<Value, Amb>
+ >
+ static Result member(Observable&& o) {
+ return Result(Amb(std::forward<Observable>(o), identity_current_thread()));
}
- template<class Observable>
- auto operator()(Observable source)
- -> observable<rxu::value_type_t<amb<rxu::value_type_t<Observable>, Observable, Coordination>>, amb<rxu::value_type_t<Observable>, Observable, Coordination>> {
- return observable<rxu::value_type_t<amb<rxu::value_type_t<Observable>, Observable, Coordination>>, amb<rxu::value_type_t<Observable>, Observable, Coordination>>(
- amb<rxu::value_type_t<Observable>, Observable, Coordination>(std::move(source), coordination));
+ template<class Observable, class Coordination,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>,
+ is_coordination<Coordination>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class Amb = rxo::detail::amb<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
+ class Value = rxu::value_type_t<SourceValue>,
+ class Result = observable<Value, Amb>
+ >
+ static Result member(Observable&& o, Coordination&& cn) {
+ return Result(Amb(std::forward<Observable>(o), std::forward<Coordination>(cn)));
}
-};
-}
+ template<class Observable, class Value0, class... ValueN,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ all_observables<Observable, Value0, ValueN...>>,
+ class EmittedValue = rxu::value_type_t<Observable>,
+ class SourceValue = observable<EmittedValue>,
+ class ObservableObservable = observable<SourceValue>,
+ class Amb = typename rxu::defer_type<rxo::detail::amb, SourceValue, ObservableObservable, identity_one_worker>::type,
+ class Value = rxu::value_type_t<Amb>,
+ class Result = observable<Value, Amb>
+ >
+ static Result member(Observable&& o, Value0&& v0, ValueN&&... vn) {
+ return Result(Amb(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread()));
+ }
-template<class Coordination>
-auto amb(Coordination&& sf)
- -> detail::amb_factory<Coordination> {
- return detail::amb_factory<Coordination>(std::forward<Coordination>(sf));
-}
+ template<class Observable, class Coordination, class Value0, class... ValueN,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ all_observables<Observable, Value0, ValueN...>,
+ is_coordination<Coordination>>,
+ class EmittedValue = rxu::value_type_t<Observable>,
+ class SourceValue = observable<EmittedValue>,
+ class ObservableObservable = observable<SourceValue>,
+ class Amb = typename rxu::defer_type<rxo::detail::amb, SourceValue, ObservableObservable, rxu::decay_t<Coordination>>::type,
+ class Value = rxu::value_type_t<Amb>,
+ class Result = observable<Value, Amb>
+ >
+ static Result member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) {
+ return Result(Amb(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn)));
+ }
-}
+ template<class... AN>
+ static operators::detail::amb_invalid_t<AN...> member(AN...) {
+ std::terminate();
+ return {};
+ static_assert(sizeof...(AN) == 10000, "amb takes (optional Coordination, optional Value0, optional ValueN...)");
+ }
+};
}
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp
index fa0678f..07a160a 100644
--- a/Rx/v2/src/rxcpp/rx-includes.hpp
+++ b/Rx/v2/src/rxcpp/rx-includes.hpp
@@ -182,6 +182,7 @@
#include "rx-grouped_observable.hpp"
#if !defined(RXCPP_LITE)
+#include "operators/rx-amb.hpp"
#include "operators/rx-all.hpp"
#include "operators/rx-any.hpp"
#include "operators/rx-combine_latest.hpp"
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 4524f23..e3d0e0f 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -1668,123 +1668,15 @@ public:
return defer_merge_from<Coordination, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::move(cn));
}
- /// \cond SHOW_SERVICE_MEMBERS
- template<class Coordination>
- struct defer_amb : public defer_observable<
- is_observable<value_type>,
- this_type,
- rxo::detail::amb, value_type, observable<value_type>, Coordination>
- {
- };
- /// \endcond
-
- /*! For each item from only the first of the nested observables deliver from the new observable that is returned.
-
- \return Observable that emits the same sequence as whichever of the observables emitted from this observable that first emitted an item or sent a termination notification.
-
- \note All sources must be synchronized! This means that calls across all the subscribers must be serial.
-
- \sample
- \snippet amb.cpp implicit amb sample
- \snippet output.txt implicit amb sample
- */
+ /*! @copydoc rx-amb.hpp
+ */
template<class... AN>
- auto amb(AN**...) const
+ auto amb(AN... an) const
/// \cond SHOW_SERVICE_MEMBERS
- -> typename defer_amb<identity_one_worker>::observable_type
- /// \endcond
- {
- return defer_amb<identity_one_worker>::make(*this, *this, identity_current_thread());
- static_assert(sizeof...(AN) == 0, "amb() was passed too many arguments.");
- }
-
- /*! For each item from only the first of the nested observables deliver from the new observable that is returned, on the specified scheduler.
-
- \tparam Coordination the type of the scheduler
-
- \param cn the scheduler to synchronize sources from different contexts.
-
- \return Observable that emits the same sequence as whichever of the observables emitted from this observable that first emitted an item or sent a termination notification.
-
- \sample
- \snippet amb.cpp threaded implicit amb sample
- \snippet output.txt threaded implicit amb sample
- */
- template<class Coordination>
- auto amb(Coordination cn) const
- /// \cond SHOW_SERVICE_MEMBERS
- -> typename std::enable_if<
- defer_amb<Coordination>::value,
- typename defer_amb<Coordination>::observable_type>::type
- /// \endcond
- {
- return defer_amb<Coordination>::make(*this, *this, std::move(cn));
- }
-
- /// \cond SHOW_SERVICE_MEMBERS
- template<class Coordination, class Value0>
- struct defer_amb_from : public defer_observable<
- rxu::all_true<
- is_coordination<Coordination>::value,
- is_observable<Value0>::value>,
- this_type,
- rxo::detail::amb, observable<value_type>, observable<observable<value_type>>, Coordination>
- {
- };
- /// \endcond
-
- /*! For each item from only the first of the given observables deliver from the new observable that is returned.
-
- \tparam Value0 ...
- \tparam ValueN types of source observables
-
- \param v0 ...
- \param vn source observables
-
- \return Observable that emits the same sequence as whichever of the source observables first emitted an item or sent a termination notification.
-
- \note All sources must be synchronized! This means that calls across all the subscribers must be serial.
-
- \sample
- \snippet amb.cpp amb sample
- \snippet output.txt amb sample
- */
- template<class Value0, class... ValueN>
- auto amb(Value0 v0, ValueN... vn) const
- /// \cond SHOW_SERVICE_MEMBERS
- -> typename std::enable_if<
- defer_amb_from<identity_one_worker, Value0>::value,
- typename defer_amb_from<identity_one_worker, Value0>::observable_type>::type
- /// \endcond
- {
- return defer_amb_from<identity_one_worker, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread());
- }
-
- /*! For each item from only the first of the given observables deliver from the new observable that is returned, on the specified scheduler.
-
- \tparam Coordination the type of the scheduler
- \tparam Value0 ...
- \tparam ValueN types of source observables
-
- \param cn the scheduler to synchronize sources from different contexts.
- \param v0 ...
- \param vn source observables
-
- \return Observable that emits the same sequence as whichever of the source observables first emitted an item or sent a termination notification.
-
- \sample
- \snippet amb.cpp threaded amb sample
- \snippet output.txt threaded amb sample
- */
- template<class Coordination, class Value0, class... ValueN>
- auto amb(Coordination cn, Value0 v0, ValueN... vn) const
- /// \cond SHOW_SERVICE_MEMBERS
- -> typename std::enable_if<
- defer_amb_from<Coordination, Value0>::value,
- typename defer_amb_from<Coordination, Value0>::observable_type>::type
+ -> decltype(observable_member(amb_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
/// \endcond
{
- return defer_amb_from<Coordination, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::move(cn));
+ return observable_member(amb_tag{}, *this, std::forward<AN>(an)...);
}
/*! For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable.
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index c96964e..2d02fe2 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -95,7 +95,6 @@ public:
}
-#include "operators/rx-amb.hpp"
#include "operators/rx-buffer_count.hpp"
#include "operators/rx-buffer_time.hpp"
#include "operators/rx-buffer_time_count.hpp"
@@ -134,6 +133,13 @@ public:
namespace rxcpp {
+struct amb_tag {
+ template<class Included>
+ struct include_header{
+ static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-amb.hpp>");
+ };
+};
+
struct all_tag {
template<class Included>
struct include_header{
diff --git a/Rx/v2/test/operators/amb.cpp b/Rx/v2/test/operators/amb.cpp
index 60bbf83..9d6297c 100644
--- a/Rx/v2/test/operators/amb.cpp
+++ b/Rx/v2/test/operators/amb.cpp
@@ -1,4 +1,5 @@
#include "../test.h"
+#include "rxcpp/operators/rx-amb.hpp"
SCENARIO("amb never 3", "[amb][join][operators]"){
GIVEN("1 cold observable with 3 hot observables of ints."){
@@ -31,9 +32,9 @@ SCENARIO("amb never 3", "[amb][join][operators]"){
auto res = w.start(
[&]() {
return xs
- .amb()
+ | rxo::amb()
// forget type to workaround lambda deduction bug on msvc 2013
- .as_dynamic();
+ | rxo::as_dynamic();
}
);
@@ -905,3 +906,48 @@ SCENARIO("amb source throws after selection", "[amb][join][operators]"){
}
}
}
+
+SCENARIO("amb never empty, custom coordination", "[amb][join][operators]"){
+ GIVEN("1 cold observable with 2 hot observables of ints."){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<rx::observable<int>> o_on;
+
+ auto ys1 = sc.make_hot_observable({
+ on.next(100, 1)
+ });
+
+ auto ys2 = sc.make_hot_observable({
+ on.next(110, 2),
+ on.completed(400)
+ });
+
+ auto xs = sc.make_cold_observable({
+ o_on.next(100, ys1),
+ o_on.next(100, ys2),
+ o_on.completed(150)
+ });
+
+ WHEN("the first observable is selected to produce ints"){
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ .amb(so)
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains only complete message"){
+ auto required = rxu::to_vector({
+ on.completed(401)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
diff --git a/Rx/v2/test/operators/amb_variadic.cpp b/Rx/v2/test/operators/amb_variadic.cpp
index 14f898b..a188758 100644
--- a/Rx/v2/test/operators/amb_variadic.cpp
+++ b/Rx/v2/test/operators/amb_variadic.cpp
@@ -1,4 +1,5 @@
#include "../test.h"
+#include "rxcpp/operators/rx-amb.hpp"
SCENARIO("variadic amb never 3", "[amb][join][operators]"){
GIVEN("3 hot observables of ints."){
@@ -23,9 +24,9 @@ SCENARIO("variadic amb never 3", "[amb][join][operators]"){
auto res = w.start(
[&]() {
return ys1
- .amb(ys2, ys3)
+ | rxo::amb(ys2, ys3)
// forget type to workaround lambda deduction bug on msvc 2013
- .as_dynamic();
+ | rxo::as_dynamic();
}
);
@@ -549,3 +550,41 @@ SCENARIO("variadic amb loser&owner throws", "[amb][join][operators]"){
}
}
}
+
+SCENARIO("variadic amb never empty, custom coordination", "[amb][join][operators]"){
+ GIVEN("2 hot observables of ints."){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto ys1 = sc.make_hot_observable({
+ on.next(100, 1)
+ });
+
+ auto ys2 = sc.make_hot_observable({
+ on.next(110, 2),
+ on.completed(400)
+ });
+
+ WHEN("the first observable is selected to produce ints"){
+
+ auto res = w.start(
+ [&]() {
+ return ys1
+ .amb(so, ys2)
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains only complete message"){
+ auto required = rxu::to_vector({
+ on.completed(401)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}