diff options
author | Valery Kopylov <v-valkop@microsoft.com> | 2015-06-01 18:06:04 +0300 |
---|---|---|
committer | Valery Kopylov <v-valkop@microsoft.com> | 2015-06-01 18:10:12 +0300 |
commit | 4992c73dbc8bbe3a7335b186f9b3e94da20ea127 (patch) | |
tree | 4f5b386b3cc364aa65c0b85216559abc266529e7 | |
parent | 1fe0081ab9866c2882bd0c24183bfa4b2de38d10 (diff) | |
download | platform_external_Reactive-Extensions_RxCpp-4992c73dbc8bbe3a7335b186f9b3e94da20ea127.tar.gz platform_external_Reactive-Extensions_RxCpp-4992c73dbc8bbe3a7335b186f9b3e94da20ea127.tar.bz2 platform_external_Reactive-Extensions_RxCpp-4992c73dbc8bbe3a7335b186f9b3e94da20ea127.zip |
Add description and examples for observable<T> members
33 files changed, 3120 insertions, 350 deletions
diff --git a/Rx/v2/examples/doxygen/amb.cpp b/Rx/v2/examples/doxygen/amb.cpp new file mode 100644 index 0000000..8f2ac7d --- /dev/null +++ b/Rx/v2/examples/doxygen/amb.cpp @@ -0,0 +1,84 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("amb sample"){ + printf("//! [amb sample]\n"); + auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;}); + auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;}); + auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;}); + auto values = o1.amb(o2, o3); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [amb sample]\n"); +} + +SCENARIO("implicit amb sample"){ + printf("//! [implicit amb sample]\n"); + auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;}).as_dynamic(); + auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;}).as_dynamic(); + auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;}).as_dynamic(); + auto base = rxcpp::observable<>::from(o1, o2, o3); + auto values = base.amb(); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [implicit amb sample]\n"); +} + +std::string get_pid(); + +SCENARIO("threaded amb sample"){ + printf("//! [threaded amb sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) { + printf("[thread %s] Timer1 fired\n", get_pid().c_str()); + return 1; + }); + auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) { + printf("[thread %s] Timer2 fired\n", get_pid().c_str()); + return 2; + }); + auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) { + printf("[thread %s] Timer3 fired\n", get_pid().c_str()); + return 3; + }); + auto values = o1.amb(rxcpp::observe_on_new_thread(), o2, o3); + values. + as_blocking(). + subscribe( + [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [threaded amb sample]\n"); +} + +SCENARIO("threaded implicit amb sample"){ + printf("//! [threaded implicit amb sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) { + printf("[thread %s] Timer1 fired\n", get_pid().c_str()); + return 1; + }).as_dynamic(); + auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) { + printf("[thread %s] Timer2 fired\n", get_pid().c_str()); + return 2; + }).as_dynamic(); + auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) { + printf("[thread %s] Timer3 fired\n", get_pid().c_str()); + return 3; + }).as_dynamic(); + auto base = rxcpp::observable<>::from(o1, o2, o3); + auto values = base.amb(rxcpp::observe_on_new_thread()); + values. + as_blocking(). + subscribe( + [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [threaded implicit amb sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/as_dynamic.cpp b/Rx/v2/examples/doxygen/as_dynamic.cpp new file mode 100644 index 0000000..363ae57 --- /dev/null +++ b/Rx/v2/examples/doxygen/as_dynamic.cpp @@ -0,0 +1,21 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("as_dynamic sample"){ + printf("//! [as_dynamic sample]\n"); + auto o1 = rxcpp::observable<>::range(1, 3); + auto o2 = rxcpp::observable<>::just(4); + auto o3 = rxcpp::observable<>::empty<int>(); + auto values = o1.concat(o2, o3); + printf("type of o1: %s\n", typeid(o1).name()); + printf("type of o1.as_dynamic(): %s\n", typeid(o1.as_dynamic()).name()); + printf("type of o2: %s\n", typeid(o2).name()); + printf("type of o2.as_dynamic(): %s\n", typeid(o2.as_dynamic()).name()); + printf("type of o3: %s\n", typeid(o3).name()); + printf("type of o3.as_dynamic(): %s\n", typeid(o3.as_dynamic()).name()); + printf("type of values: %s\n", typeid(values).name()); + printf("type of values.as_dynamic(): %s\n", typeid(values.as_dynamic()).name()); + printf("//! [as_dynamic sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/buffer.cpp b/Rx/v2/examples/doxygen/buffer.cpp new file mode 100644 index 0000000..9a5d0b0 --- /dev/null +++ b/Rx/v2/examples/doxygen/buffer.cpp @@ -0,0 +1,204 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("buffer count sample"){ + printf("//! [buffer count sample]\n"); + auto values = rxcpp::observable<>::range(1, 5).buffer(2); + values. + subscribe( + [](std::vector<int> v){ + printf("OnNext:"); + std::for_each(v.begin(), v.end(), [](int a){ + printf(" %d", a); + }); + printf("\n"); + }, + [](){printf("OnCompleted\n");}); + printf("//! [buffer count sample]\n"); +} + +SCENARIO("buffer count+skip sample"){ + printf("//! [buffer count+skip sample]\n"); + auto values = rxcpp::observable<>::range(1, 7).buffer(2, 3); + values. + subscribe( + [](std::vector<int> v){ + printf("OnNext:"); + std::for_each(v.begin(), v.end(), [](int a){ + printf(" %d", a); + }); + printf("\n"); + }, + [](){printf("OnCompleted\n");}); + printf("//! [buffer count+skip sample]\n"); +} + +std::string get_pid(); + +SCENARIO("buffer period+skip+coordination sample"){ + printf("//! [buffer period+skip+coordination sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto period = std::chrono::milliseconds(4); + auto skip = std::chrono::milliseconds(6); + auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)). + map([](long v){ + printf("[thread %s] Interval OnNext: %d\n", get_pid().c_str(), v); + return v; + }). + take(7). + buffer_with_time(period, skip, rxcpp::observe_on_new_thread()); + values. + as_blocking(). + subscribe( + [](std::vector<long> v){ + printf("[thread %s] OnNext:", get_pid().c_str()); + std::for_each(v.begin(), v.end(), [](long a){ + printf(" %d", a); + }); + printf("\n"); + }, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [buffer period+skip+coordination sample]\n"); +} + +SCENARIO("buffer period+skip sample"){ + printf("//! [buffer period+skip sample]\n"); + auto period = std::chrono::milliseconds(4); + auto skip = std::chrono::milliseconds(6); + auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)). + take(7). + buffer_with_time(period, skip); + values. + subscribe( + [](std::vector<long> v){ + printf("OnNext:"); + std::for_each(v.begin(), v.end(), [](long a){ + printf(" %d", a); + }); + printf("\n"); + }, + [](){printf("OnCompleted\n");}); + printf("//! [buffer period+skip sample]\n"); +} + +SCENARIO("buffer period+skip overlapping sample"){ + printf("//! [buffer period+skip overlapping sample]\n"); + auto period = std::chrono::milliseconds(6); + auto skip = std::chrono::milliseconds(4); + auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)). + take(7). + buffer_with_time(period, skip); + values. + subscribe( + [](std::vector<long> v){ + printf("OnNext:"); + std::for_each(v.begin(), v.end(), [](long a){ + printf(" %d", a); + }); + printf("\n"); + }, + [](){printf("OnCompleted\n");}); + printf("//! [buffer period+skip overlapping sample]\n"); +} + +SCENARIO("buffer period+skip empty sample"){ + printf("//! [buffer period+skip empty sample]\n"); + auto period = std::chrono::milliseconds(2); + auto skip = std::chrono::milliseconds(4); + auto values = rxcpp::observable<>::timer(std::chrono::milliseconds(10)). + buffer_with_time(period, skip); + values. + subscribe( + [](std::vector<long> v){ + printf("OnNext:"); + std::for_each(v.begin(), v.end(), [](long a){ + printf(" %d", a); + }); + printf("\n"); + }, + [](){printf("OnCompleted\n");}); + printf("//! [buffer period+skip empty sample]\n"); +} + +SCENARIO("buffer period+coordination sample"){ + printf("//! [buffer period+coordination sample]\n"); + auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)). + take(7). + buffer_with_time(std::chrono::milliseconds(4), rxcpp::observe_on_new_thread()); + values. + as_blocking(). + subscribe( + [](std::vector<long> v){ + printf("OnNext:"); + std::for_each(v.begin(), v.end(), [](long a){ + printf(" %d", a); + }); + printf("\n"); + }, + [](){printf("OnCompleted\n");}); + printf("//! [buffer period+coordination sample]\n"); +} + +SCENARIO("buffer period sample"){ + printf("//! [buffer period sample]\n"); + auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)). + take(7). + buffer_with_time(std::chrono::milliseconds(4)); + values. + subscribe( + [](std::vector<long> v){ + printf("OnNext:"); + std::for_each(v.begin(), v.end(), [](long a){ + printf(" %d", a); + }); + printf("\n"); + }, + [](){printf("OnCompleted\n");}); + printf("//! [buffer period sample]\n"); +} + +SCENARIO("buffer period+count+coordination sample"){ + printf("//! [buffer period+count+coordination sample]\n"); + auto start = std::chrono::steady_clock::now(); + auto int1 = rxcpp::observable<>::range(1L, 3L); + auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50)); + auto values = int1. + concat(int2). + buffer_with_time_or_count(std::chrono::milliseconds(20), 2, rxcpp::observe_on_event_loop()); + values. + as_blocking(). + subscribe( + [start](std::vector<long> v){ + printf("OnNext:"); + std::for_each(v.begin(), v.end(), [](long a){ + printf(" %d", a); + }); + printf("\n"); + }, + [](){printf("OnCompleted\n");}); + printf("//! [buffer period+count+coordination sample]\n"); +} + +SCENARIO("buffer period+count sample"){ + printf("//! [buffer period+count sample]\n"); + auto start = std::chrono::steady_clock::now(); + auto int1 = rxcpp::observable<>::range(1L, 3L); + auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50)); + auto values = int1. + concat(int2). + buffer_with_time_or_count(std::chrono::milliseconds(20), 2); + values. + subscribe( + [start](std::vector<long> v){ + printf("OnNext:"); + std::for_each(v.begin(), v.end(), [](long a){ + printf(" %d", a); + }); + printf("\n"); + }, + [](){printf("OnCompleted\n");}); + printf("//! [buffer period+count sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/combine_latest.cpp b/Rx/v2/examples/doxygen/combine_latest.cpp new file mode 100644 index 0000000..b220da4 --- /dev/null +++ b/Rx/v2/examples/doxygen/combine_latest.cpp @@ -0,0 +1,85 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("combine_latest sample"){ + printf("//! [combine_latest sample]\n"); + auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)); + auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)); + auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5)); + auto values = o1.combine_latest(o2, o3); + values. + take(5). + subscribe( + [](std::tuple<int, int, int> v){printf("OnNext: %d, %d, %d\n", std::get<0>(v), std::get<1>(v), std::get<2>(v));}, + [](){printf("OnCompleted\n");}); + printf("//! [combine_latest sample]\n"); +} + +std::string get_pid(); + +SCENARIO("Coordination combine_latest sample"){ + printf("//! [Coordination combine_latest sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto thr = rxcpp::synchronize_event_loop(); + auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)).map([](int v) { + printf("[thread %s] Source1 OnNext: %d\n", get_pid().c_str(), v); + return v; + }); + auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)).map([](int v) { + printf("[thread %s] Source2 OnNext: %d\n", get_pid().c_str(), v); + return v; + }); + auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5)).map([](int v) { + printf("[thread %s] Source3 OnNext: %d\n", get_pid().c_str(), v); + return v; + }); + auto values = o1.combine_latest(thr, o2, o3); + values. + take(5). + as_blocking(). + subscribe( + [](std::tuple<int, int, int> v){printf("[thread %s] OnNext: %d, %d, %d\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v), std::get<2>(v));}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [Coordination combine_latest sample]\n"); +} + +SCENARIO("Selector combine_latest sample"){ + printf("//! [Selector combine_latest sample]\n"); + auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)); + auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)); + auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5)); + auto values = o1.combine_latest( + [](int v1, int v2, int v3) { + return 100 * v1 + 10 * v2 + v3; + }, + o2, o3); + values. + take(5). + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [Selector combine_latest sample]\n"); +} + +SCENARIO("Coordination+Selector combine_latest sample"){ + printf("//! [Coordination+Selector combine_latest sample]\n"); + auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)); + auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)); + auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5)); + auto values = o1.combine_latest( + rxcpp::observe_on_new_thread(), + [](int v1, int v2, int v3) { + return 100 * v1 + 10 * v2 + v3; + }, + o2, o3); + values. + take(5). + as_blocking(). + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [Coordination+Selector combine_latest sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/concat.cpp b/Rx/v2/examples/doxygen/concat.cpp new file mode 100644 index 0000000..bb86033 --- /dev/null +++ b/Rx/v2/examples/doxygen/concat.cpp @@ -0,0 +1,60 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("concat sample"){ + printf("//! [concat sample]\n"); + auto o1 = rxcpp::observable<>::range(1, 3); + auto o2 = rxcpp::observable<>::just(4); + auto o3 = rxcpp::observable<>::from(5, 6); + auto values = o1.concat(o2.as_dynamic(), o3.as_dynamic()); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [concat sample]\n"); +} + +SCENARIO("implicit concat sample"){ + printf("//! [implicit concat sample]\n"); + auto o1 = rxcpp::observable<>::range(1, 3); + auto o2 = rxcpp::observable<>::just(4); + auto o3 = rxcpp::observable<>::from(5, 6); + auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2.as_dynamic(), o3.as_dynamic()); + auto values = base.concat(); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [implicit concat sample]\n"); +} + +SCENARIO("threaded concat sample"){ + printf("//! [threaded concat sample]\n"); + auto o1 = rxcpp::observable<>::range(1, 3); + auto o2 = rxcpp::observable<>::just(4); + auto o3 = rxcpp::observable<>::from(5, 6); + auto values = o1.concat(rxcpp::observe_on_new_thread(), o2.as_dynamic(), o3.as_dynamic()); + values. + as_blocking(). + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [threaded concat sample]\n"); +} + +SCENARIO("threaded implicit concat sample"){ + printf("//! [threaded implicit concat sample]\n"); + auto o1 = rxcpp::observable<>::range(1, 3); + auto o2 = rxcpp::observable<>::just(4); + auto o3 = rxcpp::observable<>::from(5, 6); + auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2.as_dynamic(), o3.as_dynamic()); + auto values = base.concat(rxcpp::observe_on_new_thread()); + values. + as_blocking(). + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [threaded implicit concat sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/concat_map.cpp b/Rx/v2/examples/doxygen/concat_map.cpp new file mode 100644 index 0000000..70664e0 --- /dev/null +++ b/Rx/v2/examples/doxygen/concat_map.cpp @@ -0,0 +1,50 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("concat_map sample"){ + printf("//! [concat_map sample]\n"); + auto values = rxcpp::observable<>::range(1, 3). + concat_map( + [](int v){ + return + rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)). + take(3); + }, + [](int v_main, long v_sub){ + return std::make_tuple(v_main, v_sub); + }); + values. + subscribe( + [](std::tuple<int, long> v){printf("OnNext: %d - %d\n", std::get<0>(v), std::get<1>(v));}, + [](){printf("OnCompleted\n");}); + printf("//! [concat_map sample]\n"); +} + +std::string get_pid(); + +SCENARIO("threaded concat_map sample"){ + printf("//! [threaded concat_map sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto values = rxcpp::observable<>::range(1, 3). + concat_map( + [](int v){ + printf("[thread %s] Call CollectionSelector(v = %d)\n", get_pid().c_str(), v); + return + rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)). + take(3); + }, + [](int v_main, long v_sub){ + printf("[thread %s] Call ResultSelector(v_main = %d, v_sub = %d)\n", get_pid().c_str(), v_main, v_sub); + return std::make_tuple(v_main, v_sub); + }, + rxcpp::observe_on_new_thread()); + values. + as_blocking(). + subscribe( + [](std::tuple<int, long> v){printf("[thread %s] OnNext: %d - %d\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v));}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [threaded concat_map sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/distinct_until_changed.cpp b/Rx/v2/examples/doxygen/distinct_until_changed.cpp new file mode 100644 index 0000000..9422b21 --- /dev/null +++ b/Rx/v2/examples/doxygen/distinct_until_changed.cpp @@ -0,0 +1,15 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("distinct_until_changed sample"){ + printf("//! [distinct_until_changed sample]\n"); + auto values = rxcpp::observable<>::from(1, 2, 2, 3, 3, 3, 4, 5, 5).distinct_until_changed(); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [distinct_until_changed sample]\n"); +} + diff --git a/Rx/v2/examples/doxygen/filter.cpp b/Rx/v2/examples/doxygen/filter.cpp new file mode 100644 index 0000000..36a1f49 --- /dev/null +++ b/Rx/v2/examples/doxygen/filter.cpp @@ -0,0 +1,17 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("filter sample"){ + printf("//! [filter sample]\n"); + auto values = rxcpp::observable<>::range(1, 6). + filter([](int v){ + return v % 2; + }); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [filter sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/finally.cpp b/Rx/v2/examples/doxygen/finally.cpp new file mode 100644 index 0000000..3f25196 --- /dev/null +++ b/Rx/v2/examples/doxygen/finally.cpp @@ -0,0 +1,37 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("finally sample"){ + printf("//! [finally sample]\n"); + auto values = rxcpp::observable<>::range(1, 3). + finally([](){ + printf("The final action\n"); + }); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [finally sample]\n"); +} + +SCENARIO("error finally sample"){ + printf("//! [error finally sample]\n"); + auto values = rxcpp::observable<>::range(1, 3). + concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))). + finally([](){ + printf("The final action\n"); + }); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](std::exception_ptr ep){ + try {std::rethrow_exception(ep);} + catch (const std::exception& ex) { + printf("OnError: %s\n", ex.what()); + } + }, + [](){printf("OnCompleted\n");}); + printf("//! [error finally sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/flat_map.cpp b/Rx/v2/examples/doxygen/flat_map.cpp new file mode 100644 index 0000000..21db657 --- /dev/null +++ b/Rx/v2/examples/doxygen/flat_map.cpp @@ -0,0 +1,50 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("flat_map sample"){ + printf("//! [flat_map sample]\n"); + auto values = rxcpp::observable<>::range(1, 3). + flat_map( + [](int v){ + return + rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)). + take(3); + }, + [](int v_main, long v_sub){ + return std::make_tuple(v_main, v_sub); + }); + values. + subscribe( + [](std::tuple<int, long> v){printf("OnNext: %d - %d\n", std::get<0>(v), std::get<1>(v));}, + [](){printf("OnCompleted\n");}); + printf("//! [flat_map sample]\n"); +} + +std::string get_pid(); + +SCENARIO("threaded flat_map sample"){ + printf("//! [threaded flat_map sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto values = rxcpp::observable<>::range(1, 3). + flat_map( + [](int v){ + printf("[thread %s] Call CollectionSelector(v = %d)\n", get_pid().c_str(), v); + return + rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)). + take(3); + }, + [](int v_main, int v_sub){ + printf("[thread %s] Call ResultSelector(v_main = %d, v_sub = %d)\n", get_pid().c_str(), v_main, v_sub); + return std::make_tuple(v_main, v_sub); + }, + rxcpp::observe_on_new_thread()); + values. + as_blocking(). + subscribe( + [](std::tuple<int, long> v){printf("[thread %s] OnNext: %d - %d\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v));}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [threaded flat_map sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/from.cpp b/Rx/v2/examples/doxygen/from.cpp index 03288eb..15186ba 100644 --- a/Rx/v2/examples/doxygen/from.cpp +++ b/Rx/v2/examples/doxygen/from.cpp @@ -13,14 +13,21 @@ SCENARIO("from sample"){ printf("//! [from sample]\n"); } +std::string get_pid(); + SCENARIO("threaded from sample"){ printf("//! [threaded from sample]\n"); - auto values = rxcpp::observable<>::from(rxcpp::observe_on_event_loop(), 1, 2, 3); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto values = rxcpp::observable<>::from(rxcpp::observe_on_new_thread(), 1, 2, 3).map([](int v){ + printf("[thread %s] Emit value: %d\n", get_pid().c_str(), v); + return v; + }); values. as_blocking(). subscribe( - [](int v){printf("OnNext: %d\n", v);}, - [](){printf("OnCompleted\n");}); + [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); printf("//! [threaded from sample]\n"); } diff --git a/Rx/v2/examples/doxygen/group_by.cpp b/Rx/v2/examples/doxygen/group_by.cpp new file mode 100644 index 0000000..d30f334 --- /dev/null +++ b/Rx/v2/examples/doxygen/group_by.cpp @@ -0,0 +1,54 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("group_by sample"){ + printf("//! [group_by sample]\n"); + auto values = rxcpp::observable<>::range(0, 8). + group_by( + [](int v){return v % 3;}, + [](int v){return 10 * v;}); + values. + subscribe( + [](rxcpp::grouped_observable<int, int> g){ + auto key = g.get_key(); + printf("OnNext: key = %d\n", key); + g.subscribe( + [key](int v){printf("[key %d] OnNext: %d\n", key, v);}, + [key](){printf("[key %d] OnCompleted\n", key);}); + }, + [](){printf("OnCompleted\n");}); + printf("//! [group_by sample]\n"); +} + +//! [group_by full intro] +bool less(int v1, int v2){ + return v1 < v2; +} +//! [group_by full intro] + +SCENARIO("group_by full sample"){ + printf("//! [group_by full sample]\n"); + auto data = rxcpp::observable<>::range(0, 8). + map([](int v){ + std::stringstream s; + s << "Value " << v; + return std::make_pair(v % 3, s.str()); + }); + auto values = data.group_by( + [](std::pair<int, std::string> v){return v.first;}, + [](std::pair<int, std::string> v){return v.second;}, + less); + values. + subscribe( + [](rxcpp::grouped_observable<int, std::string> g){ + auto key = g.get_key(); + printf("OnNext: key = %d\n", key); + g.subscribe( + [key](const std::string& v){printf("[key %d] OnNext: %s\n", key, v.c_str());}, + [key](){printf("[key %d] OnCompleted\n", key);}); + }, + [](){printf("OnCompleted\n");}); + printf("//! [group_by full sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/map.cpp b/Rx/v2/examples/doxygen/map.cpp new file mode 100644 index 0000000..8db5030 --- /dev/null +++ b/Rx/v2/examples/doxygen/map.cpp @@ -0,0 +1,17 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("map sample"){ + printf("//! [map sample]\n"); + auto values = rxcpp::observable<>::range(1, 3). + map([](int v){ + return 2 * v; + }); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [map sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/math.cpp b/Rx/v2/examples/doxygen/math.cpp new file mode 100644 index 0000000..dcf3a90 --- /dev/null +++ b/Rx/v2/examples/doxygen/math.cpp @@ -0,0 +1,89 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("first sample"){ + printf("//! [first sample]\n"); + auto values = rxcpp::observable<>::range(1, 3).first(); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [first sample]\n"); +} + +//SCENARIO("first empty sample"){ +// printf("//! [first empty sample]\n"); +// auto values = rxcpp::observable<>::empty<int>().first(); +// values. +// subscribe( +// [](int v){printf("OnNext: %d\n", v);}, +// [](std::exception_ptr ep){ +// try {std::rethrow_exception(ep);} +// catch (const std::exception& ex) { +// printf("OnError: %s\n", ex.what()); +// } +// catch (...) { +// printf("OnError:\n"); +// } +// }, +// [](){printf("OnCompleted\n");}); +// printf("//! [first empty sample]\n"); +//} + +SCENARIO("last sample"){ + printf("//! [last sample]\n"); + auto values = rxcpp::observable<>::range(1, 3).last(); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [last sample]\n"); +} + +//SCENARIO("last empty sample"){ +// printf("//! [last empty sample]\n"); +// auto values = rxcpp::observable<>::empty<int>().last(); +// values. +// subscribe( +// [](int v){printf("OnNext: %d\n", v);}, +// [](std::exception_ptr ep){ +// try {std::rethrow_exception(ep);} +// catch (const std::exception& ex) { +// printf("OnError: %s\n", ex.what()); +// } +// }, +// [](){printf("OnCompleted\n");}); +// printf("//! [last empty sample]\n"); +//} + +SCENARIO("count sample"){ + printf("//! [count sample]\n"); + auto values = rxcpp::observable<>::range(1, 3).count(); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [count sample]\n"); +} + +SCENARIO("sum sample"){ + printf("//! [sum sample]\n"); + auto values = rxcpp::observable<>::range(1, 3).sum(); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [sum sample]\n"); +} + +SCENARIO("average sample"){ + printf("//! [average sample]\n"); + auto values = rxcpp::observable<>::range(1, 4).average(); + values. + subscribe( + [](double v){printf("OnNext: %lf\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [average sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/merge.cpp b/Rx/v2/examples/doxygen/merge.cpp new file mode 100644 index 0000000..4b5ea9a --- /dev/null +++ b/Rx/v2/examples/doxygen/merge.cpp @@ -0,0 +1,84 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("merge sample"){ + printf("//! [merge sample]\n"); + auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;}); + auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;}); + auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;}); + auto values = o1.merge(o2, o3); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [merge sample]\n"); +} + +SCENARIO("implicit merge sample"){ + printf("//! [implicit merge sample]\n"); + auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;}).as_dynamic(); + auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;}).as_dynamic(); + auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;}).as_dynamic(); + auto base = rxcpp::observable<>::from(o1, o2, o3); + auto values = base.merge(); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [implicit merge sample]\n"); +} + +std::string get_pid(); + +SCENARIO("threaded merge sample"){ + printf("//! [threaded merge sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) { + printf("[thread %s] Timer1 fired\n", get_pid().c_str()); + return 1; + }); + auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(20)).map([](int) { + printf("[thread %s] Timer2 fired\n", get_pid().c_str()); + return 2; + }); + auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(30)).map([](int) { + printf("[thread %s] Timer3 fired\n", get_pid().c_str()); + return 3; + }); + auto values = o1.merge(rxcpp::observe_on_new_thread(), o2, o3); + values. + as_blocking(). + subscribe( + [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [threaded merge sample]\n"); +} + +SCENARIO("threaded implicit merge sample"){ + printf("//! [threaded implicit merge sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) { + printf("[thread %s] Timer1 fired\n", get_pid().c_str()); + return 1; + }).as_dynamic(); + auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(20)).map([](int) { + printf("[thread %s] Timer2 fired\n", get_pid().c_str()); + return 2; + }).as_dynamic(); + auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(30)).map([](int) { + printf("[thread %s] Timer3 fired\n", get_pid().c_str()); + return 3; + }).as_dynamic(); + auto base = rxcpp::observable<>::from(o1, o2, o3); + auto values = base.merge(rxcpp::observe_on_new_thread()); + values. + as_blocking(). + subscribe( + [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [threaded implicit merge sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/observe_on.cpp b/Rx/v2/examples/doxygen/observe_on.cpp new file mode 100644 index 0000000..927c339 --- /dev/null +++ b/Rx/v2/examples/doxygen/observe_on.cpp @@ -0,0 +1,24 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +std::string get_pid(); + +SCENARIO("observe_on sample"){ + printf("//! [observe_on sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto values = rxcpp::observable<>::range(1, 3). + map([](int v){ + printf("[thread %s] Emit value %d\n", get_pid().c_str(), v); + return v; + }); + values. + observe_on(rxcpp::synchronize_new_thread()). + as_blocking(). + subscribe( + [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [observe_on sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/pairwise.cpp b/Rx/v2/examples/doxygen/pairwise.cpp new file mode 100644 index 0000000..3dd8d34 --- /dev/null +++ b/Rx/v2/examples/doxygen/pairwise.cpp @@ -0,0 +1,51 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("pairwise sample"){ + printf("//! [pairwise sample]\n"); + auto values = rxcpp::observable<>::range(1, 5).pairwise(); + values. + subscribe( + [](std::tuple<int, int> v){printf("OnNext: %d, %d\n", std::get<0>(v), std::get<1>(v));}, + [](){printf("OnCompleted\n");}); + printf("//! [pairwise sample]\n"); +} + +SCENARIO("pairwise short sample"){ + printf("//! [pairwise short sample]\n"); + auto values = rxcpp::observable<>::just(1).pairwise(); + values. + subscribe( + [](std::tuple<int, int> v){printf("OnNext: %d, %d\n", std::get<0>(v), std::get<1>(v));}, + [](){printf("OnCompleted\n");}); + printf("//! [pairwise short sample]\n"); +} + +//std::string get_pid(); +// +//SCENARIO("threaded flat_map sample"){ +// printf("//! [threaded flat_map sample]\n"); +// printf("[thread %s] Start task\n", get_pid().c_str()); +// auto values = rxcpp::observable<>::range(1, 3). +// flat_map( +// [](int v){ +// printf("[thread %s] Call CollectionSelector(v = %d)\n", get_pid().c_str(), v); +// return +// rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)). +// take(3); +// }, +// [](int v_main, int v_sub){ +// printf("[thread %s] Call ResultSelector(v_main = %d, v_sub = %d)\n", get_pid().c_str(), v_main, v_sub); +// return std::make_tuple(v_main, v_sub); +// }, +// rxcpp::observe_on_new_thread()); +// values. +// as_blocking(). +// subscribe( +// [](std::tuple<int, long> v){printf("[thread %s] OnNext: %d - %d\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v));}, +// [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); +// printf("[thread %s] Finish task\n", get_pid().c_str()); +// printf("//! [threaded flat_map sample]\n"); +//} diff --git a/Rx/v2/examples/doxygen/publish.cpp b/Rx/v2/examples/doxygen/publish.cpp new file mode 100644 index 0000000..700fa89 --- /dev/null +++ b/Rx/v2/examples/doxygen/publish.cpp @@ -0,0 +1,97 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("publish_synchronized sample"){ + printf("//! [publish_synchronized sample]\n"); + auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)). + take(5). + publish_synchronized(rxcpp::observe_on_new_thread()); + + // Subscribe from the beginning + values.subscribe( + [](long v){printf("[1] OnNext: %d\n", v);}, + [](){printf("[1] OnCompleted\n");}); + + // Another subscription from the beginning + values.subscribe( + [](long v){printf("[2] OnNext: %d\n", v);}, + [](){printf("[2] OnCompleted\n");}); + + // Start emitting + values.connect(); + + // Wait before subscribing + rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){ + values.subscribe( + [](long v){printf("[3] OnNext: %d\n", v);}, + [](){printf("[3] OnCompleted\n");}); + }); + + // Add blocking subscription to see results + values.as_blocking().subscribe(); + printf("//! [publish_synchronized sample]\n"); +} + +SCENARIO("publish subject sample"){ + printf("//! [publish subject sample]\n"); + auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()). + take(5). + publish(); + + // Subscribe from the beginning + values.subscribe( + [](long v){printf("[1] OnNext: %d\n", v);}, + [](){printf("[1] OnCompleted\n");}); + + // Another subscription from the beginning + values.subscribe( + [](long v){printf("[2] OnNext: %d\n", v);}, + [](){printf("[2] OnCompleted\n");}); + + // Start emitting + values.connect(); + + // Wait before subscribing + rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){ + values.subscribe( + [](long v){printf("[3] OnNext: %d\n", v);}, + [](){printf("[3] OnCompleted\n");}); + }); + + // Add blocking subscription to see results + values.as_blocking().subscribe(); + printf("//! [publish subject sample]\n"); +} + +SCENARIO("publish behavior sample"){ + printf("//! [publish behavior sample]\n"); + auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()). + take(5). + publish(0L); + + // Subscribe from the beginning + values.subscribe( + [](long v){printf("[1] OnNext: %d\n", v);}, + [](){printf("[1] OnCompleted\n");}); + + // Another subscription from the beginning + values.subscribe( + [](long v){printf("[2] OnNext: %d\n", v);}, + [](){printf("[2] OnCompleted\n");}); + + // Start emitting + values.connect(); + + // Wait before subscribing + rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){ + values.subscribe( + [](long v){printf("[3] OnNext: %d\n", v);}, + [](){printf("[3] OnCompleted\n");}); + }); + + // Add blocking subscription to see results + values.as_blocking().subscribe(); + printf("//! [publish behavior sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/reduce.cpp b/Rx/v2/examples/doxygen/reduce.cpp new file mode 100644 index 0000000..0005bf6 --- /dev/null +++ b/Rx/v2/examples/doxygen/reduce.cpp @@ -0,0 +1,24 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("reduce sample"){ + printf("//! [reduce sample]\n"); + auto values = rxcpp::observable<>::range(1, 7). + reduce( + std::make_pair(0, 1.0), + [](std::pair<int, double> seed, int v){ + seed.first += 1; + seed.second *= v; + return seed; + }, + [](std::pair<int, double> res){ + return std::pow(res.second, 1.0 / res.first); + }); + values. + subscribe( + [](double v){printf("OnNext: %lf\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [reduce sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/repeat.cpp b/Rx/v2/examples/doxygen/repeat.cpp new file mode 100644 index 0000000..9cd8d43 --- /dev/null +++ b/Rx/v2/examples/doxygen/repeat.cpp @@ -0,0 +1,44 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("repeat sample"){ + printf("//! [repeat sample]\n"); + auto values = rxcpp::observable<>::from(1, 2). + repeat(). + take(5); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [repeat sample]\n"); +} + +SCENARIO("repeat count sample"){ + printf("//! [repeat count sample]\n"); + auto values = rxcpp::observable<>::from(1, 2).repeat(3); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [repeat count sample]\n"); +} + +SCENARIO("repeat error sample"){ + printf("//! [repeat error sample]\n"); + auto values = rxcpp::observable<>::from(1, 2). + concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))). + repeat(); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](std::exception_ptr ep){ + try {std::rethrow_exception(ep);} + catch (const std::exception& ex) { + printf("OnError: %s\n", ex.what()); + } + }, + [](){printf("OnCompleted\n");}); + printf("//! [repeat error sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/retry.cpp b/Rx/v2/examples/doxygen/retry.cpp new file mode 100644 index 0000000..efcfb23 --- /dev/null +++ b/Rx/v2/examples/doxygen/retry.cpp @@ -0,0 +1,84 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("retry sample"){ + printf("//! [retry sample]\n"); + auto values = rxcpp::observable<>::from(1, 2). + concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))). + retry(). + take(5); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [retry sample]\n"); +} + +SCENARIO("retry count sample"){ + printf("//! [retry count sample]\n"); + auto source = rxcpp::observable<>::from(1, 2). + concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))); + auto values = source.retry(3); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](std::exception_ptr ep){ + try {std::rethrow_exception(ep);} + catch (const std::exception& ex) { + printf("OnError: %s\n", ex.what()); + } + }, + [](){printf("OnCompleted\n");}); + printf("//! [retry count sample]\n"); +} + +//SCENARIO("retry hot sample"){ +// printf("//! [retry hot sample]\n"); +// auto values = rxcpp::observable<>::timer(std::chrono::milliseconds(10)). +// concat(rxcpp::observable<>::error<long>(std::runtime_error("Error1 from source"))). +// concat(rxcpp::observable<>::timer(std::chrono::milliseconds(10))). +// concat(rxcpp::observable<>::error<long>(std::runtime_error("Error2 from source"))). +// concat(rxcpp::observable<>::timer(std::chrono::milliseconds(10))). +// concat(rxcpp::observable<>::error<long>(std::runtime_error("Error3 from source"))). +// concat(rxcpp::observable<>::timer(std::chrono::milliseconds(10))). +// concat(rxcpp::observable<>::error<long>(std::runtime_error("Error4 from source"))). +// retry(3); +// values. +// subscribe( +// [](long v){printf("OnNext: %d\n", v);}, +// [](std::exception_ptr ep){ +// try {std::rethrow_exception(ep);} +// catch (const std::exception& ex) { +// printf("OnError: %s\n", ex.what()); +// } +// }, +// [](){printf("OnCompleted\n");}); +// printf("//! [retry hot sample]\n"); +//} +// +//SCENARIO("retry completed sample"){ +// printf("//! [retry completed sample <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<]\n"); +// auto source = rxcpp::observable<>::from(1, 2). +// concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))). +// publish(); +// auto values = source.retry(); +// //auto values = rxcpp::observable<>::timer(std::chrono::milliseconds(10)). +// // concat(rxcpp::observable<>::error<long>(std::runtime_error("Error1 from source"))). +// // concat(rxcpp::observable<>::timer(std::chrono::milliseconds(10))). +// // concat(rxcpp::observable<>::error<long>(std::runtime_error("Error2 from source"))). +// // concat(rxcpp::observable<>::timer(std::chrono::milliseconds(10))). +// // retry(3); +// values. +// subscribe( +// [](long v){printf("OnNext: %d\n", v);}, +// [](std::exception_ptr ep){ +// try {std::rethrow_exception(ep);} +// catch (const std::exception& ex) { +// printf("OnError: %s\n", ex.what()); +// } +// }, +// [](){printf("OnCompleted\n");}); +// printf("//! [retry completed sample]\n"); +//} diff --git a/Rx/v2/examples/doxygen/scan.cpp b/Rx/v2/examples/doxygen/scan.cpp new file mode 100644 index 0000000..2a49efd --- /dev/null +++ b/Rx/v2/examples/doxygen/scan.cpp @@ -0,0 +1,19 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("scan sample"){ + printf("//! [scan sample]\n"); + auto values = rxcpp::observable<>::range(1, 7). + scan( + 0, + [](int seed, int v){ + return seed + v; + }); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [scan sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/skip.cpp b/Rx/v2/examples/doxygen/skip.cpp new file mode 100644 index 0000000..e5b5979 --- /dev/null +++ b/Rx/v2/examples/doxygen/skip.cpp @@ -0,0 +1,14 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("skip sample"){ + printf("//! [skip sample]\n"); + auto values = rxcpp::observable<>::range(1, 7).skip(3); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [skip sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/skip_until.cpp b/Rx/v2/examples/doxygen/skip_until.cpp new file mode 100644 index 0000000..3e0202f --- /dev/null +++ b/Rx/v2/examples/doxygen/skip_until.cpp @@ -0,0 +1,39 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("skip_until sample"){ + printf("//! [skip_until sample]\n"); + auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7); + auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25)); + auto values = source.skip_until(trigger); + values. + subscribe( + [](long v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [skip_until sample]\n"); +} + +std::string get_pid(); + +SCENARIO("threaded skip_until sample"){ + printf("//! [threaded skip_until sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7).map([](long v){ + printf("[thread %s] Source emits, value = %d\n", get_pid().c_str(), v); + return v; + }); + auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25)).map([](long v){ + printf("[thread %s] Trigger emits, value = %d\n", get_pid().c_str(), v); + return v; + }); + auto values = source.skip_until(trigger, rxcpp::observe_on_new_thread()); + values. + as_blocking(). + subscribe( + [](long v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [threaded skip_until sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/subscribe.cpp b/Rx/v2/examples/doxygen/subscribe.cpp new file mode 100644 index 0000000..e7c3435 --- /dev/null +++ b/Rx/v2/examples/doxygen/subscribe.cpp @@ -0,0 +1,101 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("subscribe by subscriber"){ + printf("//! [subscribe by subscriber]\n"); + auto subscriber = rxcpp::make_subscriber<int>( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + auto values = rxcpp::observable<>::range(1, 3); + values.subscribe(subscriber); + printf("//! [subscribe by subscriber]\n"); +} + +SCENARIO("subscribe by observer"){ + printf("//! [subscribe by observer]\n"); + auto subscriber = rxcpp::make_subscriber<int>( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + auto values1 = rxcpp::observable<>::range(1, 3); + auto values2 = rxcpp::observable<>::range(4, 6); + values1.subscribe(subscriber.get_observer()); + values2.subscribe(subscriber.get_observer()); + printf("//! [subscribe by observer]\n"); +} + +SCENARIO("subscribe by on_next"){ + printf("//! [subscribe by on_next]\n"); + auto values = rxcpp::observable<>::range(1, 3); + values.subscribe( + [](int v){printf("OnNext: %d\n", v);}); + printf("//! [subscribe by on_next]\n"); +} + +SCENARIO("subscribe by on_next and on_error"){ + printf("//! [subscribe by on_next and on_error]\n"); + auto values = rxcpp::observable<>::range(1, 3). + concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))); + values.subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](std::exception_ptr ep){ + try {std::rethrow_exception(ep);} + catch (const std::exception& ex) { + printf("OnError: %s\n", ex.what()); + } + }); + printf("//! [subscribe by on_next and on_error]\n"); +} + +SCENARIO("subscribe by on_next and on_completed"){ + printf("//! [subscribe by on_next and on_completed]\n"); + auto values = rxcpp::observable<>::range(1, 3); + values.subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [subscribe by on_next and on_completed]\n"); +} + +SCENARIO("subscribe by subscription, on_next, and on_completed"){ + printf("//! [subscribe by subscription, on_next, and on_completed]\n"); + auto subscription = rxcpp::composite_subscription(); + auto values = rxcpp::observable<>::range(1, 5); + values.subscribe( + subscription, + [&subscription](int v){ + printf("OnNext: %d\n", v); + if (v == 3) + subscription.unsubscribe(); + }, + [](){printf("OnCompleted\n");}); + printf("//! [subscribe by subscription, on_next, and on_completed]\n"); +} + +SCENARIO("subscribe by on_next, on_error, and on_completed"){ + printf("//! [subscribe by on_next, on_error, and on_completed]\n"); + auto values = rxcpp::observable<>::range(1, 3). + concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))); + values.subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](std::exception_ptr ep){ + try {std::rethrow_exception(ep);} + catch (const std::exception& ex) { + printf("OnError: %s\n", ex.what()); + } + }, + [](){printf("OnCompleted\n");}); + printf("//! [subscribe by on_next, on_error, and on_completed]\n"); +} + +SCENARIO("subscribe unsubscribe"){ + printf("//! [subscribe unsubscribe]\n"); + auto values = rxcpp::observable<>::range(1, 3). + concat(rxcpp::observable<>::never<int>()). + finally([](){printf("The final action\n");}); + auto subscription = values.subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + subscription.unsubscribe(); + printf("//! [subscribe unsubscribe]\n"); +} diff --git a/Rx/v2/examples/doxygen/subscribe_on.cpp b/Rx/v2/examples/doxygen/subscribe_on.cpp new file mode 100644 index 0000000..e2614bc --- /dev/null +++ b/Rx/v2/examples/doxygen/subscribe_on.cpp @@ -0,0 +1,24 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +std::string get_pid(); + +SCENARIO("subscribe_on sample"){ + printf("//! [subscribe_on sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto values = rxcpp::observable<>::range(1, 3). + map([](int v){ + printf("[thread %s] Emit value %d\n", get_pid().c_str(), v); + return v; + }); + values. + subscribe_on(rxcpp::synchronize_new_thread()). + as_blocking(). + subscribe( + [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [subscribe_on sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/switch_on_next.cpp b/Rx/v2/examples/doxygen/switch_on_next.cpp new file mode 100644 index 0000000..6e2da70 --- /dev/null +++ b/Rx/v2/examples/doxygen/switch_on_next.cpp @@ -0,0 +1,35 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("switch_on_next sample"){ + printf("//! [switch_on_next sample]\n"); + auto base = rxcpp::observable<>::interval(std::chrono::milliseconds(30)). + take(3). + map([](int){ + return rxcpp::observable<>::interval(std::chrono::milliseconds(10)).as_dynamic(); + }); + auto values = base.switch_on_next().take(10); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [switch_on_next sample]\n"); +} + +SCENARIO("threaded switch_on_next sample"){ + printf("//! [threaded switch_on_next sample]\n"); + auto base = rxcpp::observable<>::interval(std::chrono::milliseconds(30)). + take(3). + map([](long){ + return rxcpp::observable<>::interval(std::chrono::milliseconds(10), rxcpp::observe_on_event_loop()).as_dynamic(); + }); + auto values = base.switch_on_next(rxcpp::observe_on_new_thread()).take(10); + values. + as_blocking(). + subscribe( + [](long v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [threaded switch_on_next sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/take.cpp b/Rx/v2/examples/doxygen/take.cpp new file mode 100644 index 0000000..be3be24 --- /dev/null +++ b/Rx/v2/examples/doxygen/take.cpp @@ -0,0 +1,15 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + + +SCENARIO("take sample"){ + printf("//! [take sample]\n"); + auto values = rxcpp::observable<>::range(1, 7).take(3); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [take sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/take_until.cpp b/Rx/v2/examples/doxygen/take_until.cpp new file mode 100644 index 0000000..098e7c1 --- /dev/null +++ b/Rx/v2/examples/doxygen/take_until.cpp @@ -0,0 +1,68 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("take_until sample"){ + printf("//! [take_until sample]\n"); + auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7); + auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25)); + auto values = source.take_until(trigger); + values. + subscribe( + [](long v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [take_until sample]\n"); +} + +SCENARIO("take_until time sample"){ + printf("//! [take_until time sample]\n"); + auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7); + auto values = source.take_until(std::chrono::steady_clock::now() + std::chrono::milliseconds(25)); + values. + subscribe( + [](long v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [take_until time sample]\n"); +} + +std::string get_pid(); + +SCENARIO("threaded take_until sample"){ + printf("//! [threaded take_until sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7).map([](long v){ + printf("[thread %s] Source emits, value = %d\n", get_pid().c_str(), v); + return v; + }); + auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25)).map([](long v){ + printf("[thread %s] Trigger emits, value = %d\n", get_pid().c_str(), v); + return v; + }); + auto values = source.take_until(trigger, rxcpp::observe_on_new_thread()); + values. + as_blocking(). + subscribe( + [](long v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [threaded take_until sample]\n"); +} + +SCENARIO("threaded take_until time sample"){ + printf("//! [threaded take_until time sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7).map([](long v){ + printf("[thread %s] Source emits, value = %d\n", get_pid().c_str(), v); + return v; + }).as_dynamic(); + auto scheduler = rxcpp::observe_on_new_thread(); + auto values = source.take_until(scheduler.now() + std::chrono::milliseconds(25), scheduler); + values. + as_blocking(). + subscribe( + [](long v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [threaded take_until time sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/window.cpp b/Rx/v2/examples/doxygen/window.cpp new file mode 100644 index 0000000..0a0ba63 --- /dev/null +++ b/Rx/v2/examples/doxygen/window.cpp @@ -0,0 +1,196 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("window count sample"){ + printf("//! [window count sample]\n"); + int counter = 0; + auto values = rxcpp::observable<>::range(1, 5).window(2); + values. + subscribe( + [&counter](rxcpp::observable<int> v){ + int id = counter++; + printf("[window %d] Create window\n", id); + v.subscribe( + [id](int v){printf("[window %d] OnNext: %d\n", id, v);}, + [id](){printf("[window %d] OnCompleted\n", id);}); + }); + printf("//! [window count sample]\n"); +} + +SCENARIO("window count+skip sample"){ + printf("//! [window count+skip sample]\n"); + int counter = 0; + auto values = rxcpp::observable<>::range(1, 7).window(2, 3); + values. + subscribe( + [&counter](rxcpp::observable<int> v){ + int id = counter++; + printf("[window %d] Create window\n", id); + v.subscribe( + [id](int v){printf("[window %d] OnNext: %d\n", id, v);}, + [id](){printf("[window %d] OnCompleted\n", id);}); + }); + printf("//! [window count+skip sample]\n"); +} + +SCENARIO("window period+skip+coordination sample"){ + printf("//! [window period+skip+coordination sample]\n"); + int counter = 0; + auto period = std::chrono::milliseconds(4); + auto skip = std::chrono::milliseconds(6); + auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)). + take(7). + window_with_time(period, skip, rxcpp::observe_on_new_thread()); + values. + as_blocking(). + subscribe( + [&counter](rxcpp::observable<long> v){ + int id = counter++; + printf("[window %d] Create window\n", id); + v.subscribe( + [id](long v){printf("[window %d] OnNext: %d\n", id, v);}, + [id](){printf("[window %d] OnCompleted\n", id);}); + }); + printf("//! [window period+skip+coordination sample]\n"); +} + +SCENARIO("window period+skip sample"){ + printf("//! [window period+skip sample]\n"); + int counter = 0; + auto period = std::chrono::milliseconds(4); + auto skip = std::chrono::milliseconds(6); + auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)). + take(7). + window_with_time(period, skip); + values. + subscribe( + [&counter](rxcpp::observable<long> v){ + int id = counter++; + printf("[window %d] Create window\n", id); + v.subscribe( + [id](long v){printf("[window %d] OnNext: %d\n", id, v);}, + [id](){printf("[window %d] OnCompleted\n", id);}); + }); + printf("//! [window period+skip sample]\n"); +} + +SCENARIO("window period+skip overlapping sample"){ + printf("//! [window period+skip overlapping sample]\n"); + int counter = 0; + auto period = std::chrono::milliseconds(6); + auto skip = std::chrono::milliseconds(4); + auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)). + take(7). + window_with_time(period, skip); + values. + subscribe( + [&counter](rxcpp::observable<long> v){ + int id = counter++; + printf("[window %d] Create window\n", id); + v.subscribe( + [id](long v){printf("[window %d] OnNext: %d\n", id, v);}, + [id](){printf("[window %d] OnCompleted\n", id);}); + }); + printf("//! [window period+skip overlapping sample]\n"); +} + +SCENARIO("window period+skip empty sample"){ + printf("//! [window period+skip empty sample]\n"); + int counter = 0; + auto period = std::chrono::milliseconds(2); + auto skip = std::chrono::milliseconds(4); + auto values = rxcpp::observable<>::timer(std::chrono::milliseconds(10)). + window_with_time(period, skip); + values. + subscribe( + [&counter](rxcpp::observable<long> v){ + int id = counter++; + printf("[window %d] Create window\n", id); + v.subscribe( + [id](long v){printf("[window %d] OnNext: %d\n", id, v);}, + [id](){printf("[window %d] OnCompleted\n", id);}); + }); + printf("//! [window period+skip empty sample]\n"); +} + +SCENARIO("window period+coordination sample"){ + printf("//! [window period+coordination sample]\n"); + int counter = 0; + auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)). + take(7). + window_with_time(std::chrono::milliseconds(4), rxcpp::observe_on_new_thread()); + values. + as_blocking(). + subscribe( + [&counter](rxcpp::observable<long> v){ + int id = counter++; + printf("[window %d] Create window\n", id); + v.subscribe( + [id](long v){printf("[window %d] OnNext: %d\n", id, v);}, + [id](){printf("[window %d] OnCompleted\n", id);}); + }); + printf("//! [window period+coordination sample]\n"); +} + +SCENARIO("window period sample"){ + printf("//! [window period sample]\n"); + int counter = 0; + auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)). + take(7). + window_with_time(std::chrono::milliseconds(4)); + values. + subscribe( + [&counter](rxcpp::observable<long> v){ + int id = counter++; + printf("[window %d] Create window\n", id); + v.subscribe( + [id](long v){printf("[window %d] OnNext: %d\n", id, v);}, + [id](){printf("[window %d] OnCompleted\n", id);}); + }); + printf("//! [window period sample]\n"); +} + +SCENARIO("window period+count+coordination sample"){ + printf("//! [window period+count+coordination sample]\n"); + int counter = 0; + auto start = std::chrono::steady_clock::now(); + auto int1 = rxcpp::observable<>::range(1L, 3L); + auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50)); + auto values = int1. + concat(int2). + window_with_time_or_count(std::chrono::milliseconds(20), 2, rxcpp::observe_on_event_loop()); + values. + as_blocking(). + subscribe( + [&counter](rxcpp::observable<long> v){ + int id = counter++; + printf("[window %d] Create window\n", id); + v.subscribe( + [id](long v){printf("[window %d] OnNext: %d\n", id, v);}, + [id](){printf("[window %d] OnCompleted\n", id);}); + }); + printf("//! [window period+count+coordination sample]\n"); +} + +SCENARIO("window period+count sample"){ + printf("//! [window period+count sample]\n"); + int counter = 0; + auto start = std::chrono::steady_clock::now(); + auto int1 = rxcpp::observable<>::range(1L, 3L); + auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50)); + auto values = int1. + concat(int2). + window_with_time_or_count(std::chrono::milliseconds(20), 2); + values. + subscribe( + [&counter](rxcpp::observable<long> v){ + int id = counter++; + printf("[window %d] Create window\n", id); + v.subscribe( + [id](long v){printf("[window %d] OnNext: %d\n", id, v);}, + [id](){printf("[window %d] OnCompleted\n", id);}); + }); + printf("//! [window period+count sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/zip.cpp b/Rx/v2/examples/doxygen/zip.cpp new file mode 100644 index 0000000..9085110 --- /dev/null +++ b/Rx/v2/examples/doxygen/zip.cpp @@ -0,0 +1,85 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("zip sample"){ + printf("//! [zip sample]\n"); + auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1)); + auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)); + auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)); + auto values = o1.zip(o2, o3); + values. + take(3). + subscribe( + [](std::tuple<int, int, int> v){printf("OnNext: %d, %d, %d\n", std::get<0>(v), std::get<1>(v), std::get<2>(v));}, + [](){printf("OnCompleted\n");}); + printf("//! [zip sample]\n"); +} + +std::string get_pid(); + +SCENARIO("Coordination zip sample"){ + printf("//! [Coordination zip sample]\n"); + printf("[thread %s] Start task\n", get_pid().c_str()); + auto thr = rxcpp::synchronize_event_loop(); + auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1)).map([](int v) { + printf("[thread %s] Source1 OnNext: %d\n", get_pid().c_str(), v); + return v; + }); + auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)).map([](int v) { + printf("[thread %s] Source2 OnNext: %d\n", get_pid().c_str(), v); + return v; + }); + auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)).map([](int v) { + printf("[thread %s] Source3 OnNext: %d\n", get_pid().c_str(), v); + return v; + }); + auto values = o1.zip(thr, o2, o3); + values. + take(3). + as_blocking(). + subscribe( + [](std::tuple<int, int, int> v){printf("[thread %s] OnNext: %d, %d, %d\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v), std::get<2>(v));}, + [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); + printf("[thread %s] Finish task\n", get_pid().c_str()); + printf("//! [Coordination zip sample]\n"); +} + +SCENARIO("Selector zip sample"){ + printf("//! [Selector zip sample]\n"); + auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1)); + auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)); + auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)); + auto values = o1.zip( + [](int v1, int v2, int v3) { + return 100 * v1 + 10 * v2 + v3; + }, + o2, o3); + values. + take(3). + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [Selector zip sample]\n"); +} + +SCENARIO("Coordination+Selector zip sample"){ + printf("//! [Coordination+Selector zip sample]\n"); + auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1)); + auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)); + auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)); + auto values = o1.zip( + rxcpp::observe_on_new_thread(), + [](int v1, int v2, int v3) { + return 100 * v1 + 10 * v2 + v3; + }, + o2, o3); + values. + take(3). + as_blocking(). + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [Coordination+Selector zip sample]\n"); +} diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index ed2f52f..16ec44b 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -417,20 +417,34 @@ public: } #endif - /// - /// returns a new observable that performs type-forgetting conversion of this observable - /// + /*! Return a new observable that performs type-forgetting conversion of this observable. + + \return The source observable converted to observable<T>. + + \note This operator could be useful to workaround lambda deduction bug on msvc 2013. + + \sample + \snippet concat.cpp concat sample + \snippet output.txt concat sample + */ observable<T> as_dynamic() const { return *this; } - /// - /// returns new observable that contains the blocking methods for this observable - /// + /*! Return a new observable that contains the blocking methods for this observable. + + \return An observable that contains the blocking methods for this observable. + + \sample + \snippet from.cpp threaded from sample + \snippet output.txt threaded from sample + */ blocking_observable<T, this_type> as_blocking() const { return blocking_observable<T, this_type>(*this); } + /// \cond SHOW_SERVICE_MEMBERS + /// /// takes any function that will take this observable and produce a result value. /// this is intended to allow externally defined operators, that use subscribe, @@ -479,195 +493,486 @@ public: decltype(rxs::from<ResultType>())>::type { return rxs::from<ResultType>(); } + /// \endcond - /// - /// subscribe will cause this observable to emit values to the provided subscriber. - /// callers must provide enough arguments to make a subscriber. - /// overrides are supported. thus - /// subscribe(thesubscriber, composite_subscription()) - /// will take thesubscriber.get_observer() and the provided - /// subscription and subscribe to the new subscriber. - /// the on_next, on_error, on_completed methods can be supplied instead of an observer - /// if a subscription or subscriber is not provided then a new subscription will be created. - /// + /*! Subscribe will cause this observable to emit values to the provided subscriber. + + \tparam ArgN types of the subscriber parameters + + \param an the parameters for making a subscriber + + \return A subscription with which the observer can stop receiving items before the observable has finished sending them. + + The arguments of subscribe are forwarded to rxcpp::make_subscriber function. Some possible alternatives are: + + - Pass an already composed rxcpp::subscriber: + \snippet subscribe.cpp subscribe by subscriber + \snippet output.txt subscribe by subscriber + + - Pass an rxcpp::observer. This allows subscribing the same subscriber to several observables: + \snippet subscribe.cpp subscribe by observer + \snippet output.txt subscribe by observer + + - Pass an `on_next` handler: + \snippet subscribe.cpp subscribe by on_next + \snippet output.txt subscribe by on_next + + - Pass `on_next` and `on_error` handlers: + \snippet subscribe.cpp subscribe by on_next and on_error + \snippet output.txt subscribe by on_next and on_error + + - Pass `on_next` and `on_completed` handlers: + \snippet subscribe.cpp subscribe by on_next and on_completed + \snippet output.txt subscribe by on_next and on_completed + + - Pass `on_next`, `on_error`, and `on_completed` handlers: + \snippet subscribe.cpp subscribe by on_next, on_error, and on_completed + \snippet output.txt subscribe by on_next, on_error, and on_completed + . + + All the alternatives above also support passing rxcpp::composite_subscription instance. For example: + \snippet subscribe.cpp subscribe by subscription, on_next, and on_completed + \snippet output.txt subscribe by subscription, on_next, and on_completed + + If neither subscription nor subscriber are provided, then a new subscription is created and returned as a result: + \snippet subscribe.cpp subscribe unsubscribe + \snippet output.txt subscribe unsubscribe + + For more details, see rxcpp::make_subscriber function description. + */ template<class... ArgN> auto subscribe(ArgN&&... an) const -> composite_subscription { return detail_subscribe(make_subscriber<T>(std::forward<ArgN>(an)...)); } - /// filter (AKA Where) -> - /// for each item from this observable use Predicate to select which items to emit from the new observable that is returned. - /// + /*! For each item from this observable use Predicate to select which items to emit from the new observable that is returned. + + \tparam Predicate the type of the filter function + + \param p the filter function + + \return Observable that emits only those items emitted by the source observable that the filter evaluates as true. + + \sample + \snippet filter.cpp filter sample + \snippet output.txt filter sample + */ template<class Predicate> auto filter(Predicate p) const - -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::filter<T, Predicate>(std::move(p)))) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::filter<T, Predicate>(std::move(p)))) + /// \endcond + { return lift<T>(rxo::detail::filter<T, Predicate>(std::move(p))); } - /// finally () -> - /// + /*! Add a new action at the end of the new observable that is returned. + + \tparam LastCall the type of the action function + + \param lc the action function + + \return Observable that emits the same items as the source observable, then invokes the given action. + + \sample + \snippet finally.cpp finally sample + \snippet output.txt finally sample + + If the source observable generates an error, the final action is still being called: + \snippet finally.cpp error finally sample + \snippet output.txt error finally sample + */ template<class LastCall> auto finally(LastCall lc) const - -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::finally<T, LastCall>(std::move(lc)))) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::finally<T, LastCall>(std::move(lc)))) + /// \endcond + { return lift<T>(rxo::detail::finally<T, LastCall>(std::move(lc))); } - /// map (AKA Select) -> - /// for each item from this observable use Selector to produce an item to emit from the new observable that is returned. - /// + /*! For each item from this observable use Selector to produce an item to emit from the new observable that is returned. + + \tparam Selector the type of the transforming function + + \param s the selector function + + \return Observable that emits the items from the source observable, transformed by the specified function. + + \sample + \snippet map.cpp map sample + \snippet output.txt map sample + */ template<class Selector> auto map(Selector s) const - -> decltype(EXPLICIT_THIS lift<rxu::value_type_t<rxo::detail::map<T, Selector>>>(rxo::detail::map<T, Selector>(std::move(s)))) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<rxu::value_type_t<rxo::detail::map<T, Selector>>>(rxo::detail::map<T, Selector>(std::move(s)))) + /// \endcond + { return lift<rxu::value_type_t<rxo::detail::map<T, Selector>>>(rxo::detail::map<T, Selector>(std::move(s))); } - /// distinct_until_changed -> - /// for each item from this observable, filter out repeated values and emit only changes from the new observable that is returned. - /// + /*! For each item from this observable, filter out consequentially repeated values and emit only changes from the new observable that is returned. + + \return Observable that emits those items from the source observable that are distinct from their immediate predecessors. + + \sample + \snippet distinct_until_changed.cpp distinct_until_changed sample + \snippet output.txt distinct_until_changed sample + */ auto distinct_until_changed() const - -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::distinct_until_changed<T>())) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::distinct_until_changed<T>())) + /// \endcond + { return lift<T>(rxo::detail::distinct_until_changed<T>()); } - /// window -> - /// produce observables containing count items emitted by this observable - /// + /*! Rerurn an observable that emits connected, non-overlapping windows, each containing at most count items from the source observable. + + \param count the maximum size of each window before it should be completed + + \return Observable that emits connected, non-overlapping windows, each containing at most count items from the source observable. + + \sample + \snippet window.cpp window count sample + \snippet output.txt window count sample + */ auto window(int count) const - -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window<T>(count, count))) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window<T>(count, count))) + /// \endcond + { return lift<observable<T>>(rxo::detail::window<T>(count, count)); } - /// window -> - /// produce observables containing count items emitted by this observable - /// + /*! Rerurn an observable that emits windows every skip items containing at most count items from the source observable. + + \param count the maximum size of each window before it should be completed + \param skip how many items need to be skipped before starting a new window + + \return Observable that emits windows every skip items containing at most count items from the source observable. + + \sample + \snippet window.cpp window count+skip sample + \snippet output.txt window count+skip sample + */ auto window(int count, int skip) const - -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window<T>(count, skip))) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window<T>(count, skip))) + /// \endcond + { return lift<observable<T>>(rxo::detail::window<T>(count, skip)); } - /// window_with_time -> - /// produce observables every skip time interval and collect items from this observable for period of time into each produced observable. - /// + /*! Rerurn an observable that emits observables every skip time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler. + + \tparam Duration the type of time intervals + \tparam Coordination the type of the scheduler + + \param period the period of time each window collects items before it is completed + \param skip the period of time after which a new window will be created + \param coordination the scheduler for the windows + + \return Observable that emits observables every skip time interval and collect items from this observable for period of time into each produced observable. + + \sample + \snippet window.cpp window period+skip+coordination sample + \snippet output.txt window period+skip+coordination sample + */ template<class Duration, class Coordination> auto window_with_time(Duration period, Duration skip, Coordination coordination) const - -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time<T, Duration, Coordination>(period, skip, coordination))) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time<T, Duration, Coordination>(period, skip, coordination))) + /// \endcond + { return lift<observable<T>>(rxo::detail::window_with_time<T, Duration, Coordination>(period, skip, coordination)); } - /// window_with_time -> - /// produce observables every skip time interval and collect items from this observable for period of time into each produced observable. - /// + /*! Rerurn an observable that emits observables every skip time interval and collects items from this observable for period of time into each produced observable. + + \tparam Duration the type of time intervals + + \param period the period of time each window collects items before it is completed + \param skip the period of time after which a new window will be created + + \return Observable that emits observables every skip time interval and collect items from this observable for period of time into each produced observable. + + \sample + \snippet window.cpp window period+skip sample + \snippet output.txt window period+skip sample + */ template<class Duration> auto window_with_time(Duration period, Duration skip) const - -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time<T, Duration, identity_one_worker>(period, skip, identity_current_thread()))) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time<T, Duration, identity_one_worker>(period, skip, identity_current_thread()))) + /// \endcond + { return lift<observable<T>>(rxo::detail::window_with_time<T, Duration, identity_one_worker>(period, skip, identity_current_thread())); } - /// window_with_time -> - /// produce observables every period time interval and collect items from this observable for period of time into each produced observable. - /// + /*! Rerurn an observable that emits observables every period time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler. + + \tparam Duration the type of time intervals + \tparam Coordination the type of the scheduler + + \param period the period of time each window collects items before it is completed and replaced with a new window + \param coordination the scheduler for the windows + + \return Observable that emits observables every period time interval and collect items from this observable for period of time into each produced observable. + + \sample + \snippet window.cpp window period+coordination sample + \snippet output.txt window period+coordination sample + */ template<class Duration, class Coordination> auto window_with_time(Duration period, Coordination coordination) const - -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time<T, Duration, Coordination>(period, period, coordination))) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time<T, Duration, Coordination>(period, period, coordination))) + /// \endcond + { return lift<observable<T>>(rxo::detail::window_with_time<T, Duration, Coordination>(period, period, coordination)); } - /// window_with_time -> - /// produce observables every period time interval and collect items from this observable for period of time into each produced observable. - /// + /*! Rerurn an observable that emits connected, non-overlapping windows represending items emitted by the source observable during fixed, consecutive durations. + + \tparam Duration the type of time intervals + + \param period the period of time each window collects items before it is completed and replaced with a new window + + \return Observable that emits connected, non-overlapping windows represending items emitted by the source observable during fixed, consecutive durations. + + \sample + \snippet window.cpp window period sample + \snippet output.txt window period sample + */ template<class Duration> auto window_with_time(Duration period) const - -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time<T, Duration, identity_one_worker>(period, period, identity_current_thread()))) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time<T, Duration, identity_one_worker>(period, period, identity_current_thread()))) + /// \endcond + { return lift<observable<T>>(rxo::detail::window_with_time<T, Duration, identity_one_worker>(period, period, identity_current_thread())); } - /// window_with_time_or_count -> - /// produce observables every skip time interval and collect items from this observable for period of time into each produced observable. - /// + /*! Rerurn an observable that emits connected, non-overlapping windows of items from the source observable that were emitted during a fixed duration of time or when the window has reached maximum capacity (whichever occurs first), on the specified scheduler. + + \tparam Duration the type of time intervals + \tparam Coordination the type of the scheduler + + \param period the period of time each window collects items before it is completed and replaced with a new window + \param count the maximum size of each window before it is completed and new window is created + \param coordination the scheduler for the windows + + \return Observable that emits connected, non-overlapping windows of items from the source observable that were emitted during a fixed duration of time or when the window has reached maximum capacity (whichever occurs first). + + \sample + \snippet window.cpp window period+count+coordination sample + \snippet output.txt window period+count+coordination sample + */ template<class Duration, class Coordination> auto window_with_time_or_count(Duration period, int count, Coordination coordination) const - -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time_or_count<T, Duration, Coordination>(period, count, coordination))) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time_or_count<T, Duration, Coordination>(period, count, coordination))) + /// \endcond + { return lift<observable<T>>(rxo::detail::window_with_time_or_count<T, Duration, Coordination>(period, count, coordination)); } - /// window_with_time_or_count -> - /// produce observables every skip time interval and collect items from this observable for period of time into each produced observable. - /// + /*! Rerurn an observable that emits connected, non-overlapping windows of items from the source observable that were emitted during a fixed duration of time or when the window has reached maximum capacity (whichever occurs first). + + \tparam Duration the type of time intervals + + \param period the period of time each window collects items before it is completed and replaced with a new window + \param count the maximum size of each window before it is completed and new window is created + + \return Observable that emits connected, non-overlapping windows of items from the source observable that were emitted during a fixed duration of time or when the window has reached maximum capacity (whichever occurs first). + + \sample + \snippet window.cpp window period+count sample + \snippet output.txt window period+count sample + */ template<class Duration> auto window_with_time_or_count(Duration period, int count) const - -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time_or_count<T, Duration, identity_one_worker>(period, count, identity_current_thread()))) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time_or_count<T, Duration, identity_one_worker>(period, count, identity_current_thread()))) + /// \endcond + { return lift<observable<T>>(rxo::detail::window_with_time_or_count<T, Duration, identity_one_worker>(period, count, identity_current_thread())); } - /// buffer -> - /// collect count items from this observable and produce a vector of them to emit from the new observable that is returned. - /// + /*! Rerurn an observable that emits connected, non-overlapping buffer, each containing at most count items from the source observable. + + \param count the maximum size of each buffer before it should be emitted + + \return Observable that emits connected, non-overlapping buffers, each containing at most count items from the source observable. + + \sample + \snippet buffer.cpp buffer count sample + \snippet output.txt buffer count sample + */ auto buffer(int count) const - -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_count<T>(count, count))) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_count<T>(count, count))) + /// \endcond + { return lift_if<std::vector<T>>(rxo::detail::buffer_count<T>(count, count)); } - /// buffer -> - /// start a new vector every skip items and collect count items from this observable into each vector to emit from the new observable that is returned. - /// + /*! Rerurn an observable that emits buffers every skip items containing at most count items from the source observable. + + \param count the maximum size of each buffers before it should be emitted + \param skip how many items need to be skipped before starting a new buffers + + \return Observable that emits buffers every skip items containing at most count items from the source observable. + + \sample + \snippet buffer.cpp buffer count+skip sample + \snippet output.txt buffer count+skip sample + */ auto buffer(int count, int skip) const - -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_count<T>(count, skip))) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_count<T>(count, skip))) + /// \endcond + { return lift_if<std::vector<T>>(rxo::detail::buffer_count<T>(count, skip)); } - /// buffer_with_time -> - /// start a new vector every skip time interval and collect items into it from this observable for period of time. - /// + /*! Rerurn an observable that emits buffers every skip time interval and collects items from this observable for period of time into each produced buffer, on the specified scheduler. + + \tparam Coordination the type of the scheduler + + \param period the period of time each buffer collects items before it is emitted + \param skip the period of time after which a new buffer will be created + \param coordination the scheduler for the buffers + + \return Observable that emits buffers every skip time interval and collect items from this observable for period of time into each produced buffer. + + \sample + \snippet buffer.cpp buffer period+skip+coordination sample + \snippet output.txt buffer period+skip+coordination sample + */ template<class Coordination> auto buffer_with_time(rxsc::scheduler::clock_type::duration period, rxsc::scheduler::clock_type::duration skip, Coordination coordination) const - -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, Coordination>(period, skip, coordination))) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, Coordination>(period, skip, coordination))) + /// \endcond + { return lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, Coordination>(period, skip, coordination)); } - /// buffer_with_time -> - /// start a new vector every skip time interval and collect items into it from this observable for period of time. - /// + /*! Rerurn an observable that emits buffers every skip time interval and collects items from this observable for period of time into each produced buffer. + + \param period the period of time each buffer collects items before it is emitted + \param skip the period of time after which a new buffer will be created + + \return Observable that emits buffers every skip time interval and collect items from this observable for period of time into each produced buffer. + + \sample + \snippet buffer.cpp buffer period+skip sample + \snippet output.txt buffer period+skip sample + + Overlapping buffers are allowed: + \snippet buffer.cpp buffer period+skip overlapping sample + \snippet output.txt buffer period+skip overlapping sample + + If no items are emitted, an empty buffer is returned: + \snippet buffer.cpp buffer period+skip empty sample + \snippet output.txt buffer period+skip empty sample + */ auto buffer_with_time(rxsc::scheduler::clock_type::duration period, rxsc::scheduler::clock_type::duration skip) const - -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, identity_one_worker>(period, skip, identity_current_thread()))) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, identity_one_worker>(period, skip, identity_current_thread()))) + /// \endcond + { return lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, identity_one_worker>(period, skip, identity_current_thread())); } - /// buffer_with_time -> - /// start a new vector every period time interval and collect items into it from this observable for period of time. - /// + /*! Rerurn an observable that emits buffers every period time interval and collects items from this observable for period of time into each produced buffer, on the specified scheduler. + + \tparam Coordination the type of the scheduler + + \param period the period of time each buffer collects items before it is emitted and replaced with a new buffer + \param coordination the scheduler for the buffers + + \return Observable that emits buffers every period time interval and collect items from this observable for period of time into each produced buffer. + + \sample + \snippet buffer.cpp buffer period+coordination sample + \snippet output.txt buffer period+coordination sample + */ template<class Coordination, class Requires = typename std::enable_if<is_coordination<Coordination>::value, rxu::types_checked>::type> auto buffer_with_time(rxsc::scheduler::clock_type::duration period, Coordination coordination) const - -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, Coordination>(period, period, coordination))) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, Coordination>(period, period, coordination))) + /// \endcond + { return lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, Coordination>(period, period, coordination)); } - /// buffer_with_time -> - /// start a new vector every period time interval and collect items into it from this observable for period of time. - /// + /*! Rerurn an observable that emits buffers every period time interval and collects items from this observable for period of time into each produced buffer. + + \param period the period of time each buffer collects items before it is emitted and replaced with a new buffer + + \return Observable that emits buffers every period time interval and collect items from this observable for period of time into each produced buffer. + + \sample + \snippet buffer.cpp buffer period sample + \snippet output.txt buffer period sample + */ auto buffer_with_time(rxsc::scheduler::clock_type::duration period) const - -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, identity_one_worker>(period, period, identity_current_thread()))) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, identity_one_worker>(period, period, identity_current_thread()))) + /// \endcond + { return lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, identity_one_worker>(period, period, identity_current_thread())); } - /// buffer_with_time_or_count -> - /// start a new vector every skip time interval and collect items into it from this observable for period of time. - /// + /*! Rerurn an observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first), on the specified scheduler. + + \tparam Coordination the type of the scheduler + + \param period the period of time each buffer collects items before it is emitted and replaced with a new buffer + \param count the maximum size of each buffer before it is emitted and new buffer is created + \param coordination the scheduler for the buffers + + \return Observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first). + + \sample + \snippet buffer.cpp buffer period+count+coordination sample + \snippet output.txt buffer period+count+coordination sample + */ template<class Coordination> auto buffer_with_time_or_count(rxsc::scheduler::clock_type::duration period, int count, Coordination coordination) const - -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time_or_count<T, rxsc::scheduler::clock_type::duration, Coordination>(period, count, coordination))) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time_or_count<T, rxsc::scheduler::clock_type::duration, Coordination>(period, count, coordination))) + /// \endcond + { return lift_if<std::vector<T>>(rxo::detail::buffer_with_time_or_count<T, rxsc::scheduler::clock_type::duration, Coordination>(period, count, coordination)); } - /// buffer_with_time_or_count -> - /// start a new vector every skip time interval and collect items into it from this observable for period of time. - /// + /*! Rerurn an observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first). + + \param period the period of time each buffer collects items before it is emitted and replaced with a new buffer + \param count the maximum size of each buffer before it is emitted and new buffer is created + + \return Observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first). + + \sample + \snippet buffer.cpp buffer period+count sample + \snippet output.txt buffer period+count sample + */ auto buffer_with_time_or_count(rxsc::scheduler::clock_type::duration period, int count) const - -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time_or_count<T, rxsc::scheduler::clock_type::duration, identity_one_worker>(period, count, identity_current_thread()))) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time_or_count<T, rxsc::scheduler::clock_type::duration, identity_one_worker>(period, count, identity_current_thread()))) + /// \endcond + { return lift_if<std::vector<T>>(rxo::detail::buffer_with_time_or_count<T, rxsc::scheduler::clock_type::duration, identity_one_worker>(period, count, identity_current_thread())); } + /// \cond SHOW_SERVICE_MEMBERS template<class Coordination> struct defer_switch_on_next : public defer_observable< is_observable<value_type>, @@ -675,28 +980,50 @@ public: rxo::detail::switch_on_next, value_type, observable<value_type>, Coordination> { }; + /// \endcond - /// switch_on_next (AKA Switch) -> - /// All sources must be synchronized! This means that calls across all the subscribers must be serial. - /// for each item from this observable subscribe to that observable after unsubscribing from any previous subscription. - /// + /*! Return observable that emits the items emitted by the observable most recently emitted by the source observable. + + \return Observable that emits the items emitted by the observable most recently emitted by the source observable. + + \note All sources must be synchronized! This means that calls across all the subscribers must be serial. + + \sample + \snippet switch_on_next.cpp switch_on_next sample + \snippet output.txt switch_on_next sample + */ auto switch_on_next() const - -> typename defer_switch_on_next<identity_one_worker>::observable_type { + /// \cond SHOW_SERVICE_MEMBERS + -> typename defer_switch_on_next<identity_one_worker>::observable_type + /// \endcond + { return defer_switch_on_next<identity_one_worker>::make(*this, *this, identity_current_thread()); } - /// switch_on_next (AKA Switch) -> - /// The coodination is used to synchronize sources from different contexts. - /// for each item from this observable subscribe to that observable after unsubscribing from any previous subscription. - /// + /*! Return observable that emits the items emitted by the observable most recently emitted by the source observable, on the specified scheduler. + + \tparam Coordination the type of the scheduler + + \param cn the scheduler to synchronize sources from different contexts + + \return Observable that emits the items emitted by the observable most recently emitted by the source observable. + + \sample + \snippet switch_on_next.cpp threaded switch_on_next sample + \snippet output.txt threaded switch_on_next sample + */ template<class Coordination> auto switch_on_next(Coordination cn) const + /// \cond SHOW_SERVICE_MEMBERS -> typename std::enable_if< defer_switch_on_next<Coordination>::value, - typename defer_switch_on_next<Coordination>::observable_type>::type { + typename defer_switch_on_next<Coordination>::observable_type>::type + /// \endcond + { return defer_switch_on_next<Coordination>::make(*this, *this, std::move(cn)); } + /// \cond SHOW_SERVICE_MEMBERS template<class Coordination> struct defer_merge : public defer_observable< is_observable<value_type>, @@ -704,30 +1031,52 @@ public: rxo::detail::merge, value_type, observable<value_type>, Coordination> { }; + /// \endcond - /// merge -> - /// All sources must be synchronized! This means that calls across all the subscribers must be serial. - /// for each item from this observable subscribe. - /// for each item from all of the nested observables deliver from the new observable that is returned. - /// + /*! For each item from this observable subscribe. + For each item from all of the nested observables deliver from the new observable that is returned. + + \return Observable that emits items that are the result of flattening the observables emitted by the source observable. + + \note All sources must be synchronized! This means that calls across all the subscribers must be serial. + + \sample + \snippet merge.cpp implicit merge sample + \snippet output.txt implicit merge sample + */ auto merge() const - -> typename defer_merge<identity_one_worker>::observable_type { + /// \cond SHOW_SERVICE_MEMBERS + -> typename defer_merge<identity_one_worker>::observable_type + /// \endcond + { return defer_merge<identity_one_worker>::make(*this, *this, identity_current_thread()); } - /// merge -> - /// The coordination is used to synchronize sources from different contexts. - /// for each item from this observable subscribe. - /// for each item from all of the nested observables deliver from the new observable that is returned. - /// + /*! For each item from this observable subscribe. + For each item from all of the nested observables deliver from the new observable that is returned. + + \tparam Coordination the type of the scheduler + + \param cn the scheduler to synchronize sources from different contexts. + + \return Observable that emits items that are the result of flattening the observables emitted by the source observable. + + \sample + \snippet merge.cpp threaded implicit merge sample + \snippet output.txt threaded implicit merge sample + */ template<class Coordination> auto merge(Coordination cn) const + /// \cond SHOW_SERVICE_MEMBERS -> typename std::enable_if< defer_merge<Coordination>::value, - typename defer_merge<Coordination>::observable_type>::type { + typename defer_merge<Coordination>::observable_type>::type + /// \endcond + { return defer_merge<Coordination>::make(*this, *this, std::move(cn)); } + /// \cond SHOW_SERVICE_MEMBERS template<class Coordination, class Value0> struct defer_merge_from : public defer_observable< rxu::all_true< @@ -737,33 +1086,65 @@ public: rxo::detail::merge, observable<value_type>, observable<observable<value_type>>, Coordination> { }; + /// \endcond - /// merge -> - /// All sources must be synchronized! This means that calls across all the subscribers must be serial. - /// for each item from this observable subscribe. - /// for each item from all of the nested observables deliver from the new observable that is returned. - /// + /*! For each given observable subscribe. + For each emitted item deliver from the new observable that is returned. + + \tparam Value0 ... + \tparam ValueN types of source observables + + \param v0 ... + \param vn source observables + + \return Observable that emits items that are the result of flattening the observables emitted by the source observable. + + \note All sources must be synchronized! This means that calls across all the subscribers must be serial. + + \sample + \snippet merge.cpp merge sample + \snippet output.txt merge sample + */ template<class Value0, class... ValueN> auto merge(Value0 v0, ValueN... vn) const + /// \cond SHOW_SERVICE_MEMBERS -> typename std::enable_if< defer_merge_from<identity_one_worker, Value0>::value, - typename defer_merge_from<identity_one_worker, Value0>::observable_type>::type { + typename defer_merge_from<identity_one_worker, Value0>::observable_type>::type + /// \endcond + { return defer_merge_from<identity_one_worker, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread()); } - /// merge -> - /// The coordination is used to synchronize sources from different contexts. - /// for each item from this observable subscribe. - /// for each item from all of the nested observables deliver from the new observable that is returned. - /// + /*! For each given observable subscribe. + For each emitted item deliver from the new observable that is returned. + + \tparam Coordination the type of the scheduler + \tparam Value0 ... + \tparam ValueN types of source observables + + \param cn the scheduler to synchronize sources from different contexts. + \param v0 ... + \param vn source observables + + \return Observable that emits items that are the result of flattening the observables emitted by the source observable. + + \sample + \snippet merge.cpp threaded merge sample + \snippet output.txt threaded merge sample + */ template<class Coordination, class Value0, class... ValueN> auto merge(Coordination cn, Value0 v0, ValueN... vn) const + /// \cond SHOW_SERVICE_MEMBERS -> typename std::enable_if< defer_merge_from<Coordination, Value0>::value, - typename defer_merge_from<Coordination, Value0>::observable_type>::type { + typename defer_merge_from<Coordination, Value0>::observable_type>::type + /// \endcond + { return defer_merge_from<Coordination, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::move(cn)); } + /// \cond SHOW_SERVICE_MEMBERS template<class Coordination> struct defer_amb : public defer_observable< is_observable<value_type>, @@ -771,30 +1152,50 @@ public: rxo::detail::amb, value_type, observable<value_type>, Coordination> { }; + /// \endcond - /// amb -> - /// All sources must be synchronized! This means that calls across all the subscribers must be serial. - /// for each item from this observable subscribe. - /// for each item from only the first of the nested observables deliver from the new observable that is returned. - /// + /*! For each item from only the first of the nested observables deliver from the new observable that is returned. + + \return Observable that emits the same sequence as whichever of the observables emitted from this observable that first emitted an item or sent a termination notification. + + \note All sources must be synchronized! This means that calls across all the subscribers must be serial. + + \sample + \snippet amb.cpp implicit amb sample + \snippet output.txt implicit amb sample + */ auto amb() const - -> typename defer_amb<identity_one_worker>::observable_type { + /// \cond SHOW_SERVICE_MEMBERS + -> typename defer_amb<identity_one_worker>::observable_type + /// \endcond + { return defer_amb<identity_one_worker>::make(*this, *this, identity_current_thread()); } - /// amb -> - /// The coordination is used to synchronize sources from different contexts. - /// for each item from this observable subscribe. - /// for each item from only the first of the nested observables deliver from the new observable that is returned. - /// + /*! For each item from only the first of the nested observables deliver from the new observable that is returned, on the specified scheduler. + + \tparam Coordination the type of the scheduler + + \param cn the scheduler to synchronize sources from different contexts. + + \return Observable that emits the same sequence as whichever of the observables emitted from this observable that first emitted an item or sent a termination notification. + + \sample + \snippet amb.cpp threaded implicit amb sample + \snippet output.txt threaded implicit amb sample + */ template<class Coordination> auto amb(Coordination cn) const + /// \cond SHOW_SERVICE_MEMBERS -> typename std::enable_if< defer_amb<Coordination>::value, - typename defer_amb<Coordination>::observable_type>::type { + typename defer_amb<Coordination>::observable_type>::type + /// \endcond + { return defer_amb<Coordination>::make(*this, *this, std::move(cn)); } + /// \cond SHOW_SERVICE_MEMBERS template<class Coordination, class Value0> struct defer_amb_from : public defer_observable< rxu::all_true< @@ -804,57 +1205,119 @@ public: rxo::detail::amb, observable<value_type>, observable<observable<value_type>>, Coordination> { }; + /// \endcond - /// amb -> - /// All sources must be synchronized! This means that calls across all the subscribers must be serial. - /// for each item from this observable subscribe. - /// for each item from only the first of the nested observables deliver from the new observable that is returned. - /// + /*! For each item from only the first of the given observables deliver from the new observable that is returned. + + \tparam Value0 ... + \tparam ValueN types of source observables + + \param v0 ... + \param vn source observables + + \return Observable that emits the same sequence as whichever of the source observables first emitted an item or sent a termination notification. + + \note All sources must be synchronized! This means that calls across all the subscribers must be serial. + + \sample + \snippet amb.cpp amb sample + \snippet output.txt amb sample + */ template<class Value0, class... ValueN> auto amb(Value0 v0, ValueN... vn) const + /// \cond SHOW_SERVICE_MEMBERS -> typename std::enable_if< defer_amb_from<identity_one_worker, Value0>::value, - typename defer_amb_from<identity_one_worker, Value0>::observable_type>::type { + typename defer_amb_from<identity_one_worker, Value0>::observable_type>::type + /// \endcond + { return defer_amb_from<identity_one_worker, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread()); } - /// amb -> - /// The coordination is used to synchronize sources from different contexts. - /// for each item from this observable subscribe. - /// for each item from only the first of the nested observables deliver from the new observable that is returned. - /// + /*! For each item from only the first of the given observables deliver from the new observable that is returned, on the specified scheduler. + + \tparam Coordination the type of the scheduler + \tparam Value0 ... + \tparam ValueN types of source observables + + \param cn the scheduler to synchronize sources from different contexts. + \param v0 ... + \param vn source observables + + \return Observable that emits the same sequence as whichever of the source observables first emitted an item or sent a termination notification. + + \sample + \snippet amb.cpp threaded amb sample + \snippet output.txt threaded amb sample + */ template<class Coordination, class Value0, class... ValueN> auto amb(Coordination cn, Value0 v0, ValueN... vn) const + /// \cond SHOW_SERVICE_MEMBERS -> typename std::enable_if< defer_amb_from<Coordination, Value0>::value, - typename defer_amb_from<Coordination, Value0>::observable_type>::type { + typename defer_amb_from<Coordination, Value0>::observable_type>::type + /// \endcond + { return defer_amb_from<Coordination, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::move(cn)); } - /// flat_map (AKA SelectMany) -> - /// All sources must be synchronized! This means that calls across all the subscribers must be serial. - /// for each item from this observable use the CollectionSelector to select an observable and subscribe to that observable. - /// for each item from all of the selected observables use the ResultSelector to select a value to emit from the new observable that is returned. - /// + /*! For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. + For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned. + + \tparam CollectionSelector the type of the observable producing function + \tparam ResultSelector the type of the aggregation function + + \param s a function that returns an observable for each item emitted by the source observable + \param rs a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable + + \return Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable. + + Observables, produced by the CollectionSelector, are merged. There is another operator rxcpp::observable<T,SourceType>::concat_map that works similar but concatenates the observables. + + \sample + \snippet flat_map.cpp flat_map sample + \snippet output.txt flat_map sample + */ template<class CollectionSelector, class ResultSelector> auto flat_map(CollectionSelector&& s, ResultSelector&& rs) const - -> observable<rxu::value_type_t<rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>> { + /// \cond SHOW_SERVICE_MEMBERS + -> observable<rxu::value_type_t<rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>> + /// \endcond + { return observable<rxu::value_type_t<rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>( rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), identity_current_thread())); } - /// flat_map (AKA SelectMany) -> - /// The coodination is used to synchronize sources from different contexts. - /// for each item from this observable use the CollectionSelector to select an observable and subscribe to that observable. - /// for each item from all of the selected observables use the ResultSelector to select a value to emit from the new observable that is returned. - /// + /*! For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. + For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned. + + \tparam CollectionSelector the type of the observable producing function + \tparam ResultSelector the type of the aggregation function + \tparam Coordination the type of the scheduler + + \param s a function that returns an observable for each item emitted by the source observable + \param rs a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable + \param cn the scheduler to synchronize sources from different contexts. + + \return Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable. + + Observables, produced by the CollectionSelector, are merged. There is another operator rxcpp::observable<T,SourceType>::concat_map that works similar but concatenates the observables. + + \sample + \snippet flat_map.cpp threaded flat_map sample + \snippet output.txt threaded flat_map sample + */ template<class CollectionSelector, class ResultSelector, class Coordination> - auto flat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& sf) const - -> observable<rxu::value_type_t<rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, Coordination>>, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, Coordination>> { + auto flat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& cn) const + /// \cond SHOW_SERVICE_MEMBERS + -> observable<rxu::value_type_t<rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, Coordination>>, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, Coordination>> + /// \endcond + { return observable<rxu::value_type_t<rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, Coordination>>, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, Coordination>>( - rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, Coordination>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(sf))); + rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, Coordination>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(cn))); } + /// \cond SHOW_SERVICE_MEMBERS template<class Coordination> struct defer_concat : public defer_observable< is_observable<value_type>, @@ -862,30 +1325,54 @@ public: rxo::detail::concat, value_type, observable<value_type>, Coordination> { }; + /// \endcond - /// concat -> - /// All sources must be synchronized! This means that calls across all the subscribers must be serial. - /// for each item from this observable subscribe to one at a time. in the order received. - /// for each item from all of the nested observables deliver from the new observable that is returned. - /// + /*! For each item from this observable subscribe to one at a time, in the order received. + For each item from all of the nested observables deliver from the new observable that is returned. + + \return Observable that emits the items emitted by each of the Observables emitted by the source observable, one after the other, without interleaving them. + + \note All sources must be synchronized! This means that calls across all the subscribers must be serial. + + \sample + \snippet concat.cpp implicit concat sample + \snippet output.txt implicit concat sample + */ auto concat() const - -> typename defer_concat<identity_one_worker>::observable_type { + /// \cond SHOW_SERVICE_MEMBERS + -> typename defer_concat<identity_one_worker>::observable_type + /// \endcond + { return defer_concat<identity_one_worker>::make(*this, *this, identity_current_thread()); } - /// concat -> - /// The coordination is used to synchronize sources from different contexts. - /// for each item from this observable subscribe to one at a time. in the order received. - /// for each item from all of the nested observables deliver from the new observable that is returned. - /// + /*! For each item from this observable subscribe to one at a time, in the order received. + For each item from all of the nested observables deliver from the new observable that is returned. + + \tparam Coordination the type of the scheduler + + \param cn the scheduler to synchronize sources from different contexts. + + \return Observable that emits the items emitted by each of the Observables emitted by the source observable, one after the other, without interleaving them. + + \note All sources must be synchronized! This means that calls across all the subscribers must be serial. + + \sample + \snippet concat.cpp threaded implicit concat sample + \snippet output.txt threaded implicit concat sample + */ template<class Coordination> auto concat(Coordination cn) const + /// \cond SHOW_SERVICE_MEMBERS -> typename std::enable_if< defer_concat<Coordination>::value, - typename defer_concat<Coordination>::observable_type>::type { + typename defer_concat<Coordination>::observable_type>::type + /// \endcond + { return defer_concat<Coordination>::make(*this, *this, std::move(cn)); } + /// \cond SHOW_SERVICE_MEMBERS template<class Coordination, class Value0> struct defer_concat_from : public defer_observable< rxu::all_true< @@ -895,57 +1382,119 @@ public: rxo::detail::concat, observable<value_type>, observable<observable<value_type>>, Coordination> { }; + /// \endcond - /// concat -> - /// All sources must be synchronized! This means that calls across all the subscribers must be serial. - /// for each item from this observable subscribe to one at a time. in the order received. - /// for each item from all of the nested observables deliver from the new observable that is returned. - /// + /*! For each given observable subscribe to one at a time, in the order received. + For each emitted item deliver from the new observable that is returned. + + \tparam Value0 ... + \tparam ValueN types of source observables + + \param v0 ... + \param vn source observables + + \return Observable that emits items emitted by the source observables, one after the other, without interleaving them. + + \sample + \snippet concat.cpp concat sample + \snippet output.txt concat sample + */ template<class Value0, class... ValueN> auto concat(Value0 v0, ValueN... vn) const + /// \cond SHOW_SERVICE_MEMBERS -> typename std::enable_if< defer_concat_from<identity_one_worker, Value0>::value, - typename defer_concat_from<identity_one_worker, Value0>::observable_type>::type { + typename defer_concat_from<identity_one_worker, Value0>::observable_type>::type + /// \endcond + { return defer_concat_from<identity_one_worker, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread()); } - /// concat -> - /// The coordination is used to synchronize sources from different contexts. - /// for each item from this observable subscribe to one at a time. in the order received. - /// for each item from all of the nested observables deliver from the new observable that is returned. - /// + /*! For each given observable subscribe to one at a time, in the order received. + For each emitted item deliver from the new observable that is returned. + + \tparam Coordination the type of the scheduler + \tparam Value0 ... + \tparam ValueN types of source observables + + \param cn the scheduler to synchronize sources from different contexts. + \param v0 ... + \param vn source observables + + \return Observable that emits items emitted by the source observables, one after the other, without interleaving them. + + \sample + \snippet concat.cpp threaded concat sample + \snippet output.txt threaded concat sample + */ template<class Coordination, class Value0, class... ValueN> auto concat(Coordination cn, Value0 v0, ValueN... vn) const + /// \cond SHOW_SERVICE_MEMBERS -> typename std::enable_if< defer_concat_from<Coordination, Value0>::value, - typename defer_concat_from<Coordination, Value0>::observable_type>::type { + typename defer_concat_from<Coordination, Value0>::observable_type>::type + /// \endcond + { return defer_concat_from<Coordination, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::move(cn)); } - /// concat_map -> - /// All sources must be synchronized! This means that calls across all the subscribers must be serial. - /// for each item from this observable use the CollectionSelector to select an observable and subscribe to that observable. - /// for each item from all of the selected observables use the ResultSelector to select a value to emit from the new observable that is returned. - /// + /*! For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. + For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned. + + \tparam CollectionSelector the type of the observable producing function + \tparam ResultSelector the type of the aggregation function + + \param s a function that returns an observable for each item emitted by the source observable + \param rs a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable + + \return Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable. + + Observables, produced by the CollectionSelector, are concatenated. There is another operator rxcpp::observable<T,SourceType>::flat_map that works similar but merges the observables. + + \sample + \snippet concat_map.cpp concat_map sample + \snippet output.txt concat_map sample + */ template<class CollectionSelector, class ResultSelector> auto concat_map(CollectionSelector&& s, ResultSelector&& rs) const - -> observable<rxu::value_type_t<rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>, rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>> { + /// \cond SHOW_SERVICE_MEMBERS + -> observable<rxu::value_type_t<rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>, rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>> + /// \endcond + { return observable<rxu::value_type_t<rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>, rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>( rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), identity_current_thread())); } - /// concat_map -> - /// The coordination is used to synchronize sources from different contexts. - /// for each item from this observable use the CollectionSelector to select an observable and subscribe to that observable. - /// for each item from all of the selected observables use the ResultSelector to select a value to emit from the new observable that is returned. - /// + /*! For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. + For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned. + + \tparam CollectionSelector the type of the observable producing function + \tparam ResultSelector the type of the aggregation function + \tparam Coordination the type of the scheduler + + \param s a function that returns an observable for each item emitted by the source observable + \param rs a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable + \param cn the scheduler to synchronize sources from different contexts. + + \return Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable. + + Observables, produced by the CollectionSelector, are concatenated. There is another operator rxcpp::observable<T,SourceType>::flat_map that works similar but merges the observables. + + \sample + \snippet concat_map.cpp threaded concat_map sample + \snippet output.txt threaded concat_map sample + */ template<class CollectionSelector, class ResultSelector, class Coordination> - auto concat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& sf) const - -> observable<rxu::value_type_t<rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, Coordination>>, rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, Coordination>> { + auto concat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& cn) const + /// \cond SHOW_SERVICE_MEMBERS + -> observable<rxu::value_type_t<rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, Coordination>>, rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, Coordination>> + /// \endcond + { return observable<rxu::value_type_t<rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, Coordination>>, rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, Coordination>>( - rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, Coordination>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(sf))); + rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, Coordination>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(cn))); } + /// \cond SHOW_SERVICE_MEMBERS template<class Source, class Coordination, class TS, class C = rxu::types_checked> struct select_combine_latest_cn : public std::false_type {}; @@ -1020,16 +1569,48 @@ public: return observable_type(operator_type(identity_current_thread(), rxu::pack(), std::make_tuple(src, std::move(on)...))); } }; + /// \endcond - /// combine_latest -> - /// for each item from all of the observables use the Selector to select a value to emit from the new observable that is returned. - /// + /*! For each item from all of the observables select a value to emit from the new observable that is returned. + + \tparam AN types of scheduler (optional), aggregate function (optional), and source observables + + \param an scheduler (optional), aggregation function (optional), and source observables + + \return Observable that emits items that are the result of combining the items emitted by the source observables. + + If scheduler is omitted, identity_current_thread is used. + + If aggregation function is omitted, the resulting observable returns tuples of emitted items. + + \sample + + Neither scheduler nor aggregation function are present: + \snippet combine_latest.cpp combine_latest sample + \snippet output.txt combine_latest sample + + Only scheduler is present: + \snippet combine_latest.cpp Coordination combine_latest sample + \snippet output.txt Coordination combine_latest sample + + Only aggregation function is present: + \snippet combine_latest.cpp Selector combine_latest sample + \snippet output.txt Selector combine_latest sample + + Both scheduler and aggregation function are present: + \snippet combine_latest.cpp Coordination+Selector combine_latest sample + \snippet output.txt Coordination+Selector combine_latest sample + */ template<class... AN> auto combine_latest(AN... an) const - -> decltype(select_combine_latest<this_type, rxu::types<decltype(an)...>>{}(*(this_type*)nullptr, std::move(an)...)) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(select_combine_latest<this_type, rxu::types<decltype(an)...>>{}(*(this_type*)nullptr, std::move(an)...)) + /// \endcond + { return select_combine_latest<this_type, rxu::types<decltype(an)...>>{}(*this, std::move(an)...); } + /// \cond SHOW_SERVICE_MEMBERS template<class Source, class Coordination, class TS, class C = rxu::types_checked> struct select_zip_cn : public std::false_type {}; @@ -1104,32 +1685,97 @@ public: return observable_type(operator_type(identity_current_thread(), rxu::pack(), std::make_tuple(src, std::move(on)...))); } }; + /// \endcond - /// zip -> - /// bring by one item from all given observables and use the Selector to select a value to emit from the new observable that is returned. - /// + /*! Bring by one item from all given observables and select a value to emit from the new observable that is returned. + + \tparam AN types of scheduler (optional), aggregate function (optional), and source observables + + \param an scheduler (optional), aggregation function (optional), and source observables + + \return Observable that emits the result of combining the items emitted and brought by one from each of the source observables. + + If scheduler is omitted, identity_current_thread is used. + + If aggregation function is omitted, the resulting observable returns tuples of emitted items. + + \sample + + Neither scheduler nor aggregation function are present: + \snippet zip.cpp zip sample + \snippet output.txt zip sample + + Only scheduler is present: + \snippet zip.cpp Coordination zip sample + \snippet output.txt Coordination zip sample + + Only aggregation function is present: + \snippet zip.cpp Selector zip sample + \snippet output.txt Selector zip sample + + Both scheduler and aggregation function are present: + \snippet zip.cpp Coordination+Selector zip sample + \snippet output.txt Coordination+Selector zip sample + */ template<class... AN> auto zip(AN... an) const - -> decltype(select_zip<this_type, rxu::types<decltype(an)...>>{}(*(this_type*)nullptr, std::move(an)...)) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(select_zip<this_type, rxu::types<decltype(an)...>>{}(*(this_type*)nullptr, std::move(an)...)) + /// \endcond + { return select_zip<this_type, rxu::types<decltype(an)...>>{}(*this, std::move(an)...); } - /// group_by -> - /// + /*! Return an observable that emits grouped_observables, each of which corresponds to a unique key value and each of which emits those items from the source observable that share that key value. + + \tparam KeySelector the type of the key extracting function + \tparam MarbleSelector the type of the element extracting function + \tparam BinaryPredicate the type of the key comparing function + + \param ks a function that extracts the key for each item + \param ms a function that extracts the return element for each item + \param p a function that implements comparison of two keys + + \return Observable that emits values of grouped_observable type, each of which corresponds to a unique key value and each of which emits those items from the source observable that share that key value. + + \sample + \snippet group_by.cpp group_by full intro + \snippet group_by.cpp group_by full sample + \snippet output.txt group_by full sample + */ template<class KeySelector, class MarbleSelector, class BinaryPredicate> inline auto group_by(KeySelector ks, MarbleSelector ms, BinaryPredicate p) const - -> decltype(EXPLICIT_THIS lift<typename rxo::detail::group_by_traits<T, this_type, KeySelector, MarbleSelector, BinaryPredicate>::grouped_observable_type>(rxo::detail::group_by<T, this_type, KeySelector, MarbleSelector, BinaryPredicate>(std::move(ks), std::move(ms), std::move(p)))) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<typename rxo::detail::group_by_traits<T, this_type, KeySelector, MarbleSelector, BinaryPredicate>::grouped_observable_type>(rxo::detail::group_by<T, this_type, KeySelector, MarbleSelector, BinaryPredicate>(std::move(ks), std::move(ms), std::move(p)))) + /// \endcond + { return lift<typename rxo::detail::group_by_traits<T, this_type, KeySelector, MarbleSelector, BinaryPredicate>::grouped_observable_type>(rxo::detail::group_by<T, this_type, KeySelector, MarbleSelector, BinaryPredicate>(std::move(ks), std::move(ms), std::move(p))); } - /// group_by -> - /// + /*! Return an observable that emits grouped_observables, each of which corresponds to a unique key value and each of which emits those items from the source observable that share that key value. + + \tparam KeySelector the type of the key extracting function + \tparam MarbleSelector the type of the element extracting function + + \param ks a function that extracts the key for each item + \param ms a function that extracts the return element for each item + + \return Observable that emits values of grouped_observable type, each of which corresponds to a unique key value and each of which emits those items from the source observable that share that key value. + + \sample + \snippet group_by.cpp group_by sample + \snippet output.txt group_by sample + */ template<class KeySelector, class MarbleSelector> inline auto group_by(KeySelector ks, MarbleSelector ms) const - -> decltype(EXPLICIT_THIS lift<typename rxo::detail::group_by_traits<T, this_type, KeySelector, MarbleSelector, rxu::less>::grouped_observable_type>(rxo::detail::group_by<T, this_type, KeySelector, MarbleSelector, rxu::less>(std::move(ks), std::move(ms), rxu::less()))) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<typename rxo::detail::group_by_traits<T, this_type, KeySelector, MarbleSelector, rxu::less>::grouped_observable_type>(rxo::detail::group_by<T, this_type, KeySelector, MarbleSelector, rxu::less>(std::move(ks), std::move(ms), rxu::less()))) + /// \endcond + { return lift<typename rxo::detail::group_by_traits<T, this_type, KeySelector, MarbleSelector, rxu::less>::grouped_observable_type>(rxo::detail::group_by<T, this_type, KeySelector, MarbleSelector, rxu::less>(std::move(ks), std::move(ms), rxu::less())); } + /// \cond SHOW_SERVICE_MEMBERS /// multicast -> /// allows connections to the source to be independent of subscriptions /// @@ -1139,64 +1785,151 @@ public: return connectable_observable<T, rxo::detail::multicast<T, this_type, Subject>>( rxo::detail::multicast<T, this_type, Subject>(*this, std::move(sub))); } + /// \endcond - /// publish_synchronized -> - /// turns a cold observable hot and allows connections to the source to be independent of subscriptions. - /// all values are queued and delivered using the scheduler from the supplied coordination - /// + /*! Turn a cold observable hot and allow connections to the source to be independent of subscriptions. + + \tparam Coordination the type of the scheduler + + \param cn a scheduler all values are queued and delivered on + \param cs the subscription to control lifetime + + \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers, on the specified scheduler. + + \sample + \snippet publish.cpp publish_synchronized sample + \snippet output.txt publish_synchronized sample + */ template<class Coordination> auto publish_synchronized(Coordination cn, composite_subscription cs = composite_subscription()) const - -> decltype(EXPLICIT_THIS multicast(rxsub::synchronize<T, Coordination>(std::move(cn), cs))) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS multicast(rxsub::synchronize<T, Coordination>(std::move(cn), cs))) + /// \endcond + { return multicast(rxsub::synchronize<T, Coordination>(std::move(cn), cs)); } - /// publish -> - /// turns a cold observable hot and allows connections to the source to be independent of subscriptions - /// NOTE: multicast of a subject - /// + /*! Turn a cold observable hot and allow connections to the source to be independent of subscriptions. + + \param cs the subscription to control lifetime + + \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers. + + \sample + \snippet publish.cpp publish subject sample + \snippet output.txt publish subject sample + */ auto publish(composite_subscription cs = composite_subscription()) const - -> decltype(EXPLICIT_THIS multicast(rxsub::subject<T>(cs))) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS multicast(rxsub::subject<T>(cs))) + /// \endcond + { return multicast(rxsub::subject<T>(cs)); } - /// publish -> - /// turns a cold observable hot, sends the most recent value to any new subscriber and allows connections to the source to be independent of subscriptions - /// NOTE: multicast of a behavior - /// + /*! Turn a cold observable hot, send the most recent value to any new subscriber, and allow connections to the source to be independent of subscriptions. + + \tparam T the type of the emitted item + + \param first an initial item to be emitted by the resulting observable at connection time before emitting the items from the source observable; not emitted to observers that subscribe after the time of connection + \param cs the subscription to control lifetime + + \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers. + + \sample + \snippet publish.cpp publish behavior sample + \snippet output.txt publish behavior sample + */ auto publish(T first, composite_subscription cs = composite_subscription()) const - -> decltype(EXPLICIT_THIS multicast(rxsub::behavior<T>(first, cs))) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS multicast(rxsub::behavior<T>(first, cs))) + /// \endcond + { return multicast(rxsub::behavior<T>(first, cs)); } - /// subscribe_on -> - /// subscription and unsubscription are queued and delivered using the scheduler from the supplied coordination - /// + /*! Subscription and unsubscription are queued and delivered using the scheduler from the supplied coordination. + + \tparam Coordination the type of the scheduler + + \param cn the scheduler to perform subscription actions on + + \return The source observable modified so that its subscriptions happen on the specified scheduler. + + \sample + \snippet subscribe_on.cpp subscribe_on sample + \snippet output.txt subscribe_on sample + + Invoking rxcpp::observable::observe_on operator, instead of subscribe_on, gives following results: + \snippet output.txt observe_on sample + */ template<class Coordination> auto subscribe_on(Coordination cn) const - -> observable<rxu::value_type_t<rxo::detail::subscribe_on<T, this_type, Coordination>>, rxo::detail::subscribe_on<T, this_type, Coordination>> { + /// \cond SHOW_SERVICE_MEMBERS + -> observable<rxu::value_type_t<rxo::detail::subscribe_on<T, this_type, Coordination>>, rxo::detail::subscribe_on<T, this_type, Coordination>> + /// \endcond + { return observable<rxu::value_type_t<rxo::detail::subscribe_on<T, this_type, Coordination>>, rxo::detail::subscribe_on<T, this_type, Coordination>>( rxo::detail::subscribe_on<T, this_type, Coordination>(*this, std::move(cn))); } - /// observe_on -> - /// all values are queued and delivered using the scheduler from the supplied coordination - /// + /*! All values are queued and delivered using the scheduler from the supplied coordination. + + \tparam Coordination the type of the scheduler + + \param cn the scheduler to notify observers on + + \return The source observable modified so that its observers are notified on the specified scheduler. + + \sample + \snippet observe_on.cpp observe_on sample + \snippet output.txt subscribe_on sample + + Invoking rxcpp::observable::subscribe_on operator, instead of observe_on, gives following results: + \snippet output.txt observe_on sample + */ template<class Coordination> auto observe_on(Coordination cn) const - -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::observe_on<T, Coordination>(std::move(cn)))) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::observe_on<T, Coordination>(std::move(cn)))) + /// \endcond + { return lift<T>(rxo::detail::observe_on<T, Coordination>(std::move(cn))); } - /// reduce -> - /// for each item from this observable use Accumulator to combine items, when completed use ResultSelector to produce a value that will be emitted from the new observable that is returned. - /// + /*! For each item from this observable use Accumulator to combine items, when completed use ResultSelector to produce a value that will be emitted from the new observable that is returned. + + \tparam Seed the type of the initial value for the accumulator + \tparam Accumulator the type of the data accumulating function + \tparam ResultSelector the type of the result producing function + + \param seed the initial value for the accumulator + \param a an accumulator function to be invoked on each item emitted by the source observable, the result of which will be used in the next accumulator call + \param rs a result producing function that makes the final value from the last accumulator call result + + \return An observable that emits a single item that is the result of accumulating the output from the items emitted by the source observable. + + Some basic reduce-type operators have already been implemented: + - rxcpp::observable::count + - rxcpp::observable::sum + - rxcpp::observable::average + + \sample + Geometric mean of source values: + \snippet reduce.cpp reduce sample + \snippet output.txt reduce sample + */ template<class Seed, class Accumulator, class ResultSelector> auto reduce(Seed seed, Accumulator&& a, ResultSelector&& rs) const - -> observable<rxu::value_type_t<rxo::detail::reduce<T, source_operator_type, Accumulator, ResultSelector, Seed>>, rxo::detail::reduce<T, source_operator_type, Accumulator, ResultSelector, Seed>> { + /// \cond SHOW_SERVICE_MEMBERS + -> observable<rxu::value_type_t<rxo::detail::reduce<T, source_operator_type, Accumulator, ResultSelector, Seed>>, rxo::detail::reduce<T, source_operator_type, Accumulator, ResultSelector, Seed>> + /// \endcond + { return observable<rxu::value_type_t<rxo::detail::reduce<T, source_operator_type, Accumulator, ResultSelector, Seed>>, rxo::detail::reduce<T, source_operator_type, Accumulator, ResultSelector, Seed>>( rxo::detail::reduce<T, source_operator_type, Accumulator, ResultSelector, Seed>(source_operator, std::forward<Accumulator>(a), std::forward<ResultSelector>(rs), seed)); } + /// \cond SHOW_SERVICE_MEMBERS template<class Seed, class Accumulator, class ResultSelector> struct defer_reduce : public defer_observable< rxu::all_true< @@ -1206,207 +1939,420 @@ public: rxo::detail::reduce, T, source_operator_type, Accumulator, ResultSelector, Seed> { }; + /// \endcond - /// first -> - /// for each item from this observable reduce it by sending only the first item. - /// + /*! For each item from this observable reduce it by sending only the first item. + + \return An observable that emits only the very first item emitted by the source observable. + + \sample + \snippet math.cpp first sample + \snippet output.txt first sample + */ auto first() const -> observable<T>; - /// last -> - /// for each item from this observable reduce it by sending only the last item. - /// + /*! For each item from this observable reduce it by sending only the last item. + + \return An observable that emits only the very last item emitted by the source observable. + + \sample + \snippet math.cpp last sample + \snippet output.txt last sample + */ auto last() const -> observable<T>; - /// count -> - /// for each item from this observable reduce it by incrementing a count. - /// + /*! For each item from this observable reduce it by incrementing a count. + + \return An observable that emits a single item: the number of elements emitted by the source observable. + + \sample + \snippet math.cpp count sample + \snippet output.txt count sample + */ auto count() const -> observable<int>; - /// sum -> - /// for each item from this observable reduce it by adding to the previous items. - /// + /*! For each item from this observable reduce it by adding to the previous items. + + \return An observable that emits a single item: the sum of elements emitted by the source observable. + + \sample + \snippet math.cpp sum sample + \snippet output.txt sum sample + */ auto sum() const - -> typename defer_reduce<rxu::defer_seed_type<rxo::detail::initialize_seeder, T>, rxu::plus, rxu::defer_type<identity_for, T>>::observable_type { + /// \cond SHOW_SERVICE_MEMBERS + -> typename defer_reduce<rxu::defer_seed_type<rxo::detail::initialize_seeder, T>, rxu::plus, rxu::defer_type<identity_for, T>>::observable_type + /// \endcond + { return defer_reduce<rxu::defer_seed_type<rxo::detail::initialize_seeder, T>, rxu::plus, rxu::defer_type<identity_for, T>>::make(source_operator, rxu::plus(), identity_for<T>(), rxo::detail::initialize_seeder<T>().seed()); } - /// average -> - /// for each item from this observable reduce it by adding to the previous values and then dividing by the number of items at the end. - /// + /*! For each item from this observable reduce it by adding to the previous values and then dividing by the number of items at the end. + + \return An observable that emits a single item: the average of elements emitted by the source observable. + + \sample + \snippet math.cpp average sample + \snippet output.txt average sample + */ auto average() const - -> typename defer_reduce<rxu::defer_seed_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>>::observable_type { + /// \cond SHOW_SERVICE_MEMBERS + -> typename defer_reduce<rxu::defer_seed_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>>::observable_type + /// \endcond + { return defer_reduce<rxu::defer_seed_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>>::make(source_operator, rxo::detail::average<T>(), rxo::detail::average<T>(), rxo::detail::average<T>().seed()); } - /// scan -> - /// for each item from this observable use Accumulator to combine items into a value that will be emitted from the new observable that is returned. - /// + /*! For each item from this observable use Accumulator to combine items into a value that will be emitted from the new observable that is returned. + + \tparam Seed the type of the initial value for the accumulator + \tparam Accumulator the type of the data accumulating function + + \param seed the initial value for the accumulator + \param a an accumulator function to be invoked on each item emitted by the source observable, whose result will be emitted and used in the next accumulator call + + \return An observable that emits the results of each call to the accumulator function. + + \sample + \snippet scan.cpp scan sample + \snippet output.txt scan sample + */ template<class Seed, class Accumulator> auto scan(Seed seed, Accumulator&& a) const - -> observable<Seed, rxo::detail::scan<T, this_type, Accumulator, Seed>> { + /// \cond SHOW_SERVICE_MEMBERS + -> observable<Seed, rxo::detail::scan<T, this_type, Accumulator, Seed>> + /// \endcond + { return observable<Seed, rxo::detail::scan<T, this_type, Accumulator, Seed>>( rxo::detail::scan<T, this_type, Accumulator, Seed>(*this, std::forward<Accumulator>(a), seed)); } - /// skip -> - /// make new observable with skipped first count items from this observable - /// - /// + /*! Make new observable with skipped first count items from this observable. + + \tparam Count the type of the items counter + + \param t the number of items to skip + + \return An observable that is identical to the source observable except that it does not emit the first t items that the source observable emits. + + \sample + \snippet skip.cpp skip sample + \snippet output.txt skip sample + */ template<class Count> auto skip(Count t) const - -> observable<T, rxo::detail::skip<T, this_type, Count>> { + /// \cond SHOW_SERVICE_MEMBERS + -> observable<T, rxo::detail::skip<T, this_type, Count>> + /// \endcond + { return observable<T, rxo::detail::skip<T, this_type, Count>>( rxo::detail::skip<T, this_type, Count>(*this, t)); } - /// skip_until -> - /// All sources must be synchronized! This means that calls across all the subscribers must be serial. - /// make new observable with items skipped until on_next occurs on the TriggerSource - /// - /// + /*! Make new observable with items skipped until on_next occurs on the trigger observable + + \tparam TriggerSource the type of the trigger observable + + \param t an observable that has to emit an item before the source observable's elements begin to be mirrored by the resulting observable + + \return An observable that skips items from the source observable until the second observable emits an item, then emits the remaining items. + + \note All sources must be synchronized! This means that calls across all the subscribers must be serial. + + \sample + \snippet skip_until.cpp skip_until sample + \snippet output.txt skip_until sample + */ template<class TriggerSource> auto skip_until(TriggerSource&& t) const + /// \cond SHOW_SERVICE_MEMBERS -> typename std::enable_if<is_observable<TriggerSource>::value, - observable<T, rxo::detail::skip_until<T, this_type, TriggerSource, identity_one_worker>>>::type { + observable<T, rxo::detail::skip_until<T, this_type, TriggerSource, identity_one_worker>>>::type + /// \endcond + { return observable<T, rxo::detail::skip_until<T, this_type, TriggerSource, identity_one_worker>>( rxo::detail::skip_until<T, this_type, TriggerSource, identity_one_worker>(*this, std::forward<TriggerSource>(t), identity_one_worker(rxsc::make_current_thread()))); } - /// skip_until -> - /// The coordination is used to synchronize sources from different contexts. - /// make new observable with items skipped until on_next occurs on the TriggerSource - /// - /// + /*! Make new observable with items skipped until on_next occurs on the trigger observable + + \tparam TriggerSource the type of the trigger observable + \tparam Coordination the type of the scheduler + + \param t an observable that has to emit an item before the source observable's elements begin to be mirrored by the resulting observable + \param cn the scheduler to use for scheduling the items + + \return An observable that skips items from the source observable until the second observable emits an item, then emits the remaining items. + + \sample + \snippet skip_until.cpp threaded skip_until sample + \snippet output.txt threaded skip_until sample + */ template<class TriggerSource, class Coordination> - auto skip_until(TriggerSource&& t, Coordination&& sf) const + auto skip_until(TriggerSource&& t, Coordination&& cn) const + /// \cond SHOW_SERVICE_MEMBERS -> typename std::enable_if<is_observable<TriggerSource>::value && is_coordination<Coordination>::value, - observable<T, rxo::detail::skip_until<T, this_type, TriggerSource, Coordination>>>::type { + observable<T, rxo::detail::skip_until<T, this_type, TriggerSource, Coordination>>>::type + /// \endcond + { return observable<T, rxo::detail::skip_until<T, this_type, TriggerSource, Coordination>>( - rxo::detail::skip_until<T, this_type, TriggerSource, Coordination>(*this, std::forward<TriggerSource>(t), std::forward<Coordination>(sf))); + rxo::detail::skip_until<T, this_type, TriggerSource, Coordination>(*this, std::forward<TriggerSource>(t), std::forward<Coordination>(cn))); } - /// take -> - /// for the first count items from this observable emit them from the new observable that is returned. - /// - /// + /*! For the first count items from this observable emit them from the new observable that is returned. + + \tparam Count the type of the items counter + + \param t the number of items to take + + \return An observable that emits only the first t items emitted by the source Observable, or all of the items from the source observable if that observable emits fewer than t items. + + \sample + \snippet take.cpp take sample + \snippet output.txt take sample + */ template<class Count> auto take(Count t) const - -> observable<T, rxo::detail::take<T, this_type, Count>> { + /// \cond SHOW_SERVICE_MEMBERS + -> observable<T, rxo::detail::take<T, this_type, Count>> + /// \endcond + { return observable<T, rxo::detail::take<T, this_type, Count>>( rxo::detail::take<T, this_type, Count>(*this, t)); } - /// take_until -> - /// All sources must be synchronized! This means that calls across all the subscribers must be serial. - /// for each item from this observable until on_next occurs on the TriggerSource, emit them from the new observable that is returned. - /// - /// + /*! For each item from this observable until on_next occurs on the trigger observable, emit them from the new observable that is returned. + + \tparam TriggerSource the type of the trigger observable + + \param t an observable whose first emitted item will stop emitting items from the source observable + + \return An observable that emits the items emitted by the source observable until such time as other emits its first item. + + \note All sources must be synchronized! This means that calls across all the subscribers must be serial. + + \sample + \snippet take_until.cpp take_until sample + \snippet output.txt take_until sample + */ template<class TriggerSource> auto take_until(TriggerSource t) const + /// \cond SHOW_SERVICE_MEMBERS -> typename std::enable_if<is_observable<TriggerSource>::value, - observable<T, rxo::detail::take_until<T, this_type, TriggerSource, identity_one_worker>>>::type { + observable<T, rxo::detail::take_until<T, this_type, TriggerSource, identity_one_worker>>>::type + /// \endcond + { return observable<T, rxo::detail::take_until<T, this_type, TriggerSource, identity_one_worker>>( rxo::detail::take_until<T, this_type, TriggerSource, identity_one_worker>(*this, std::move(t), identity_current_thread())); } - /// take_until -> - /// The coordination is used to synchronize sources from different contexts. - /// for each item from this observable until on_next occurs on the TriggerSource, emit them from the new observable that is returned. - /// - /// + /*! For each item from this observable until on_next occurs on the trigger observable, emit them from the new observable that is returned. + + \tparam TriggerSource the type of the trigger observable + \tparam Coordination the type of the scheduler + + \param t an observable whose first emitted item will stop emitting items from the source observable + \param cn the scheduler to use for scheduling the items + + \return An observable that emits the items emitted by the source observable until such time as other emits its first item. + + \sample + \snippet take_until.cpp threaded take_until sample + \snippet output.txt threaded take_until sample + */ template<class TriggerSource, class Coordination> - auto take_until(TriggerSource t, Coordination sf) const + auto take_until(TriggerSource t, Coordination cn) const + /// \cond SHOW_SERVICE_MEMBERS -> typename std::enable_if<is_observable<TriggerSource>::value && is_coordination<Coordination>::value, - observable<T, rxo::detail::take_until<T, this_type, TriggerSource, Coordination>>>::type { + observable<T, rxo::detail::take_until<T, this_type, TriggerSource, Coordination>>>::type + /// \endcond + { return observable<T, rxo::detail::take_until<T, this_type, TriggerSource, Coordination>>( - rxo::detail::take_until<T, this_type, TriggerSource, Coordination>(*this, std::move(t), std::move(sf))); + rxo::detail::take_until<T, this_type, TriggerSource, Coordination>(*this, std::move(t), std::move(cn))); } - /// take_until -> - /// All sources must be synchronized! This means that calls across all the subscribers must be serial. - /// for each item from this observable until the specified time, emit them from the new observable that is returned. - /// - /// + /*! For each item from this observable until the specified time, emit them from the new observable that is returned. + + \tparam TimePoint the type of the time interval + + \param when an observable whose first emitted item will stop emitting items from the source observable + + \return An observable that emits those items emitted by the source observable before the time runs out. + + \note All sources must be synchronized! This means that calls across all the subscribers must be serial. + + \sample + \snippet take_until.cpp take_until time sample + \snippet output.txt take_until time sample + */ template<class TimePoint> auto take_until(TimePoint when) const + /// \cond SHOW_SERVICE_MEMBERS -> typename std::enable_if<std::is_convertible<TimePoint, rxsc::scheduler::clock_type::time_point>::value, - observable<T, rxo::detail::take_until<T, this_type, decltype(rxs::timer(when, identity_current_thread())), identity_one_worker>>>::type { + observable<T, rxo::detail::take_until<T, this_type, decltype(rxs::timer(when, identity_current_thread())), identity_one_worker>>>::type + /// \endcond + { auto cn = identity_current_thread(); return take_until(rxs::timer(when, cn), cn); } - /// take_until -> - /// The coordination is used to synchronize sources from different contexts. - /// for each item from this observable until the specified time, emit them from the new observable that is returned. - /// - /// + /*! For each item from this observable until the specified time, emit them from the new observable that is returned. + + \tparam TimePoint the type of the time interval + \tparam Coordination the type of the scheduler + + \param when an observable whose first emitted item will stop emitting items from the source observable + \param cn the scheduler to use for scheduling the items + + \return An observable that emits those items emitted by the source observable before the time runs out. + + \sample + \snippet take_until.cpp threaded take_until time sample + \snippet output.txt threaded take_until time sample + */ template<class Coordination> auto take_until(rxsc::scheduler::clock_type::time_point when, Coordination cn) const + /// \cond SHOW_SERVICE_MEMBERS -> typename std::enable_if<is_coordination<Coordination>::value, - observable<T, rxo::detail::take_until<T, this_type, decltype(rxs::interval(when, cn)), Coordination>>>::type { - return take_until(rxs::interval(when, cn), cn); + observable<T, rxo::detail::take_until<T, this_type, decltype(rxs::timer(when, cn)), Coordination>>>::type + /// \endcond + { + return take_until(rxs::timer(when, cn), cn); } - /// repeat -> - /// infinitely repeats this observable - /// - /// + /*! Infinitely repeat this observable. + + \return An observable that emits the items emitted by the source observable repeatedly and in sequence. + + \sample + \snippet repeat.cpp repeat sample + \snippet output.txt repeat sample + + If the source observable calls on_error, repeat stops: + \snippet repeat.cpp repeat error sample + \snippet output.txt repeat error sample + */ auto repeat() const - -> observable<T, rxo::detail::repeat<T, this_type, int>> { + /// \cond SHOW_SERVICE_MEMBERS + -> observable<T, rxo::detail::repeat<T, this_type, int>> + /// \endcond + { return observable<T, rxo::detail::repeat<T, this_type, int>>( rxo::detail::repeat<T, this_type, int>(*this, 0)); } - /// repeat -> - /// repeats this observable for given number of times - /// - /// + /*! Repeat this observable for the given number of times. + + \tparam Count the type of the counter + + \param t the number of times the source observable items are repeated + + \return An observable that repeats the sequence of items emitted by the source observable for t times. + + Call to repeat(0) infinitely repeats the source observable. + + \sample + \snippet repeat.cpp repeat count sample + \snippet output.txt repeat count sample + */ template<class Count> auto repeat(Count t) const - -> observable<T, rxo::detail::repeat<T, this_type, Count>> { + /// \cond SHOW_SERVICE_MEMBERS + -> observable<T, rxo::detail::repeat<T, this_type, Count>> + /// \endcond + { return observable<T, rxo::detail::repeat<T, this_type, Count>>( rxo::detail::repeat<T, this_type, Count>(*this, t)); } - /// retry -> - /// infinitely retrys this observable - /// - /// + /*! Infinitely retry this observable. + + \return An observable that mirrors the source observable, resubscribing to it if it calls on_error. + + \sample + \snippet retry.cpp retry sample + \snippet output.txt retry sample + */ auto retry() const - -> observable<T, rxo::detail::retry<T, this_type, int>> { + /// \cond SHOW_SERVICE_MEMBERS + -> observable<T, rxo::detail::retry<T, this_type, int>> + /// \endcond + { return observable<T, rxo::detail::retry<T, this_type, int>>( rxo::detail::retry<T, this_type, int>(*this, 0)); } - /// retry -> - /// retrys this observable for given number of times - /// - /// + /*! Retry this observable for the given number of times. + + \tparam Count the type of the counter + + \param t the number of retries + + \return An observable that mirrors the source observable, resubscribing to it if it calls on_error up to a specified number of retries. + + Call to retry(0) infinitely retries the source observable. + + \sample + \snippet retry.cpp retry count sample + \snippet output.txt retry count sample + */ template<class Count> auto retry(Count t) const - -> observable<T, rxo::detail::retry<T, this_type, Count>> { + /// \cond SHOW_SERVICE_MEMBERS + -> observable<T, rxo::detail::retry<T, this_type, Count>> + /// \endcond + { return observable<T, rxo::detail::retry<T, this_type, Count>>( rxo::detail::retry<T, this_type, Count>(*this, t)); } - /// start_with -> - /// start with the supplied values, then concatenate this observable - /// - /// + /*! Start with the supplied values, then concatenate this observable. + + \tparam Value0 ... + \tparam ValueN the type of sending values + + \param v0 ... + \param vn values to send + + \return Observable that emits the specified items and then emits the items emitted by the source observable. + + \sample + \snippet start_with.cpp short start_with sample + \snippet output.txt short start_with sample + + Another form of this operator, rxcpp::observable<void, void>::start_with, gets the source observable as a parameter: + \snippet start_with.cpp full start_with sample + \snippet output.txt full start_with sample + */ template<class Value0, class... ValueN> auto start_with(Value0 v0, ValueN... vn) const - -> decltype(rxo::start_with(*(this_type*)nullptr, std::move(v0), std::move(vn)...)) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(rxo::start_with(*(this_type*)nullptr, std::move(v0), std::move(vn)...)) + /// \endcond + { return rxo::start_with(*this, std::move(v0), std::move(vn)...); } + + /*! Take values pairwise from this observable. - /// pairwise -> - /// take values pairwise from the observable - /// + \return Observable that emits tuples of two the most recent items emitted by the source observable. + + \sample + \snippet pairwise.cpp pairwise sample + \snippet output.txt pairwise sample + + If the source observable emits less than two items, no pairs are emitted by the source observable: + \snippet pairwise.cpp pairwise short sample + \snippet output.txt pairwise short sample + */ auto pairwise() const - -> decltype(EXPLICIT_THIS lift<rxu::value_type_t<rxo::detail::pairwise<T>>>(rxo::detail::pairwise<T>())) { + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<rxu::value_type_t<rxo::detail::pairwise<T>>>(rxo::detail::pairwise<T>())) + /// \endcond + { return lift<rxu::value_type_t<rxo::detail::pairwise<T>>>(rxo::detail::pairwise<T>()); } }; diff --git a/projects/CMake/CMakeLists.txt b/projects/CMake/CMakeLists.txt index 0515b55..76ef1ab 100644 --- a/projects/CMake/CMakeLists.txt +++ b/projects/CMake/CMakeLists.txt @@ -156,19 +156,49 @@ if(DOXYGEN_FOUND) # Target to build examples set(DOXY_EXAMPLE_SRC_LIST ${DOXY_EXAMPLES_SRC_DIR}/main.cpp + ${DOXY_EXAMPLES_SRC_DIR}/amb.cpp + ${DOXY_EXAMPLES_SRC_DIR}/as_dynamic.cpp + ${DOXY_EXAMPLES_SRC_DIR}/buffer.cpp + ${DOXY_EXAMPLES_SRC_DIR}/combine_latest.cpp + ${DOXY_EXAMPLES_SRC_DIR}/concat.cpp + ${DOXY_EXAMPLES_SRC_DIR}/concat_map.cpp ${DOXY_EXAMPLES_SRC_DIR}/create.cpp ${DOXY_EXAMPLES_SRC_DIR}/defer.cpp + ${DOXY_EXAMPLES_SRC_DIR}/distinct_until_changed.cpp ${DOXY_EXAMPLES_SRC_DIR}/empty.cpp ${DOXY_EXAMPLES_SRC_DIR}/error.cpp + ${DOXY_EXAMPLES_SRC_DIR}/filter.cpp + ${DOXY_EXAMPLES_SRC_DIR}/finally.cpp + ${DOXY_EXAMPLES_SRC_DIR}/flat_map.cpp ${DOXY_EXAMPLES_SRC_DIR}/from.cpp + ${DOXY_EXAMPLES_SRC_DIR}/group_by.cpp ${DOXY_EXAMPLES_SRC_DIR}/interval.cpp ${DOXY_EXAMPLES_SRC_DIR}/iterate.cpp ${DOXY_EXAMPLES_SRC_DIR}/just.cpp + ${DOXY_EXAMPLES_SRC_DIR}/map.cpp + ${DOXY_EXAMPLES_SRC_DIR}/math.cpp + ${DOXY_EXAMPLES_SRC_DIR}/merge.cpp ${DOXY_EXAMPLES_SRC_DIR}/never.cpp + ${DOXY_EXAMPLES_SRC_DIR}/observe_on.cpp + ${DOXY_EXAMPLES_SRC_DIR}/pairwise.cpp + ${DOXY_EXAMPLES_SRC_DIR}/publish.cpp ${DOXY_EXAMPLES_SRC_DIR}/range.cpp + ${DOXY_EXAMPLES_SRC_DIR}/reduce.cpp + ${DOXY_EXAMPLES_SRC_DIR}/repeat.cpp + ${DOXY_EXAMPLES_SRC_DIR}/retry.cpp + ${DOXY_EXAMPLES_SRC_DIR}/scan.cpp ${DOXY_EXAMPLES_SRC_DIR}/scope.cpp + ${DOXY_EXAMPLES_SRC_DIR}/skip.cpp + ${DOXY_EXAMPLES_SRC_DIR}/skip_until.cpp ${DOXY_EXAMPLES_SRC_DIR}/start_with.cpp + ${DOXY_EXAMPLES_SRC_DIR}/subscribe.cpp + ${DOXY_EXAMPLES_SRC_DIR}/subscribe_on.cpp + ${DOXY_EXAMPLES_SRC_DIR}/switch_on_next.cpp + ${DOXY_EXAMPLES_SRC_DIR}/take.cpp + ${DOXY_EXAMPLES_SRC_DIR}/take_until.cpp ${DOXY_EXAMPLES_SRC_DIR}/timer.cpp + ${DOXY_EXAMPLES_SRC_DIR}/window.cpp + ${DOXY_EXAMPLES_SRC_DIR}/zip.cpp ) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${DOXY_EXAMPLES_BIN_DIR}) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY_DEBUG ${DOXY_EXAMPLES_BIN_DIR}) |