diff options
author | Grigoriy Chudnov <g.chudnov@gmail.com> | 2016-12-26 17:39:14 +0300 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2016-12-26 10:04:52 -0800 |
commit | ac66697bfe72539e6c3ef2f633ae5dfeb589234d (patch) | |
tree | b67f77839907927826253394a4eb86319972e8fc /Rx/v2 | |
parent | 4ce03456ca6dadc387874d134d6d987f9fc48550 (diff) | |
download | platform_external_Reactive-Extensions_RxCpp-ac66697bfe72539e6c3ef2f633ae5dfeb589234d.tar.gz platform_external_Reactive-Extensions_RxCpp-ac66697bfe72539e6c3ef2f633ae5dfeb589234d.tar.bz2 platform_external_Reactive-Extensions_RxCpp-ac66697bfe72539e6c3ef2f633ae5dfeb589234d.zip |
decouple timestamp from observable
Diffstat (limited to 'Rx/v2')
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-timestamp.hpp | 96 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-includes.hpp | 1 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 40 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 8 | ||||
-rw-r--r-- | Rx/v2/test/operators/timestamp.cpp | 4 |
5 files changed, 83 insertions, 66 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-timestamp.hpp b/Rx/v2/src/rxcpp/operators/rx-timestamp.hpp index 17aff82..d1a11bf 100644 --- a/Rx/v2/src/rxcpp/operators/rx-timestamp.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-timestamp.hpp @@ -2,6 +2,21 @@ #pragma once +/*! \file rx-timestamp.hpp + + \brief Returns an observable that attaches a timestamp to each item emitted by the source observable indicating when it was emitted. + + \tparam Coordination the type of the scheduler (optional). + + \param coordination the scheduler to manage timeout for each event (optional). + + \return Observable that emits a pair: { item emitted by the source observable, time_point representing the current value of the clock }. + + \sample + \snippet timestamp.cpp timestamp sample + \snippet output.txt timestamp sample +*/ + #if !defined(RXCPP_OPERATORS_RX_TIMESTAMP_HPP) #define RXCPP_OPERATORS_RX_TIMESTAMP_HPP @@ -13,11 +28,19 @@ namespace operators { namespace detail { +template<class... AN> +struct timestamp_invalid_arguments {}; + +template<class... AN> +struct timestamp_invalid : public rxo::operator_base<timestamp_invalid_arguments<AN...>> { + using type = observable<timestamp_invalid_arguments<AN...>, timestamp_invalid<AN...>>; +}; +template<class... AN> +using timestamp_invalid_t = typename timestamp_invalid<AN...>::type; + template<class T, class Coordination> struct timestamp { - static_assert(is_coordination<Coordination>::value, "Coordination parameter must satisfy the requirements for a Coordination"); - typedef rxu::decay_t<T> source_value_type; typedef rxu::decay_t<Coordination> coordination_type; @@ -62,7 +85,7 @@ struct timestamp dest.on_completed(); } - static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, timestamp_values v) { + static subscriber<value_type, observer_type> make(dest_type d, timestamp_values v) { return make_subscriber<value_type>(d, this_type(d, v.coordination)); } }; @@ -74,38 +97,53 @@ struct timestamp } }; -template <class Coordination> -class timestamp_factory -{ - typedef rxu::decay_t<Coordination> coordination_type; - - coordination_type coordination; -public: - timestamp_factory(coordination_type ct) - : coordination(std::move(ct)) { } - - template<class Observable> - auto operator()(Observable&& source) - -> decltype(source.template lift<std::pair<rxu::value_type_t<rxu::decay_t<Observable>>, typename rxsc::scheduler::clock_type::time_point>>(timestamp<rxu::value_type_t<rxu::decay_t<Observable>>, Coordination>(coordination))) { - return source.template lift<std::pair<rxu::value_type_t<rxu::decay_t<Observable>>, typename rxsc::scheduler::clock_type::time_point>>(timestamp<rxu::value_type_t<rxu::decay_t<Observable>>, Coordination>(coordination)); - } - -}; - } -template <class Coordination> -inline auto timestamp(Coordination ct) --> detail::timestamp_factory<Coordination> { - return detail::timestamp_factory<Coordination>(std::move(ct)); +/*! @copydoc rx-timestamp.hpp +*/ +template<class... AN> +auto timestamp(AN&&... an) + -> operator_factory<timestamp_tag, AN...> { + return operator_factory<timestamp_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); } -inline auto timestamp() --> detail::timestamp_factory<identity_one_worker> { - return detail::timestamp_factory<identity_one_worker>(identity_current_thread()); } -} +template<> +struct member_overload<timestamp_tag> +{ + template<class Observable, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>>, + class SourceValue = rxu::value_type_t<Observable>, + class Timestamp = rxo::detail::timestamp<SourceValue, identity_one_worker>, + class Clock = typename rxsc::scheduler::clock_type::time_point, + class Value = std::pair<SourceValue, Clock>> + static auto member(Observable&& o) + -> decltype(o.template lift<Value>(Timestamp(identity_current_thread()))) { + return o.template lift<Value>(Timestamp(identity_current_thread())); + } + + template<class Observable, class Coordination, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>, + is_coordination<Coordination>>, + class SourceValue = rxu::value_type_t<Observable>, + class Timestamp = rxo::detail::timestamp<SourceValue, rxu::decay_t<Coordination>>, + class Clock = typename rxsc::scheduler::clock_type::time_point, + class Value = std::pair<SourceValue, Clock>> + static auto member(Observable&& o, Coordination&& cn) + -> decltype(o.template lift<Value>(Timestamp(std::forward<Coordination>(cn)))) { + return o.template lift<Value>(Timestamp(std::forward<Coordination>(cn))); + } + + template<class... AN> + static operators::detail::timestamp_invalid_t<AN...> member(AN...) { + std::terminate(); + return {}; + static_assert(sizeof...(AN) == 10000, "timestamp takes (optional Coordination)"); + } +}; } diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp index 07a160a..c88f7f5 100644 --- a/Rx/v2/src/rxcpp/rx-includes.hpp +++ b/Rx/v2/src/rxcpp/rx-includes.hpp @@ -203,6 +203,7 @@ #include "operators/rx-retry.hpp" #include "operators/rx-sequence_equal.hpp" #include "operators/rx-take_while.hpp" +#include "operators/rx-timestamp.hpp" #include "operators/rx-with_latest_from.hpp" #include "operators/rx-zip.hpp" #endif diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index e3d0e0f..bfd3391 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -961,45 +961,15 @@ public: return lift<T>(rxo::detail::timeout<T, Duration, identity_one_worker>(period, identity_current_thread())); } - /*! Returns an observable that attaches a timestamp to each item emitted by the source observable indicating when it was emitted. - - \tparam Coordination the type of the scheduler - - \param coordination the scheduler to manage timeout for each event - - \return Observable that emits a pair: { item emitted by the source observable, time_point representing the current value of the clock }. - - \sample - \snippet timestamp.cpp timestamp sample - \snippet output.txt timestamp sample - */ - template<class Coordination> - auto timestamp(Coordination coordination) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(EXPLICIT_THIS lift<std::pair<T, rxsc::scheduler::clock_type::time_point>>(rxo::detail::timestamp<T, Coordination>{coordination})) - /// \endcond - { - return lift<std::pair<T, rxsc::scheduler::clock_type::time_point>>(rxo::detail::timestamp<T, Coordination>{coordination}); - } - - /*! Returns an observable that attaches a timestamp to each item emitted by the source observable indicating when it was emitted. - - \tparam ClockType the type of the clock to return a time_point. - - \return Observable that emits a pair: { item emitted by the source observable, time_point representing the current value of the clock }. - - \sample - \snippet timestamp.cpp timestamp sample - \snippet output.txt timestamp sample - */ + /*! @copydoc rx-timestamp.hpp + */ template<class... AN> - auto timestamp(AN**...) const + auto timestamp(AN&&... an) const /// \cond SHOW_SERVICE_MEMBERS - -> decltype(EXPLICIT_THIS lift<std::pair<T, rxsc::scheduler::clock_type::time_point>>(rxo::detail::timestamp<T, identity_one_worker>{identity_current_thread()})) + -> decltype(observable_member(timestamp_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) /// \endcond { - return lift<std::pair<T, rxsc::scheduler::clock_type::time_point>>(rxo::detail::timestamp<T, identity_one_worker>{identity_current_thread()}); - static_assert(sizeof...(AN) == 0, "timestamp() was passed too many arguments."); + return observable_member(timestamp_tag{}, *this, std::forward<AN>(an)...); } /*! @copydoc rx-finally.hpp diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index 2d02fe2..c49c801 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -125,7 +125,6 @@ public: #include "operators/rx-tap.hpp" #include "operators/rx-time_interval.hpp" #include "operators/rx-timeout.hpp" -#include "operators/rx-timestamp.hpp" #include "operators/rx-window.hpp" #include "operators/rx-window_time.hpp" #include "operators/rx-window_time_count.hpp" @@ -298,6 +297,13 @@ struct sequence_equal_tag { }; }; +struct timestamp_tag { + template<class Included> + struct include_header{ + static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-timestamp.hpp>"); + }; +}; + struct with_latest_from_tag { template<class Included> struct include_header{ diff --git a/Rx/v2/test/operators/timestamp.cpp b/Rx/v2/test/operators/timestamp.cpp index 8b35d87..43b4512 100644 --- a/Rx/v2/test/operators/timestamp.cpp +++ b/Rx/v2/test/operators/timestamp.cpp @@ -1,4 +1,5 @@ #include "../test.h" +#include "rxcpp/operators/rx-timestamp.hpp" using namespace std::chrono; @@ -18,7 +19,8 @@ SCENARIO("should not emit timestamped items if the source never emits any items" auto res = w.start( [xs]() { - return xs.timestamp(); + return xs + | rxo::timestamp(); } ); |