aboutsummaryrefslogtreecommitdiffstats
path: root/Rx/v2
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2016-12-26 17:39:14 +0300
committerKirk Shoop <kirk.shoop@microsoft.com>2016-12-26 10:04:52 -0800
commitac66697bfe72539e6c3ef2f633ae5dfeb589234d (patch)
treeb67f77839907927826253394a4eb86319972e8fc /Rx/v2
parent4ce03456ca6dadc387874d134d6d987f9fc48550 (diff)
downloadplatform_external_Reactive-Extensions_RxCpp-ac66697bfe72539e6c3ef2f633ae5dfeb589234d.tar.gz
platform_external_Reactive-Extensions_RxCpp-ac66697bfe72539e6c3ef2f633ae5dfeb589234d.tar.bz2
platform_external_Reactive-Extensions_RxCpp-ac66697bfe72539e6c3ef2f633ae5dfeb589234d.zip
decouple timestamp from observable
Diffstat (limited to 'Rx/v2')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-timestamp.hpp96
-rw-r--r--Rx/v2/src/rxcpp/rx-includes.hpp1
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp40
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp8
-rw-r--r--Rx/v2/test/operators/timestamp.cpp4
5 files changed, 83 insertions, 66 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-timestamp.hpp b/Rx/v2/src/rxcpp/operators/rx-timestamp.hpp
index 17aff82..d1a11bf 100644
--- a/Rx/v2/src/rxcpp/operators/rx-timestamp.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-timestamp.hpp
@@ -2,6 +2,21 @@
#pragma once
+/*! \file rx-timestamp.hpp
+
+ \brief Returns an observable that attaches a timestamp to each item emitted by the source observable indicating when it was emitted.
+
+ \tparam Coordination the type of the scheduler (optional).
+
+ \param coordination the scheduler to manage timeout for each event (optional).
+
+ \return Observable that emits a pair: { item emitted by the source observable, time_point representing the current value of the clock }.
+
+ \sample
+ \snippet timestamp.cpp timestamp sample
+ \snippet output.txt timestamp sample
+*/
+
#if !defined(RXCPP_OPERATORS_RX_TIMESTAMP_HPP)
#define RXCPP_OPERATORS_RX_TIMESTAMP_HPP
@@ -13,11 +28,19 @@ namespace operators {
namespace detail {
+template<class... AN>
+struct timestamp_invalid_arguments {};
+
+template<class... AN>
+struct timestamp_invalid : public rxo::operator_base<timestamp_invalid_arguments<AN...>> {
+ using type = observable<timestamp_invalid_arguments<AN...>, timestamp_invalid<AN...>>;
+};
+template<class... AN>
+using timestamp_invalid_t = typename timestamp_invalid<AN...>::type;
+
template<class T, class Coordination>
struct timestamp
{
- static_assert(is_coordination<Coordination>::value, "Coordination parameter must satisfy the requirements for a Coordination");
-
typedef rxu::decay_t<T> source_value_type;
typedef rxu::decay_t<Coordination> coordination_type;
@@ -62,7 +85,7 @@ struct timestamp
dest.on_completed();
}
- static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, timestamp_values v) {
+ static subscriber<value_type, observer_type> make(dest_type d, timestamp_values v) {
return make_subscriber<value_type>(d, this_type(d, v.coordination));
}
};
@@ -74,38 +97,53 @@ struct timestamp
}
};
-template <class Coordination>
-class timestamp_factory
-{
- typedef rxu::decay_t<Coordination> coordination_type;
-
- coordination_type coordination;
-public:
- timestamp_factory(coordination_type ct)
- : coordination(std::move(ct)) { }
-
- template<class Observable>
- auto operator()(Observable&& source)
- -> decltype(source.template lift<std::pair<rxu::value_type_t<rxu::decay_t<Observable>>, typename rxsc::scheduler::clock_type::time_point>>(timestamp<rxu::value_type_t<rxu::decay_t<Observable>>, Coordination>(coordination))) {
- return source.template lift<std::pair<rxu::value_type_t<rxu::decay_t<Observable>>, typename rxsc::scheduler::clock_type::time_point>>(timestamp<rxu::value_type_t<rxu::decay_t<Observable>>, Coordination>(coordination));
- }
-
-};
-
}
-template <class Coordination>
-inline auto timestamp(Coordination ct)
--> detail::timestamp_factory<Coordination> {
- return detail::timestamp_factory<Coordination>(std::move(ct));
+/*! @copydoc rx-timestamp.hpp
+*/
+template<class... AN>
+auto timestamp(AN&&... an)
+ -> operator_factory<timestamp_tag, AN...> {
+ return operator_factory<timestamp_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
}
-inline auto timestamp()
--> detail::timestamp_factory<identity_one_worker> {
- return detail::timestamp_factory<identity_one_worker>(identity_current_thread());
}
-}
+template<>
+struct member_overload<timestamp_tag>
+{
+ template<class Observable,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class Timestamp = rxo::detail::timestamp<SourceValue, identity_one_worker>,
+ class Clock = typename rxsc::scheduler::clock_type::time_point,
+ class Value = std::pair<SourceValue, Clock>>
+ static auto member(Observable&& o)
+ -> decltype(o.template lift<Value>(Timestamp(identity_current_thread()))) {
+ return o.template lift<Value>(Timestamp(identity_current_thread()));
+ }
+
+ template<class Observable, class Coordination,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>,
+ is_coordination<Coordination>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class Timestamp = rxo::detail::timestamp<SourceValue, rxu::decay_t<Coordination>>,
+ class Clock = typename rxsc::scheduler::clock_type::time_point,
+ class Value = std::pair<SourceValue, Clock>>
+ static auto member(Observable&& o, Coordination&& cn)
+ -> decltype(o.template lift<Value>(Timestamp(std::forward<Coordination>(cn)))) {
+ return o.template lift<Value>(Timestamp(std::forward<Coordination>(cn)));
+ }
+
+ template<class... AN>
+ static operators::detail::timestamp_invalid_t<AN...> member(AN...) {
+ std::terminate();
+ return {};
+ static_assert(sizeof...(AN) == 10000, "timestamp takes (optional Coordination)");
+ }
+};
}
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp
index 07a160a..c88f7f5 100644
--- a/Rx/v2/src/rxcpp/rx-includes.hpp
+++ b/Rx/v2/src/rxcpp/rx-includes.hpp
@@ -203,6 +203,7 @@
#include "operators/rx-retry.hpp"
#include "operators/rx-sequence_equal.hpp"
#include "operators/rx-take_while.hpp"
+#include "operators/rx-timestamp.hpp"
#include "operators/rx-with_latest_from.hpp"
#include "operators/rx-zip.hpp"
#endif
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index e3d0e0f..bfd3391 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -961,45 +961,15 @@ public:
return lift<T>(rxo::detail::timeout<T, Duration, identity_one_worker>(period, identity_current_thread()));
}
- /*! Returns an observable that attaches a timestamp to each item emitted by the source observable indicating when it was emitted.
-
- \tparam Coordination the type of the scheduler
-
- \param coordination the scheduler to manage timeout for each event
-
- \return Observable that emits a pair: { item emitted by the source observable, time_point representing the current value of the clock }.
-
- \sample
- \snippet timestamp.cpp timestamp sample
- \snippet output.txt timestamp sample
- */
- template<class Coordination>
- auto timestamp(Coordination coordination) const
- /// \cond SHOW_SERVICE_MEMBERS
- -> decltype(EXPLICIT_THIS lift<std::pair<T, rxsc::scheduler::clock_type::time_point>>(rxo::detail::timestamp<T, Coordination>{coordination}))
- /// \endcond
- {
- return lift<std::pair<T, rxsc::scheduler::clock_type::time_point>>(rxo::detail::timestamp<T, Coordination>{coordination});
- }
-
- /*! Returns an observable that attaches a timestamp to each item emitted by the source observable indicating when it was emitted.
-
- \tparam ClockType the type of the clock to return a time_point.
-
- \return Observable that emits a pair: { item emitted by the source observable, time_point representing the current value of the clock }.
-
- \sample
- \snippet timestamp.cpp timestamp sample
- \snippet output.txt timestamp sample
- */
+ /*! @copydoc rx-timestamp.hpp
+ */
template<class... AN>
- auto timestamp(AN**...) const
+ auto timestamp(AN&&... an) const
/// \cond SHOW_SERVICE_MEMBERS
- -> decltype(EXPLICIT_THIS lift<std::pair<T, rxsc::scheduler::clock_type::time_point>>(rxo::detail::timestamp<T, identity_one_worker>{identity_current_thread()}))
+ -> decltype(observable_member(timestamp_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
/// \endcond
{
- return lift<std::pair<T, rxsc::scheduler::clock_type::time_point>>(rxo::detail::timestamp<T, identity_one_worker>{identity_current_thread()});
- static_assert(sizeof...(AN) == 0, "timestamp() was passed too many arguments.");
+ return observable_member(timestamp_tag{}, *this, std::forward<AN>(an)...);
}
/*! @copydoc rx-finally.hpp
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index 2d02fe2..c49c801 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -125,7 +125,6 @@ public:
#include "operators/rx-tap.hpp"
#include "operators/rx-time_interval.hpp"
#include "operators/rx-timeout.hpp"
-#include "operators/rx-timestamp.hpp"
#include "operators/rx-window.hpp"
#include "operators/rx-window_time.hpp"
#include "operators/rx-window_time_count.hpp"
@@ -298,6 +297,13 @@ struct sequence_equal_tag {
};
};
+struct timestamp_tag {
+ template<class Included>
+ struct include_header{
+ static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-timestamp.hpp>");
+ };
+};
+
struct with_latest_from_tag {
template<class Included>
struct include_header{
diff --git a/Rx/v2/test/operators/timestamp.cpp b/Rx/v2/test/operators/timestamp.cpp
index 8b35d87..43b4512 100644
--- a/Rx/v2/test/operators/timestamp.cpp
+++ b/Rx/v2/test/operators/timestamp.cpp
@@ -1,4 +1,5 @@
#include "../test.h"
+#include "rxcpp/operators/rx-timestamp.hpp"
using namespace std::chrono;
@@ -18,7 +19,8 @@ SCENARIO("should not emit timestamped items if the source never emits any items"
auto res = w.start(
[xs]() {
- return xs.timestamp();
+ return xs
+ | rxo::timestamp();
}
);