diff options
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-replay.hpp | 228 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-includes.hpp | 1 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 217 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 8 | ||||
-rw-r--r-- | Rx/v2/test/operators/replay.cpp | 1 |
5 files changed, 220 insertions, 235 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-replay.hpp b/Rx/v2/src/rxcpp/operators/rx-replay.hpp index ef443bb..b2db8d8 100644 --- a/Rx/v2/src/rxcpp/operators/rx-replay.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-replay.hpp @@ -2,10 +2,69 @@ #pragma once +/*! \file rx-replay.hpp + + \brief 1) replay(optional Coordination, optional CompositeSubscription) + Turn a cold observable hot, send all earlier emitted values to any new subscriber, and allow connections to the source to be independent of subscriptions. + + 2) replay(Count, optional Coordination, optional CompositeSubscription) + Turn a cold observable hot, send at most count of earlier emitted values to any new subscriber, and allow connections to the source to be independent of subscriptions. + + 3) replay(Duration, optional Coordination, optional CompositeSubscription) + Turn a cold observable hot, send values emitted within a specified time window to any new subscriber, and allow connections to the source to be independent of subscriptions. + + 4) replay(Count, Duration, optional Coordination, optional CompositeSubscription) + Turn a cold observable hot, send at most count of values emitted within a specified time window to any new subscriber, and allow connections to the source to be independent of subscriptions. + + \tparam Duration the type of the time interval (optional). + \tparam Count the type of the maximum number of the most recent items sent to new observers (optional). + \tparam Coordination the type of the scheduler (optional). + + \param count the maximum number of the most recent items sent to new observers (optional). + \param d the duration of the window in which the replayed items must be emitted + \param cn a scheduler all values are queued and delivered on (optional). + \param cs the subscription to control lifetime (optional). + + \return rxcpp::connectable_observable that shares a single subscription to the underlying observable that will replay all of its items and notifications to any future observer. + + \sample + \snippet replay.cpp replay sample + \snippet output.txt replay sample + + \sample + \snippet replay.cpp threaded replay sample + \snippet output.txt threaded replay sample + + \sample + \snippet replay.cpp replay count sample + \snippet output.txt replay count sample + + \sample + \snippet replay.cpp threaded replay count sample + \snippet output.txt threaded replay count sample + + \sample + \snippet replay.cpp replay period sample + \snippet output.txt replay period sample + + \sample + \snippet replay.cpp threaded replay period sample + \snippet output.txt threaded replay period sample + + \sample + \snippet replay.cpp replay count+period sample + \snippet output.txt replay count+period sample + + \sample + \snippet replay.cpp threaded replay count+period sample + \snippet output.txt threaded replay count+period sample +*/ + #if !defined(RXCPP_OPERATORS_RX_REPLAY_HPP) #define RXCPP_OPERATORS_RX_REPLAY_HPP #include "../rx-includes.hpp" +#include "./rx-multicast.hpp" namespace rxcpp { @@ -13,38 +72,163 @@ namespace operators { namespace detail { -template<template<class T, class Coordination> class Subject, class Coordination> -class replay_factory +template<class... AN> +struct replay_invalid_arguments {}; + +template<class... AN> +struct replay_invalid : public rxo::operator_base<replay_invalid_arguments<AN...>> { + using type = observable<replay_invalid_arguments<AN...>, replay_invalid<AN...>>; +}; +template<class... AN> +using replay_invalid_t = typename replay_invalid<AN...>::type; + +} + +/*! @copydoc rx-replay.hpp +*/ +template<class... AN> +auto replay(AN&&... an) + -> operator_factory<replay_tag, AN...> { + return operator_factory<replay_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); +} + +} + + template<> +struct member_overload<replay_tag> { - typedef rxu::decay_t<Coordination> coordination_type; + template<class Observable, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>>, + class SourceValue = rxu::value_type_t<Observable>, + class Subject = rxsub::replay<SourceValue, identity_one_worker>, + class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>, + class Result = connectable_observable<SourceValue, Multicast> + > + static Result member(Observable&& o) { + return Result(Multicast(std::forward<Observable>(o), Subject(identity_current_thread(), composite_subscription()))); + } - coordination_type coordination; + template<class Observable, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>>, + class SourceValue = rxu::value_type_t<Observable>, + class Subject = rxsub::replay<SourceValue, identity_one_worker>, + class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>, + class Result = connectable_observable<SourceValue, Multicast> + > + static Result member(Observable&& o, composite_subscription cs) { + return Result(Multicast(std::forward<Observable>(o), Subject(identity_current_thread(), cs))); + } -public: - replay_factory(coordination_type cn) - : coordination(std::move(cn)) - { + template<class Observable, class Coordination, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>, + is_coordination<Coordination>>, + class SourceValue = rxu::value_type_t<Observable>, + class Subject = rxsub::replay<SourceValue, rxu::decay_t<Coordination>>, + class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>, + class Result = connectable_observable<SourceValue, Multicast> + > + static Result member(Observable&& o, Coordination&& cn, composite_subscription cs = composite_subscription()) { + return Result(Multicast(std::forward<Observable>(o), Subject(std::forward<Coordination>(cn), cs))); } - template<class Observable> - auto operator()(Observable&& source) - -> connectable_observable<rxu::value_type_t<rxu::decay_t<Observable>>, multicast<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Subject<rxu::value_type_t<rxu::decay_t<Observable>>, Coordination>>> { - return connectable_observable<rxu::value_type_t<rxu::decay_t<Observable>>, multicast<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Subject<rxu::value_type_t<rxu::decay_t<Observable>>, Coordination>>>( - multicast<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Subject<rxu::value_type_t<rxu::decay_t<Observable>>, Coordination>>( - std::forward<Observable>(source), Subject<rxu::value_type_t<rxu::decay_t<Observable>>, Coordination>(coordination))); + template<class Observable, class Count, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>, + std::is_integral<Count>>, + class SourceValue = rxu::value_type_t<Observable>, + class Subject = rxsub::replay<SourceValue, identity_one_worker>, + class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>, + class Result = connectable_observable<SourceValue, Multicast> + > + static Result member(Observable&& o, Count count, composite_subscription cs = composite_subscription()) { + return Result(Multicast(std::forward<Observable>(o), Subject(count, identity_current_thread(), cs))); } -}; -} + template<class Observable, class Count, class Coordination, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>, + std::is_integral<Count>, + is_coordination<Coordination>>, + class SourceValue = rxu::value_type_t<Observable>, + class Subject = rxsub::replay<SourceValue, rxu::decay_t<Coordination>>, + class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>, + class Result = connectable_observable<SourceValue, Multicast> + > + static Result member(Observable&& o, Count count, Coordination&& cn, composite_subscription cs = composite_subscription()) { + return Result(Multicast(std::forward<Observable>(o), Subject(count, std::forward<Coordination>(cn), cs))); + } -template<class Coordination> -inline auto replay(Coordination&& cn) - -> detail::replay_factory<rxsub::replay, Coordination> { - return detail::replay_factory<rxsub::replay, Coordination>(std::forward<Coordination>(cn)); -} + template<class Observable, class Duration, + class IsDuration = rxu::is_duration<Duration>, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>, + IsDuration>, + class SourceValue = rxu::value_type_t<Observable>, + class Subject = rxsub::replay<SourceValue, identity_one_worker>, + class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>, + class Result = connectable_observable<SourceValue, Multicast> + > + static Result member(Observable&& o, Duration&& d, composite_subscription cs = composite_subscription()) { + return Result(Multicast(std::forward<Observable>(o), Subject(std::forward<Duration>(d), identity_current_thread(), cs))); + } -} + template<class Observable, class Duration, class Coordination, + class IsDuration = rxu::is_duration<Duration>, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>, + IsDuration, + is_coordination<Coordination>>, + class SourceValue = rxu::value_type_t<Observable>, + class Subject = rxsub::replay<SourceValue, rxu::decay_t<Coordination>>, + class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>, + class Result = connectable_observable<SourceValue, Multicast> + > + static Result member(Observable&& o, Duration&& d, Coordination&& cn, composite_subscription cs = composite_subscription()) { + return Result(Multicast(std::forward<Observable>(o), Subject(std::forward<Duration>(d), std::forward<Coordination>(cn), cs))); + } + template<class Observable, class Count, class Duration, + class IsDuration = rxu::is_duration<Duration>, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>, + std::is_integral<Count>, + IsDuration>, + class SourceValue = rxu::value_type_t<Observable>, + class Subject = rxsub::replay<SourceValue, identity_one_worker>, + class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>, + class Result = connectable_observable<SourceValue, Multicast> + > + static Result member(Observable&& o, Count count, Duration&& d, composite_subscription cs = composite_subscription()) { + return Result(Multicast(std::forward<Observable>(o), Subject(count, std::forward<Duration>(d), identity_current_thread(), cs))); + } + + template<class Observable, class Count, class Duration, class Coordination, + class IsDuration = rxu::is_duration<Duration>, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>, + std::is_integral<Count>, + IsDuration, + is_coordination<Coordination>>, + class SourceValue = rxu::value_type_t<Observable>, + class Subject = rxsub::replay<SourceValue, rxu::decay_t<Coordination>>, + class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>, + class Result = connectable_observable<SourceValue, Multicast> + > + static Result member(Observable&& o, Count count, Duration&& d, Coordination&& cn, composite_subscription cs = composite_subscription()) { + return Result(Multicast(std::forward<Observable>(o), Subject(count, std::forward<Duration>(d), std::forward<Coordination>(cn), cs))); + } + + template<class... AN> + static operators::detail::replay_invalid_t<AN...> member(AN...) { + std::terminate(); + return {}; + static_assert(sizeof...(AN) == 10000, "replay takes (optional Count, optional Duration, optional Coordination, optional CompositeSubscription)"); + } +}; + } #endif diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp index 7027648..c3b7d9d 100644 --- a/Rx/v2/src/rxcpp/rx-includes.hpp +++ b/Rx/v2/src/rxcpp/rx-includes.hpp @@ -208,6 +208,7 @@ #include "operators/rx-pairwise.hpp" #include "operators/rx-reduce.hpp" #include "operators/rx-repeat.hpp" +#include "operators/rx-replay.hpp" #include "operators/rx-retry.hpp" #include "operators/rx-sample_time.hpp" #include "operators/rx-scan.hpp" diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index f121cf7..0a5ccbe 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -1280,222 +1280,15 @@ public: static_assert(sizeof...(AN) == 0, "publish(value_type, composite_subscription) was passed too many arguments."); } - /*! Turn a cold observable hot, send all earlier emitted values to any new subscriber, and allow connections to the source to be independent of subscriptions. - - \return rxcpp::connectable_observable that shares a single subscription to the underlying observable that will replay all of its items and notifications to any future observer. - - \sample - \snippet replay.cpp replay sample - \snippet output.txt replay sample - */ - template<class... AN> - auto replay(AN**...) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(EXPLICIT_THIS multicast(rxsub::replay<T, identity_one_worker>(identity_current_thread(), composite_subscription()))) - /// \endcond - { - composite_subscription cs; - return multicast(rxsub::replay<T, identity_one_worker>(identity_current_thread(), cs)); - static_assert(sizeof...(AN) == 0, "replay() was passed too many arguments."); - } - - /*! Turn a cold observable hot, send all earlier emitted values to any new subscriber, and allow connections to the source to be independent of subscriptions. - - \param cs the subscription to control lifetime - - \return rxcpp::connectable_observable that shares a single subscription to the underlying observable that will replay all of its items and notifications to any future observer. - - \sample - \snippet replay.cpp replay sample - \snippet output.txt replay sample - */ - template<class... AN> - auto replay(composite_subscription cs, AN**...) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(EXPLICIT_THIS multicast(rxsub::replay<T, identity_one_worker>(identity_current_thread(), cs))) - /// \endcond - { - return multicast(rxsub::replay<T, identity_one_worker>(identity_current_thread(), cs)); - static_assert(sizeof...(AN) == 0, "replay(composite_subscription) was passed too many arguments."); - } - - /*! Turn a cold observable hot, send all earlier emitted values to any new subscriber, and allow connections to the source to be independent of subscriptions. - - \tparam Coordination the type of the scheduler - - \param cn a scheduler all values are queued and delivered on - \param cs the subscription to control lifetime - - \return rxcpp::connectable_observable that shares a single subscription to the underlying observable that will replay all of its items and notifications to any future observer. - - \sample - \snippet replay.cpp threaded replay sample - \snippet output.txt threaded replay sample - */ - template<class Coordination, - class Requires = typename std::enable_if<is_coordination<Coordination>::value, rxu::types_checked>::type> - auto replay(Coordination cn, composite_subscription cs = composite_subscription()) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(EXPLICIT_THIS multicast(rxsub::replay<T, Coordination>(std::move(cn), cs))) - /// \endcond - { - return multicast(rxsub::replay<T, Coordination>(std::move(cn), cs)); - } - - /*! Turn a cold observable hot, send at most count of earlier emitted values to any new subscriber, and allow connections to the source to be independent of subscriptions. - - \param count the maximum number of the most recent items sent to new observers - - \return rxcpp::connectable_observable that shares a single subscription to the underlying observable that will replay at most count items to any future observer. - - \sample - \snippet replay.cpp replay count sample - \snippet output.txt replay count sample - */ - template<class... AN> - auto replay(std::size_t count, AN**...) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(EXPLICIT_THIS multicast(rxsub::replay<T, identity_one_worker>(count, identity_current_thread(), composite_subscription()))) - /// \endcond - { - composite_subscription cs; - return multicast(rxsub::replay<T, identity_one_worker>(count, identity_current_thread(), cs)); - static_assert(sizeof...(AN) == 0, "replay(count) was passed too many arguments."); - } - - /*! Turn a cold observable hot, send at most count of earlier emitted values to any new subscriber, and allow connections to the source to be independent of subscriptions. - - \param count the maximum number of the most recent items sent to new observers - \param cs the subscription to control lifetime - - \return rxcpp::connectable_observable that shares a single subscription to the underlying observable that will replay at most count items to any future observer. - - \sample - \snippet replay.cpp replay count sample - \snippet output.txt replay count sample - */ + /*! @copydoc rx-replay.hpp + */ template<class... AN> - auto replay(std::size_t count, composite_subscription cs, AN**...) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(EXPLICIT_THIS multicast(rxsub::replay<T, identity_one_worker>(count, identity_current_thread(), cs))) - /// \endcond - { - return multicast(rxsub::replay<T, identity_one_worker>(count, identity_current_thread(), cs)); - static_assert(sizeof...(AN) == 0, "replay(count, composite_subscription) was passed too many arguments."); - } - - /*! Turn a cold observable hot, send at most count of earlier emitted values to any new subscriber, and allow connections to the source to be independent of subscriptions. - - \tparam Coordination the type of the scheduler - - \param count the maximum number of the most recent items sent to new observers - \param cn a scheduler all values are queued and delivered on - \param cs the subscription to control lifetime - - \return rxcpp::connectable_observable that shares a single subscription to the underlying observable that will replay at most count items to any future observer. - - \sample - \snippet replay.cpp threaded replay count sample - \snippet output.txt threaded replay count sample - */ - template<class Coordination, - class Requires = typename std::enable_if<is_coordination<Coordination>::value, rxu::types_checked>::type> - auto replay(std::size_t count, Coordination cn, composite_subscription cs = composite_subscription()) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(EXPLICIT_THIS multicast(rxsub::replay<T, Coordination>(count, std::move(cn), cs))) - /// \endcond - { - return multicast(rxsub::replay<T, Coordination>(count, std::move(cn), cs)); - } - - /*! Turn a cold observable hot, send values emitted within a specified time window to any new subscriber, and allow connections to the source to be independent of subscriptions. - - \param period the duration of the window in which the replayed items must be emitted - \param cs the subscription to control lifetime - - \return rxcpp::connectable_observable that shares a single subscription to the underlying observable that will replay items emitted within a specified time window to any future observer. - - \sample - \snippet replay.cpp replay period sample - \snippet output.txt replay period sample - */ - template<class Duration> - auto replay(Duration period, composite_subscription cs = composite_subscription()) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(EXPLICIT_THIS multicast(rxsub::replay<T, identity_one_worker>(period, identity_current_thread(), cs))) - /// \endcond - { - return multicast(rxsub::replay<T, identity_one_worker>(period, identity_current_thread(), cs)); - } - - /*! Turn a cold observable hot, send values emitted within a specified time window to any new subscriber, and allow connections to the source to be independent of subscriptions. - - \tparam Coordination the type of the scheduler - - \param period the duration of the window in which the replayed items must be emitted - \param cn a scheduler all values are queued and delivered on - \param cs the subscription to control lifetime - - \return rxcpp::connectable_observable that shares a single subscription to the underlying observable that will replay items emitted within a specified time window to any future observer. - - \sample - \snippet replay.cpp threaded replay period sample - \snippet output.txt threaded replay period sample - */ - template<class Coordination, - class Requires = typename std::enable_if<is_coordination<Coordination>::value, rxu::types_checked>::type> - auto replay(rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs = composite_subscription()) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(EXPLICIT_THIS multicast(rxsub::replay<T, Coordination>(period, std::move(cn), cs))) - /// \endcond - { - return multicast(rxsub::replay<T, Coordination>(period, std::move(cn), cs)); - } - - /*! Turn a cold observable hot, send at most count of values emitted within a specified time window to any new subscriber, and allow connections to the source to be independent of subscriptions. - - \param count the maximum number of the most recent items sent to new observers - \param period the duration of the window in which the replayed items must be emitted - \param cs the subscription to control lifetime - - \return rxcpp::connectable_observable that shares a single subscription to the underlying observable that will replay at most count of items emitted within a specified time window to any future observer. - - \sample - \snippet replay.cpp replay count+period sample - \snippet output.txt replay count+period sample - */ - template<class Duration> - auto replay(std::size_t count, Duration period, composite_subscription cs = composite_subscription()) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(EXPLICIT_THIS multicast(rxsub::replay<T, identity_one_worker>(count, period, identity_current_thread(), cs))) - /// \endcond - { - return multicast(rxsub::replay<T, identity_one_worker>(count, period, identity_current_thread(), cs)); - } - - /*! Turn a cold observable hot, send at most count of values emitted within a specified time window to any new subscriber, and allow connections to the source to be independent of subscriptions. - - \tparam Coordination the type of the scheduler - - \param count the maximum number of the most recent items sent to new observers - \param period the duration of the window in which the replayed items must be emitted - \param cn a scheduler all values are queued and delivered on - \param cs the subscription to control lifetime - - \return rxcpp::connectable_observable that shares a single subscription to the underlying observable that will replay at most count of items emitted within a specified time window to any future observer. - - \sample - \snippet replay.cpp threaded replay count+period sample - \snippet output.txt threaded replay count+period sample - */ - template<class Coordination, - class Requires = typename std::enable_if<is_coordination<Coordination>::value, rxu::types_checked>::type> - auto replay(std::size_t count, rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs = composite_subscription()) const + auto replay(AN&&... an) const /// \cond SHOW_SERVICE_MEMBERS - -> decltype(EXPLICIT_THIS multicast(rxsub::replay<T, Coordination>(count, period, std::move(cn), cs))) + -> decltype(observable_member(replay_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) /// \endcond { - return multicast(rxsub::replay<T, Coordination>(count, period, std::move(cn), cs)); + return observable_member(replay_tag{}, *this, std::forward<AN>(an)...); } /*! @copydoc rx-subscribe_on.hpp diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index 086315b..aeffac9 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -100,7 +100,6 @@ public: #include "operators/rx-multicast.hpp" #include "operators/rx-publish.hpp" #include "operators/rx-ref_count.hpp" -#include "operators/rx-replay.hpp" #include "operators/rx-subscribe.hpp" namespace rxcpp { @@ -305,6 +304,13 @@ struct repeat_tag { }; }; +struct replay_tag { + template<class Included> + struct include_header{ + static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-replay.hpp>"); + }; +}; + struct retry_tag { template<class Included> struct include_header{ diff --git a/Rx/v2/test/operators/replay.cpp b/Rx/v2/test/operators/replay.cpp index a694854..62ab372 100644 --- a/Rx/v2/test/operators/replay.cpp +++ b/Rx/v2/test/operators/replay.cpp @@ -1,4 +1,5 @@ #include "../test.h" +#include <rxcpp/operators/rx-replay.hpp> SCENARIO("replay basic", "[replay][multicast][subject][operators]"){ GIVEN("a test hot observable of ints"){ |