diff options
author | Kirk Shoop <kirk.shoop@microsoft.com> | 2016-04-23 22:18:33 -0700 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2016-04-24 00:45:30 -0700 |
commit | ce867895c4c2348e21f3dabfe5dca13c67fd990b (patch) | |
tree | bde1df1873b1c98e05e8b991b5071dd125f2dc75 /Rx | |
parent | 7154876bada9f1c86039086b187c8be5ad987895 (diff) | |
download | platform_external_Reactive-Extensions_RxCpp-ce867895c4c2348e21f3dabfe5dca13c67fd990b.tar.gz platform_external_Reactive-Extensions_RxCpp-ce867895c4c2348e21f3dabfe5dca13c67fd990b.tar.bz2 platform_external_Reactive-Extensions_RxCpp-ce867895c4c2348e21f3dabfe5dca13c67fd990b.zip |
added window_toggle
Diffstat (limited to 'Rx')
-rw-r--r-- | Rx/v2/examples/doxygen/window.cpp | 46 | ||||
-rw-r--r-- | Rx/v2/examples/linesfrombytes/main.cpp | 35 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-connect_forever.hpp | 16 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-map.hpp | 7 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-ref_count.hpp | 16 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-start_with.hpp | 30 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-window_toggle.hpp | 296 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-with_latest_from.hpp | 14 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-zip.hpp | 15 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-connectable_observable.hpp | 12 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 52 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/CMakeLists.txt | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/group_by.cpp | 4 | ||||
-rw-r--r-- | Rx/v2/test/operators/window_toggle.cpp | 318 |
15 files changed, 827 insertions, 36 deletions
diff --git a/Rx/v2/examples/doxygen/window.cpp b/Rx/v2/examples/doxygen/window.cpp index 0b2850c..9443ecb 100644 --- a/Rx/v2/examples/doxygen/window.cpp +++ b/Rx/v2/examples/doxygen/window.cpp @@ -192,3 +192,49 @@ SCENARIO("window period+count sample"){ }); printf("//! [window period+count sample]\n"); } + +SCENARIO("window toggle+coordination sample"){ + printf("//! [window toggle+coordination sample]\n"); + int counter = 0; + auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)). + take(7). + window_toggle( + rxcpp::observable<>::interval(std::chrono::milliseconds(4)), + [](long){ + return rxcpp::observable<>::interval(std::chrono::milliseconds(4)).skip(1); + }, + rxcpp::observe_on_new_thread()); + values. + as_blocking(). + subscribe( + [&counter](rxcpp::observable<long> v){ + int id = counter++; + printf("[window %d] Create window\n", id); + v.subscribe( + [id](long v){printf("[window %d] OnNext: %ld\n", id, v);}, + [id](){printf("[window %d] OnCompleted\n", id);}); + }); + printf("//! [window toggle+coordination sample]\n"); +} + +SCENARIO("window toggle sample"){ + printf("//! [window toggle sample]\n"); + int counter = 0; + auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)). + take(7). + window_toggle( + rxcpp::observable<>::interval(std::chrono::milliseconds(4)), + [](long){ + return rxcpp::observable<>::interval(std::chrono::milliseconds(4)).skip(1); + }); + values. + subscribe( + [&counter](rxcpp::observable<long> v){ + int id = counter++; + printf("[window %d] Create window\n", id); + v.subscribe( + [id](long v){printf("[window %d] OnNext: %ld\n", id, v);}, + [id](){printf("[window %d] OnCompleted\n", id);}); + }); + printf("//! [window toggle sample]\n"); +} diff --git a/Rx/v2/examples/linesfrombytes/main.cpp b/Rx/v2/examples/linesfrombytes/main.cpp index b027d13..4847169 100644 --- a/Rx/v2/examples/linesfrombytes/main.cpp +++ b/Rx/v2/examples/linesfrombytes/main.cpp @@ -1,13 +1,17 @@ #include "rxcpp/rx.hpp" +namespace Rx { using namespace rxcpp; using namespace rxcpp::sources; using namespace rxcpp::operators; using namespace rxcpp::util; +} +using namespace Rx; #include <regex> #include <random> using namespace std; +using namespace std::chrono; int main() { @@ -16,7 +20,7 @@ int main() uniform_int_distribution<> dist(4, 18); // for testing purposes, produce byte stream that from lines of text - auto bytes = range(1, 10) | + auto bytes = range(0, 10) | flat_map([&](int i){ auto body = from((uint8_t)('A' + i)) | repeat(dist(gen)) | @@ -44,6 +48,11 @@ int main() // // recover lines of text from byte stream // + + auto removespaces = [](string s){ + s.erase(remove_if(s.begin(), s.end(), ::isspace), s.end()); + return s; + }; // create strings split on \r auto strings = bytes | @@ -55,22 +64,28 @@ int main() vector<string> splits(cursor, end); return iterate(move(splits)); }) | - filter([](string& s){ + filter([](const string& s){ return !s.empty(); - }); + }) | + publish() | + ref_count(); + + // filter to last string in each line + auto closes = strings | + filter( + [](const string& s){ + return s.back() == '\r'; + }) | + Rx::map([](const string&){return 0;}); // group strings by line - int group = 0; auto linewindows = strings | - group_by( - [=](string& s) mutable { - return s.back() == '\r' ? group++ : group; - }); + window_toggle(closes | start_with(0), [=](int){return closes;}); // reduce the strings for a line into one string auto lines = linewindows | - flat_map([](grouped_observable<int, string> w) { - return w | sum(); + flat_map([&](observable<string> w) { + return w | start_with<string>("") | sum() | Rx::map(removespaces); }); // print result diff --git a/Rx/v2/src/rxcpp/operators/rx-connect_forever.hpp b/Rx/v2/src/rxcpp/operators/rx-connect_forever.hpp index 7713c2c..1117a48 100644 --- a/Rx/v2/src/rxcpp/operators/rx-connect_forever.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-connect_forever.hpp @@ -36,11 +36,17 @@ class connect_forever_factory { public: connect_forever_factory() {} - template<class Observable> - auto operator()(Observable&& source) - -> observable<rxu::value_type_t<rxu::decay_t<Observable>>, connect_forever<rxu::value_type_t<rxu::decay_t<Observable>>, Observable>> { - return observable<rxu::value_type_t<rxu::decay_t<Observable>>, connect_forever<rxu::value_type_t<rxu::decay_t<Observable>>, Observable>>( - connect_forever<rxu::value_type_t<rxu::decay_t<Observable>>, Observable>(std::forward<Observable>(source))); + template<class... TN> + auto operator()(connectable_observable<TN...>&& source) + -> observable<rxu::value_type_t<connectable_observable<TN...>>, connect_forever<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>> { + return observable<rxu::value_type_t<connectable_observable<TN...>>, connect_forever<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>>( + connect_forever<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>(std::move(source))); + } + template<class... TN> + auto operator()(const connectable_observable<TN...>& source) + -> observable<rxu::value_type_t<connectable_observable<TN...>>, connect_forever<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>> { + return observable<rxu::value_type_t<connectable_observable<TN...>>, connect_forever<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>>( + connect_forever<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>(source)); } }; diff --git a/Rx/v2/src/rxcpp/operators/rx-map.hpp b/Rx/v2/src/rxcpp/operators/rx-map.hpp index 855f8eb..6db251f 100644 --- a/Rx/v2/src/rxcpp/operators/rx-map.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-map.hpp @@ -35,17 +35,18 @@ struct map typedef rxu::decay_t<Subscriber> dest_type; typedef observer<T, this_type> observer_type; dest_type dest; - select_type selector; + mutable select_type selector; map_observer(dest_type d, select_type s) : dest(std::move(d)) , selector(std::move(s)) { } - void on_next(source_value_type v) const { + template<class Value> + void on_next(Value&& v) const { auto selected = on_exception( [&](){ - return this->selector(std::move(v));}, + return this->selector(std::forward<Value>(v));}, dest); if (selected.empty()) { return; diff --git a/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp b/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp index 2d41ef2..886c605 100644 --- a/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp @@ -63,11 +63,17 @@ class ref_count_factory { public: ref_count_factory() {} - template<class Observable> - auto operator()(Observable&& source) - -> observable<rxu::value_type_t<rxu::decay_t<Observable>>, ref_count<rxu::value_type_t<rxu::decay_t<Observable>>, Observable>> { - return observable<rxu::value_type_t<rxu::decay_t<Observable>>, ref_count<rxu::value_type_t<rxu::decay_t<Observable>>, Observable>>( - ref_count<rxu::value_type_t<rxu::decay_t<Observable>>, Observable>(std::forward<Observable>(source))); + template<class... TN> + auto operator()(connectable_observable<TN...>&& source) + -> observable<rxu::value_type_t<connectable_observable<TN...>>, ref_count<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>> { + return observable<rxu::value_type_t<connectable_observable<TN...>>, ref_count<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>>( + ref_count<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>(std::move(source))); + } + template<class... TN> + auto operator()(const connectable_observable<TN...>& source) + -> observable<rxu::value_type_t<connectable_observable<TN...>>, ref_count<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>> { + return observable<rxu::value_type_t<connectable_observable<TN...>>, ref_count<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>>( + ref_count<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>(source)); } }; diff --git a/Rx/v2/src/rxcpp/operators/rx-start_with.hpp b/Rx/v2/src/rxcpp/operators/rx-start_with.hpp index 16e909f..42bf388 100644 --- a/Rx/v2/src/rxcpp/operators/rx-start_with.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-start_with.hpp @@ -11,10 +11,32 @@ namespace rxcpp { namespace operators { -template<class Observable, class Value0, class... ValueN> -auto start_with(Observable o, Value0 v0, ValueN... vn) - -> decltype(rxs::from(rxu::value_type_t<Observable>(v0), rxu::value_type_t<Observable>(vn)...).concat(o)) { - return rxs::from(rxu::value_type_t<Observable>(v0), rxu::value_type_t<Observable>(vn)...).concat(o); +namespace detail { + +template<class StartObservable> +class start_with_factory +{ +public: + using start_type = rxu::decay_t<StartObservable>; + + start_type start; + + explicit start_with_factory(start_type s) : start(s) {} + + template<class Observable> + auto operator()(Observable source) + -> decltype(start.concat(source)) { + return start.concat(source); + } +}; + +} + +template<class Value0, class... ValueN> +auto start_with(Value0 v0, ValueN... vn) + -> detail::start_with_factory<decltype(rxs::from(rxu::decay_t<Value0>(v0), rxu::decay_t<Value0>(vn)...))> { + return detail::start_with_factory<decltype(rxs::from(rxu::decay_t<Value0>(v0), rxu::decay_t<Value0>(vn)...))>( + rxs::from(rxu::decay_t<Value0>(v0), rxu::decay_t<Value0>(vn)...)); } } diff --git a/Rx/v2/src/rxcpp/operators/rx-window_toggle.hpp b/Rx/v2/src/rxcpp/operators/rx-window_toggle.hpp new file mode 100644 index 0000000..c0195d8 --- /dev/null +++ b/Rx/v2/src/rxcpp/operators/rx-window_toggle.hpp @@ -0,0 +1,296 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#pragma once + +#if !defined(RXCPP_OPERATORS_RX_WINDOW_TOGGLE_HPP) +#define RXCPP_OPERATORS_RX_WINDOW_TOGGLE_HPP + +#include "../rx-includes.hpp" + +namespace rxcpp { + +namespace operators { + +namespace detail { + +template<class T, class Openings, class ClosingSelector, class Coordination> +struct window_toggle_traits { + + using source_value_type = rxu::decay_t<T>; + using coordination_type = rxu::decay_t<Coordination>; + using openings_type = rxu::decay_t<Openings>; + using openings_value_type = typename openings_type::value_type; + using closing_selector_type = rxu::decay_t<ClosingSelector>; + + static_assert(is_observable<openings_type>::value, "window_toggle Openings must be an observable"); + + struct tag_not_valid {}; + template<class CS, class CV> + static auto check(int) -> decltype((*(CS*)nullptr)((*(CV*)nullptr))); + template<class CS, class CV> + static tag_not_valid check(...); + + static_assert(is_observable<decltype(check<closing_selector_type, openings_value_type>(0))>::value, "window_toggle ClosingSelector must be a function with the signature observable<U>(Openings::value_type)"); + + using closings_type = rxu::decay_t<decltype(check<closing_selector_type, openings_value_type>(0))>; + using closings_value_type = typename closings_type::value_type; +}; + +template<class T, class Openings, class ClosingSelector, class Coordination> +struct window_toggle +{ + typedef window_toggle<T, Openings, ClosingSelector, Coordination> this_type; + + typedef window_toggle_traits<T, Openings, ClosingSelector, Coordination> traits; + + using source_value_type = typename traits::source_value_type; + using coordination_type = typename traits::coordination_type; + using coordinator_type = typename coordination_type::coordinator_type; + using openings_type = typename traits::openings_type; + using openings_value_type = typename traits::openings_value_type; + using closing_selector_type = typename traits::closing_selector_type; + using closings_value_type = typename traits::closings_value_type; + + struct window_toggle_values + { + window_toggle_values(openings_type opens, closing_selector_type closes, coordination_type c) + : openings(opens) + , closingSelector(closes) + , coordination(c) + { + } + openings_type openings; + mutable closing_selector_type closingSelector; + coordination_type coordination; + }; + window_toggle_values initial; + + window_toggle(openings_type opens, closing_selector_type closes, coordination_type coordination) + : initial(opens, closes, coordination) + { + } + + template<class Subscriber> + struct window_toggle_observer + { + typedef window_toggle_observer<Subscriber> this_type; + typedef rxu::decay_t<T> value_type; + typedef rxu::decay_t<Subscriber> dest_type; + typedef observer<T, this_type> observer_type; + + struct window_toggle_subscriber_values : public window_toggle_values + { + window_toggle_subscriber_values(composite_subscription cs, dest_type d, window_toggle_values v, coordinator_type c) + : window_toggle_values(v) + , cs(std::move(cs)) + , dest(std::move(d)) + , coordinator(std::move(c)) + , worker(coordinator.get_worker()) + { + } + composite_subscription cs; + dest_type dest; + coordinator_type coordinator; + rxsc::worker worker; + mutable std::list<rxcpp::subjects::subject<T>> subj; + }; + std::shared_ptr<window_toggle_subscriber_values> state; + + window_toggle_observer(composite_subscription cs, dest_type d, window_toggle_values v, coordinator_type c) + : state(std::make_shared<window_toggle_subscriber_values>(window_toggle_subscriber_values(std::move(cs), std::move(d), v, std::move(c)))) + { + auto localState = state; + + composite_subscription innercs; + + // when the out observer is unsubscribed all the + // inner subscriptions are unsubscribed as well + auto innerscope = localState->dest.add(innercs); + + innercs.add([=](){ + localState->dest.remove(innerscope); + }); + + auto source = on_exception( + [&](){return localState->coordinator.in(localState->openings);}, + localState->dest); + if (source.empty()) { + return; + } + + // this subscribe does not share the observer subscription + // so that when it is unsubscribed the observer can be called + // until the inner subscriptions have finished + auto sink = make_subscriber<openings_value_type>( + localState->dest, + innercs, + // on_next + [localState](const openings_value_type& ov) { + auto closer = localState->closingSelector(ov); + + auto it = localState->subj.insert(localState->subj.end(), rxcpp::subjects::subject<T>()); + localState->dest.on_next(it->get_observable().as_dynamic()); + + composite_subscription innercs; + + // when the out observer is unsubscribed all the + // inner subscriptions are unsubscribed as well + auto innerscope = localState->dest.add(innercs); + + innercs.add([=](){ + localState->dest.remove(innerscope); + }); + + auto source = localState->coordinator.in(closer); + + auto sit = std::make_shared<decltype(it)>(it); + auto close = [localState, sit]() { + auto it = *sit; + *sit = localState->subj.end(); + if (it != localState->subj.end()) { + it->get_subscriber().on_completed(); + localState->subj.erase(it); + } + }; + + // this subscribe does not share the observer subscription + // so that when it is unsubscribed the observer can be called + // until the inner subscriptions have finished + auto sink = make_subscriber<closings_value_type>( + localState->dest, + innercs, + // on_next + [close, innercs](closings_value_type) { + close(); + innercs.unsubscribe(); + }, + // on_error + [localState](std::exception_ptr e) { + localState->dest.on_error(e); + }, + // on_completed + close + ); + auto selectedSink = localState->coordinator.out(sink); + source.subscribe(std::move(selectedSink)); + }, + // on_error + [localState](std::exception_ptr e) { + localState->dest.on_error(e); + }, + // on_completed + []() { + } + ); + auto selectedSink = on_exception( + [&](){return localState->coordinator.out(sink);}, + localState->dest); + if (selectedSink.empty()) { + return; + } + source->subscribe(std::move(selectedSink.get())); + } + + void on_next(T v) const { + auto localState = state; + auto work = [v, localState](const rxsc::schedulable&){ + for (auto s : localState->subj) { + s.get_subscriber().on_next(v); + } + }; + auto selectedWork = on_exception( + [&](){return localState->coordinator.act(work);}, + localState->dest); + if (selectedWork.empty()) { + return; + } + localState->worker.schedule(selectedWork.get()); + } + + void on_error(std::exception_ptr e) const { + auto localState = state; + auto work = [e, localState](const rxsc::schedulable&){ + for (auto s : localState->subj) { + s.get_subscriber().on_error(e); + } + localState->dest.on_error(e); + }; + auto selectedWork = on_exception( + [&](){return localState->coordinator.act(work);}, + localState->dest); + if (selectedWork.empty()) { + return; + } + localState->worker.schedule(selectedWork.get()); + } + + void on_completed() const { + auto localState = state; + auto work = [localState](const rxsc::schedulable&){ + for (auto s : localState->subj) { + s.get_subscriber().on_completed(); + } + localState->dest.on_completed(); + }; + auto selectedWork = on_exception( + [&](){return localState->coordinator.act(work);}, + localState->dest); + if (selectedWork.empty()) { + return; + } + localState->worker.schedule(selectedWork.get()); + } + + static subscriber<T, observer_type> make(dest_type d, window_toggle_values v) { + auto cs = composite_subscription(); + auto coordinator = v.coordination.create_coordinator(); + + return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator)))); + } + }; + + template<class Subscriber> + auto operator()(Subscriber dest) const + -> decltype(window_toggle_observer<Subscriber>::make(std::move(dest), initial)) { + return window_toggle_observer<Subscriber>::make(std::move(dest), initial); + } +}; + +template<class Openings, class ClosingSelector, class Coordination> +class window_toggle_factory +{ + typedef rxu::decay_t<Openings> openings_type; + typedef rxu::decay_t<ClosingSelector> closing_selector_type; + typedef rxu::decay_t<Coordination> coordination_type; + + openings_type openings; + closing_selector_type closingSelector; + coordination_type coordination; +public: + window_toggle_factory(openings_type opens, closing_selector_type closes, coordination_type c) : openings(opens), closingSelector(closes), coordination(c) {} + template<class Observable> + auto operator()(Observable&& source) + -> decltype(source.template lift<observable<rxu::value_type_t<rxu::decay_t<Observable>>>>(window_toggle<rxu::value_type_t<rxu::decay_t<Observable>>, Openings, ClosingSelector, Coordination>(openings, closingSelector, coordination))) { + return source.template lift<observable<rxu::value_type_t<rxu::decay_t<Observable>>>>(window_toggle<rxu::value_type_t<rxu::decay_t<Observable>>, Openings, ClosingSelector, Coordination>(openings, closingSelector, coordination)); + } +}; + +} + +template<class Openings, class ClosingSelector, class Coordination> +inline auto window_toggle(Openings openings, ClosingSelector closingSelector, Coordination coordination) + -> detail::window_toggle_factory<Openings, ClosingSelector, Coordination> { + return detail::window_toggle_factory<Openings, ClosingSelector, Coordination>(openings, closingSelector, coordination); +} + +template<class Openings, class ClosingSelector> +inline auto window_toggle(Openings openings, ClosingSelector closingSelector) + -> detail::window_toggle_factory<Openings, ClosingSelector, identity_one_worker> { + return detail::window_toggle_factory<Openings, ClosingSelector, identity_one_worker>(openings, closingSelector, identity_immediate()); +} + +} + +} + +#endif diff --git a/Rx/v2/src/rxcpp/operators/rx-with_latest_from.hpp b/Rx/v2/src/rxcpp/operators/rx-with_latest_from.hpp index 71b4975..b786d37 100644 --- a/Rx/v2/src/rxcpp/operators/rx-with_latest_from.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-with_latest_from.hpp @@ -172,6 +172,8 @@ struct with_latest_from : public operator_base<rxu::value_type_t<with_latest_fro template<class Coordination, class Selector, class... ObservableN> class with_latest_from_factory { + using this_type = with_latest_from_factory<Coordination, Selector, ObservableN...>; + typedef rxu::decay_t<Coordination> coordination_type; typedef rxu::decay_t<Selector> selector_type; typedef std::tuple<ObservableN...> tuple_source_type; @@ -187,6 +189,8 @@ class with_latest_from_factory with_latest_from<Coordination, Selector, YObservableN...>(coordination, selector, std::move(source))); } public: + using checked_type = std::enable_if<is_coordination<coordination_type>::value, this_type>; + with_latest_from_factory(coordination_type sf, selector_type s, ObservableN... on) : coordination(std::move(sf)) , selector(std::move(s)) @@ -205,8 +209,14 @@ public: template<class Coordination, class Selector, class... ObservableN> auto with_latest_from(Coordination sf, Selector s, ObservableN... on) - -> detail::with_latest_from_factory<Coordination, Selector, ObservableN...> { - return detail::with_latest_from_factory<Coordination, Selector, ObservableN...>(std::move(sf), std::move(s), std::move(on)...); + -> typename detail::with_latest_from_factory<Coordination, Selector, ObservableN...>::checked_type::type { + return detail::with_latest_from_factory<Coordination, Selector, ObservableN...>(std::move(sf), std::move(s), std::move(on)...); +} + +template<class Selector, class... ObservableN> +auto with_latest_from(Selector s, ObservableN... on) + -> typename detail::with_latest_from_factory<identity_one_worker, Selector, ObservableN...>::checked_type::type { + return detail::with_latest_from_factory<identity_one_worker, Selector, ObservableN...>(identity_current_thread(), std::move(s), std::move(on)...); } } diff --git a/Rx/v2/src/rxcpp/operators/rx-zip.hpp b/Rx/v2/src/rxcpp/operators/rx-zip.hpp index 36f4915..e436762 100644 --- a/Rx/v2/src/rxcpp/operators/rx-zip.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-zip.hpp @@ -164,10 +164,11 @@ struct zip : public operator_base<rxu::value_type_t<zip_traits<Coordination, Sel template<class Coordination, class Selector, class... ObservableN> class zip_factory { + using this_type = zip_factory<Coordination, Selector, ObservableN...>; typedef rxu::decay_t<Coordination> coordination_type; typedef rxu::decay_t<Selector> selector_type; typedef std::tuple<ObservableN...> tuple_source_type; - + coordination_type coordination; selector_type selector; tuple_source_type sourcen; @@ -179,6 +180,8 @@ class zip_factory zip<Coordination, Selector, YObservableN...>(coordination, selector, std::move(source))); } public: + using checked_type = std::enable_if<is_coordination<coordination_type>::value, this_type>; + zip_factory(coordination_type sf, selector_type s, ObservableN... on) : coordination(std::move(sf)) , selector(std::move(s)) @@ -197,8 +200,14 @@ public: template<class Coordination, class Selector, class... ObservableN> auto zip(Coordination sf, Selector s, ObservableN... on) - -> detail::zip_factory<Coordination, Selector, ObservableN...> { - return detail::zip_factory<Coordination, Selector, ObservableN...>(std::move(sf), std::move(s), std::move(on)...); + -> typename detail::zip_factory<Coordination, Selector, ObservableN...>::checked_type::type { + return detail::zip_factory<Coordination, Selector, ObservableN...>(std::move(sf), std::move(s), std::move(on)...); +} + +template<class Selector, class... ObservableN> +auto zip(Selector s, ObservableN... on) + -> typename detail::zip_factory<identity_one_worker, Selector, ObservableN...>::checked_type::type { + return detail::zip_factory<identity_one_worker, Selector, ObservableN...>(identity_current_thread(), std::move(s), std::move(on)...); } } diff --git a/Rx/v2/src/rxcpp/rx-connectable_observable.hpp b/Rx/v2/src/rxcpp/rx-connectable_observable.hpp index 665522c..5603c3a 100644 --- a/Rx/v2/src/rxcpp/rx-connectable_observable.hpp +++ b/Rx/v2/src/rxcpp/rx-connectable_observable.hpp @@ -139,6 +139,18 @@ public: {} /// + /// takes any function that will take this observable and produce a result value. + /// this is intended to allow externally defined operators, that use subscribe, + /// to be connected into the expression. + /// + template<class OperatorFactory> + auto op(OperatorFactory&& of) const + -> decltype(of(*(const this_type*)nullptr)) { + return of(*this); + static_assert(detail::is_operator_factory_for<this_type, OperatorFactory>::value, "Function passed for op() must have the signature Result(SourceObservable)"); + } + + /// /// performs type-forgetting conversion to a new composite_observable /// connectable_observable<T> as_dynamic() { diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 678db71..e032afa 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -1297,6 +1297,54 @@ public: return lift<observable<T>>(rxo::detail::window_with_time_or_count<T, Duration, identity_one_worker>(period, count, identity_current_thread())); } + /*! Return an observable that emits observables every period time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler. + + \tparam Openings observable<OT> + \tparam ClosingSelector a function of type observable<CT>(OT) + \tparam Coordination the type of the scheduler + + \param opens each value from this observable opens a new window. + \param closes this function is called for each opened window and returns an observable. the first value from the returned observable will close the window + \param coordination the scheduler for the windows + + \return Observable that emits an observable for each opened window. + + \sample + \snippet window.cpp window toggle+coordination sample + \snippet output.txt window toggle+coordination sample + */ + template<class Openings, class ClosingSelector, class Coordination, class Reqiures = typename rxu::types_checked_from<typename Coordination::coordination_tag>::type> + auto window_toggle(Openings opens, ClosingSelector closes, Coordination coordination) const + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_toggle<T, Openings, ClosingSelector, Coordination>(opens, closes, coordination))) + /// \endcond + { + return lift<observable<T>>(rxo::detail::window_toggle<T, Openings, ClosingSelector, Coordination>(opens, closes, coordination)); + } + + /*! Return an observable that emits connected, non-overlapping windows represending items emitted by the source observable during fixed, consecutive durations. + + \tparam Openings observable<OT> + \tparam ClosingSelector a function of type observable<CT>(OT) + + \param opens each value from this observable opens a new window. + \param closes this function is called for each opened window and returns an observable. the first value from the returned observable will close the window + + \return Observable that emits an observable for each opened window. + + \sample + \snippet window.cpp window toggle sample + \snippet output.txt window toggle sample + */ + template<class Openings, class ClosingSelector> + auto window_toggle(Openings opens, ClosingSelector closes) const + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_toggle<T, Openings, ClosingSelector, identity_one_worker>(opens, closes, identity_current_thread()))) + /// \endcond + { + return lift<observable<T>>(rxo::detail::window_toggle<T, Openings, ClosingSelector, identity_one_worker>(opens, closes, identity_current_thread())); + } + /*! Return an observable that emits connected, non-overlapping buffer, each containing at most count items from the source observable. \param count the maximum size of each buffer before it should be emitted @@ -3265,10 +3313,10 @@ public: template<class Value0, class... ValueN> auto start_with(Value0 v0, ValueN... vn) const /// \cond SHOW_SERVICE_MEMBERS - -> decltype(rxo::start_with(*(this_type*)nullptr, std::move(v0), std::move(vn)...)) + -> decltype(rxo::start_with(std::move(v0), std::move(vn)...)(*(this_type*)nullptr)) /// \endcond { - return rxo::start_with(*this, std::move(v0), std::move(vn)...); + return rxo::start_with(std::move(v0), std::move(vn)...)(*this); } /*! Take values pairwise from this observable. diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index f23d710..4bbc6ef 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -84,5 +84,6 @@ namespace rxo=operators; #include "operators/rx-window.hpp" #include "operators/rx-window_time.hpp" #include "operators/rx-window_time_count.hpp" +#include "operators/rx-window_toggle.hpp" #include "operators/rx-zip.hpp" #endif diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt index ee3f01a..2dac8bf 100644 --- a/Rx/v2/test/CMakeLists.txt +++ b/Rx/v2/test/CMakeLists.txt @@ -69,6 +69,7 @@ set(TEST_SOURCES ${TEST_DIR}/operators/timestamp.cpp ${TEST_DIR}/operators/with_latest_from.cpp ${TEST_DIR}/operators/window.cpp + ${TEST_DIR}/operators/window_toggle.cpp ${TEST_DIR}/operators/zip.cpp ) diff --git a/Rx/v2/test/operators/group_by.cpp b/Rx/v2/test/operators/group_by.cpp index 1584b1a..145a706 100644 --- a/Rx/v2/test/operators/group_by.cpp +++ b/Rx/v2/test/operators/group_by.cpp @@ -58,7 +58,7 @@ SCENARIO("range partitioned by group_by across hardware threads to derive pi", " message << key << " on " << std::this_thread::get_id() << " - value: " << std::setprecision(16) << v; return std::make_tuple(message.str(), v); }). - start_with(std::make_tuple(message.str(), 0)). + start_with(std::make_tuple(message.str(), 0.0L)). as_dynamic(); }). concat(). // only subscribe to one range at a time in this group. @@ -129,7 +129,7 @@ SCENARIO("range partitioned by dividing work across hardware threads to derive p message << w.index << " on " << std::this_thread::get_id() << " - value: " << std::setprecision(16) << v; return std::make_tuple(message.str(), v); }). - start_with(std::make_tuple(message.str(), 0)). + start_with(std::make_tuple(message.str(), 0.0L)). as_dynamic(); }). merge(rxcpp::observe_on_new_thread()). diff --git a/Rx/v2/test/operators/window_toggle.cpp b/Rx/v2/test/operators/window_toggle.cpp new file mode 100644 index 0000000..0c1cc9f --- /dev/null +++ b/Rx/v2/test/operators/window_toggle.cpp @@ -0,0 +1,318 @@ +#include "../test.h" + +SCENARIO("window toggle, basic", "[window_toggle][operators]"){ + GIVEN("1 hot observable of ints and hot observable of opens."){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<std::string> o_on; + + auto xs = sc.make_hot_observable({ + on.next(90, 1), + on.next(180, 2), + on.next(250, 3), + on.next(260, 4), + on.next(310, 5), + on.next(340, 6), + on.next(410, 7), + on.next(420, 8), + on.next(470, 9), + on.next(550, 10), + on.completed(590) + }); + + auto ys = sc.make_hot_observable({ + on.next(255, 50), + on.next(330, 100), + on.next(350, 50), + on.next(400, 90), + on.completed(900) + }); + + WHEN("ints are split into windows"){ + using namespace std::chrono; + + int wi = 0; + + auto res = w.start( + [&]() { + return xs + .window_toggle(ys, [&](int y){ + return rx::observable<>::timer(milliseconds(y), so); + }, so) + .map([wi](rxcpp::observable<int> w) mutable { + auto ti = wi++; + return w + .map([ti](int x){return std::to_string(ti) + " " + std::to_string(x);}) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + }) + .merge() + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains ints assigned to windows"){ + auto required = rxu::to_vector({ + o_on.next(261, "0 4"), + o_on.next(341, "1 6"), + o_on.next(411, "1 7"), + o_on.next(411, "3 7"), + o_on.next(421, "1 8"), + o_on.next(421, "3 8"), + o_on.next(471, "3 9"), + o_on.completed(591) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the observable"){ + auto required = rxu::to_vector({ + o_on.subscribe(200, 590) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("window toggle, basic same", "[window_toggle][operators]"){ + GIVEN("1 hot observable of ints and hot observable of opens."){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<std::string> o_on; + + auto xs = sc.make_hot_observable({ + on.next(90, 1), + on.next(180, 2), + on.next(250, 3), + on.next(260, 4), + on.next(310, 5), + on.next(340, 6), + on.next(410, 7), + on.next(420, 8), + on.next(470, 9), + on.next(550, 10), + on.completed(590) + }); + + auto ys = sc.make_hot_observable({ + on.next(255, 50), + on.next(330, 100), + on.next(350, 50), + on.next(400, 90), + on.completed(900) + }); + + WHEN("ints are split into windows"){ + using namespace std::chrono; + + int wi = 0; + + auto res = w.start( + [&]() { + return xs + .window_toggle(ys, [&](int){ + return ys; + }, so) + .map([wi](rxcpp::observable<int> w) mutable { + auto ti = wi++; + return w + .map([ti](int x){return std::to_string(ti) + " " + std::to_string(x);}) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + }) + .merge() + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains ints assigned to windows"){ + auto required = rxu::to_vector({ + o_on.next(261, "0 4"), + o_on.next(311, "0 5"), + o_on.next(341, "1 6"), + o_on.next(411, "3 7"), + o_on.next(421, "3 8"), + o_on.next(471, "3 9"), + o_on.next(551, "3 10"), + o_on.completed(591) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the observable"){ + auto required = rxu::to_vector({ + o_on.subscribe(200, 590) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("window toggle, error", "[window_toggle][operators]"){ + GIVEN("1 hot observable of ints and hot observable of opens."){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<std::string> o_on; + + std::runtime_error ex("window_toggle on_error from source"); + + auto xs = sc.make_hot_observable({ + on.next(90, 1), + on.next(180, 2), + on.next(250, 3), + on.next(260, 4), + on.next(310, 5), + on.next(340, 6), + on.next(410, 7), + on.error(420, ex), + on.next(470, 9), + on.next(550, 10), + on.completed(590) + }); + + auto ys = sc.make_hot_observable({ + on.next(255, 50), + on.next(330, 100), + on.next(350, 50), + on.next(400, 90), + on.completed(900) + }); + + WHEN("ints are split into windows"){ + using namespace std::chrono; + + int wi = 0; + + auto res = w.start( + [&]() { + return xs + .window_toggle(ys, [&](int){ + return ys; + }, so) + .map([wi](rxcpp::observable<int> w) mutable { + auto ti = wi++; + return w + .map([ti](int x){return std::to_string(ti) + " " + std::to_string(x);}) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + }) + .merge() + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains ints assigned to windows"){ + auto required = rxu::to_vector({ + o_on.next(261, "0 4"), + o_on.next(311, "0 5"), + o_on.next(341, "1 6"), + o_on.next(411, "3 7"), + o_on.error(421, ex) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the observable"){ + auto required = rxu::to_vector({ + o_on.subscribe(200, 420) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("window toggle, disposed", "[window_toggle][operators]"){ + GIVEN("1 hot observable of ints and hot observable of opens."){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<std::string> o_on; + + auto xs = sc.make_hot_observable({ + on.next(90, 1), + on.next(180, 2), + on.next(250, 3), + on.next(260, 4), + on.next(310, 5), + on.next(340, 6), + on.next(410, 7), + on.next(420, 8), + on.next(470, 9), + on.next(550, 10), + on.completed(590) + }); + + auto ys = sc.make_hot_observable({ + on.next(255, 50), + on.next(330, 100), + on.next(350, 50), + on.next(400, 90), + on.completed(900) + }); + + WHEN("ints are split into windows"){ + using namespace std::chrono; + + int wi = 0; + + auto res = w.start( + [&]() { + return xs + .window_toggle(ys, [&](int){ + return ys; + }, so) + .map([wi](rxcpp::observable<int> w) mutable { + auto ti = wi++; + return w + .map([ti](int x){return std::to_string(ti) + " " + std::to_string(x);}) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + }) + .merge() + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + }, + 420 + ); + + THEN("the output contains ints assigned to windows"){ + auto required = rxu::to_vector({ + o_on.next(261, "0 4"), + o_on.next(311, "0 5"), + o_on.next(341, "1 6"), + o_on.next(411, "3 7") + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the observable"){ + auto required = rxu::to_vector({ + o_on.subscribe(200, 590) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + |