aboutsummaryrefslogtreecommitdiffstats
path: root/Rx/v2/src
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2017-01-12 23:09:27 +0300
committerKirk Shoop <kirk.shoop@microsoft.com>2017-01-12 15:51:28 -0800
commitdd1702ba6e6ff5ed622d44964f11f938ca5e23a4 (patch)
tree377150870dd687008bd15e74ff2c674d71586714 /Rx/v2/src
parentbbd51e36354e9f159e4408996014e0577f656431 (diff)
downloadplatform_external_Reactive-Extensions_RxCpp-dd1702ba6e6ff5ed622d44964f11f938ca5e23a4.tar.gz
platform_external_Reactive-Extensions_RxCpp-dd1702ba6e6ff5ed622d44964f11f938ca5e23a4.tar.bz2
platform_external_Reactive-Extensions_RxCpp-dd1702ba6e6ff5ed622d44964f11f938ca5e23a4.zip
decouple concat from observable
Diffstat (limited to 'Rx/v2/src')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-concat.hpp154
-rw-r--r--Rx/v2/src/rxcpp/rx-includes.hpp1
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp122
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp8
4 files changed, 127 insertions, 158 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-concat.hpp b/Rx/v2/src/rxcpp/operators/rx-concat.hpp
index f6d637e..57b42fa 100644
--- a/Rx/v2/src/rxcpp/operators/rx-concat.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-concat.hpp
@@ -2,6 +2,42 @@
#pragma once
+/*! \file rx-concat.hpp
+
+ \brief For each item from this observable subscribe to one at a time, in the order received.
+ For each item from all of the given observables deliver from the new observable that is returned.
+
+ There are 2 variants of the operator:
+ - The source observable emits nested observables, nested observables are concatenated.
+ - The source observable and the arguments v0...vn are used to provide the observables to concatenate.
+
+ \tparam Coordination the type of the scheduler (optional).
+ \tparam Value0 ... (optional).
+ \tparam ValueN types of source observables (optional).
+
+ \param cn the scheduler to synchronize sources from different contexts (optional).
+ \param v0 ... (optional).
+ \param vn source observables (optional).
+
+ \return Observable that emits the items emitted by each of the Observables emitted by the source observable, one after the other, without interleaving them.
+
+ \sample
+ \snippet concat.cpp implicit concat sample
+ \snippet output.txt implicit concat sample
+
+ \sample
+ \snippet concat.cpp threaded implicit concat sample
+ \snippet output.txt threaded implicit concat sample
+
+ \sample
+ \snippet concat.cpp concat sample
+ \snippet output.txt concat sample
+
+ \sample
+ \snippet concat.cpp threaded concat sample
+ \snippet output.txt threaded concat sample
+*/
+
#if !defined(RXCPP_OPERATORS_RX_CONCAT_HPP)
#define RXCPP_OPERATORS_RX_CONCAT_HPP
@@ -13,6 +49,16 @@ namespace operators {
namespace detail {
+template<class... AN>
+struct concat_invalid_arguments {};
+
+template<class... AN>
+struct concat_invalid : public rxo::operator_base<concat_invalid_arguments<AN...>> {
+ using type = observable<concat_invalid_arguments<AN...>, concat_invalid<AN...>>;
+};
+template<class... AN>
+using concat_invalid_t = typename concat_invalid<AN...>::type;
+
template<class T, class Observable, class Coordination>
struct concat
: public operator_base<rxu::value_type_t<rxu::decay_t<T>>>
@@ -181,54 +227,82 @@ struct concat
}
};
-template<class Coordination>
-class concat_factory
-{
- typedef rxu::decay_t<Coordination> coordination_type;
-
- coordination_type coordination;
-public:
- concat_factory(coordination_type sf)
- : coordination(std::move(sf))
- {
- }
+}
- template<class Observable>
- auto operator()(Observable source)
- -> observable<rxu::value_type_t<concat<rxu::value_type_t<Observable>, Observable, Coordination>>, concat<rxu::value_type_t<Observable>, Observable, Coordination>> {
- return observable<rxu::value_type_t<concat<rxu::value_type_t<Observable>, Observable, Coordination>>, concat<rxu::value_type_t<Observable>, Observable, Coordination>>(
- concat<rxu::value_type_t<Observable>, Observable, Coordination>(std::move(source), coordination));
- }
-};
+/*! @copydoc rx-concat.hpp
+*/
+template<class... AN>
+auto concat(AN&&... an)
+ -> operator_factory<concat_tag, AN...> {
+ return operator_factory<concat_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
+}
}
-inline auto concat()
- -> detail::concat_factory<identity_one_worker> {
- return detail::concat_factory<identity_one_worker>(identity_current_thread());
-}
+template<>
+struct member_overload<concat_tag>
+{
+ template<class Observable,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class Concat = rxo::detail::concat<SourceValue, rxu::decay_t<Observable>, identity_one_worker>,
+ class Value = rxu::value_type_t<SourceValue>,
+ class Result = observable<Value, Concat>
+ >
+ static Result member(Observable&& o) {
+ return Result(Concat(std::forward<Observable>(o), identity_current_thread()));
+ }
-template<class Coordination, class Check = typename std::enable_if<is_coordination<Coordination>::value>::type>
-auto concat(Coordination&& sf)
- -> detail::concat_factory<Coordination> {
- return detail::concat_factory<Coordination>(std::forward<Coordination>(sf));
-}
+ template<class Observable, class Coordination,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>,
+ is_coordination<Coordination>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class Concat = rxo::detail::concat<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
+ class Value = rxu::value_type_t<SourceValue>,
+ class Result = observable<Value, Concat>
+ >
+ static Result member(Observable&& o, Coordination&& cn) {
+ return Result(Concat(std::forward<Observable>(o), std::forward<Coordination>(cn)));
+ }
-template<class O0, class... ON, class Check = typename std::enable_if<is_observable<O0>::value>::type>
-auto concat(O0&& o0, ON&&... on)
- -> detail::concat_factory<identity_one_worker> {
- return detail::concat_factory<identity_one_worker>(identity_current_thread())(from(std::forward<O0>(o0), std::forward<ON>(on)...));
-}
+ template<class Observable, class Value0, class... ValueN,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ all_observables<Observable, Value0, ValueN...>>,
+ class EmittedValue = rxu::value_type_t<Observable>,
+ class SourceValue = observable<EmittedValue>,
+ class ObservableObservable = observable<SourceValue>,
+ class Concat = typename rxu::defer_type<rxo::detail::concat, SourceValue, ObservableObservable, identity_one_worker>::type,
+ class Value = rxu::value_type_t<Concat>,
+ class Result = observable<Value, Concat>
+ >
+ static Result member(Observable&& o, Value0&& v0, ValueN&&... vn) {
+ return Result(Concat(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread()));
+ }
-template<class Coordination, class O0, class... ON,
- class CheckC = typename std::enable_if<is_coordination<Coordination>::value>::type,
- class CheckO = typename std::enable_if<is_observable<O0>::value>::type>
-auto concat(Coordination&& sf, O0&& o0, ON&&... on)
- -> detail::concat_factory<Coordination> {
- return detail::concat_factory<Coordination>(std::forward<Coordination>(sf))(from(std::forward<O0>(o0), std::forward<ON>(on)...));
-}
+ template<class Observable, class Coordination, class Value0, class... ValueN,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ all_observables<Observable, Value0, ValueN...>,
+ is_coordination<Coordination>>,
+ class EmittedValue = rxu::value_type_t<Observable>,
+ class SourceValue = observable<EmittedValue>,
+ class ObservableObservable = observable<SourceValue>,
+ class Concat = typename rxu::defer_type<rxo::detail::concat, SourceValue, ObservableObservable, rxu::decay_t<Coordination>>::type,
+ class Value = rxu::value_type_t<Concat>,
+ class Result = observable<Value, Concat>
+ >
+ static Result member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) {
+ return Result(Concat(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn)));
+ }
-}
+ template<class... AN>
+ static operators::detail::concat_invalid_t<AN...> member(AN...) {
+ std::terminate();
+ return {};
+ static_assert(sizeof...(AN) == 10000, "concat takes (optional Coordination, optional Value0, optional ValueN...)");
+ }
+};
}
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp
index 84561e6..7fbdf83 100644
--- a/Rx/v2/src/rxcpp/rx-includes.hpp
+++ b/Rx/v2/src/rxcpp/rx-includes.hpp
@@ -189,6 +189,7 @@
#include "operators/rx-buffer_time.hpp"
#include "operators/rx-buffer_time_count.hpp"
#include "operators/rx-combine_latest.hpp"
+#include "operators/rx-concat.hpp"
#include "operators/rx-debounce.hpp"
#include "operators/rx-delay.hpp"
#include "operators/rx-distinct.hpp"
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 52acb15..e77068c 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -1205,127 +1205,15 @@ public:
rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, Coordination>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(cn)));
}
- /// \cond SHOW_SERVICE_MEMBERS
- template<class Coordination>
- struct defer_concat : public defer_observable<
- is_observable<value_type>,
- this_type,
- rxo::detail::concat, value_type, observable<value_type>, Coordination>
- {
- };
- /// \endcond
-
- /*! For each item from this observable subscribe to one at a time, in the order received.
- For each item from all of the nested observables deliver from the new observable that is returned.
-
- \return Observable that emits the items emitted by each of the Observables emitted by the source observable, one after the other, without interleaving them.
-
- \note All sources must be synchronized! This means that calls across all the subscribers must be serial.
-
- \sample
- \snippet concat.cpp implicit concat sample
- \snippet output.txt implicit concat sample
- */
+ /*! @copydoc rx-concat.hpp
+ */
template<class... AN>
- auto concat(AN**...) const
+ auto concat(AN... an) const
/// \cond SHOW_SERVICE_MEMBERS
- -> typename defer_concat<identity_one_worker>::observable_type
- /// \endcond
- {
- return defer_concat<identity_one_worker>::make(*this, *this, identity_current_thread());
- static_assert(sizeof...(AN) == 0, "concat() was passed too many arguments.");
- }
-
- /*! For each item from this observable subscribe to one at a time, in the order received.
- For each item from all of the nested observables deliver from the new observable that is returned.
-
- \tparam Coordination the type of the scheduler
-
- \param cn the scheduler to synchronize sources from different contexts.
-
- \return Observable that emits the items emitted by each of the Observables emitted by the source observable, one after the other, without interleaving them.
-
- \note All sources must be synchronized! This means that calls across all the subscribers must be serial.
-
- \sample
- \snippet concat.cpp threaded implicit concat sample
- \snippet output.txt threaded implicit concat sample
- */
- template<class Coordination>
- auto concat(Coordination cn) const
- /// \cond SHOW_SERVICE_MEMBERS
- -> typename std::enable_if<
- defer_concat<Coordination>::value,
- typename defer_concat<Coordination>::observable_type>::type
- /// \endcond
- {
- return defer_concat<Coordination>::make(*this, *this, std::move(cn));
- }
-
- /// \cond SHOW_SERVICE_MEMBERS
- template<class Coordination, class Value0>
- struct defer_concat_from : public defer_observable<
- rxu::all_true<
- is_coordination<Coordination>::value,
- is_observable<Value0>::value>,
- this_type,
- rxo::detail::concat, observable<value_type>, observable<observable<value_type>>, Coordination>
- {
- };
- /// \endcond
-
- /*! For each given observable subscribe to one at a time, in the order received.
- For each emitted item deliver from the new observable that is returned.
-
- \tparam Value0 ...
- \tparam ValueN types of source observables
-
- \param v0 ...
- \param vn source observables
-
- \return Observable that emits items emitted by the source observables, one after the other, without interleaving them.
-
- \sample
- \snippet concat.cpp concat sample
- \snippet output.txt concat sample
- */
- template<class Value0, class... ValueN>
- auto concat(Value0 v0, ValueN... vn) const
- /// \cond SHOW_SERVICE_MEMBERS
- -> typename std::enable_if<
- defer_concat_from<identity_one_worker, Value0>::value,
- typename defer_concat_from<identity_one_worker, Value0>::observable_type>::type
- /// \endcond
- {
- return defer_concat_from<identity_one_worker, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread());
- }
-
- /*! For each given observable subscribe to one at a time, in the order received.
- For each emitted item deliver from the new observable that is returned.
-
- \tparam Coordination the type of the scheduler
- \tparam Value0 ...
- \tparam ValueN types of source observables
-
- \param cn the scheduler to synchronize sources from different contexts.
- \param v0 ...
- \param vn source observables
-
- \return Observable that emits items emitted by the source observables, one after the other, without interleaving them.
-
- \sample
- \snippet concat.cpp threaded concat sample
- \snippet output.txt threaded concat sample
- */
- template<class Coordination, class Value0, class... ValueN>
- auto concat(Coordination cn, Value0 v0, ValueN... vn) const
- /// \cond SHOW_SERVICE_MEMBERS
- -> typename std::enable_if<
- defer_concat_from<Coordination, Value0>::value,
- typename defer_concat_from<Coordination, Value0>::observable_type>::type
+ -> decltype(observable_member(concat_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
/// \endcond
{
- return defer_concat_from<Coordination, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::move(cn));
+ return observable_member(concat_tag{}, *this, std::forward<AN>(an)...);
}
/*! For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable.
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index bee493c..2aaa935 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -95,7 +95,6 @@ public:
}
-#include "operators/rx-concat.hpp"
#include "operators/rx-concat_map.hpp"
#include "operators/rx-connect_forever.hpp"
#include "operators/rx-flat_map.hpp"
@@ -168,6 +167,13 @@ struct combine_latest_tag {
};
};
+struct concat_tag {
+ template<class Included>
+ struct include_header{
+ static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-concat.hpp>");
+ };
+};
+
struct debounce_tag {
template<class Included>
struct include_header{