aboutsummaryrefslogtreecommitdiffstats
path: root/Rx
diff options
context:
space:
mode:
authorKirk Shoop <kirk.shoop@microsoft.com>2016-04-23 22:18:33 -0700
committerKirk Shoop <kirk.shoop@microsoft.com>2016-04-24 00:45:30 -0700
commitce867895c4c2348e21f3dabfe5dca13c67fd990b (patch)
treebde1df1873b1c98e05e8b991b5071dd125f2dc75 /Rx
parent7154876bada9f1c86039086b187c8be5ad987895 (diff)
downloadplatform_external_Reactive-Extensions_RxCpp-ce867895c4c2348e21f3dabfe5dca13c67fd990b.tar.gz
platform_external_Reactive-Extensions_RxCpp-ce867895c4c2348e21f3dabfe5dca13c67fd990b.tar.bz2
platform_external_Reactive-Extensions_RxCpp-ce867895c4c2348e21f3dabfe5dca13c67fd990b.zip
added window_toggle
Diffstat (limited to 'Rx')
-rw-r--r--Rx/v2/examples/doxygen/window.cpp46
-rw-r--r--Rx/v2/examples/linesfrombytes/main.cpp35
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-connect_forever.hpp16
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-map.hpp7
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-ref_count.hpp16
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-start_with.hpp30
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-window_toggle.hpp296
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-with_latest_from.hpp14
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-zip.hpp15
-rw-r--r--Rx/v2/src/rxcpp/rx-connectable_observable.hpp12
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp52
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp1
-rw-r--r--Rx/v2/test/CMakeLists.txt1
-rw-r--r--Rx/v2/test/operators/group_by.cpp4
-rw-r--r--Rx/v2/test/operators/window_toggle.cpp318
15 files changed, 827 insertions, 36 deletions
diff --git a/Rx/v2/examples/doxygen/window.cpp b/Rx/v2/examples/doxygen/window.cpp
index 0b2850c..9443ecb 100644
--- a/Rx/v2/examples/doxygen/window.cpp
+++ b/Rx/v2/examples/doxygen/window.cpp
@@ -192,3 +192,49 @@ SCENARIO("window period+count sample"){
});
printf("//! [window period+count sample]\n");
}
+
+SCENARIO("window toggle+coordination sample"){
+ printf("//! [window toggle+coordination sample]\n");
+ int counter = 0;
+ auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
+ take(7).
+ window_toggle(
+ rxcpp::observable<>::interval(std::chrono::milliseconds(4)),
+ [](long){
+ return rxcpp::observable<>::interval(std::chrono::milliseconds(4)).skip(1);
+ },
+ rxcpp::observe_on_new_thread());
+ values.
+ as_blocking().
+ subscribe(
+ [&counter](rxcpp::observable<long> v){
+ int id = counter++;
+ printf("[window %d] Create window\n", id);
+ v.subscribe(
+ [id](long v){printf("[window %d] OnNext: %ld\n", id, v);},
+ [id](){printf("[window %d] OnCompleted\n", id);});
+ });
+ printf("//! [window toggle+coordination sample]\n");
+}
+
+SCENARIO("window toggle sample"){
+ printf("//! [window toggle sample]\n");
+ int counter = 0;
+ auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
+ take(7).
+ window_toggle(
+ rxcpp::observable<>::interval(std::chrono::milliseconds(4)),
+ [](long){
+ return rxcpp::observable<>::interval(std::chrono::milliseconds(4)).skip(1);
+ });
+ values.
+ subscribe(
+ [&counter](rxcpp::observable<long> v){
+ int id = counter++;
+ printf("[window %d] Create window\n", id);
+ v.subscribe(
+ [id](long v){printf("[window %d] OnNext: %ld\n", id, v);},
+ [id](){printf("[window %d] OnCompleted\n", id);});
+ });
+ printf("//! [window toggle sample]\n");
+}
diff --git a/Rx/v2/examples/linesfrombytes/main.cpp b/Rx/v2/examples/linesfrombytes/main.cpp
index b027d13..4847169 100644
--- a/Rx/v2/examples/linesfrombytes/main.cpp
+++ b/Rx/v2/examples/linesfrombytes/main.cpp
@@ -1,13 +1,17 @@
#include "rxcpp/rx.hpp"
+namespace Rx {
using namespace rxcpp;
using namespace rxcpp::sources;
using namespace rxcpp::operators;
using namespace rxcpp::util;
+}
+using namespace Rx;
#include <regex>
#include <random>
using namespace std;
+using namespace std::chrono;
int main()
{
@@ -16,7 +20,7 @@ int main()
uniform_int_distribution<> dist(4, 18);
// for testing purposes, produce byte stream that from lines of text
- auto bytes = range(1, 10) |
+ auto bytes = range(0, 10) |
flat_map([&](int i){
auto body = from((uint8_t)('A' + i)) |
repeat(dist(gen)) |
@@ -44,6 +48,11 @@ int main()
//
// recover lines of text from byte stream
//
+
+ auto removespaces = [](string s){
+ s.erase(remove_if(s.begin(), s.end(), ::isspace), s.end());
+ return s;
+ };
// create strings split on \r
auto strings = bytes |
@@ -55,22 +64,28 @@ int main()
vector<string> splits(cursor, end);
return iterate(move(splits));
}) |
- filter([](string& s){
+ filter([](const string& s){
return !s.empty();
- });
+ }) |
+ publish() |
+ ref_count();
+
+ // filter to last string in each line
+ auto closes = strings |
+ filter(
+ [](const string& s){
+ return s.back() == '\r';
+ }) |
+ Rx::map([](const string&){return 0;});
// group strings by line
- int group = 0;
auto linewindows = strings |
- group_by(
- [=](string& s) mutable {
- return s.back() == '\r' ? group++ : group;
- });
+ window_toggle(closes | start_with(0), [=](int){return closes;});
// reduce the strings for a line into one string
auto lines = linewindows |
- flat_map([](grouped_observable<int, string> w) {
- return w | sum();
+ flat_map([&](observable<string> w) {
+ return w | start_with<string>("") | sum() | Rx::map(removespaces);
});
// print result
diff --git a/Rx/v2/src/rxcpp/operators/rx-connect_forever.hpp b/Rx/v2/src/rxcpp/operators/rx-connect_forever.hpp
index 7713c2c..1117a48 100644
--- a/Rx/v2/src/rxcpp/operators/rx-connect_forever.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-connect_forever.hpp
@@ -36,11 +36,17 @@ class connect_forever_factory
{
public:
connect_forever_factory() {}
- template<class Observable>
- auto operator()(Observable&& source)
- -> observable<rxu::value_type_t<rxu::decay_t<Observable>>, connect_forever<rxu::value_type_t<rxu::decay_t<Observable>>, Observable>> {
- return observable<rxu::value_type_t<rxu::decay_t<Observable>>, connect_forever<rxu::value_type_t<rxu::decay_t<Observable>>, Observable>>(
- connect_forever<rxu::value_type_t<rxu::decay_t<Observable>>, Observable>(std::forward<Observable>(source)));
+ template<class... TN>
+ auto operator()(connectable_observable<TN...>&& source)
+ -> observable<rxu::value_type_t<connectable_observable<TN...>>, connect_forever<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>> {
+ return observable<rxu::value_type_t<connectable_observable<TN...>>, connect_forever<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>>(
+ connect_forever<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>(std::move(source)));
+ }
+ template<class... TN>
+ auto operator()(const connectable_observable<TN...>& source)
+ -> observable<rxu::value_type_t<connectable_observable<TN...>>, connect_forever<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>> {
+ return observable<rxu::value_type_t<connectable_observable<TN...>>, connect_forever<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>>(
+ connect_forever<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>(source));
}
};
diff --git a/Rx/v2/src/rxcpp/operators/rx-map.hpp b/Rx/v2/src/rxcpp/operators/rx-map.hpp
index 855f8eb..6db251f 100644
--- a/Rx/v2/src/rxcpp/operators/rx-map.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-map.hpp
@@ -35,17 +35,18 @@ struct map
typedef rxu::decay_t<Subscriber> dest_type;
typedef observer<T, this_type> observer_type;
dest_type dest;
- select_type selector;
+ mutable select_type selector;
map_observer(dest_type d, select_type s)
: dest(std::move(d))
, selector(std::move(s))
{
}
- void on_next(source_value_type v) const {
+ template<class Value>
+ void on_next(Value&& v) const {
auto selected = on_exception(
[&](){
- return this->selector(std::move(v));},
+ return this->selector(std::forward<Value>(v));},
dest);
if (selected.empty()) {
return;
diff --git a/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp b/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp
index 2d41ef2..886c605 100644
--- a/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp
@@ -63,11 +63,17 @@ class ref_count_factory
{
public:
ref_count_factory() {}
- template<class Observable>
- auto operator()(Observable&& source)
- -> observable<rxu::value_type_t<rxu::decay_t<Observable>>, ref_count<rxu::value_type_t<rxu::decay_t<Observable>>, Observable>> {
- return observable<rxu::value_type_t<rxu::decay_t<Observable>>, ref_count<rxu::value_type_t<rxu::decay_t<Observable>>, Observable>>(
- ref_count<rxu::value_type_t<rxu::decay_t<Observable>>, Observable>(std::forward<Observable>(source)));
+ template<class... TN>
+ auto operator()(connectable_observable<TN...>&& source)
+ -> observable<rxu::value_type_t<connectable_observable<TN...>>, ref_count<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>> {
+ return observable<rxu::value_type_t<connectable_observable<TN...>>, ref_count<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>>(
+ ref_count<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>(std::move(source)));
+ }
+ template<class... TN>
+ auto operator()(const connectable_observable<TN...>& source)
+ -> observable<rxu::value_type_t<connectable_observable<TN...>>, ref_count<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>> {
+ return observable<rxu::value_type_t<connectable_observable<TN...>>, ref_count<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>>(
+ ref_count<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>(source));
}
};
diff --git a/Rx/v2/src/rxcpp/operators/rx-start_with.hpp b/Rx/v2/src/rxcpp/operators/rx-start_with.hpp
index 16e909f..42bf388 100644
--- a/Rx/v2/src/rxcpp/operators/rx-start_with.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-start_with.hpp
@@ -11,10 +11,32 @@ namespace rxcpp {
namespace operators {
-template<class Observable, class Value0, class... ValueN>
-auto start_with(Observable o, Value0 v0, ValueN... vn)
- -> decltype(rxs::from(rxu::value_type_t<Observable>(v0), rxu::value_type_t<Observable>(vn)...).concat(o)) {
- return rxs::from(rxu::value_type_t<Observable>(v0), rxu::value_type_t<Observable>(vn)...).concat(o);
+namespace detail {
+
+template<class StartObservable>
+class start_with_factory
+{
+public:
+ using start_type = rxu::decay_t<StartObservable>;
+
+ start_type start;
+
+ explicit start_with_factory(start_type s) : start(s) {}
+
+ template<class Observable>
+ auto operator()(Observable source)
+ -> decltype(start.concat(source)) {
+ return start.concat(source);
+ }
+};
+
+}
+
+template<class Value0, class... ValueN>
+auto start_with(Value0 v0, ValueN... vn)
+ -> detail::start_with_factory<decltype(rxs::from(rxu::decay_t<Value0>(v0), rxu::decay_t<Value0>(vn)...))> {
+ return detail::start_with_factory<decltype(rxs::from(rxu::decay_t<Value0>(v0), rxu::decay_t<Value0>(vn)...))>(
+ rxs::from(rxu::decay_t<Value0>(v0), rxu::decay_t<Value0>(vn)...));
}
}
diff --git a/Rx/v2/src/rxcpp/operators/rx-window_toggle.hpp b/Rx/v2/src/rxcpp/operators/rx-window_toggle.hpp
new file mode 100644
index 0000000..c0195d8
--- /dev/null
+++ b/Rx/v2/src/rxcpp/operators/rx-window_toggle.hpp
@@ -0,0 +1,296 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#pragma once
+
+#if !defined(RXCPP_OPERATORS_RX_WINDOW_TOGGLE_HPP)
+#define RXCPP_OPERATORS_RX_WINDOW_TOGGLE_HPP
+
+#include "../rx-includes.hpp"
+
+namespace rxcpp {
+
+namespace operators {
+
+namespace detail {
+
+template<class T, class Openings, class ClosingSelector, class Coordination>
+struct window_toggle_traits {
+
+ using source_value_type = rxu::decay_t<T>;
+ using coordination_type = rxu::decay_t<Coordination>;
+ using openings_type = rxu::decay_t<Openings>;
+ using openings_value_type = typename openings_type::value_type;
+ using closing_selector_type = rxu::decay_t<ClosingSelector>;
+
+ static_assert(is_observable<openings_type>::value, "window_toggle Openings must be an observable");
+
+ struct tag_not_valid {};
+ template<class CS, class CV>
+ static auto check(int) -> decltype((*(CS*)nullptr)((*(CV*)nullptr)));
+ template<class CS, class CV>
+ static tag_not_valid check(...);
+
+ static_assert(is_observable<decltype(check<closing_selector_type, openings_value_type>(0))>::value, "window_toggle ClosingSelector must be a function with the signature observable<U>(Openings::value_type)");
+
+ using closings_type = rxu::decay_t<decltype(check<closing_selector_type, openings_value_type>(0))>;
+ using closings_value_type = typename closings_type::value_type;
+};
+
+template<class T, class Openings, class ClosingSelector, class Coordination>
+struct window_toggle
+{
+ typedef window_toggle<T, Openings, ClosingSelector, Coordination> this_type;
+
+ typedef window_toggle_traits<T, Openings, ClosingSelector, Coordination> traits;
+
+ using source_value_type = typename traits::source_value_type;
+ using coordination_type = typename traits::coordination_type;
+ using coordinator_type = typename coordination_type::coordinator_type;
+ using openings_type = typename traits::openings_type;
+ using openings_value_type = typename traits::openings_value_type;
+ using closing_selector_type = typename traits::closing_selector_type;
+ using closings_value_type = typename traits::closings_value_type;
+
+ struct window_toggle_values
+ {
+ window_toggle_values(openings_type opens, closing_selector_type closes, coordination_type c)
+ : openings(opens)
+ , closingSelector(closes)
+ , coordination(c)
+ {
+ }
+ openings_type openings;
+ mutable closing_selector_type closingSelector;
+ coordination_type coordination;
+ };
+ window_toggle_values initial;
+
+ window_toggle(openings_type opens, closing_selector_type closes, coordination_type coordination)
+ : initial(opens, closes, coordination)
+ {
+ }
+
+ template<class Subscriber>
+ struct window_toggle_observer
+ {
+ typedef window_toggle_observer<Subscriber> this_type;
+ typedef rxu::decay_t<T> value_type;
+ typedef rxu::decay_t<Subscriber> dest_type;
+ typedef observer<T, this_type> observer_type;
+
+ struct window_toggle_subscriber_values : public window_toggle_values
+ {
+ window_toggle_subscriber_values(composite_subscription cs, dest_type d, window_toggle_values v, coordinator_type c)
+ : window_toggle_values(v)
+ , cs(std::move(cs))
+ , dest(std::move(d))
+ , coordinator(std::move(c))
+ , worker(coordinator.get_worker())
+ {
+ }
+ composite_subscription cs;
+ dest_type dest;
+ coordinator_type coordinator;
+ rxsc::worker worker;
+ mutable std::list<rxcpp::subjects::subject<T>> subj;
+ };
+ std::shared_ptr<window_toggle_subscriber_values> state;
+
+ window_toggle_observer(composite_subscription cs, dest_type d, window_toggle_values v, coordinator_type c)
+ : state(std::make_shared<window_toggle_subscriber_values>(window_toggle_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
+ {
+ auto localState = state;
+
+ composite_subscription innercs;
+
+ // when the out observer is unsubscribed all the
+ // inner subscriptions are unsubscribed as well
+ auto innerscope = localState->dest.add(innercs);
+
+ innercs.add([=](){
+ localState->dest.remove(innerscope);
+ });
+
+ auto source = on_exception(
+ [&](){return localState->coordinator.in(localState->openings);},
+ localState->dest);
+ if (source.empty()) {
+ return;
+ }
+
+ // this subscribe does not share the observer subscription
+ // so that when it is unsubscribed the observer can be called
+ // until the inner subscriptions have finished
+ auto sink = make_subscriber<openings_value_type>(
+ localState->dest,
+ innercs,
+ // on_next
+ [localState](const openings_value_type& ov) {
+ auto closer = localState->closingSelector(ov);
+
+ auto it = localState->subj.insert(localState->subj.end(), rxcpp::subjects::subject<T>());
+ localState->dest.on_next(it->get_observable().as_dynamic());
+
+ composite_subscription innercs;
+
+ // when the out observer is unsubscribed all the
+ // inner subscriptions are unsubscribed as well
+ auto innerscope = localState->dest.add(innercs);
+
+ innercs.add([=](){
+ localState->dest.remove(innerscope);
+ });
+
+ auto source = localState->coordinator.in(closer);
+
+ auto sit = std::make_shared<decltype(it)>(it);
+ auto close = [localState, sit]() {
+ auto it = *sit;
+ *sit = localState->subj.end();
+ if (it != localState->subj.end()) {
+ it->get_subscriber().on_completed();
+ localState->subj.erase(it);
+ }
+ };
+
+ // this subscribe does not share the observer subscription
+ // so that when it is unsubscribed the observer can be called
+ // until the inner subscriptions have finished
+ auto sink = make_subscriber<closings_value_type>(
+ localState->dest,
+ innercs,
+ // on_next
+ [close, innercs](closings_value_type) {
+ close();
+ innercs.unsubscribe();
+ },
+ // on_error
+ [localState](std::exception_ptr e) {
+ localState->dest.on_error(e);
+ },
+ // on_completed
+ close
+ );
+ auto selectedSink = localState->coordinator.out(sink);
+ source.subscribe(std::move(selectedSink));
+ },
+ // on_error
+ [localState](std::exception_ptr e) {
+ localState->dest.on_error(e);
+ },
+ // on_completed
+ []() {
+ }
+ );
+ auto selectedSink = on_exception(
+ [&](){return localState->coordinator.out(sink);},
+ localState->dest);
+ if (selectedSink.empty()) {
+ return;
+ }
+ source->subscribe(std::move(selectedSink.get()));
+ }
+
+ void on_next(T v) const {
+ auto localState = state;
+ auto work = [v, localState](const rxsc::schedulable&){
+ for (auto s : localState->subj) {
+ s.get_subscriber().on_next(v);
+ }
+ };
+ auto selectedWork = on_exception(
+ [&](){return localState->coordinator.act(work);},
+ localState->dest);
+ if (selectedWork.empty()) {
+ return;
+ }
+ localState->worker.schedule(selectedWork.get());
+ }
+
+ void on_error(std::exception_ptr e) const {
+ auto localState = state;
+ auto work = [e, localState](const rxsc::schedulable&){
+ for (auto s : localState->subj) {
+ s.get_subscriber().on_error(e);
+ }
+ localState->dest.on_error(e);
+ };
+ auto selectedWork = on_exception(
+ [&](){return localState->coordinator.act(work);},
+ localState->dest);
+ if (selectedWork.empty()) {
+ return;
+ }
+ localState->worker.schedule(selectedWork.get());
+ }
+
+ void on_completed() const {
+ auto localState = state;
+ auto work = [localState](const rxsc::schedulable&){
+ for (auto s : localState->subj) {
+ s.get_subscriber().on_completed();
+ }
+ localState->dest.on_completed();
+ };
+ auto selectedWork = on_exception(
+ [&](){return localState->coordinator.act(work);},
+ localState->dest);
+ if (selectedWork.empty()) {
+ return;
+ }
+ localState->worker.schedule(selectedWork.get());
+ }
+
+ static subscriber<T, observer_type> make(dest_type d, window_toggle_values v) {
+ auto cs = composite_subscription();
+ auto coordinator = v.coordination.create_coordinator();
+
+ return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
+ }
+ };
+
+ template<class Subscriber>
+ auto operator()(Subscriber dest) const
+ -> decltype(window_toggle_observer<Subscriber>::make(std::move(dest), initial)) {
+ return window_toggle_observer<Subscriber>::make(std::move(dest), initial);
+ }
+};
+
+template<class Openings, class ClosingSelector, class Coordination>
+class window_toggle_factory
+{
+ typedef rxu::decay_t<Openings> openings_type;
+ typedef rxu::decay_t<ClosingSelector> closing_selector_type;
+ typedef rxu::decay_t<Coordination> coordination_type;
+
+ openings_type openings;
+ closing_selector_type closingSelector;
+ coordination_type coordination;
+public:
+ window_toggle_factory(openings_type opens, closing_selector_type closes, coordination_type c) : openings(opens), closingSelector(closes), coordination(c) {}
+ template<class Observable>
+ auto operator()(Observable&& source)
+ -> decltype(source.template lift<observable<rxu::value_type_t<rxu::decay_t<Observable>>>>(window_toggle<rxu::value_type_t<rxu::decay_t<Observable>>, Openings, ClosingSelector, Coordination>(openings, closingSelector, coordination))) {
+ return source.template lift<observable<rxu::value_type_t<rxu::decay_t<Observable>>>>(window_toggle<rxu::value_type_t<rxu::decay_t<Observable>>, Openings, ClosingSelector, Coordination>(openings, closingSelector, coordination));
+ }
+};
+
+}
+
+template<class Openings, class ClosingSelector, class Coordination>
+inline auto window_toggle(Openings openings, ClosingSelector closingSelector, Coordination coordination)
+ -> detail::window_toggle_factory<Openings, ClosingSelector, Coordination> {
+ return detail::window_toggle_factory<Openings, ClosingSelector, Coordination>(openings, closingSelector, coordination);
+}
+
+template<class Openings, class ClosingSelector>
+inline auto window_toggle(Openings openings, ClosingSelector closingSelector)
+ -> detail::window_toggle_factory<Openings, ClosingSelector, identity_one_worker> {
+ return detail::window_toggle_factory<Openings, ClosingSelector, identity_one_worker>(openings, closingSelector, identity_immediate());
+}
+
+}
+
+}
+
+#endif
diff --git a/Rx/v2/src/rxcpp/operators/rx-with_latest_from.hpp b/Rx/v2/src/rxcpp/operators/rx-with_latest_from.hpp
index 71b4975..b786d37 100644
--- a/Rx/v2/src/rxcpp/operators/rx-with_latest_from.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-with_latest_from.hpp
@@ -172,6 +172,8 @@ struct with_latest_from : public operator_base<rxu::value_type_t<with_latest_fro
template<class Coordination, class Selector, class... ObservableN>
class with_latest_from_factory
{
+ using this_type = with_latest_from_factory<Coordination, Selector, ObservableN...>;
+
typedef rxu::decay_t<Coordination> coordination_type;
typedef rxu::decay_t<Selector> selector_type;
typedef std::tuple<ObservableN...> tuple_source_type;
@@ -187,6 +189,8 @@ class with_latest_from_factory
with_latest_from<Coordination, Selector, YObservableN...>(coordination, selector, std::move(source)));
}
public:
+ using checked_type = std::enable_if<is_coordination<coordination_type>::value, this_type>;
+
with_latest_from_factory(coordination_type sf, selector_type s, ObservableN... on)
: coordination(std::move(sf))
, selector(std::move(s))
@@ -205,8 +209,14 @@ public:
template<class Coordination, class Selector, class... ObservableN>
auto with_latest_from(Coordination sf, Selector s, ObservableN... on)
- -> detail::with_latest_from_factory<Coordination, Selector, ObservableN...> {
- return detail::with_latest_from_factory<Coordination, Selector, ObservableN...>(std::move(sf), std::move(s), std::move(on)...);
+ -> typename detail::with_latest_from_factory<Coordination, Selector, ObservableN...>::checked_type::type {
+ return detail::with_latest_from_factory<Coordination, Selector, ObservableN...>(std::move(sf), std::move(s), std::move(on)...);
+}
+
+template<class Selector, class... ObservableN>
+auto with_latest_from(Selector s, ObservableN... on)
+ -> typename detail::with_latest_from_factory<identity_one_worker, Selector, ObservableN...>::checked_type::type {
+ return detail::with_latest_from_factory<identity_one_worker, Selector, ObservableN...>(identity_current_thread(), std::move(s), std::move(on)...);
}
}
diff --git a/Rx/v2/src/rxcpp/operators/rx-zip.hpp b/Rx/v2/src/rxcpp/operators/rx-zip.hpp
index 36f4915..e436762 100644
--- a/Rx/v2/src/rxcpp/operators/rx-zip.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-zip.hpp
@@ -164,10 +164,11 @@ struct zip : public operator_base<rxu::value_type_t<zip_traits<Coordination, Sel
template<class Coordination, class Selector, class... ObservableN>
class zip_factory
{
+ using this_type = zip_factory<Coordination, Selector, ObservableN...>;
typedef rxu::decay_t<Coordination> coordination_type;
typedef rxu::decay_t<Selector> selector_type;
typedef std::tuple<ObservableN...> tuple_source_type;
-
+
coordination_type coordination;
selector_type selector;
tuple_source_type sourcen;
@@ -179,6 +180,8 @@ class zip_factory
zip<Coordination, Selector, YObservableN...>(coordination, selector, std::move(source)));
}
public:
+ using checked_type = std::enable_if<is_coordination<coordination_type>::value, this_type>;
+
zip_factory(coordination_type sf, selector_type s, ObservableN... on)
: coordination(std::move(sf))
, selector(std::move(s))
@@ -197,8 +200,14 @@ public:
template<class Coordination, class Selector, class... ObservableN>
auto zip(Coordination sf, Selector s, ObservableN... on)
- -> detail::zip_factory<Coordination, Selector, ObservableN...> {
- return detail::zip_factory<Coordination, Selector, ObservableN...>(std::move(sf), std::move(s), std::move(on)...);
+ -> typename detail::zip_factory<Coordination, Selector, ObservableN...>::checked_type::type {
+ return detail::zip_factory<Coordination, Selector, ObservableN...>(std::move(sf), std::move(s), std::move(on)...);
+}
+
+template<class Selector, class... ObservableN>
+auto zip(Selector s, ObservableN... on)
+ -> typename detail::zip_factory<identity_one_worker, Selector, ObservableN...>::checked_type::type {
+ return detail::zip_factory<identity_one_worker, Selector, ObservableN...>(identity_current_thread(), std::move(s), std::move(on)...);
}
}
diff --git a/Rx/v2/src/rxcpp/rx-connectable_observable.hpp b/Rx/v2/src/rxcpp/rx-connectable_observable.hpp
index 665522c..5603c3a 100644
--- a/Rx/v2/src/rxcpp/rx-connectable_observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-connectable_observable.hpp
@@ -139,6 +139,18 @@ public:
{}
///
+ /// takes any function that will take this observable and produce a result value.
+ /// this is intended to allow externally defined operators, that use subscribe,
+ /// to be connected into the expression.
+ ///
+ template<class OperatorFactory>
+ auto op(OperatorFactory&& of) const
+ -> decltype(of(*(const this_type*)nullptr)) {
+ return of(*this);
+ static_assert(detail::is_operator_factory_for<this_type, OperatorFactory>::value, "Function passed for op() must have the signature Result(SourceObservable)");
+ }
+
+ ///
/// performs type-forgetting conversion to a new composite_observable
///
connectable_observable<T> as_dynamic() {
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 678db71..e032afa 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -1297,6 +1297,54 @@ public:
return lift<observable<T>>(rxo::detail::window_with_time_or_count<T, Duration, identity_one_worker>(period, count, identity_current_thread()));
}
+ /*! Return an observable that emits observables every period time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler.
+
+ \tparam Openings observable<OT>
+ \tparam ClosingSelector a function of type observable<CT>(OT)
+ \tparam Coordination the type of the scheduler
+
+ \param opens each value from this observable opens a new window.
+ \param closes this function is called for each opened window and returns an observable. the first value from the returned observable will close the window
+ \param coordination the scheduler for the windows
+
+ \return Observable that emits an observable for each opened window.
+
+ \sample
+ \snippet window.cpp window toggle+coordination sample
+ \snippet output.txt window toggle+coordination sample
+ */
+ template<class Openings, class ClosingSelector, class Coordination, class Reqiures = typename rxu::types_checked_from<typename Coordination::coordination_tag>::type>
+ auto window_toggle(Openings opens, ClosingSelector closes, Coordination coordination) const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_toggle<T, Openings, ClosingSelector, Coordination>(opens, closes, coordination)))
+ /// \endcond
+ {
+ return lift<observable<T>>(rxo::detail::window_toggle<T, Openings, ClosingSelector, Coordination>(opens, closes, coordination));
+ }
+
+ /*! Return an observable that emits connected, non-overlapping windows represending items emitted by the source observable during fixed, consecutive durations.
+
+ \tparam Openings observable<OT>
+ \tparam ClosingSelector a function of type observable<CT>(OT)
+
+ \param opens each value from this observable opens a new window.
+ \param closes this function is called for each opened window and returns an observable. the first value from the returned observable will close the window
+
+ \return Observable that emits an observable for each opened window.
+
+ \sample
+ \snippet window.cpp window toggle sample
+ \snippet output.txt window toggle sample
+ */
+ template<class Openings, class ClosingSelector>
+ auto window_toggle(Openings opens, ClosingSelector closes) const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_toggle<T, Openings, ClosingSelector, identity_one_worker>(opens, closes, identity_current_thread())))
+ /// \endcond
+ {
+ return lift<observable<T>>(rxo::detail::window_toggle<T, Openings, ClosingSelector, identity_one_worker>(opens, closes, identity_current_thread()));
+ }
+
/*! Return an observable that emits connected, non-overlapping buffer, each containing at most count items from the source observable.
\param count the maximum size of each buffer before it should be emitted
@@ -3265,10 +3313,10 @@ public:
template<class Value0, class... ValueN>
auto start_with(Value0 v0, ValueN... vn) const
/// \cond SHOW_SERVICE_MEMBERS
- -> decltype(rxo::start_with(*(this_type*)nullptr, std::move(v0), std::move(vn)...))
+ -> decltype(rxo::start_with(std::move(v0), std::move(vn)...)(*(this_type*)nullptr))
/// \endcond
{
- return rxo::start_with(*this, std::move(v0), std::move(vn)...);
+ return rxo::start_with(std::move(v0), std::move(vn)...)(*this);
}
/*! Take values pairwise from this observable.
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index f23d710..4bbc6ef 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -84,5 +84,6 @@ namespace rxo=operators;
#include "operators/rx-window.hpp"
#include "operators/rx-window_time.hpp"
#include "operators/rx-window_time_count.hpp"
+#include "operators/rx-window_toggle.hpp"
#include "operators/rx-zip.hpp"
#endif
diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt
index ee3f01a..2dac8bf 100644
--- a/Rx/v2/test/CMakeLists.txt
+++ b/Rx/v2/test/CMakeLists.txt
@@ -69,6 +69,7 @@ set(TEST_SOURCES
${TEST_DIR}/operators/timestamp.cpp
${TEST_DIR}/operators/with_latest_from.cpp
${TEST_DIR}/operators/window.cpp
+ ${TEST_DIR}/operators/window_toggle.cpp
${TEST_DIR}/operators/zip.cpp
)
diff --git a/Rx/v2/test/operators/group_by.cpp b/Rx/v2/test/operators/group_by.cpp
index 1584b1a..145a706 100644
--- a/Rx/v2/test/operators/group_by.cpp
+++ b/Rx/v2/test/operators/group_by.cpp
@@ -58,7 +58,7 @@ SCENARIO("range partitioned by group_by across hardware threads to derive pi", "
message << key << " on " << std::this_thread::get_id() << " - value: " << std::setprecision(16) << v;
return std::make_tuple(message.str(), v);
}).
- start_with(std::make_tuple(message.str(), 0)).
+ start_with(std::make_tuple(message.str(), 0.0L)).
as_dynamic();
}).
concat(). // only subscribe to one range at a time in this group.
@@ -129,7 +129,7 @@ SCENARIO("range partitioned by dividing work across hardware threads to derive p
message << w.index << " on " << std::this_thread::get_id() << " - value: " << std::setprecision(16) << v;
return std::make_tuple(message.str(), v);
}).
- start_with(std::make_tuple(message.str(), 0)).
+ start_with(std::make_tuple(message.str(), 0.0L)).
as_dynamic();
}).
merge(rxcpp::observe_on_new_thread()).
diff --git a/Rx/v2/test/operators/window_toggle.cpp b/Rx/v2/test/operators/window_toggle.cpp
new file mode 100644
index 0000000..0c1cc9f
--- /dev/null
+++ b/Rx/v2/test/operators/window_toggle.cpp
@@ -0,0 +1,318 @@
+#include "../test.h"
+
+SCENARIO("window toggle, basic", "[window_toggle][operators]"){
+ GIVEN("1 hot observable of ints and hot observable of opens."){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<std::string> o_on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(90, 1),
+ on.next(180, 2),
+ on.next(250, 3),
+ on.next(260, 4),
+ on.next(310, 5),
+ on.next(340, 6),
+ on.next(410, 7),
+ on.next(420, 8),
+ on.next(470, 9),
+ on.next(550, 10),
+ on.completed(590)
+ });
+
+ auto ys = sc.make_hot_observable({
+ on.next(255, 50),
+ on.next(330, 100),
+ on.next(350, 50),
+ on.next(400, 90),
+ on.completed(900)
+ });
+
+ WHEN("ints are split into windows"){
+ using namespace std::chrono;
+
+ int wi = 0;
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ .window_toggle(ys, [&](int y){
+ return rx::observable<>::timer(milliseconds(y), so);
+ }, so)
+ .map([wi](rxcpp::observable<int> w) mutable {
+ auto ti = wi++;
+ return w
+ .map([ti](int x){return std::to_string(ti) + " " + std::to_string(x);})
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ })
+ .merge()
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains ints assigned to windows"){
+ auto required = rxu::to_vector({
+ o_on.next(261, "0 4"),
+ o_on.next(341, "1 6"),
+ o_on.next(411, "1 7"),
+ o_on.next(411, "3 7"),
+ o_on.next(421, "1 8"),
+ o_on.next(421, "3 8"),
+ o_on.next(471, "3 9"),
+ o_on.completed(591)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the observable"){
+ auto required = rxu::to_vector({
+ o_on.subscribe(200, 590)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("window toggle, basic same", "[window_toggle][operators]"){
+ GIVEN("1 hot observable of ints and hot observable of opens."){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<std::string> o_on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(90, 1),
+ on.next(180, 2),
+ on.next(250, 3),
+ on.next(260, 4),
+ on.next(310, 5),
+ on.next(340, 6),
+ on.next(410, 7),
+ on.next(420, 8),
+ on.next(470, 9),
+ on.next(550, 10),
+ on.completed(590)
+ });
+
+ auto ys = sc.make_hot_observable({
+ on.next(255, 50),
+ on.next(330, 100),
+ on.next(350, 50),
+ on.next(400, 90),
+ on.completed(900)
+ });
+
+ WHEN("ints are split into windows"){
+ using namespace std::chrono;
+
+ int wi = 0;
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ .window_toggle(ys, [&](int){
+ return ys;
+ }, so)
+ .map([wi](rxcpp::observable<int> w) mutable {
+ auto ti = wi++;
+ return w
+ .map([ti](int x){return std::to_string(ti) + " " + std::to_string(x);})
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ })
+ .merge()
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains ints assigned to windows"){
+ auto required = rxu::to_vector({
+ o_on.next(261, "0 4"),
+ o_on.next(311, "0 5"),
+ o_on.next(341, "1 6"),
+ o_on.next(411, "3 7"),
+ o_on.next(421, "3 8"),
+ o_on.next(471, "3 9"),
+ o_on.next(551, "3 10"),
+ o_on.completed(591)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the observable"){
+ auto required = rxu::to_vector({
+ o_on.subscribe(200, 590)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("window toggle, error", "[window_toggle][operators]"){
+ GIVEN("1 hot observable of ints and hot observable of opens."){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<std::string> o_on;
+
+ std::runtime_error ex("window_toggle on_error from source");
+
+ auto xs = sc.make_hot_observable({
+ on.next(90, 1),
+ on.next(180, 2),
+ on.next(250, 3),
+ on.next(260, 4),
+ on.next(310, 5),
+ on.next(340, 6),
+ on.next(410, 7),
+ on.error(420, ex),
+ on.next(470, 9),
+ on.next(550, 10),
+ on.completed(590)
+ });
+
+ auto ys = sc.make_hot_observable({
+ on.next(255, 50),
+ on.next(330, 100),
+ on.next(350, 50),
+ on.next(400, 90),
+ on.completed(900)
+ });
+
+ WHEN("ints are split into windows"){
+ using namespace std::chrono;
+
+ int wi = 0;
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ .window_toggle(ys, [&](int){
+ return ys;
+ }, so)
+ .map([wi](rxcpp::observable<int> w) mutable {
+ auto ti = wi++;
+ return w
+ .map([ti](int x){return std::to_string(ti) + " " + std::to_string(x);})
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ })
+ .merge()
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains ints assigned to windows"){
+ auto required = rxu::to_vector({
+ o_on.next(261, "0 4"),
+ o_on.next(311, "0 5"),
+ o_on.next(341, "1 6"),
+ o_on.next(411, "3 7"),
+ o_on.error(421, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the observable"){
+ auto required = rxu::to_vector({
+ o_on.subscribe(200, 420)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("window toggle, disposed", "[window_toggle][operators]"){
+ GIVEN("1 hot observable of ints and hot observable of opens."){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<std::string> o_on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(90, 1),
+ on.next(180, 2),
+ on.next(250, 3),
+ on.next(260, 4),
+ on.next(310, 5),
+ on.next(340, 6),
+ on.next(410, 7),
+ on.next(420, 8),
+ on.next(470, 9),
+ on.next(550, 10),
+ on.completed(590)
+ });
+
+ auto ys = sc.make_hot_observable({
+ on.next(255, 50),
+ on.next(330, 100),
+ on.next(350, 50),
+ on.next(400, 90),
+ on.completed(900)
+ });
+
+ WHEN("ints are split into windows"){
+ using namespace std::chrono;
+
+ int wi = 0;
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ .window_toggle(ys, [&](int){
+ return ys;
+ }, so)
+ .map([wi](rxcpp::observable<int> w) mutable {
+ auto ti = wi++;
+ return w
+ .map([ti](int x){return std::to_string(ti) + " " + std::to_string(x);})
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ })
+ .merge()
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ },
+ 420
+ );
+
+ THEN("the output contains ints assigned to windows"){
+ auto required = rxu::to_vector({
+ o_on.next(261, "0 4"),
+ o_on.next(311, "0 5"),
+ o_on.next(341, "1 6"),
+ o_on.next(411, "3 7")
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the observable"){
+ auto required = rxu::to_vector({
+ o_on.subscribe(200, 590)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+