diff options
author | Grigoriy Chudnov <g.chudnov@gmail.com> | 2017-01-12 23:09:27 +0300 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2017-01-12 15:51:28 -0800 |
commit | dd1702ba6e6ff5ed622d44964f11f938ca5e23a4 (patch) | |
tree | 377150870dd687008bd15e74ff2c674d71586714 /Rx/v2/src | |
parent | bbd51e36354e9f159e4408996014e0577f656431 (diff) | |
download | platform_external_Reactive-Extensions_RxCpp-dd1702ba6e6ff5ed622d44964f11f938ca5e23a4.tar.gz platform_external_Reactive-Extensions_RxCpp-dd1702ba6e6ff5ed622d44964f11f938ca5e23a4.tar.bz2 platform_external_Reactive-Extensions_RxCpp-dd1702ba6e6ff5ed622d44964f11f938ca5e23a4.zip |
decouple concat from observable
Diffstat (limited to 'Rx/v2/src')
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-concat.hpp | 154 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-includes.hpp | 1 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 122 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 8 |
4 files changed, 127 insertions, 158 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-concat.hpp b/Rx/v2/src/rxcpp/operators/rx-concat.hpp index f6d637e..57b42fa 100644 --- a/Rx/v2/src/rxcpp/operators/rx-concat.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-concat.hpp @@ -2,6 +2,42 @@ #pragma once +/*! \file rx-concat.hpp + + \brief For each item from this observable subscribe to one at a time, in the order received. + For each item from all of the given observables deliver from the new observable that is returned. + + There are 2 variants of the operator: + - The source observable emits nested observables, nested observables are concatenated. + - The source observable and the arguments v0...vn are used to provide the observables to concatenate. + + \tparam Coordination the type of the scheduler (optional). + \tparam Value0 ... (optional). + \tparam ValueN types of source observables (optional). + + \param cn the scheduler to synchronize sources from different contexts (optional). + \param v0 ... (optional). + \param vn source observables (optional). + + \return Observable that emits the items emitted by each of the Observables emitted by the source observable, one after the other, without interleaving them. + + \sample + \snippet concat.cpp implicit concat sample + \snippet output.txt implicit concat sample + + \sample + \snippet concat.cpp threaded implicit concat sample + \snippet output.txt threaded implicit concat sample + + \sample + \snippet concat.cpp concat sample + \snippet output.txt concat sample + + \sample + \snippet concat.cpp threaded concat sample + \snippet output.txt threaded concat sample +*/ + #if !defined(RXCPP_OPERATORS_RX_CONCAT_HPP) #define RXCPP_OPERATORS_RX_CONCAT_HPP @@ -13,6 +49,16 @@ namespace operators { namespace detail { +template<class... AN> +struct concat_invalid_arguments {}; + +template<class... AN> +struct concat_invalid : public rxo::operator_base<concat_invalid_arguments<AN...>> { + using type = observable<concat_invalid_arguments<AN...>, concat_invalid<AN...>>; +}; +template<class... AN> +using concat_invalid_t = typename concat_invalid<AN...>::type; + template<class T, class Observable, class Coordination> struct concat : public operator_base<rxu::value_type_t<rxu::decay_t<T>>> @@ -181,54 +227,82 @@ struct concat } }; -template<class Coordination> -class concat_factory -{ - typedef rxu::decay_t<Coordination> coordination_type; - - coordination_type coordination; -public: - concat_factory(coordination_type sf) - : coordination(std::move(sf)) - { - } +} - template<class Observable> - auto operator()(Observable source) - -> observable<rxu::value_type_t<concat<rxu::value_type_t<Observable>, Observable, Coordination>>, concat<rxu::value_type_t<Observable>, Observable, Coordination>> { - return observable<rxu::value_type_t<concat<rxu::value_type_t<Observable>, Observable, Coordination>>, concat<rxu::value_type_t<Observable>, Observable, Coordination>>( - concat<rxu::value_type_t<Observable>, Observable, Coordination>(std::move(source), coordination)); - } -}; +/*! @copydoc rx-concat.hpp +*/ +template<class... AN> +auto concat(AN&&... an) + -> operator_factory<concat_tag, AN...> { + return operator_factory<concat_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); +} } -inline auto concat() - -> detail::concat_factory<identity_one_worker> { - return detail::concat_factory<identity_one_worker>(identity_current_thread()); -} +template<> +struct member_overload<concat_tag> +{ + template<class Observable, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>>, + class SourceValue = rxu::value_type_t<Observable>, + class Concat = rxo::detail::concat<SourceValue, rxu::decay_t<Observable>, identity_one_worker>, + class Value = rxu::value_type_t<SourceValue>, + class Result = observable<Value, Concat> + > + static Result member(Observable&& o) { + return Result(Concat(std::forward<Observable>(o), identity_current_thread())); + } -template<class Coordination, class Check = typename std::enable_if<is_coordination<Coordination>::value>::type> -auto concat(Coordination&& sf) - -> detail::concat_factory<Coordination> { - return detail::concat_factory<Coordination>(std::forward<Coordination>(sf)); -} + template<class Observable, class Coordination, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>, + is_coordination<Coordination>>, + class SourceValue = rxu::value_type_t<Observable>, + class Concat = rxo::detail::concat<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>, + class Value = rxu::value_type_t<SourceValue>, + class Result = observable<Value, Concat> + > + static Result member(Observable&& o, Coordination&& cn) { + return Result(Concat(std::forward<Observable>(o), std::forward<Coordination>(cn))); + } -template<class O0, class... ON, class Check = typename std::enable_if<is_observable<O0>::value>::type> -auto concat(O0&& o0, ON&&... on) - -> detail::concat_factory<identity_one_worker> { - return detail::concat_factory<identity_one_worker>(identity_current_thread())(from(std::forward<O0>(o0), std::forward<ON>(on)...)); -} + template<class Observable, class Value0, class... ValueN, + class Enabled = rxu::enable_if_all_true_type_t< + all_observables<Observable, Value0, ValueN...>>, + class EmittedValue = rxu::value_type_t<Observable>, + class SourceValue = observable<EmittedValue>, + class ObservableObservable = observable<SourceValue>, + class Concat = typename rxu::defer_type<rxo::detail::concat, SourceValue, ObservableObservable, identity_one_worker>::type, + class Value = rxu::value_type_t<Concat>, + class Result = observable<Value, Concat> + > + static Result member(Observable&& o, Value0&& v0, ValueN&&... vn) { + return Result(Concat(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread())); + } -template<class Coordination, class O0, class... ON, - class CheckC = typename std::enable_if<is_coordination<Coordination>::value>::type, - class CheckO = typename std::enable_if<is_observable<O0>::value>::type> -auto concat(Coordination&& sf, O0&& o0, ON&&... on) - -> detail::concat_factory<Coordination> { - return detail::concat_factory<Coordination>(std::forward<Coordination>(sf))(from(std::forward<O0>(o0), std::forward<ON>(on)...)); -} + template<class Observable, class Coordination, class Value0, class... ValueN, + class Enabled = rxu::enable_if_all_true_type_t< + all_observables<Observable, Value0, ValueN...>, + is_coordination<Coordination>>, + class EmittedValue = rxu::value_type_t<Observable>, + class SourceValue = observable<EmittedValue>, + class ObservableObservable = observable<SourceValue>, + class Concat = typename rxu::defer_type<rxo::detail::concat, SourceValue, ObservableObservable, rxu::decay_t<Coordination>>::type, + class Value = rxu::value_type_t<Concat>, + class Result = observable<Value, Concat> + > + static Result member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) { + return Result(Concat(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn))); + } -} + template<class... AN> + static operators::detail::concat_invalid_t<AN...> member(AN...) { + std::terminate(); + return {}; + static_assert(sizeof...(AN) == 10000, "concat takes (optional Coordination, optional Value0, optional ValueN...)"); + } +}; } diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp index 84561e6..7fbdf83 100644 --- a/Rx/v2/src/rxcpp/rx-includes.hpp +++ b/Rx/v2/src/rxcpp/rx-includes.hpp @@ -189,6 +189,7 @@ #include "operators/rx-buffer_time.hpp" #include "operators/rx-buffer_time_count.hpp" #include "operators/rx-combine_latest.hpp" +#include "operators/rx-concat.hpp" #include "operators/rx-debounce.hpp" #include "operators/rx-delay.hpp" #include "operators/rx-distinct.hpp" diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 52acb15..e77068c 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -1205,127 +1205,15 @@ public: rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, Coordination>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(cn))); } - /// \cond SHOW_SERVICE_MEMBERS - template<class Coordination> - struct defer_concat : public defer_observable< - is_observable<value_type>, - this_type, - rxo::detail::concat, value_type, observable<value_type>, Coordination> - { - }; - /// \endcond - - /*! For each item from this observable subscribe to one at a time, in the order received. - For each item from all of the nested observables deliver from the new observable that is returned. - - \return Observable that emits the items emitted by each of the Observables emitted by the source observable, one after the other, without interleaving them. - - \note All sources must be synchronized! This means that calls across all the subscribers must be serial. - - \sample - \snippet concat.cpp implicit concat sample - \snippet output.txt implicit concat sample - */ + /*! @copydoc rx-concat.hpp + */ template<class... AN> - auto concat(AN**...) const + auto concat(AN... an) const /// \cond SHOW_SERVICE_MEMBERS - -> typename defer_concat<identity_one_worker>::observable_type - /// \endcond - { - return defer_concat<identity_one_worker>::make(*this, *this, identity_current_thread()); - static_assert(sizeof...(AN) == 0, "concat() was passed too many arguments."); - } - - /*! For each item from this observable subscribe to one at a time, in the order received. - For each item from all of the nested observables deliver from the new observable that is returned. - - \tparam Coordination the type of the scheduler - - \param cn the scheduler to synchronize sources from different contexts. - - \return Observable that emits the items emitted by each of the Observables emitted by the source observable, one after the other, without interleaving them. - - \note All sources must be synchronized! This means that calls across all the subscribers must be serial. - - \sample - \snippet concat.cpp threaded implicit concat sample - \snippet output.txt threaded implicit concat sample - */ - template<class Coordination> - auto concat(Coordination cn) const - /// \cond SHOW_SERVICE_MEMBERS - -> typename std::enable_if< - defer_concat<Coordination>::value, - typename defer_concat<Coordination>::observable_type>::type - /// \endcond - { - return defer_concat<Coordination>::make(*this, *this, std::move(cn)); - } - - /// \cond SHOW_SERVICE_MEMBERS - template<class Coordination, class Value0> - struct defer_concat_from : public defer_observable< - rxu::all_true< - is_coordination<Coordination>::value, - is_observable<Value0>::value>, - this_type, - rxo::detail::concat, observable<value_type>, observable<observable<value_type>>, Coordination> - { - }; - /// \endcond - - /*! For each given observable subscribe to one at a time, in the order received. - For each emitted item deliver from the new observable that is returned. - - \tparam Value0 ... - \tparam ValueN types of source observables - - \param v0 ... - \param vn source observables - - \return Observable that emits items emitted by the source observables, one after the other, without interleaving them. - - \sample - \snippet concat.cpp concat sample - \snippet output.txt concat sample - */ - template<class Value0, class... ValueN> - auto concat(Value0 v0, ValueN... vn) const - /// \cond SHOW_SERVICE_MEMBERS - -> typename std::enable_if< - defer_concat_from<identity_one_worker, Value0>::value, - typename defer_concat_from<identity_one_worker, Value0>::observable_type>::type - /// \endcond - { - return defer_concat_from<identity_one_worker, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread()); - } - - /*! For each given observable subscribe to one at a time, in the order received. - For each emitted item deliver from the new observable that is returned. - - \tparam Coordination the type of the scheduler - \tparam Value0 ... - \tparam ValueN types of source observables - - \param cn the scheduler to synchronize sources from different contexts. - \param v0 ... - \param vn source observables - - \return Observable that emits items emitted by the source observables, one after the other, without interleaving them. - - \sample - \snippet concat.cpp threaded concat sample - \snippet output.txt threaded concat sample - */ - template<class Coordination, class Value0, class... ValueN> - auto concat(Coordination cn, Value0 v0, ValueN... vn) const - /// \cond SHOW_SERVICE_MEMBERS - -> typename std::enable_if< - defer_concat_from<Coordination, Value0>::value, - typename defer_concat_from<Coordination, Value0>::observable_type>::type + -> decltype(observable_member(concat_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) /// \endcond { - return defer_concat_from<Coordination, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::move(cn)); + return observable_member(concat_tag{}, *this, std::forward<AN>(an)...); } /*! For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index bee493c..2aaa935 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -95,7 +95,6 @@ public: } -#include "operators/rx-concat.hpp" #include "operators/rx-concat_map.hpp" #include "operators/rx-connect_forever.hpp" #include "operators/rx-flat_map.hpp" @@ -168,6 +167,13 @@ struct combine_latest_tag { }; }; +struct concat_tag { + template<class Included> + struct include_header{ + static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-concat.hpp>"); + }; +}; + struct debounce_tag { template<class Included> struct include_header{ |