aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorValery Kopylov <v-valkop@microsoft.com>2015-06-01 18:06:04 +0300
committerValery Kopylov <v-valkop@microsoft.com>2015-06-01 18:10:12 +0300
commit4992c73dbc8bbe3a7335b186f9b3e94da20ea127 (patch)
tree4f5b386b3cc364aa65c0b85216559abc266529e7
parent1fe0081ab9866c2882bd0c24183bfa4b2de38d10 (diff)
downloadplatform_external_Reactive-Extensions_RxCpp-4992c73dbc8bbe3a7335b186f9b3e94da20ea127.tar.gz
platform_external_Reactive-Extensions_RxCpp-4992c73dbc8bbe3a7335b186f9b3e94da20ea127.tar.bz2
platform_external_Reactive-Extensions_RxCpp-4992c73dbc8bbe3a7335b186f9b3e94da20ea127.zip
Add description and examples for observable<T> members
-rw-r--r--Rx/v2/examples/doxygen/amb.cpp84
-rw-r--r--Rx/v2/examples/doxygen/as_dynamic.cpp21
-rw-r--r--Rx/v2/examples/doxygen/buffer.cpp204
-rw-r--r--Rx/v2/examples/doxygen/combine_latest.cpp85
-rw-r--r--Rx/v2/examples/doxygen/concat.cpp60
-rw-r--r--Rx/v2/examples/doxygen/concat_map.cpp50
-rw-r--r--Rx/v2/examples/doxygen/distinct_until_changed.cpp15
-rw-r--r--Rx/v2/examples/doxygen/filter.cpp17
-rw-r--r--Rx/v2/examples/doxygen/finally.cpp37
-rw-r--r--Rx/v2/examples/doxygen/flat_map.cpp50
-rw-r--r--Rx/v2/examples/doxygen/from.cpp13
-rw-r--r--Rx/v2/examples/doxygen/group_by.cpp54
-rw-r--r--Rx/v2/examples/doxygen/map.cpp17
-rw-r--r--Rx/v2/examples/doxygen/math.cpp89
-rw-r--r--Rx/v2/examples/doxygen/merge.cpp84
-rw-r--r--Rx/v2/examples/doxygen/observe_on.cpp24
-rw-r--r--Rx/v2/examples/doxygen/pairwise.cpp51
-rw-r--r--Rx/v2/examples/doxygen/publish.cpp97
-rw-r--r--Rx/v2/examples/doxygen/reduce.cpp24
-rw-r--r--Rx/v2/examples/doxygen/repeat.cpp44
-rw-r--r--Rx/v2/examples/doxygen/retry.cpp84
-rw-r--r--Rx/v2/examples/doxygen/scan.cpp19
-rw-r--r--Rx/v2/examples/doxygen/skip.cpp14
-rw-r--r--Rx/v2/examples/doxygen/skip_until.cpp39
-rw-r--r--Rx/v2/examples/doxygen/subscribe.cpp101
-rw-r--r--Rx/v2/examples/doxygen/subscribe_on.cpp24
-rw-r--r--Rx/v2/examples/doxygen/switch_on_next.cpp35
-rw-r--r--Rx/v2/examples/doxygen/take.cpp15
-rw-r--r--Rx/v2/examples/doxygen/take_until.cpp68
-rw-r--r--Rx/v2/examples/doxygen/window.cpp196
-rw-r--r--Rx/v2/examples/doxygen/zip.cpp85
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp1640
-rw-r--r--projects/CMake/CMakeLists.txt30
33 files changed, 3120 insertions, 350 deletions
diff --git a/Rx/v2/examples/doxygen/amb.cpp b/Rx/v2/examples/doxygen/amb.cpp
new file mode 100644
index 0000000..8f2ac7d
--- /dev/null
+++ b/Rx/v2/examples/doxygen/amb.cpp
@@ -0,0 +1,84 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("amb sample"){
+ printf("//! [amb sample]\n");
+ auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;});
+ auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;});
+ auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;});
+ auto values = o1.amb(o2, o3);
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [amb sample]\n");
+}
+
+SCENARIO("implicit amb sample"){
+ printf("//! [implicit amb sample]\n");
+ auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;}).as_dynamic();
+ auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;}).as_dynamic();
+ auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;}).as_dynamic();
+ auto base = rxcpp::observable<>::from(o1, o2, o3);
+ auto values = base.amb();
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [implicit amb sample]\n");
+}
+
+std::string get_pid();
+
+SCENARIO("threaded amb sample"){
+ printf("//! [threaded amb sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {
+ printf("[thread %s] Timer1 fired\n", get_pid().c_str());
+ return 1;
+ });
+ auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {
+ printf("[thread %s] Timer2 fired\n", get_pid().c_str());
+ return 2;
+ });
+ auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {
+ printf("[thread %s] Timer3 fired\n", get_pid().c_str());
+ return 3;
+ });
+ auto values = o1.amb(rxcpp::observe_on_new_thread(), o2, o3);
+ values.
+ as_blocking().
+ subscribe(
+ [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [threaded amb sample]\n");
+}
+
+SCENARIO("threaded implicit amb sample"){
+ printf("//! [threaded implicit amb sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {
+ printf("[thread %s] Timer1 fired\n", get_pid().c_str());
+ return 1;
+ }).as_dynamic();
+ auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {
+ printf("[thread %s] Timer2 fired\n", get_pid().c_str());
+ return 2;
+ }).as_dynamic();
+ auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {
+ printf("[thread %s] Timer3 fired\n", get_pid().c_str());
+ return 3;
+ }).as_dynamic();
+ auto base = rxcpp::observable<>::from(o1, o2, o3);
+ auto values = base.amb(rxcpp::observe_on_new_thread());
+ values.
+ as_blocking().
+ subscribe(
+ [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [threaded implicit amb sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/as_dynamic.cpp b/Rx/v2/examples/doxygen/as_dynamic.cpp
new file mode 100644
index 0000000..363ae57
--- /dev/null
+++ b/Rx/v2/examples/doxygen/as_dynamic.cpp
@@ -0,0 +1,21 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("as_dynamic sample"){
+ printf("//! [as_dynamic sample]\n");
+ auto o1 = rxcpp::observable<>::range(1, 3);
+ auto o2 = rxcpp::observable<>::just(4);
+ auto o3 = rxcpp::observable<>::empty<int>();
+ auto values = o1.concat(o2, o3);
+ printf("type of o1: %s\n", typeid(o1).name());
+ printf("type of o1.as_dynamic(): %s\n", typeid(o1.as_dynamic()).name());
+ printf("type of o2: %s\n", typeid(o2).name());
+ printf("type of o2.as_dynamic(): %s\n", typeid(o2.as_dynamic()).name());
+ printf("type of o3: %s\n", typeid(o3).name());
+ printf("type of o3.as_dynamic(): %s\n", typeid(o3.as_dynamic()).name());
+ printf("type of values: %s\n", typeid(values).name());
+ printf("type of values.as_dynamic(): %s\n", typeid(values.as_dynamic()).name());
+ printf("//! [as_dynamic sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/buffer.cpp b/Rx/v2/examples/doxygen/buffer.cpp
new file mode 100644
index 0000000..9a5d0b0
--- /dev/null
+++ b/Rx/v2/examples/doxygen/buffer.cpp
@@ -0,0 +1,204 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("buffer count sample"){
+ printf("//! [buffer count sample]\n");
+ auto values = rxcpp::observable<>::range(1, 5).buffer(2);
+ values.
+ subscribe(
+ [](std::vector<int> v){
+ printf("OnNext:");
+ std::for_each(v.begin(), v.end(), [](int a){
+ printf(" %d", a);
+ });
+ printf("\n");
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [buffer count sample]\n");
+}
+
+SCENARIO("buffer count+skip sample"){
+ printf("//! [buffer count+skip sample]\n");
+ auto values = rxcpp::observable<>::range(1, 7).buffer(2, 3);
+ values.
+ subscribe(
+ [](std::vector<int> v){
+ printf("OnNext:");
+ std::for_each(v.begin(), v.end(), [](int a){
+ printf(" %d", a);
+ });
+ printf("\n");
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [buffer count+skip sample]\n");
+}
+
+std::string get_pid();
+
+SCENARIO("buffer period+skip+coordination sample"){
+ printf("//! [buffer period+skip+coordination sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto period = std::chrono::milliseconds(4);
+ auto skip = std::chrono::milliseconds(6);
+ auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
+ map([](long v){
+ printf("[thread %s] Interval OnNext: %d\n", get_pid().c_str(), v);
+ return v;
+ }).
+ take(7).
+ buffer_with_time(period, skip, rxcpp::observe_on_new_thread());
+ values.
+ as_blocking().
+ subscribe(
+ [](std::vector<long> v){
+ printf("[thread %s] OnNext:", get_pid().c_str());
+ std::for_each(v.begin(), v.end(), [](long a){
+ printf(" %d", a);
+ });
+ printf("\n");
+ },
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [buffer period+skip+coordination sample]\n");
+}
+
+SCENARIO("buffer period+skip sample"){
+ printf("//! [buffer period+skip sample]\n");
+ auto period = std::chrono::milliseconds(4);
+ auto skip = std::chrono::milliseconds(6);
+ auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
+ take(7).
+ buffer_with_time(period, skip);
+ values.
+ subscribe(
+ [](std::vector<long> v){
+ printf("OnNext:");
+ std::for_each(v.begin(), v.end(), [](long a){
+ printf(" %d", a);
+ });
+ printf("\n");
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [buffer period+skip sample]\n");
+}
+
+SCENARIO("buffer period+skip overlapping sample"){
+ printf("//! [buffer period+skip overlapping sample]\n");
+ auto period = std::chrono::milliseconds(6);
+ auto skip = std::chrono::milliseconds(4);
+ auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
+ take(7).
+ buffer_with_time(period, skip);
+ values.
+ subscribe(
+ [](std::vector<long> v){
+ printf("OnNext:");
+ std::for_each(v.begin(), v.end(), [](long a){
+ printf(" %d", a);
+ });
+ printf("\n");
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [buffer period+skip overlapping sample]\n");
+}
+
+SCENARIO("buffer period+skip empty sample"){
+ printf("//! [buffer period+skip empty sample]\n");
+ auto period = std::chrono::milliseconds(2);
+ auto skip = std::chrono::milliseconds(4);
+ auto values = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).
+ buffer_with_time(period, skip);
+ values.
+ subscribe(
+ [](std::vector<long> v){
+ printf("OnNext:");
+ std::for_each(v.begin(), v.end(), [](long a){
+ printf(" %d", a);
+ });
+ printf("\n");
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [buffer period+skip empty sample]\n");
+}
+
+SCENARIO("buffer period+coordination sample"){
+ printf("//! [buffer period+coordination sample]\n");
+ auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
+ take(7).
+ buffer_with_time(std::chrono::milliseconds(4), rxcpp::observe_on_new_thread());
+ values.
+ as_blocking().
+ subscribe(
+ [](std::vector<long> v){
+ printf("OnNext:");
+ std::for_each(v.begin(), v.end(), [](long a){
+ printf(" %d", a);
+ });
+ printf("\n");
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [buffer period+coordination sample]\n");
+}
+
+SCENARIO("buffer period sample"){
+ printf("//! [buffer period sample]\n");
+ auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
+ take(7).
+ buffer_with_time(std::chrono::milliseconds(4));
+ values.
+ subscribe(
+ [](std::vector<long> v){
+ printf("OnNext:");
+ std::for_each(v.begin(), v.end(), [](long a){
+ printf(" %d", a);
+ });
+ printf("\n");
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [buffer period sample]\n");
+}
+
+SCENARIO("buffer period+count+coordination sample"){
+ printf("//! [buffer period+count+coordination sample]\n");
+ auto start = std::chrono::steady_clock::now();
+ auto int1 = rxcpp::observable<>::range(1L, 3L);
+ auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50));
+ auto values = int1.
+ concat(int2).
+ buffer_with_time_or_count(std::chrono::milliseconds(20), 2, rxcpp::observe_on_event_loop());
+ values.
+ as_blocking().
+ subscribe(
+ [start](std::vector<long> v){
+ printf("OnNext:");
+ std::for_each(v.begin(), v.end(), [](long a){
+ printf(" %d", a);
+ });
+ printf("\n");
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [buffer period+count+coordination sample]\n");
+}
+
+SCENARIO("buffer period+count sample"){
+ printf("//! [buffer period+count sample]\n");
+ auto start = std::chrono::steady_clock::now();
+ auto int1 = rxcpp::observable<>::range(1L, 3L);
+ auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50));
+ auto values = int1.
+ concat(int2).
+ buffer_with_time_or_count(std::chrono::milliseconds(20), 2);
+ values.
+ subscribe(
+ [start](std::vector<long> v){
+ printf("OnNext:");
+ std::for_each(v.begin(), v.end(), [](long a){
+ printf(" %d", a);
+ });
+ printf("\n");
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [buffer period+count sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/combine_latest.cpp b/Rx/v2/examples/doxygen/combine_latest.cpp
new file mode 100644
index 0000000..b220da4
--- /dev/null
+++ b/Rx/v2/examples/doxygen/combine_latest.cpp
@@ -0,0 +1,85 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("combine_latest sample"){
+ printf("//! [combine_latest sample]\n");
+ auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
+ auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
+ auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5));
+ auto values = o1.combine_latest(o2, o3);
+ values.
+ take(5).
+ subscribe(
+ [](std::tuple<int, int, int> v){printf("OnNext: %d, %d, %d\n", std::get<0>(v), std::get<1>(v), std::get<2>(v));},
+ [](){printf("OnCompleted\n");});
+ printf("//! [combine_latest sample]\n");
+}
+
+std::string get_pid();
+
+SCENARIO("Coordination combine_latest sample"){
+ printf("//! [Coordination combine_latest sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto thr = rxcpp::synchronize_event_loop();
+ auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)).map([](int v) {
+ printf("[thread %s] Source1 OnNext: %d\n", get_pid().c_str(), v);
+ return v;
+ });
+ auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)).map([](int v) {
+ printf("[thread %s] Source2 OnNext: %d\n", get_pid().c_str(), v);
+ return v;
+ });
+ auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5)).map([](int v) {
+ printf("[thread %s] Source3 OnNext: %d\n", get_pid().c_str(), v);
+ return v;
+ });
+ auto values = o1.combine_latest(thr, o2, o3);
+ values.
+ take(5).
+ as_blocking().
+ subscribe(
+ [](std::tuple<int, int, int> v){printf("[thread %s] OnNext: %d, %d, %d\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v), std::get<2>(v));},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [Coordination combine_latest sample]\n");
+}
+
+SCENARIO("Selector combine_latest sample"){
+ printf("//! [Selector combine_latest sample]\n");
+ auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
+ auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
+ auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5));
+ auto values = o1.combine_latest(
+ [](int v1, int v2, int v3) {
+ return 100 * v1 + 10 * v2 + v3;
+ },
+ o2, o3);
+ values.
+ take(5).
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [Selector combine_latest sample]\n");
+}
+
+SCENARIO("Coordination+Selector combine_latest sample"){
+ printf("//! [Coordination+Selector combine_latest sample]\n");
+ auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
+ auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
+ auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(5));
+ auto values = o1.combine_latest(
+ rxcpp::observe_on_new_thread(),
+ [](int v1, int v2, int v3) {
+ return 100 * v1 + 10 * v2 + v3;
+ },
+ o2, o3);
+ values.
+ take(5).
+ as_blocking().
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [Coordination+Selector combine_latest sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/concat.cpp b/Rx/v2/examples/doxygen/concat.cpp
new file mode 100644
index 0000000..bb86033
--- /dev/null
+++ b/Rx/v2/examples/doxygen/concat.cpp
@@ -0,0 +1,60 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("concat sample"){
+ printf("//! [concat sample]\n");
+ auto o1 = rxcpp::observable<>::range(1, 3);
+ auto o2 = rxcpp::observable<>::just(4);
+ auto o3 = rxcpp::observable<>::from(5, 6);
+ auto values = o1.concat(o2.as_dynamic(), o3.as_dynamic());
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [concat sample]\n");
+}
+
+SCENARIO("implicit concat sample"){
+ printf("//! [implicit concat sample]\n");
+ auto o1 = rxcpp::observable<>::range(1, 3);
+ auto o2 = rxcpp::observable<>::just(4);
+ auto o3 = rxcpp::observable<>::from(5, 6);
+ auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2.as_dynamic(), o3.as_dynamic());
+ auto values = base.concat();
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [implicit concat sample]\n");
+}
+
+SCENARIO("threaded concat sample"){
+ printf("//! [threaded concat sample]\n");
+ auto o1 = rxcpp::observable<>::range(1, 3);
+ auto o2 = rxcpp::observable<>::just(4);
+ auto o3 = rxcpp::observable<>::from(5, 6);
+ auto values = o1.concat(rxcpp::observe_on_new_thread(), o2.as_dynamic(), o3.as_dynamic());
+ values.
+ as_blocking().
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [threaded concat sample]\n");
+}
+
+SCENARIO("threaded implicit concat sample"){
+ printf("//! [threaded implicit concat sample]\n");
+ auto o1 = rxcpp::observable<>::range(1, 3);
+ auto o2 = rxcpp::observable<>::just(4);
+ auto o3 = rxcpp::observable<>::from(5, 6);
+ auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2.as_dynamic(), o3.as_dynamic());
+ auto values = base.concat(rxcpp::observe_on_new_thread());
+ values.
+ as_blocking().
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [threaded implicit concat sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/concat_map.cpp b/Rx/v2/examples/doxygen/concat_map.cpp
new file mode 100644
index 0000000..70664e0
--- /dev/null
+++ b/Rx/v2/examples/doxygen/concat_map.cpp
@@ -0,0 +1,50 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("concat_map sample"){
+ printf("//! [concat_map sample]\n");
+ auto values = rxcpp::observable<>::range(1, 3).
+ concat_map(
+ [](int v){
+ return
+ rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)).
+ take(3);
+ },
+ [](int v_main, long v_sub){
+ return std::make_tuple(v_main, v_sub);
+ });
+ values.
+ subscribe(
+ [](std::tuple<int, long> v){printf("OnNext: %d - %d\n", std::get<0>(v), std::get<1>(v));},
+ [](){printf("OnCompleted\n");});
+ printf("//! [concat_map sample]\n");
+}
+
+std::string get_pid();
+
+SCENARIO("threaded concat_map sample"){
+ printf("//! [threaded concat_map sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto values = rxcpp::observable<>::range(1, 3).
+ concat_map(
+ [](int v){
+ printf("[thread %s] Call CollectionSelector(v = %d)\n", get_pid().c_str(), v);
+ return
+ rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)).
+ take(3);
+ },
+ [](int v_main, long v_sub){
+ printf("[thread %s] Call ResultSelector(v_main = %d, v_sub = %d)\n", get_pid().c_str(), v_main, v_sub);
+ return std::make_tuple(v_main, v_sub);
+ },
+ rxcpp::observe_on_new_thread());
+ values.
+ as_blocking().
+ subscribe(
+ [](std::tuple<int, long> v){printf("[thread %s] OnNext: %d - %d\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v));},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [threaded concat_map sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/distinct_until_changed.cpp b/Rx/v2/examples/doxygen/distinct_until_changed.cpp
new file mode 100644
index 0000000..9422b21
--- /dev/null
+++ b/Rx/v2/examples/doxygen/distinct_until_changed.cpp
@@ -0,0 +1,15 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("distinct_until_changed sample"){
+ printf("//! [distinct_until_changed sample]\n");
+ auto values = rxcpp::observable<>::from(1, 2, 2, 3, 3, 3, 4, 5, 5).distinct_until_changed();
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [distinct_until_changed sample]\n");
+}
+
diff --git a/Rx/v2/examples/doxygen/filter.cpp b/Rx/v2/examples/doxygen/filter.cpp
new file mode 100644
index 0000000..36a1f49
--- /dev/null
+++ b/Rx/v2/examples/doxygen/filter.cpp
@@ -0,0 +1,17 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("filter sample"){
+ printf("//! [filter sample]\n");
+ auto values = rxcpp::observable<>::range(1, 6).
+ filter([](int v){
+ return v % 2;
+ });
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [filter sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/finally.cpp b/Rx/v2/examples/doxygen/finally.cpp
new file mode 100644
index 0000000..3f25196
--- /dev/null
+++ b/Rx/v2/examples/doxygen/finally.cpp
@@ -0,0 +1,37 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("finally sample"){
+ printf("//! [finally sample]\n");
+ auto values = rxcpp::observable<>::range(1, 3).
+ finally([](){
+ printf("The final action\n");
+ });
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [finally sample]\n");
+}
+
+SCENARIO("error finally sample"){
+ printf("//! [error finally sample]\n");
+ auto values = rxcpp::observable<>::range(1, 3).
+ concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
+ finally([](){
+ printf("The final action\n");
+ });
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](std::exception_ptr ep){
+ try {std::rethrow_exception(ep);}
+ catch (const std::exception& ex) {
+ printf("OnError: %s\n", ex.what());
+ }
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [error finally sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/flat_map.cpp b/Rx/v2/examples/doxygen/flat_map.cpp
new file mode 100644
index 0000000..21db657
--- /dev/null
+++ b/Rx/v2/examples/doxygen/flat_map.cpp
@@ -0,0 +1,50 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("flat_map sample"){
+ printf("//! [flat_map sample]\n");
+ auto values = rxcpp::observable<>::range(1, 3).
+ flat_map(
+ [](int v){
+ return
+ rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)).
+ take(3);
+ },
+ [](int v_main, long v_sub){
+ return std::make_tuple(v_main, v_sub);
+ });
+ values.
+ subscribe(
+ [](std::tuple<int, long> v){printf("OnNext: %d - %d\n", std::get<0>(v), std::get<1>(v));},
+ [](){printf("OnCompleted\n");});
+ printf("//! [flat_map sample]\n");
+}
+
+std::string get_pid();
+
+SCENARIO("threaded flat_map sample"){
+ printf("//! [threaded flat_map sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto values = rxcpp::observable<>::range(1, 3).
+ flat_map(
+ [](int v){
+ printf("[thread %s] Call CollectionSelector(v = %d)\n", get_pid().c_str(), v);
+ return
+ rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)).
+ take(3);
+ },
+ [](int v_main, int v_sub){
+ printf("[thread %s] Call ResultSelector(v_main = %d, v_sub = %d)\n", get_pid().c_str(), v_main, v_sub);
+ return std::make_tuple(v_main, v_sub);
+ },
+ rxcpp::observe_on_new_thread());
+ values.
+ as_blocking().
+ subscribe(
+ [](std::tuple<int, long> v){printf("[thread %s] OnNext: %d - %d\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v));},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [threaded flat_map sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/from.cpp b/Rx/v2/examples/doxygen/from.cpp
index 03288eb..15186ba 100644
--- a/Rx/v2/examples/doxygen/from.cpp
+++ b/Rx/v2/examples/doxygen/from.cpp
@@ -13,14 +13,21 @@ SCENARIO("from sample"){
printf("//! [from sample]\n");
}
+std::string get_pid();
+
SCENARIO("threaded from sample"){
printf("//! [threaded from sample]\n");
- auto values = rxcpp::observable<>::from(rxcpp::observe_on_event_loop(), 1, 2, 3);
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto values = rxcpp::observable<>::from(rxcpp::observe_on_new_thread(), 1, 2, 3).map([](int v){
+ printf("[thread %s] Emit value: %d\n", get_pid().c_str(), v);
+ return v;
+ });
values.
as_blocking().
subscribe(
- [](int v){printf("OnNext: %d\n", v);},
- [](){printf("OnCompleted\n");});
+ [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
printf("//! [threaded from sample]\n");
}
diff --git a/Rx/v2/examples/doxygen/group_by.cpp b/Rx/v2/examples/doxygen/group_by.cpp
new file mode 100644
index 0000000..d30f334
--- /dev/null
+++ b/Rx/v2/examples/doxygen/group_by.cpp
@@ -0,0 +1,54 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("group_by sample"){
+ printf("//! [group_by sample]\n");
+ auto values = rxcpp::observable<>::range(0, 8).
+ group_by(
+ [](int v){return v % 3;},
+ [](int v){return 10 * v;});
+ values.
+ subscribe(
+ [](rxcpp::grouped_observable<int, int> g){
+ auto key = g.get_key();
+ printf("OnNext: key = %d\n", key);
+ g.subscribe(
+ [key](int v){printf("[key %d] OnNext: %d\n", key, v);},
+ [key](){printf("[key %d] OnCompleted\n", key);});
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [group_by sample]\n");
+}
+
+//! [group_by full intro]
+bool less(int v1, int v2){
+ return v1 < v2;
+}
+//! [group_by full intro]
+
+SCENARIO("group_by full sample"){
+ printf("//! [group_by full sample]\n");
+ auto data = rxcpp::observable<>::range(0, 8).
+ map([](int v){
+ std::stringstream s;
+ s << "Value " << v;
+ return std::make_pair(v % 3, s.str());
+ });
+ auto values = data.group_by(
+ [](std::pair<int, std::string> v){return v.first;},
+ [](std::pair<int, std::string> v){return v.second;},
+ less);
+ values.
+ subscribe(
+ [](rxcpp::grouped_observable<int, std::string> g){
+ auto key = g.get_key();
+ printf("OnNext: key = %d\n", key);
+ g.subscribe(
+ [key](const std::string& v){printf("[key %d] OnNext: %s\n", key, v.c_str());},
+ [key](){printf("[key %d] OnCompleted\n", key);});
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [group_by full sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/map.cpp b/Rx/v2/examples/doxygen/map.cpp
new file mode 100644
index 0000000..8db5030
--- /dev/null
+++ b/Rx/v2/examples/doxygen/map.cpp
@@ -0,0 +1,17 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("map sample"){
+ printf("//! [map sample]\n");
+ auto values = rxcpp::observable<>::range(1, 3).
+ map([](int v){
+ return 2 * v;
+ });
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [map sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/math.cpp b/Rx/v2/examples/doxygen/math.cpp
new file mode 100644
index 0000000..dcf3a90
--- /dev/null
+++ b/Rx/v2/examples/doxygen/math.cpp
@@ -0,0 +1,89 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("first sample"){
+ printf("//! [first sample]\n");
+ auto values = rxcpp::observable<>::range(1, 3).first();
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [first sample]\n");
+}
+
+//SCENARIO("first empty sample"){
+// printf("//! [first empty sample]\n");
+// auto values = rxcpp::observable<>::empty<int>().first();
+// values.
+// subscribe(
+// [](int v){printf("OnNext: %d\n", v);},
+// [](std::exception_ptr ep){
+// try {std::rethrow_exception(ep);}
+// catch (const std::exception& ex) {
+// printf("OnError: %s\n", ex.what());
+// }
+// catch (...) {
+// printf("OnError:\n");
+// }
+// },
+// [](){printf("OnCompleted\n");});
+// printf("//! [first empty sample]\n");
+//}
+
+SCENARIO("last sample"){
+ printf("//! [last sample]\n");
+ auto values = rxcpp::observable<>::range(1, 3).last();
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [last sample]\n");
+}
+
+//SCENARIO("last empty sample"){
+// printf("//! [last empty sample]\n");
+// auto values = rxcpp::observable<>::empty<int>().last();
+// values.
+// subscribe(
+// [](int v){printf("OnNext: %d\n", v);},
+// [](std::exception_ptr ep){
+// try {std::rethrow_exception(ep);}
+// catch (const std::exception& ex) {
+// printf("OnError: %s\n", ex.what());
+// }
+// },
+// [](){printf("OnCompleted\n");});
+// printf("//! [last empty sample]\n");
+//}
+
+SCENARIO("count sample"){
+ printf("//! [count sample]\n");
+ auto values = rxcpp::observable<>::range(1, 3).count();
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [count sample]\n");
+}
+
+SCENARIO("sum sample"){
+ printf("//! [sum sample]\n");
+ auto values = rxcpp::observable<>::range(1, 3).sum();
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [sum sample]\n");
+}
+
+SCENARIO("average sample"){
+ printf("//! [average sample]\n");
+ auto values = rxcpp::observable<>::range(1, 4).average();
+ values.
+ subscribe(
+ [](double v){printf("OnNext: %lf\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [average sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/merge.cpp b/Rx/v2/examples/doxygen/merge.cpp
new file mode 100644
index 0000000..4b5ea9a
--- /dev/null
+++ b/Rx/v2/examples/doxygen/merge.cpp
@@ -0,0 +1,84 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("merge sample"){
+ printf("//! [merge sample]\n");
+ auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;});
+ auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;});
+ auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;});
+ auto values = o1.merge(o2, o3);
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [merge sample]\n");
+}
+
+SCENARIO("implicit merge sample"){
+ printf("//! [implicit merge sample]\n");
+ auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;}).as_dynamic();
+ auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;}).as_dynamic();
+ auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;}).as_dynamic();
+ auto base = rxcpp::observable<>::from(o1, o2, o3);
+ auto values = base.merge();
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [implicit merge sample]\n");
+}
+
+std::string get_pid();
+
+SCENARIO("threaded merge sample"){
+ printf("//! [threaded merge sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {
+ printf("[thread %s] Timer1 fired\n", get_pid().c_str());
+ return 1;
+ });
+ auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(20)).map([](int) {
+ printf("[thread %s] Timer2 fired\n", get_pid().c_str());
+ return 2;
+ });
+ auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(30)).map([](int) {
+ printf("[thread %s] Timer3 fired\n", get_pid().c_str());
+ return 3;
+ });
+ auto values = o1.merge(rxcpp::observe_on_new_thread(), o2, o3);
+ values.
+ as_blocking().
+ subscribe(
+ [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [threaded merge sample]\n");
+}
+
+SCENARIO("threaded implicit merge sample"){
+ printf("//! [threaded implicit merge sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {
+ printf("[thread %s] Timer1 fired\n", get_pid().c_str());
+ return 1;
+ }).as_dynamic();
+ auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(20)).map([](int) {
+ printf("[thread %s] Timer2 fired\n", get_pid().c_str());
+ return 2;
+ }).as_dynamic();
+ auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(30)).map([](int) {
+ printf("[thread %s] Timer3 fired\n", get_pid().c_str());
+ return 3;
+ }).as_dynamic();
+ auto base = rxcpp::observable<>::from(o1, o2, o3);
+ auto values = base.merge(rxcpp::observe_on_new_thread());
+ values.
+ as_blocking().
+ subscribe(
+ [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [threaded implicit merge sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/observe_on.cpp b/Rx/v2/examples/doxygen/observe_on.cpp
new file mode 100644
index 0000000..927c339
--- /dev/null
+++ b/Rx/v2/examples/doxygen/observe_on.cpp
@@ -0,0 +1,24 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+std::string get_pid();
+
+SCENARIO("observe_on sample"){
+ printf("//! [observe_on sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto values = rxcpp::observable<>::range(1, 3).
+ map([](int v){
+ printf("[thread %s] Emit value %d\n", get_pid().c_str(), v);
+ return v;
+ });
+ values.
+ observe_on(rxcpp::synchronize_new_thread()).
+ as_blocking().
+ subscribe(
+ [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [observe_on sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/pairwise.cpp b/Rx/v2/examples/doxygen/pairwise.cpp
new file mode 100644
index 0000000..3dd8d34
--- /dev/null
+++ b/Rx/v2/examples/doxygen/pairwise.cpp
@@ -0,0 +1,51 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("pairwise sample"){
+ printf("//! [pairwise sample]\n");
+ auto values = rxcpp::observable<>::range(1, 5).pairwise();
+ values.
+ subscribe(
+ [](std::tuple<int, int> v){printf("OnNext: %d, %d\n", std::get<0>(v), std::get<1>(v));},
+ [](){printf("OnCompleted\n");});
+ printf("//! [pairwise sample]\n");
+}
+
+SCENARIO("pairwise short sample"){
+ printf("//! [pairwise short sample]\n");
+ auto values = rxcpp::observable<>::just(1).pairwise();
+ values.
+ subscribe(
+ [](std::tuple<int, int> v){printf("OnNext: %d, %d\n", std::get<0>(v), std::get<1>(v));},
+ [](){printf("OnCompleted\n");});
+ printf("//! [pairwise short sample]\n");
+}
+
+//std::string get_pid();
+//
+//SCENARIO("threaded flat_map sample"){
+// printf("//! [threaded flat_map sample]\n");
+// printf("[thread %s] Start task\n", get_pid().c_str());
+// auto values = rxcpp::observable<>::range(1, 3).
+// flat_map(
+// [](int v){
+// printf("[thread %s] Call CollectionSelector(v = %d)\n", get_pid().c_str(), v);
+// return
+// rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(10 * v), std::chrono::milliseconds(50)).
+// take(3);
+// },
+// [](int v_main, int v_sub){
+// printf("[thread %s] Call ResultSelector(v_main = %d, v_sub = %d)\n", get_pid().c_str(), v_main, v_sub);
+// return std::make_tuple(v_main, v_sub);
+// },
+// rxcpp::observe_on_new_thread());
+// values.
+// as_blocking().
+// subscribe(
+// [](std::tuple<int, long> v){printf("[thread %s] OnNext: %d - %d\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v));},
+// [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+// printf("[thread %s] Finish task\n", get_pid().c_str());
+// printf("//! [threaded flat_map sample]\n");
+//}
diff --git a/Rx/v2/examples/doxygen/publish.cpp b/Rx/v2/examples/doxygen/publish.cpp
new file mode 100644
index 0000000..700fa89
--- /dev/null
+++ b/Rx/v2/examples/doxygen/publish.cpp
@@ -0,0 +1,97 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("publish_synchronized sample"){
+ printf("//! [publish_synchronized sample]\n");
+ auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
+ take(5).
+ publish_synchronized(rxcpp::observe_on_new_thread());
+
+ // Subscribe from the beginning
+ values.subscribe(
+ [](long v){printf("[1] OnNext: %d\n", v);},
+ [](){printf("[1] OnCompleted\n");});
+
+ // Another subscription from the beginning
+ values.subscribe(
+ [](long v){printf("[2] OnNext: %d\n", v);},
+ [](){printf("[2] OnCompleted\n");});
+
+ // Start emitting
+ values.connect();
+
+ // Wait before subscribing
+ rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){
+ values.subscribe(
+ [](long v){printf("[3] OnNext: %d\n", v);},
+ [](){printf("[3] OnCompleted\n");});
+ });
+
+ // Add blocking subscription to see results
+ values.as_blocking().subscribe();
+ printf("//! [publish_synchronized sample]\n");
+}
+
+SCENARIO("publish subject sample"){
+ printf("//! [publish subject sample]\n");
+ auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
+ take(5).
+ publish();
+
+ // Subscribe from the beginning
+ values.subscribe(
+ [](long v){printf("[1] OnNext: %d\n", v);},
+ [](){printf("[1] OnCompleted\n");});
+
+ // Another subscription from the beginning
+ values.subscribe(
+ [](long v){printf("[2] OnNext: %d\n", v);},
+ [](){printf("[2] OnCompleted\n");});
+
+ // Start emitting
+ values.connect();
+
+ // Wait before subscribing
+ rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){
+ values.subscribe(
+ [](long v){printf("[3] OnNext: %d\n", v);},
+ [](){printf("[3] OnCompleted\n");});
+ });
+
+ // Add blocking subscription to see results
+ values.as_blocking().subscribe();
+ printf("//! [publish subject sample]\n");
+}
+
+SCENARIO("publish behavior sample"){
+ printf("//! [publish behavior sample]\n");
+ auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
+ take(5).
+ publish(0L);
+
+ // Subscribe from the beginning
+ values.subscribe(
+ [](long v){printf("[1] OnNext: %d\n", v);},
+ [](){printf("[1] OnCompleted\n");});
+
+ // Another subscription from the beginning
+ values.subscribe(
+ [](long v){printf("[2] OnNext: %d\n", v);},
+ [](){printf("[2] OnCompleted\n");});
+
+ // Start emitting
+ values.connect();
+
+ // Wait before subscribing
+ rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){
+ values.subscribe(
+ [](long v){printf("[3] OnNext: %d\n", v);},
+ [](){printf("[3] OnCompleted\n");});
+ });
+
+ // Add blocking subscription to see results
+ values.as_blocking().subscribe();
+ printf("//! [publish behavior sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/reduce.cpp b/Rx/v2/examples/doxygen/reduce.cpp
new file mode 100644
index 0000000..0005bf6
--- /dev/null
+++ b/Rx/v2/examples/doxygen/reduce.cpp
@@ -0,0 +1,24 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("reduce sample"){
+ printf("//! [reduce sample]\n");
+ auto values = rxcpp::observable<>::range(1, 7).
+ reduce(
+ std::make_pair(0, 1.0),
+ [](std::pair<int, double> seed, int v){
+ seed.first += 1;
+ seed.second *= v;
+ return seed;
+ },
+ [](std::pair<int, double> res){
+ return std::pow(res.second, 1.0 / res.first);
+ });
+ values.
+ subscribe(
+ [](double v){printf("OnNext: %lf\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [reduce sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/repeat.cpp b/Rx/v2/examples/doxygen/repeat.cpp
new file mode 100644
index 0000000..9cd8d43
--- /dev/null
+++ b/Rx/v2/examples/doxygen/repeat.cpp
@@ -0,0 +1,44 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("repeat sample"){
+ printf("//! [repeat sample]\n");
+ auto values = rxcpp::observable<>::from(1, 2).
+ repeat().
+ take(5);
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [repeat sample]\n");
+}
+
+SCENARIO("repeat count sample"){
+ printf("//! [repeat count sample]\n");
+ auto values = rxcpp::observable<>::from(1, 2).repeat(3);
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [repeat count sample]\n");
+}
+
+SCENARIO("repeat error sample"){
+ printf("//! [repeat error sample]\n");
+ auto values = rxcpp::observable<>::from(1, 2).
+ concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
+ repeat();
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](std::exception_ptr ep){
+ try {std::rethrow_exception(ep);}
+ catch (const std::exception& ex) {
+ printf("OnError: %s\n", ex.what());
+ }
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [repeat error sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/retry.cpp b/Rx/v2/examples/doxygen/retry.cpp
new file mode 100644
index 0000000..efcfb23
--- /dev/null
+++ b/Rx/v2/examples/doxygen/retry.cpp
@@ -0,0 +1,84 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("retry sample"){
+ printf("//! [retry sample]\n");
+ auto values = rxcpp::observable<>::from(1, 2).
+ concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
+ retry().
+ take(5);
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [retry sample]\n");
+}
+
+SCENARIO("retry count sample"){
+ printf("//! [retry count sample]\n");
+ auto source = rxcpp::observable<>::from(1, 2).
+ concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source")));
+ auto values = source.retry(3);
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](std::exception_ptr ep){
+ try {std::rethrow_exception(ep);}
+ catch (const std::exception& ex) {
+ printf("OnError: %s\n", ex.what());
+ }
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [retry count sample]\n");
+}
+
+//SCENARIO("retry hot sample"){
+// printf("//! [retry hot sample]\n");
+// auto values = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).
+// concat(rxcpp::observable<>::error<long>(std::runtime_error("Error1 from source"))).
+// concat(rxcpp::observable<>::timer(std::chrono::milliseconds(10))).
+// concat(rxcpp::observable<>::error<long>(std::runtime_error("Error2 from source"))).
+// concat(rxcpp::observable<>::timer(std::chrono::milliseconds(10))).
+// concat(rxcpp::observable<>::error<long>(std::runtime_error("Error3 from source"))).
+// concat(rxcpp::observable<>::timer(std::chrono::milliseconds(10))).
+// concat(rxcpp::observable<>::error<long>(std::runtime_error("Error4 from source"))).
+// retry(3);
+// values.
+// subscribe(
+// [](long v){printf("OnNext: %d\n", v);},
+// [](std::exception_ptr ep){
+// try {std::rethrow_exception(ep);}
+// catch (const std::exception& ex) {
+// printf("OnError: %s\n", ex.what());
+// }
+// },
+// [](){printf("OnCompleted\n");});
+// printf("//! [retry hot sample]\n");
+//}
+//
+//SCENARIO("retry completed sample"){
+// printf("//! [retry completed sample <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<]\n");
+// auto source = rxcpp::observable<>::from(1, 2).
+// concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
+// publish();
+// auto values = source.retry();
+// //auto values = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).
+// // concat(rxcpp::observable<>::error<long>(std::runtime_error("Error1 from source"))).
+// // concat(rxcpp::observable<>::timer(std::chrono::milliseconds(10))).
+// // concat(rxcpp::observable<>::error<long>(std::runtime_error("Error2 from source"))).
+// // concat(rxcpp::observable<>::timer(std::chrono::milliseconds(10))).
+// // retry(3);
+// values.
+// subscribe(
+// [](long v){printf("OnNext: %d\n", v);},
+// [](std::exception_ptr ep){
+// try {std::rethrow_exception(ep);}
+// catch (const std::exception& ex) {
+// printf("OnError: %s\n", ex.what());
+// }
+// },
+// [](){printf("OnCompleted\n");});
+// printf("//! [retry completed sample]\n");
+//}
diff --git a/Rx/v2/examples/doxygen/scan.cpp b/Rx/v2/examples/doxygen/scan.cpp
new file mode 100644
index 0000000..2a49efd
--- /dev/null
+++ b/Rx/v2/examples/doxygen/scan.cpp
@@ -0,0 +1,19 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("scan sample"){
+ printf("//! [scan sample]\n");
+ auto values = rxcpp::observable<>::range(1, 7).
+ scan(
+ 0,
+ [](int seed, int v){
+ return seed + v;
+ });
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [scan sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/skip.cpp b/Rx/v2/examples/doxygen/skip.cpp
new file mode 100644
index 0000000..e5b5979
--- /dev/null
+++ b/Rx/v2/examples/doxygen/skip.cpp
@@ -0,0 +1,14 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("skip sample"){
+ printf("//! [skip sample]\n");
+ auto values = rxcpp::observable<>::range(1, 7).skip(3);
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [skip sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/skip_until.cpp b/Rx/v2/examples/doxygen/skip_until.cpp
new file mode 100644
index 0000000..3e0202f
--- /dev/null
+++ b/Rx/v2/examples/doxygen/skip_until.cpp
@@ -0,0 +1,39 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("skip_until sample"){
+ printf("//! [skip_until sample]\n");
+ auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7);
+ auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25));
+ auto values = source.skip_until(trigger);
+ values.
+ subscribe(
+ [](long v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [skip_until sample]\n");
+}
+
+std::string get_pid();
+
+SCENARIO("threaded skip_until sample"){
+ printf("//! [threaded skip_until sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7).map([](long v){
+ printf("[thread %s] Source emits, value = %d\n", get_pid().c_str(), v);
+ return v;
+ });
+ auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25)).map([](long v){
+ printf("[thread %s] Trigger emits, value = %d\n", get_pid().c_str(), v);
+ return v;
+ });
+ auto values = source.skip_until(trigger, rxcpp::observe_on_new_thread());
+ values.
+ as_blocking().
+ subscribe(
+ [](long v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [threaded skip_until sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/subscribe.cpp b/Rx/v2/examples/doxygen/subscribe.cpp
new file mode 100644
index 0000000..e7c3435
--- /dev/null
+++ b/Rx/v2/examples/doxygen/subscribe.cpp
@@ -0,0 +1,101 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("subscribe by subscriber"){
+ printf("//! [subscribe by subscriber]\n");
+ auto subscriber = rxcpp::make_subscriber<int>(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ auto values = rxcpp::observable<>::range(1, 3);
+ values.subscribe(subscriber);
+ printf("//! [subscribe by subscriber]\n");
+}
+
+SCENARIO("subscribe by observer"){
+ printf("//! [subscribe by observer]\n");
+ auto subscriber = rxcpp::make_subscriber<int>(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ auto values1 = rxcpp::observable<>::range(1, 3);
+ auto values2 = rxcpp::observable<>::range(4, 6);
+ values1.subscribe(subscriber.get_observer());
+ values2.subscribe(subscriber.get_observer());
+ printf("//! [subscribe by observer]\n");
+}
+
+SCENARIO("subscribe by on_next"){
+ printf("//! [subscribe by on_next]\n");
+ auto values = rxcpp::observable<>::range(1, 3);
+ values.subscribe(
+ [](int v){printf("OnNext: %d\n", v);});
+ printf("//! [subscribe by on_next]\n");
+}
+
+SCENARIO("subscribe by on_next and on_error"){
+ printf("//! [subscribe by on_next and on_error]\n");
+ auto values = rxcpp::observable<>::range(1, 3).
+ concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source")));
+ values.subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](std::exception_ptr ep){
+ try {std::rethrow_exception(ep);}
+ catch (const std::exception& ex) {
+ printf("OnError: %s\n", ex.what());
+ }
+ });
+ printf("//! [subscribe by on_next and on_error]\n");
+}
+
+SCENARIO("subscribe by on_next and on_completed"){
+ printf("//! [subscribe by on_next and on_completed]\n");
+ auto values = rxcpp::observable<>::range(1, 3);
+ values.subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [subscribe by on_next and on_completed]\n");
+}
+
+SCENARIO("subscribe by subscription, on_next, and on_completed"){
+ printf("//! [subscribe by subscription, on_next, and on_completed]\n");
+ auto subscription = rxcpp::composite_subscription();
+ auto values = rxcpp::observable<>::range(1, 5);
+ values.subscribe(
+ subscription,
+ [&subscription](int v){
+ printf("OnNext: %d\n", v);
+ if (v == 3)
+ subscription.unsubscribe();
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [subscribe by subscription, on_next, and on_completed]\n");
+}
+
+SCENARIO("subscribe by on_next, on_error, and on_completed"){
+ printf("//! [subscribe by on_next, on_error, and on_completed]\n");
+ auto values = rxcpp::observable<>::range(1, 3).
+ concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source")));
+ values.subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](std::exception_ptr ep){
+ try {std::rethrow_exception(ep);}
+ catch (const std::exception& ex) {
+ printf("OnError: %s\n", ex.what());
+ }
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [subscribe by on_next, on_error, and on_completed]\n");
+}
+
+SCENARIO("subscribe unsubscribe"){
+ printf("//! [subscribe unsubscribe]\n");
+ auto values = rxcpp::observable<>::range(1, 3).
+ concat(rxcpp::observable<>::never<int>()).
+ finally([](){printf("The final action\n");});
+ auto subscription = values.subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ subscription.unsubscribe();
+ printf("//! [subscribe unsubscribe]\n");
+}
diff --git a/Rx/v2/examples/doxygen/subscribe_on.cpp b/Rx/v2/examples/doxygen/subscribe_on.cpp
new file mode 100644
index 0000000..e2614bc
--- /dev/null
+++ b/Rx/v2/examples/doxygen/subscribe_on.cpp
@@ -0,0 +1,24 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+std::string get_pid();
+
+SCENARIO("subscribe_on sample"){
+ printf("//! [subscribe_on sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto values = rxcpp::observable<>::range(1, 3).
+ map([](int v){
+ printf("[thread %s] Emit value %d\n", get_pid().c_str(), v);
+ return v;
+ });
+ values.
+ subscribe_on(rxcpp::synchronize_new_thread()).
+ as_blocking().
+ subscribe(
+ [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [subscribe_on sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/switch_on_next.cpp b/Rx/v2/examples/doxygen/switch_on_next.cpp
new file mode 100644
index 0000000..6e2da70
--- /dev/null
+++ b/Rx/v2/examples/doxygen/switch_on_next.cpp
@@ -0,0 +1,35 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("switch_on_next sample"){
+ printf("//! [switch_on_next sample]\n");
+ auto base = rxcpp::observable<>::interval(std::chrono::milliseconds(30)).
+ take(3).
+ map([](int){
+ return rxcpp::observable<>::interval(std::chrono::milliseconds(10)).as_dynamic();
+ });
+ auto values = base.switch_on_next().take(10);
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [switch_on_next sample]\n");
+}
+
+SCENARIO("threaded switch_on_next sample"){
+ printf("//! [threaded switch_on_next sample]\n");
+ auto base = rxcpp::observable<>::interval(std::chrono::milliseconds(30)).
+ take(3).
+ map([](long){
+ return rxcpp::observable<>::interval(std::chrono::milliseconds(10), rxcpp::observe_on_event_loop()).as_dynamic();
+ });
+ auto values = base.switch_on_next(rxcpp::observe_on_new_thread()).take(10);
+ values.
+ as_blocking().
+ subscribe(
+ [](long v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [threaded switch_on_next sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/take.cpp b/Rx/v2/examples/doxygen/take.cpp
new file mode 100644
index 0000000..be3be24
--- /dev/null
+++ b/Rx/v2/examples/doxygen/take.cpp
@@ -0,0 +1,15 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+
+SCENARIO("take sample"){
+ printf("//! [take sample]\n");
+ auto values = rxcpp::observable<>::range(1, 7).take(3);
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [take sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/take_until.cpp b/Rx/v2/examples/doxygen/take_until.cpp
new file mode 100644
index 0000000..098e7c1
--- /dev/null
+++ b/Rx/v2/examples/doxygen/take_until.cpp
@@ -0,0 +1,68 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("take_until sample"){
+ printf("//! [take_until sample]\n");
+ auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7);
+ auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25));
+ auto values = source.take_until(trigger);
+ values.
+ subscribe(
+ [](long v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [take_until sample]\n");
+}
+
+SCENARIO("take_until time sample"){
+ printf("//! [take_until time sample]\n");
+ auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7);
+ auto values = source.take_until(std::chrono::steady_clock::now() + std::chrono::milliseconds(25));
+ values.
+ subscribe(
+ [](long v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [take_until time sample]\n");
+}
+
+std::string get_pid();
+
+SCENARIO("threaded take_until sample"){
+ printf("//! [threaded take_until sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7).map([](long v){
+ printf("[thread %s] Source emits, value = %d\n", get_pid().c_str(), v);
+ return v;
+ });
+ auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25)).map([](long v){
+ printf("[thread %s] Trigger emits, value = %d\n", get_pid().c_str(), v);
+ return v;
+ });
+ auto values = source.take_until(trigger, rxcpp::observe_on_new_thread());
+ values.
+ as_blocking().
+ subscribe(
+ [](long v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [threaded take_until sample]\n");
+}
+
+SCENARIO("threaded take_until time sample"){
+ printf("//! [threaded take_until time sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7).map([](long v){
+ printf("[thread %s] Source emits, value = %d\n", get_pid().c_str(), v);
+ return v;
+ }).as_dynamic();
+ auto scheduler = rxcpp::observe_on_new_thread();
+ auto values = source.take_until(scheduler.now() + std::chrono::milliseconds(25), scheduler);
+ values.
+ as_blocking().
+ subscribe(
+ [](long v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [threaded take_until time sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/window.cpp b/Rx/v2/examples/doxygen/window.cpp
new file mode 100644
index 0000000..0a0ba63
--- /dev/null
+++ b/Rx/v2/examples/doxygen/window.cpp
@@ -0,0 +1,196 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("window count sample"){
+ printf("//! [window count sample]\n");
+ int counter = 0;
+ auto values = rxcpp::observable<>::range(1, 5).window(2);
+ values.
+ subscribe(
+ [&counter](rxcpp::observable<int> v){
+ int id = counter++;
+ printf("[window %d] Create window\n", id);
+ v.subscribe(
+ [id](int v){printf("[window %d] OnNext: %d\n", id, v);},
+ [id](){printf("[window %d] OnCompleted\n", id);});
+ });
+ printf("//! [window count sample]\n");
+}
+
+SCENARIO("window count+skip sample"){
+ printf("//! [window count+skip sample]\n");
+ int counter = 0;
+ auto values = rxcpp::observable<>::range(1, 7).window(2, 3);
+ values.
+ subscribe(
+ [&counter](rxcpp::observable<int> v){
+ int id = counter++;
+ printf("[window %d] Create window\n", id);
+ v.subscribe(
+ [id](int v){printf("[window %d] OnNext: %d\n", id, v);},
+ [id](){printf("[window %d] OnCompleted\n", id);});
+ });
+ printf("//! [window count+skip sample]\n");
+}
+
+SCENARIO("window period+skip+coordination sample"){
+ printf("//! [window period+skip+coordination sample]\n");
+ int counter = 0;
+ auto period = std::chrono::milliseconds(4);
+ auto skip = std::chrono::milliseconds(6);
+ auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
+ take(7).
+ window_with_time(period, skip, rxcpp::observe_on_new_thread());
+ values.
+ as_blocking().
+ subscribe(
+ [&counter](rxcpp::observable<long> v){
+ int id = counter++;
+ printf("[window %d] Create window\n", id);
+ v.subscribe(
+ [id](long v){printf("[window %d] OnNext: %d\n", id, v);},
+ [id](){printf("[window %d] OnCompleted\n", id);});
+ });
+ printf("//! [window period+skip+coordination sample]\n");
+}
+
+SCENARIO("window period+skip sample"){
+ printf("//! [window period+skip sample]\n");
+ int counter = 0;
+ auto period = std::chrono::milliseconds(4);
+ auto skip = std::chrono::milliseconds(6);
+ auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
+ take(7).
+ window_with_time(period, skip);
+ values.
+ subscribe(
+ [&counter](rxcpp::observable<long> v){
+ int id = counter++;
+ printf("[window %d] Create window\n", id);
+ v.subscribe(
+ [id](long v){printf("[window %d] OnNext: %d\n", id, v);},
+ [id](){printf("[window %d] OnCompleted\n", id);});
+ });
+ printf("//! [window period+skip sample]\n");
+}
+
+SCENARIO("window period+skip overlapping sample"){
+ printf("//! [window period+skip overlapping sample]\n");
+ int counter = 0;
+ auto period = std::chrono::milliseconds(6);
+ auto skip = std::chrono::milliseconds(4);
+ auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
+ take(7).
+ window_with_time(period, skip);
+ values.
+ subscribe(
+ [&counter](rxcpp::observable<long> v){
+ int id = counter++;
+ printf("[window %d] Create window\n", id);
+ v.subscribe(
+ [id](long v){printf("[window %d] OnNext: %d\n", id, v);},
+ [id](){printf("[window %d] OnCompleted\n", id);});
+ });
+ printf("//! [window period+skip overlapping sample]\n");
+}
+
+SCENARIO("window period+skip empty sample"){
+ printf("//! [window period+skip empty sample]\n");
+ int counter = 0;
+ auto period = std::chrono::milliseconds(2);
+ auto skip = std::chrono::milliseconds(4);
+ auto values = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).
+ window_with_time(period, skip);
+ values.
+ subscribe(
+ [&counter](rxcpp::observable<long> v){
+ int id = counter++;
+ printf("[window %d] Create window\n", id);
+ v.subscribe(
+ [id](long v){printf("[window %d] OnNext: %d\n", id, v);},
+ [id](){printf("[window %d] OnCompleted\n", id);});
+ });
+ printf("//! [window period+skip empty sample]\n");
+}
+
+SCENARIO("window period+coordination sample"){
+ printf("//! [window period+coordination sample]\n");
+ int counter = 0;
+ auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
+ take(7).
+ window_with_time(std::chrono::milliseconds(4), rxcpp::observe_on_new_thread());
+ values.
+ as_blocking().
+ subscribe(
+ [&counter](rxcpp::observable<long> v){
+ int id = counter++;
+ printf("[window %d] Create window\n", id);
+ v.subscribe(
+ [id](long v){printf("[window %d] OnNext: %d\n", id, v);},
+ [id](){printf("[window %d] OnCompleted\n", id);});
+ });
+ printf("//! [window period+coordination sample]\n");
+}
+
+SCENARIO("window period sample"){
+ printf("//! [window period sample]\n");
+ int counter = 0;
+ auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
+ take(7).
+ window_with_time(std::chrono::milliseconds(4));
+ values.
+ subscribe(
+ [&counter](rxcpp::observable<long> v){
+ int id = counter++;
+ printf("[window %d] Create window\n", id);
+ v.subscribe(
+ [id](long v){printf("[window %d] OnNext: %d\n", id, v);},
+ [id](){printf("[window %d] OnCompleted\n", id);});
+ });
+ printf("//! [window period sample]\n");
+}
+
+SCENARIO("window period+count+coordination sample"){
+ printf("//! [window period+count+coordination sample]\n");
+ int counter = 0;
+ auto start = std::chrono::steady_clock::now();
+ auto int1 = rxcpp::observable<>::range(1L, 3L);
+ auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50));
+ auto values = int1.
+ concat(int2).
+ window_with_time_or_count(std::chrono::milliseconds(20), 2, rxcpp::observe_on_event_loop());
+ values.
+ as_blocking().
+ subscribe(
+ [&counter](rxcpp::observable<long> v){
+ int id = counter++;
+ printf("[window %d] Create window\n", id);
+ v.subscribe(
+ [id](long v){printf("[window %d] OnNext: %d\n", id, v);},
+ [id](){printf("[window %d] OnCompleted\n", id);});
+ });
+ printf("//! [window period+count+coordination sample]\n");
+}
+
+SCENARIO("window period+count sample"){
+ printf("//! [window period+count sample]\n");
+ int counter = 0;
+ auto start = std::chrono::steady_clock::now();
+ auto int1 = rxcpp::observable<>::range(1L, 3L);
+ auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50));
+ auto values = int1.
+ concat(int2).
+ window_with_time_or_count(std::chrono::milliseconds(20), 2);
+ values.
+ subscribe(
+ [&counter](rxcpp::observable<long> v){
+ int id = counter++;
+ printf("[window %d] Create window\n", id);
+ v.subscribe(
+ [id](long v){printf("[window %d] OnNext: %d\n", id, v);},
+ [id](){printf("[window %d] OnCompleted\n", id);});
+ });
+ printf("//! [window period+count sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/zip.cpp b/Rx/v2/examples/doxygen/zip.cpp
new file mode 100644
index 0000000..9085110
--- /dev/null
+++ b/Rx/v2/examples/doxygen/zip.cpp
@@ -0,0 +1,85 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("zip sample"){
+ printf("//! [zip sample]\n");
+ auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1));
+ auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
+ auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
+ auto values = o1.zip(o2, o3);
+ values.
+ take(3).
+ subscribe(
+ [](std::tuple<int, int, int> v){printf("OnNext: %d, %d, %d\n", std::get<0>(v), std::get<1>(v), std::get<2>(v));},
+ [](){printf("OnCompleted\n");});
+ printf("//! [zip sample]\n");
+}
+
+std::string get_pid();
+
+SCENARIO("Coordination zip sample"){
+ printf("//! [Coordination zip sample]\n");
+ printf("[thread %s] Start task\n", get_pid().c_str());
+ auto thr = rxcpp::synchronize_event_loop();
+ auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1)).map([](int v) {
+ printf("[thread %s] Source1 OnNext: %d\n", get_pid().c_str(), v);
+ return v;
+ });
+ auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)).map([](int v) {
+ printf("[thread %s] Source2 OnNext: %d\n", get_pid().c_str(), v);
+ return v;
+ });
+ auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)).map([](int v) {
+ printf("[thread %s] Source3 OnNext: %d\n", get_pid().c_str(), v);
+ return v;
+ });
+ auto values = o1.zip(thr, o2, o3);
+ values.
+ take(3).
+ as_blocking().
+ subscribe(
+ [](std::tuple<int, int, int> v){printf("[thread %s] OnNext: %d, %d, %d\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v), std::get<2>(v));},
+ [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
+ printf("[thread %s] Finish task\n", get_pid().c_str());
+ printf("//! [Coordination zip sample]\n");
+}
+
+SCENARIO("Selector zip sample"){
+ printf("//! [Selector zip sample]\n");
+ auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1));
+ auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
+ auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
+ auto values = o1.zip(
+ [](int v1, int v2, int v3) {
+ return 100 * v1 + 10 * v2 + v3;
+ },
+ o2, o3);
+ values.
+ take(3).
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [Selector zip sample]\n");
+}
+
+SCENARIO("Coordination+Selector zip sample"){
+ printf("//! [Coordination+Selector zip sample]\n");
+ auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1));
+ auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
+ auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3));
+ auto values = o1.zip(
+ rxcpp::observe_on_new_thread(),
+ [](int v1, int v2, int v3) {
+ return 100 * v1 + 10 * v2 + v3;
+ },
+ o2, o3);
+ values.
+ take(3).
+ as_blocking().
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [Coordination+Selector zip sample]\n");
+}
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index ed2f52f..16ec44b 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -417,20 +417,34 @@ public:
}
#endif
- ///
- /// returns a new observable that performs type-forgetting conversion of this observable
- ///
+ /*! Return a new observable that performs type-forgetting conversion of this observable.
+
+ \return The source observable converted to observable<T>.
+
+ \note This operator could be useful to workaround lambda deduction bug on msvc 2013.
+
+ \sample
+ \snippet concat.cpp concat sample
+ \snippet output.txt concat sample
+ */
observable<T> as_dynamic() const {
return *this;
}
- ///
- /// returns new observable that contains the blocking methods for this observable
- ///
+ /*! Return a new observable that contains the blocking methods for this observable.
+
+ \return An observable that contains the blocking methods for this observable.
+
+ \sample
+ \snippet from.cpp threaded from sample
+ \snippet output.txt threaded from sample
+ */
blocking_observable<T, this_type> as_blocking() const {
return blocking_observable<T, this_type>(*this);
}
+ /// \cond SHOW_SERVICE_MEMBERS
+
///
/// takes any function that will take this observable and produce a result value.
/// this is intended to allow externally defined operators, that use subscribe,
@@ -479,195 +493,486 @@ public:
decltype(rxs::from<ResultType>())>::type {
return rxs::from<ResultType>();
}
+ /// \endcond
- ///
- /// subscribe will cause this observable to emit values to the provided subscriber.
- /// callers must provide enough arguments to make a subscriber.
- /// overrides are supported. thus
- /// subscribe(thesubscriber, composite_subscription())
- /// will take thesubscriber.get_observer() and the provided
- /// subscription and subscribe to the new subscriber.
- /// the on_next, on_error, on_completed methods can be supplied instead of an observer
- /// if a subscription or subscriber is not provided then a new subscription will be created.
- ///
+ /*! Subscribe will cause this observable to emit values to the provided subscriber.
+
+ \tparam ArgN types of the subscriber parameters
+
+ \param an the parameters for making a subscriber
+
+ \return A subscription with which the observer can stop receiving items before the observable has finished sending them.
+
+ The arguments of subscribe are forwarded to rxcpp::make_subscriber function. Some possible alternatives are:
+
+ - Pass an already composed rxcpp::subscriber:
+ \snippet subscribe.cpp subscribe by subscriber
+ \snippet output.txt subscribe by subscriber
+
+ - Pass an rxcpp::observer. This allows subscribing the same subscriber to several observables:
+ \snippet subscribe.cpp subscribe by observer
+ \snippet output.txt subscribe by observer
+
+ - Pass an `on_next` handler:
+ \snippet subscribe.cpp subscribe by on_next
+ \snippet output.txt subscribe by on_next
+
+ - Pass `on_next` and `on_error` handlers:
+ \snippet subscribe.cpp subscribe by on_next and on_error
+ \snippet output.txt subscribe by on_next and on_error
+
+ - Pass `on_next` and `on_completed` handlers:
+ \snippet subscribe.cpp subscribe by on_next and on_completed
+ \snippet output.txt subscribe by on_next and on_completed
+
+ - Pass `on_next`, `on_error`, and `on_completed` handlers:
+ \snippet subscribe.cpp subscribe by on_next, on_error, and on_completed
+ \snippet output.txt subscribe by on_next, on_error, and on_completed
+ .
+
+ All the alternatives above also support passing rxcpp::composite_subscription instance. For example:
+ \snippet subscribe.cpp subscribe by subscription, on_next, and on_completed
+ \snippet output.txt subscribe by subscription, on_next, and on_completed
+
+ If neither subscription nor subscriber are provided, then a new subscription is created and returned as a result:
+ \snippet subscribe.cpp subscribe unsubscribe
+ \snippet output.txt subscribe unsubscribe
+
+ For more details, see rxcpp::make_subscriber function description.
+ */
template<class... ArgN>
auto subscribe(ArgN&&... an) const
-> composite_subscription {
return detail_subscribe(make_subscriber<T>(std::forward<ArgN>(an)...));
}
- /// filter (AKA Where) ->
- /// for each item from this observable use Predicate to select which items to emit from the new observable that is returned.
- ///
+ /*! For each item from this observable use Predicate to select which items to emit from the new observable that is returned.
+
+ \tparam Predicate the type of the filter function
+
+ \param p the filter function
+
+ \return Observable that emits only those items emitted by the source observable that the filter evaluates as true.
+
+ \sample
+ \snippet filter.cpp filter sample
+ \snippet output.txt filter sample
+ */
template<class Predicate>
auto filter(Predicate p) const
- -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::filter<T, Predicate>(std::move(p)))) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::filter<T, Predicate>(std::move(p))))
+ /// \endcond
+ {
return lift<T>(rxo::detail::filter<T, Predicate>(std::move(p)));
}
- /// finally () ->
- ///
+ /*! Add a new action at the end of the new observable that is returned.
+
+ \tparam LastCall the type of the action function
+
+ \param lc the action function
+
+ \return Observable that emits the same items as the source observable, then invokes the given action.
+
+ \sample
+ \snippet finally.cpp finally sample
+ \snippet output.txt finally sample
+
+ If the source observable generates an error, the final action is still being called:
+ \snippet finally.cpp error finally sample
+ \snippet output.txt error finally sample
+ */
template<class LastCall>
auto finally(LastCall lc) const
- -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::finally<T, LastCall>(std::move(lc)))) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::finally<T, LastCall>(std::move(lc))))
+ /// \endcond
+ {
return lift<T>(rxo::detail::finally<T, LastCall>(std::move(lc)));
}
- /// map (AKA Select) ->
- /// for each item from this observable use Selector to produce an item to emit from the new observable that is returned.
- ///
+ /*! For each item from this observable use Selector to produce an item to emit from the new observable that is returned.
+
+ \tparam Selector the type of the transforming function
+
+ \param s the selector function
+
+ \return Observable that emits the items from the source observable, transformed by the specified function.
+
+ \sample
+ \snippet map.cpp map sample
+ \snippet output.txt map sample
+ */
template<class Selector>
auto map(Selector s) const
- -> decltype(EXPLICIT_THIS lift<rxu::value_type_t<rxo::detail::map<T, Selector>>>(rxo::detail::map<T, Selector>(std::move(s)))) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<rxu::value_type_t<rxo::detail::map<T, Selector>>>(rxo::detail::map<T, Selector>(std::move(s))))
+ /// \endcond
+ {
return lift<rxu::value_type_t<rxo::detail::map<T, Selector>>>(rxo::detail::map<T, Selector>(std::move(s)));
}
- /// distinct_until_changed ->
- /// for each item from this observable, filter out repeated values and emit only changes from the new observable that is returned.
- ///
+ /*! For each item from this observable, filter out consequentially repeated values and emit only changes from the new observable that is returned.
+
+ \return Observable that emits those items from the source observable that are distinct from their immediate predecessors.
+
+ \sample
+ \snippet distinct_until_changed.cpp distinct_until_changed sample
+ \snippet output.txt distinct_until_changed sample
+ */
auto distinct_until_changed() const
- -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::distinct_until_changed<T>())) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::distinct_until_changed<T>()))
+ /// \endcond
+ {
return lift<T>(rxo::detail::distinct_until_changed<T>());
}
- /// window ->
- /// produce observables containing count items emitted by this observable
- ///
+ /*! Rerurn an observable that emits connected, non-overlapping windows, each containing at most count items from the source observable.
+
+ \param count the maximum size of each window before it should be completed
+
+ \return Observable that emits connected, non-overlapping windows, each containing at most count items from the source observable.
+
+ \sample
+ \snippet window.cpp window count sample
+ \snippet output.txt window count sample
+ */
auto window(int count) const
- -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window<T>(count, count))) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window<T>(count, count)))
+ /// \endcond
+ {
return lift<observable<T>>(rxo::detail::window<T>(count, count));
}
- /// window ->
- /// produce observables containing count items emitted by this observable
- ///
+ /*! Rerurn an observable that emits windows every skip items containing at most count items from the source observable.
+
+ \param count the maximum size of each window before it should be completed
+ \param skip how many items need to be skipped before starting a new window
+
+ \return Observable that emits windows every skip items containing at most count items from the source observable.
+
+ \sample
+ \snippet window.cpp window count+skip sample
+ \snippet output.txt window count+skip sample
+ */
auto window(int count, int skip) const
- -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window<T>(count, skip))) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window<T>(count, skip)))
+ /// \endcond
+ {
return lift<observable<T>>(rxo::detail::window<T>(count, skip));
}
- /// window_with_time ->
- /// produce observables every skip time interval and collect items from this observable for period of time into each produced observable.
- ///
+ /*! Rerurn an observable that emits observables every skip time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler.
+
+ \tparam Duration the type of time intervals
+ \tparam Coordination the type of the scheduler
+
+ \param period the period of time each window collects items before it is completed
+ \param skip the period of time after which a new window will be created
+ \param coordination the scheduler for the windows
+
+ \return Observable that emits observables every skip time interval and collect items from this observable for period of time into each produced observable.
+
+ \sample
+ \snippet window.cpp window period+skip+coordination sample
+ \snippet output.txt window period+skip+coordination sample
+ */
template<class Duration, class Coordination>
auto window_with_time(Duration period, Duration skip, Coordination coordination) const
- -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time<T, Duration, Coordination>(period, skip, coordination))) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time<T, Duration, Coordination>(period, skip, coordination)))
+ /// \endcond
+ {
return lift<observable<T>>(rxo::detail::window_with_time<T, Duration, Coordination>(period, skip, coordination));
}
- /// window_with_time ->
- /// produce observables every skip time interval and collect items from this observable for period of time into each produced observable.
- ///
+ /*! Rerurn an observable that emits observables every skip time interval and collects items from this observable for period of time into each produced observable.
+
+ \tparam Duration the type of time intervals
+
+ \param period the period of time each window collects items before it is completed
+ \param skip the period of time after which a new window will be created
+
+ \return Observable that emits observables every skip time interval and collect items from this observable for period of time into each produced observable.
+
+ \sample
+ \snippet window.cpp window period+skip sample
+ \snippet output.txt window period+skip sample
+ */
template<class Duration>
auto window_with_time(Duration period, Duration skip) const
- -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time<T, Duration, identity_one_worker>(period, skip, identity_current_thread()))) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time<T, Duration, identity_one_worker>(period, skip, identity_current_thread())))
+ /// \endcond
+ {
return lift<observable<T>>(rxo::detail::window_with_time<T, Duration, identity_one_worker>(period, skip, identity_current_thread()));
}
- /// window_with_time ->
- /// produce observables every period time interval and collect items from this observable for period of time into each produced observable.
- ///
+ /*! Rerurn an observable that emits observables every period time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler.
+
+ \tparam Duration the type of time intervals
+ \tparam Coordination the type of the scheduler
+
+ \param period the period of time each window collects items before it is completed and replaced with a new window
+ \param coordination the scheduler for the windows
+
+ \return Observable that emits observables every period time interval and collect items from this observable for period of time into each produced observable.
+
+ \sample
+ \snippet window.cpp window period+coordination sample
+ \snippet output.txt window period+coordination sample
+ */
template<class Duration, class Coordination>
auto window_with_time(Duration period, Coordination coordination) const
- -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time<T, Duration, Coordination>(period, period, coordination))) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time<T, Duration, Coordination>(period, period, coordination)))
+ /// \endcond
+ {
return lift<observable<T>>(rxo::detail::window_with_time<T, Duration, Coordination>(period, period, coordination));
}
- /// window_with_time ->
- /// produce observables every period time interval and collect items from this observable for period of time into each produced observable.
- ///
+ /*! Rerurn an observable that emits connected, non-overlapping windows represending items emitted by the source observable during fixed, consecutive durations.
+
+ \tparam Duration the type of time intervals
+
+ \param period the period of time each window collects items before it is completed and replaced with a new window
+
+ \return Observable that emits connected, non-overlapping windows represending items emitted by the source observable during fixed, consecutive durations.
+
+ \sample
+ \snippet window.cpp window period sample
+ \snippet output.txt window period sample
+ */
template<class Duration>
auto window_with_time(Duration period) const
- -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time<T, Duration, identity_one_worker>(period, period, identity_current_thread()))) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time<T, Duration, identity_one_worker>(period, period, identity_current_thread())))
+ /// \endcond
+ {
return lift<observable<T>>(rxo::detail::window_with_time<T, Duration, identity_one_worker>(period, period, identity_current_thread()));
}
- /// window_with_time_or_count ->
- /// produce observables every skip time interval and collect items from this observable for period of time into each produced observable.
- ///
+ /*! Rerurn an observable that emits connected, non-overlapping windows of items from the source observable that were emitted during a fixed duration of time or when the window has reached maximum capacity (whichever occurs first), on the specified scheduler.
+
+ \tparam Duration the type of time intervals
+ \tparam Coordination the type of the scheduler
+
+ \param period the period of time each window collects items before it is completed and replaced with a new window
+ \param count the maximum size of each window before it is completed and new window is created
+ \param coordination the scheduler for the windows
+
+ \return Observable that emits connected, non-overlapping windows of items from the source observable that were emitted during a fixed duration of time or when the window has reached maximum capacity (whichever occurs first).
+
+ \sample
+ \snippet window.cpp window period+count+coordination sample
+ \snippet output.txt window period+count+coordination sample
+ */
template<class Duration, class Coordination>
auto window_with_time_or_count(Duration period, int count, Coordination coordination) const
- -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time_or_count<T, Duration, Coordination>(period, count, coordination))) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time_or_count<T, Duration, Coordination>(period, count, coordination)))
+ /// \endcond
+ {
return lift<observable<T>>(rxo::detail::window_with_time_or_count<T, Duration, Coordination>(period, count, coordination));
}
- /// window_with_time_or_count ->
- /// produce observables every skip time interval and collect items from this observable for period of time into each produced observable.
- ///
+ /*! Rerurn an observable that emits connected, non-overlapping windows of items from the source observable that were emitted during a fixed duration of time or when the window has reached maximum capacity (whichever occurs first).
+
+ \tparam Duration the type of time intervals
+
+ \param period the period of time each window collects items before it is completed and replaced with a new window
+ \param count the maximum size of each window before it is completed and new window is created
+
+ \return Observable that emits connected, non-overlapping windows of items from the source observable that were emitted during a fixed duration of time or when the window has reached maximum capacity (whichever occurs first).
+
+ \sample
+ \snippet window.cpp window period+count sample
+ \snippet output.txt window period+count sample
+ */
template<class Duration>
auto window_with_time_or_count(Duration period, int count) const
- -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time_or_count<T, Duration, identity_one_worker>(period, count, identity_current_thread()))) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<observable<T>>(rxo::detail::window_with_time_or_count<T, Duration, identity_one_worker>(period, count, identity_current_thread())))
+ /// \endcond
+ {
return lift<observable<T>>(rxo::detail::window_with_time_or_count<T, Duration, identity_one_worker>(period, count, identity_current_thread()));
}
- /// buffer ->
- /// collect count items from this observable and produce a vector of them to emit from the new observable that is returned.
- ///
+ /*! Rerurn an observable that emits connected, non-overlapping buffer, each containing at most count items from the source observable.
+
+ \param count the maximum size of each buffer before it should be emitted
+
+ \return Observable that emits connected, non-overlapping buffers, each containing at most count items from the source observable.
+
+ \sample
+ \snippet buffer.cpp buffer count sample
+ \snippet output.txt buffer count sample
+ */
auto buffer(int count) const
- -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_count<T>(count, count))) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_count<T>(count, count)))
+ /// \endcond
+ {
return lift_if<std::vector<T>>(rxo::detail::buffer_count<T>(count, count));
}
- /// buffer ->
- /// start a new vector every skip items and collect count items from this observable into each vector to emit from the new observable that is returned.
- ///
+ /*! Rerurn an observable that emits buffers every skip items containing at most count items from the source observable.
+
+ \param count the maximum size of each buffers before it should be emitted
+ \param skip how many items need to be skipped before starting a new buffers
+
+ \return Observable that emits buffers every skip items containing at most count items from the source observable.
+
+ \sample
+ \snippet buffer.cpp buffer count+skip sample
+ \snippet output.txt buffer count+skip sample
+ */
auto buffer(int count, int skip) const
- -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_count<T>(count, skip))) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_count<T>(count, skip)))
+ /// \endcond
+ {
return lift_if<std::vector<T>>(rxo::detail::buffer_count<T>(count, skip));
}
- /// buffer_with_time ->
- /// start a new vector every skip time interval and collect items into it from this observable for period of time.
- ///
+ /*! Rerurn an observable that emits buffers every skip time interval and collects items from this observable for period of time into each produced buffer, on the specified scheduler.
+
+ \tparam Coordination the type of the scheduler
+
+ \param period the period of time each buffer collects items before it is emitted
+ \param skip the period of time after which a new buffer will be created
+ \param coordination the scheduler for the buffers
+
+ \return Observable that emits buffers every skip time interval and collect items from this observable for period of time into each produced buffer.
+
+ \sample
+ \snippet buffer.cpp buffer period+skip+coordination sample
+ \snippet output.txt buffer period+skip+coordination sample
+ */
template<class Coordination>
auto buffer_with_time(rxsc::scheduler::clock_type::duration period, rxsc::scheduler::clock_type::duration skip, Coordination coordination) const
- -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, Coordination>(period, skip, coordination))) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, Coordination>(period, skip, coordination)))
+ /// \endcond
+ {
return lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, Coordination>(period, skip, coordination));
}
- /// buffer_with_time ->
- /// start a new vector every skip time interval and collect items into it from this observable for period of time.
- ///
+ /*! Rerurn an observable that emits buffers every skip time interval and collects items from this observable for period of time into each produced buffer.
+
+ \param period the period of time each buffer collects items before it is emitted
+ \param skip the period of time after which a new buffer will be created
+
+ \return Observable that emits buffers every skip time interval and collect items from this observable for period of time into each produced buffer.
+
+ \sample
+ \snippet buffer.cpp buffer period+skip sample
+ \snippet output.txt buffer period+skip sample
+
+ Overlapping buffers are allowed:
+ \snippet buffer.cpp buffer period+skip overlapping sample
+ \snippet output.txt buffer period+skip overlapping sample
+
+ If no items are emitted, an empty buffer is returned:
+ \snippet buffer.cpp buffer period+skip empty sample
+ \snippet output.txt buffer period+skip empty sample
+ */
auto buffer_with_time(rxsc::scheduler::clock_type::duration period, rxsc::scheduler::clock_type::duration skip) const
- -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, identity_one_worker>(period, skip, identity_current_thread()))) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, identity_one_worker>(period, skip, identity_current_thread())))
+ /// \endcond
+ {
return lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, identity_one_worker>(period, skip, identity_current_thread()));
}
- /// buffer_with_time ->
- /// start a new vector every period time interval and collect items into it from this observable for period of time.
- ///
+ /*! Rerurn an observable that emits buffers every period time interval and collects items from this observable for period of time into each produced buffer, on the specified scheduler.
+
+ \tparam Coordination the type of the scheduler
+
+ \param period the period of time each buffer collects items before it is emitted and replaced with a new buffer
+ \param coordination the scheduler for the buffers
+
+ \return Observable that emits buffers every period time interval and collect items from this observable for period of time into each produced buffer.
+
+ \sample
+ \snippet buffer.cpp buffer period+coordination sample
+ \snippet output.txt buffer period+coordination sample
+ */
template<class Coordination,
class Requires = typename std::enable_if<is_coordination<Coordination>::value, rxu::types_checked>::type>
auto buffer_with_time(rxsc::scheduler::clock_type::duration period, Coordination coordination) const
- -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, Coordination>(period, period, coordination))) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, Coordination>(period, period, coordination)))
+ /// \endcond
+ {
return lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, Coordination>(period, period, coordination));
}
- /// buffer_with_time ->
- /// start a new vector every period time interval and collect items into it from this observable for period of time.
- ///
+ /*! Rerurn an observable that emits buffers every period time interval and collects items from this observable for period of time into each produced buffer.
+
+ \param period the period of time each buffer collects items before it is emitted and replaced with a new buffer
+
+ \return Observable that emits buffers every period time interval and collect items from this observable for period of time into each produced buffer.
+
+ \sample
+ \snippet buffer.cpp buffer period sample
+ \snippet output.txt buffer period sample
+ */
auto buffer_with_time(rxsc::scheduler::clock_type::duration period) const
- -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, identity_one_worker>(period, period, identity_current_thread()))) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, identity_one_worker>(period, period, identity_current_thread())))
+ /// \endcond
+ {
return lift_if<std::vector<T>>(rxo::detail::buffer_with_time<T, rxsc::scheduler::clock_type::duration, identity_one_worker>(period, period, identity_current_thread()));
}
- /// buffer_with_time_or_count ->
- /// start a new vector every skip time interval and collect items into it from this observable for period of time.
- ///
+ /*! Rerurn an observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first), on the specified scheduler.
+
+ \tparam Coordination the type of the scheduler
+
+ \param period the period of time each buffer collects items before it is emitted and replaced with a new buffer
+ \param count the maximum size of each buffer before it is emitted and new buffer is created
+ \param coordination the scheduler for the buffers
+
+ \return Observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first).
+
+ \sample
+ \snippet buffer.cpp buffer period+count+coordination sample
+ \snippet output.txt buffer period+count+coordination sample
+ */
template<class Coordination>
auto buffer_with_time_or_count(rxsc::scheduler::clock_type::duration period, int count, Coordination coordination) const
- -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time_or_count<T, rxsc::scheduler::clock_type::duration, Coordination>(period, count, coordination))) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time_or_count<T, rxsc::scheduler::clock_type::duration, Coordination>(period, count, coordination)))
+ /// \endcond
+ {
return lift_if<std::vector<T>>(rxo::detail::buffer_with_time_or_count<T, rxsc::scheduler::clock_type::duration, Coordination>(period, count, coordination));
}
- /// buffer_with_time_or_count ->
- /// start a new vector every skip time interval and collect items into it from this observable for period of time.
- ///
+ /*! Rerurn an observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first).
+
+ \param period the period of time each buffer collects items before it is emitted and replaced with a new buffer
+ \param count the maximum size of each buffer before it is emitted and new buffer is created
+
+ \return Observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first).
+
+ \sample
+ \snippet buffer.cpp buffer period+count sample
+ \snippet output.txt buffer period+count sample
+ */
auto buffer_with_time_or_count(rxsc::scheduler::clock_type::duration period, int count) const
- -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time_or_count<T, rxsc::scheduler::clock_type::duration, identity_one_worker>(period, count, identity_current_thread()))) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift_if<std::vector<T>>(rxo::detail::buffer_with_time_or_count<T, rxsc::scheduler::clock_type::duration, identity_one_worker>(period, count, identity_current_thread())))
+ /// \endcond
+ {
return lift_if<std::vector<T>>(rxo::detail::buffer_with_time_or_count<T, rxsc::scheduler::clock_type::duration, identity_one_worker>(period, count, identity_current_thread()));
}
+ /// \cond SHOW_SERVICE_MEMBERS
template<class Coordination>
struct defer_switch_on_next : public defer_observable<
is_observable<value_type>,
@@ -675,28 +980,50 @@ public:
rxo::detail::switch_on_next, value_type, observable<value_type>, Coordination>
{
};
+ /// \endcond
- /// switch_on_next (AKA Switch) ->
- /// All sources must be synchronized! This means that calls across all the subscribers must be serial.
- /// for each item from this observable subscribe to that observable after unsubscribing from any previous subscription.
- ///
+ /*! Return observable that emits the items emitted by the observable most recently emitted by the source observable.
+
+ \return Observable that emits the items emitted by the observable most recently emitted by the source observable.
+
+ \note All sources must be synchronized! This means that calls across all the subscribers must be serial.
+
+ \sample
+ \snippet switch_on_next.cpp switch_on_next sample
+ \snippet output.txt switch_on_next sample
+ */
auto switch_on_next() const
- -> typename defer_switch_on_next<identity_one_worker>::observable_type {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> typename defer_switch_on_next<identity_one_worker>::observable_type
+ /// \endcond
+ {
return defer_switch_on_next<identity_one_worker>::make(*this, *this, identity_current_thread());
}
- /// switch_on_next (AKA Switch) ->
- /// The coodination is used to synchronize sources from different contexts.
- /// for each item from this observable subscribe to that observable after unsubscribing from any previous subscription.
- ///
+ /*! Return observable that emits the items emitted by the observable most recently emitted by the source observable, on the specified scheduler.
+
+ \tparam Coordination the type of the scheduler
+
+ \param cn the scheduler to synchronize sources from different contexts
+
+ \return Observable that emits the items emitted by the observable most recently emitted by the source observable.
+
+ \sample
+ \snippet switch_on_next.cpp threaded switch_on_next sample
+ \snippet output.txt threaded switch_on_next sample
+ */
template<class Coordination>
auto switch_on_next(Coordination cn) const
+ /// \cond SHOW_SERVICE_MEMBERS
-> typename std::enable_if<
defer_switch_on_next<Coordination>::value,
- typename defer_switch_on_next<Coordination>::observable_type>::type {
+ typename defer_switch_on_next<Coordination>::observable_type>::type
+ /// \endcond
+ {
return defer_switch_on_next<Coordination>::make(*this, *this, std::move(cn));
}
+ /// \cond SHOW_SERVICE_MEMBERS
template<class Coordination>
struct defer_merge : public defer_observable<
is_observable<value_type>,
@@ -704,30 +1031,52 @@ public:
rxo::detail::merge, value_type, observable<value_type>, Coordination>
{
};
+ /// \endcond
- /// merge ->
- /// All sources must be synchronized! This means that calls across all the subscribers must be serial.
- /// for each item from this observable subscribe.
- /// for each item from all of the nested observables deliver from the new observable that is returned.
- ///
+ /*! For each item from this observable subscribe.
+ For each item from all of the nested observables deliver from the new observable that is returned.
+
+ \return Observable that emits items that are the result of flattening the observables emitted by the source observable.
+
+ \note All sources must be synchronized! This means that calls across all the subscribers must be serial.
+
+ \sample
+ \snippet merge.cpp implicit merge sample
+ \snippet output.txt implicit merge sample
+ */
auto merge() const
- -> typename defer_merge<identity_one_worker>::observable_type {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> typename defer_merge<identity_one_worker>::observable_type
+ /// \endcond
+ {
return defer_merge<identity_one_worker>::make(*this, *this, identity_current_thread());
}
- /// merge ->
- /// The coordination is used to synchronize sources from different contexts.
- /// for each item from this observable subscribe.
- /// for each item from all of the nested observables deliver from the new observable that is returned.
- ///
+ /*! For each item from this observable subscribe.
+ For each item from all of the nested observables deliver from the new observable that is returned.
+
+ \tparam Coordination the type of the scheduler
+
+ \param cn the scheduler to synchronize sources from different contexts.
+
+ \return Observable that emits items that are the result of flattening the observables emitted by the source observable.
+
+ \sample
+ \snippet merge.cpp threaded implicit merge sample
+ \snippet output.txt threaded implicit merge sample
+ */
template<class Coordination>
auto merge(Coordination cn) const
+ /// \cond SHOW_SERVICE_MEMBERS
-> typename std::enable_if<
defer_merge<Coordination>::value,
- typename defer_merge<Coordination>::observable_type>::type {
+ typename defer_merge<Coordination>::observable_type>::type
+ /// \endcond
+ {
return defer_merge<Coordination>::make(*this, *this, std::move(cn));
}
+ /// \cond SHOW_SERVICE_MEMBERS
template<class Coordination, class Value0>
struct defer_merge_from : public defer_observable<
rxu::all_true<
@@ -737,33 +1086,65 @@ public:
rxo::detail::merge, observable<value_type>, observable<observable<value_type>>, Coordination>
{
};
+ /// \endcond
- /// merge ->
- /// All sources must be synchronized! This means that calls across all the subscribers must be serial.
- /// for each item from this observable subscribe.
- /// for each item from all of the nested observables deliver from the new observable that is returned.
- ///
+ /*! For each given observable subscribe.
+ For each emitted item deliver from the new observable that is returned.
+
+ \tparam Value0 ...
+ \tparam ValueN types of source observables
+
+ \param v0 ...
+ \param vn source observables
+
+ \return Observable that emits items that are the result of flattening the observables emitted by the source observable.
+
+ \note All sources must be synchronized! This means that calls across all the subscribers must be serial.
+
+ \sample
+ \snippet merge.cpp merge sample
+ \snippet output.txt merge sample
+ */
template<class Value0, class... ValueN>
auto merge(Value0 v0, ValueN... vn) const
+ /// \cond SHOW_SERVICE_MEMBERS
-> typename std::enable_if<
defer_merge_from<identity_one_worker, Value0>::value,
- typename defer_merge_from<identity_one_worker, Value0>::observable_type>::type {
+ typename defer_merge_from<identity_one_worker, Value0>::observable_type>::type
+ /// \endcond
+ {
return defer_merge_from<identity_one_worker, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread());
}
- /// merge ->
- /// The coordination is used to synchronize sources from different contexts.
- /// for each item from this observable subscribe.
- /// for each item from all of the nested observables deliver from the new observable that is returned.
- ///
+ /*! For each given observable subscribe.
+ For each emitted item deliver from the new observable that is returned.
+
+ \tparam Coordination the type of the scheduler
+ \tparam Value0 ...
+ \tparam ValueN types of source observables
+
+ \param cn the scheduler to synchronize sources from different contexts.
+ \param v0 ...
+ \param vn source observables
+
+ \return Observable that emits items that are the result of flattening the observables emitted by the source observable.
+
+ \sample
+ \snippet merge.cpp threaded merge sample
+ \snippet output.txt threaded merge sample
+ */
template<class Coordination, class Value0, class... ValueN>
auto merge(Coordination cn, Value0 v0, ValueN... vn) const
+ /// \cond SHOW_SERVICE_MEMBERS
-> typename std::enable_if<
defer_merge_from<Coordination, Value0>::value,
- typename defer_merge_from<Coordination, Value0>::observable_type>::type {
+ typename defer_merge_from<Coordination, Value0>::observable_type>::type
+ /// \endcond
+ {
return defer_merge_from<Coordination, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::move(cn));
}
+ /// \cond SHOW_SERVICE_MEMBERS
template<class Coordination>
struct defer_amb : public defer_observable<
is_observable<value_type>,
@@ -771,30 +1152,50 @@ public:
rxo::detail::amb, value_type, observable<value_type>, Coordination>
{
};
+ /// \endcond
- /// amb ->
- /// All sources must be synchronized! This means that calls across all the subscribers must be serial.
- /// for each item from this observable subscribe.
- /// for each item from only the first of the nested observables deliver from the new observable that is returned.
- ///
+ /*! For each item from only the first of the nested observables deliver from the new observable that is returned.
+
+ \return Observable that emits the same sequence as whichever of the observables emitted from this observable that first emitted an item or sent a termination notification.
+
+ \note All sources must be synchronized! This means that calls across all the subscribers must be serial.
+
+ \sample
+ \snippet amb.cpp implicit amb sample
+ \snippet output.txt implicit amb sample
+ */
auto amb() const
- -> typename defer_amb<identity_one_worker>::observable_type {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> typename defer_amb<identity_one_worker>::observable_type
+ /// \endcond
+ {
return defer_amb<identity_one_worker>::make(*this, *this, identity_current_thread());
}
- /// amb ->
- /// The coordination is used to synchronize sources from different contexts.
- /// for each item from this observable subscribe.
- /// for each item from only the first of the nested observables deliver from the new observable that is returned.
- ///
+ /*! For each item from only the first of the nested observables deliver from the new observable that is returned, on the specified scheduler.
+
+ \tparam Coordination the type of the scheduler
+
+ \param cn the scheduler to synchronize sources from different contexts.
+
+ \return Observable that emits the same sequence as whichever of the observables emitted from this observable that first emitted an item or sent a termination notification.
+
+ \sample
+ \snippet amb.cpp threaded implicit amb sample
+ \snippet output.txt threaded implicit amb sample
+ */
template<class Coordination>
auto amb(Coordination cn) const
+ /// \cond SHOW_SERVICE_MEMBERS
-> typename std::enable_if<
defer_amb<Coordination>::value,
- typename defer_amb<Coordination>::observable_type>::type {
+ typename defer_amb<Coordination>::observable_type>::type
+ /// \endcond
+ {
return defer_amb<Coordination>::make(*this, *this, std::move(cn));
}
+ /// \cond SHOW_SERVICE_MEMBERS
template<class Coordination, class Value0>
struct defer_amb_from : public defer_observable<
rxu::all_true<
@@ -804,57 +1205,119 @@ public:
rxo::detail::amb, observable<value_type>, observable<observable<value_type>>, Coordination>
{
};
+ /// \endcond
- /// amb ->
- /// All sources must be synchronized! This means that calls across all the subscribers must be serial.
- /// for each item from this observable subscribe.
- /// for each item from only the first of the nested observables deliver from the new observable that is returned.
- ///
+ /*! For each item from only the first of the given observables deliver from the new observable that is returned.
+
+ \tparam Value0 ...
+ \tparam ValueN types of source observables
+
+ \param v0 ...
+ \param vn source observables
+
+ \return Observable that emits the same sequence as whichever of the source observables first emitted an item or sent a termination notification.
+
+ \note All sources must be synchronized! This means that calls across all the subscribers must be serial.
+
+ \sample
+ \snippet amb.cpp amb sample
+ \snippet output.txt amb sample
+ */
template<class Value0, class... ValueN>
auto amb(Value0 v0, ValueN... vn) const
+ /// \cond SHOW_SERVICE_MEMBERS
-> typename std::enable_if<
defer_amb_from<identity_one_worker, Value0>::value,
- typename defer_amb_from<identity_one_worker, Value0>::observable_type>::type {
+ typename defer_amb_from<identity_one_worker, Value0>::observable_type>::type
+ /// \endcond
+ {
return defer_amb_from<identity_one_worker, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread());
}
- /// amb ->
- /// The coordination is used to synchronize sources from different contexts.
- /// for each item from this observable subscribe.
- /// for each item from only the first of the nested observables deliver from the new observable that is returned.
- ///
+ /*! For each item from only the first of the given observables deliver from the new observable that is returned, on the specified scheduler.
+
+ \tparam Coordination the type of the scheduler
+ \tparam Value0 ...
+ \tparam ValueN types of source observables
+
+ \param cn the scheduler to synchronize sources from different contexts.
+ \param v0 ...
+ \param vn source observables
+
+ \return Observable that emits the same sequence as whichever of the source observables first emitted an item or sent a termination notification.
+
+ \sample
+ \snippet amb.cpp threaded amb sample
+ \snippet output.txt threaded amb sample
+ */
template<class Coordination, class Value0, class... ValueN>
auto amb(Coordination cn, Value0 v0, ValueN... vn) const
+ /// \cond SHOW_SERVICE_MEMBERS
-> typename std::enable_if<
defer_amb_from<Coordination, Value0>::value,
- typename defer_amb_from<Coordination, Value0>::observable_type>::type {
+ typename defer_amb_from<Coordination, Value0>::observable_type>::type
+ /// \endcond
+ {
return defer_amb_from<Coordination, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::move(cn));
}
- /// flat_map (AKA SelectMany) ->
- /// All sources must be synchronized! This means that calls across all the subscribers must be serial.
- /// for each item from this observable use the CollectionSelector to select an observable and subscribe to that observable.
- /// for each item from all of the selected observables use the ResultSelector to select a value to emit from the new observable that is returned.
- ///
+ /*! For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable.
+ For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned.
+
+ \tparam CollectionSelector the type of the observable producing function
+ \tparam ResultSelector the type of the aggregation function
+
+ \param s a function that returns an observable for each item emitted by the source observable
+ \param rs a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable
+
+ \return Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable.
+
+ Observables, produced by the CollectionSelector, are merged. There is another operator rxcpp::observable<T,SourceType>::concat_map that works similar but concatenates the observables.
+
+ \sample
+ \snippet flat_map.cpp flat_map sample
+ \snippet output.txt flat_map sample
+ */
template<class CollectionSelector, class ResultSelector>
auto flat_map(CollectionSelector&& s, ResultSelector&& rs) const
- -> observable<rxu::value_type_t<rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>> {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> observable<rxu::value_type_t<rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>
+ /// \endcond
+ {
return observable<rxu::value_type_t<rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>(
rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), identity_current_thread()));
}
- /// flat_map (AKA SelectMany) ->
- /// The coodination is used to synchronize sources from different contexts.
- /// for each item from this observable use the CollectionSelector to select an observable and subscribe to that observable.
- /// for each item from all of the selected observables use the ResultSelector to select a value to emit from the new observable that is returned.
- ///
+ /*! For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable.
+ For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned.
+
+ \tparam CollectionSelector the type of the observable producing function
+ \tparam ResultSelector the type of the aggregation function
+ \tparam Coordination the type of the scheduler
+
+ \param s a function that returns an observable for each item emitted by the source observable
+ \param rs a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable
+ \param cn the scheduler to synchronize sources from different contexts.
+
+ \return Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable.
+
+ Observables, produced by the CollectionSelector, are merged. There is another operator rxcpp::observable<T,SourceType>::concat_map that works similar but concatenates the observables.
+
+ \sample
+ \snippet flat_map.cpp threaded flat_map sample
+ \snippet output.txt threaded flat_map sample
+ */
template<class CollectionSelector, class ResultSelector, class Coordination>
- auto flat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& sf) const
- -> observable<rxu::value_type_t<rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, Coordination>>, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, Coordination>> {
+ auto flat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& cn) const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> observable<rxu::value_type_t<rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, Coordination>>, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, Coordination>>
+ /// \endcond
+ {
return observable<rxu::value_type_t<rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, Coordination>>, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, Coordination>>(
- rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, Coordination>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(sf)));
+ rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, Coordination>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(cn)));
}
+ /// \cond SHOW_SERVICE_MEMBERS
template<class Coordination>
struct defer_concat : public defer_observable<
is_observable<value_type>,
@@ -862,30 +1325,54 @@ public:
rxo::detail::concat, value_type, observable<value_type>, Coordination>
{
};
+ /// \endcond
- /// concat ->
- /// All sources must be synchronized! This means that calls across all the subscribers must be serial.
- /// for each item from this observable subscribe to one at a time. in the order received.
- /// for each item from all of the nested observables deliver from the new observable that is returned.
- ///
+ /*! For each item from this observable subscribe to one at a time, in the order received.
+ For each item from all of the nested observables deliver from the new observable that is returned.
+
+ \return Observable that emits the items emitted by each of the Observables emitted by the source observable, one after the other, without interleaving them.
+
+ \note All sources must be synchronized! This means that calls across all the subscribers must be serial.
+
+ \sample
+ \snippet concat.cpp implicit concat sample
+ \snippet output.txt implicit concat sample
+ */
auto concat() const
- -> typename defer_concat<identity_one_worker>::observable_type {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> typename defer_concat<identity_one_worker>::observable_type
+ /// \endcond
+ {
return defer_concat<identity_one_worker>::make(*this, *this, identity_current_thread());
}
- /// concat ->
- /// The coordination is used to synchronize sources from different contexts.
- /// for each item from this observable subscribe to one at a time. in the order received.
- /// for each item from all of the nested observables deliver from the new observable that is returned.
- ///
+ /*! For each item from this observable subscribe to one at a time, in the order received.
+ For each item from all of the nested observables deliver from the new observable that is returned.
+
+ \tparam Coordination the type of the scheduler
+
+ \param cn the scheduler to synchronize sources from different contexts.
+
+ \return Observable that emits the items emitted by each of the Observables emitted by the source observable, one after the other, without interleaving them.
+
+ \note All sources must be synchronized! This means that calls across all the subscribers must be serial.
+
+ \sample
+ \snippet concat.cpp threaded implicit concat sample
+ \snippet output.txt threaded implicit concat sample
+ */
template<class Coordination>
auto concat(Coordination cn) const
+ /// \cond SHOW_SERVICE_MEMBERS
-> typename std::enable_if<
defer_concat<Coordination>::value,
- typename defer_concat<Coordination>::observable_type>::type {
+ typename defer_concat<Coordination>::observable_type>::type
+ /// \endcond
+ {
return defer_concat<Coordination>::make(*this, *this, std::move(cn));
}
+ /// \cond SHOW_SERVICE_MEMBERS
template<class Coordination, class Value0>
struct defer_concat_from : public defer_observable<
rxu::all_true<
@@ -895,57 +1382,119 @@ public:
rxo::detail::concat, observable<value_type>, observable<observable<value_type>>, Coordination>
{
};
+ /// \endcond
- /// concat ->
- /// All sources must be synchronized! This means that calls across all the subscribers must be serial.
- /// for each item from this observable subscribe to one at a time. in the order received.
- /// for each item from all of the nested observables deliver from the new observable that is returned.
- ///
+ /*! For each given observable subscribe to one at a time, in the order received.
+ For each emitted item deliver from the new observable that is returned.
+
+ \tparam Value0 ...
+ \tparam ValueN types of source observables
+
+ \param v0 ...
+ \param vn source observables
+
+ \return Observable that emits items emitted by the source observables, one after the other, without interleaving them.
+
+ \sample
+ \snippet concat.cpp concat sample
+ \snippet output.txt concat sample
+ */
template<class Value0, class... ValueN>
auto concat(Value0 v0, ValueN... vn) const
+ /// \cond SHOW_SERVICE_MEMBERS
-> typename std::enable_if<
defer_concat_from<identity_one_worker, Value0>::value,
- typename defer_concat_from<identity_one_worker, Value0>::observable_type>::type {
+ typename defer_concat_from<identity_one_worker, Value0>::observable_type>::type
+ /// \endcond
+ {
return defer_concat_from<identity_one_worker, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread());
}
- /// concat ->
- /// The coordination is used to synchronize sources from different contexts.
- /// for each item from this observable subscribe to one at a time. in the order received.
- /// for each item from all of the nested observables deliver from the new observable that is returned.
- ///
+ /*! For each given observable subscribe to one at a time, in the order received.
+ For each emitted item deliver from the new observable that is returned.
+
+ \tparam Coordination the type of the scheduler
+ \tparam Value0 ...
+ \tparam ValueN types of source observables
+
+ \param cn the scheduler to synchronize sources from different contexts.
+ \param v0 ...
+ \param vn source observables
+
+ \return Observable that emits items emitted by the source observables, one after the other, without interleaving them.
+
+ \sample
+ \snippet concat.cpp threaded concat sample
+ \snippet output.txt threaded concat sample
+ */
template<class Coordination, class Value0, class... ValueN>
auto concat(Coordination cn, Value0 v0, ValueN... vn) const
+ /// \cond SHOW_SERVICE_MEMBERS
-> typename std::enable_if<
defer_concat_from<Coordination, Value0>::value,
- typename defer_concat_from<Coordination, Value0>::observable_type>::type {
+ typename defer_concat_from<Coordination, Value0>::observable_type>::type
+ /// \endcond
+ {
return defer_concat_from<Coordination, Value0>::make(*this, rxs::from(this->as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::move(cn));
}
- /// concat_map ->
- /// All sources must be synchronized! This means that calls across all the subscribers must be serial.
- /// for each item from this observable use the CollectionSelector to select an observable and subscribe to that observable.
- /// for each item from all of the selected observables use the ResultSelector to select a value to emit from the new observable that is returned.
- ///
+ /*! For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable.
+ For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned.
+
+ \tparam CollectionSelector the type of the observable producing function
+ \tparam ResultSelector the type of the aggregation function
+
+ \param s a function that returns an observable for each item emitted by the source observable
+ \param rs a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable
+
+ \return Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable.
+
+ Observables, produced by the CollectionSelector, are concatenated. There is another operator rxcpp::observable<T,SourceType>::flat_map that works similar but merges the observables.
+
+ \sample
+ \snippet concat_map.cpp concat_map sample
+ \snippet output.txt concat_map sample
+ */
template<class CollectionSelector, class ResultSelector>
auto concat_map(CollectionSelector&& s, ResultSelector&& rs) const
- -> observable<rxu::value_type_t<rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>, rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>> {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> observable<rxu::value_type_t<rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>, rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>
+ /// \endcond
+ {
return observable<rxu::value_type_t<rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>, rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>(
rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), identity_current_thread()));
}
- /// concat_map ->
- /// The coordination is used to synchronize sources from different contexts.
- /// for each item from this observable use the CollectionSelector to select an observable and subscribe to that observable.
- /// for each item from all of the selected observables use the ResultSelector to select a value to emit from the new observable that is returned.
- ///
+ /*! For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable.
+ For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned.
+
+ \tparam CollectionSelector the type of the observable producing function
+ \tparam ResultSelector the type of the aggregation function
+ \tparam Coordination the type of the scheduler
+
+ \param s a function that returns an observable for each item emitted by the source observable
+ \param rs a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable
+ \param cn the scheduler to synchronize sources from different contexts.
+
+ \return Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable.
+
+ Observables, produced by the CollectionSelector, are concatenated. There is another operator rxcpp::observable<T,SourceType>::flat_map that works similar but merges the observables.
+
+ \sample
+ \snippet concat_map.cpp threaded concat_map sample
+ \snippet output.txt threaded concat_map sample
+ */
template<class CollectionSelector, class ResultSelector, class Coordination>
- auto concat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& sf) const
- -> observable<rxu::value_type_t<rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, Coordination>>, rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, Coordination>> {
+ auto concat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& cn) const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> observable<rxu::value_type_t<rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, Coordination>>, rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, Coordination>>
+ /// \endcond
+ {
return observable<rxu::value_type_t<rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, Coordination>>, rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, Coordination>>(
- rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, Coordination>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(sf)));
+ rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, Coordination>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(cn)));
}
+ /// \cond SHOW_SERVICE_MEMBERS
template<class Source, class Coordination, class TS, class C = rxu::types_checked>
struct select_combine_latest_cn : public std::false_type {};
@@ -1020,16 +1569,48 @@ public:
return observable_type(operator_type(identity_current_thread(), rxu::pack(), std::make_tuple(src, std::move(on)...)));
}
};
+ /// \endcond
- /// combine_latest ->
- /// for each item from all of the observables use the Selector to select a value to emit from the new observable that is returned.
- ///
+ /*! For each item from all of the observables select a value to emit from the new observable that is returned.
+
+ \tparam AN types of scheduler (optional), aggregate function (optional), and source observables
+
+ \param an scheduler (optional), aggregation function (optional), and source observables
+
+ \return Observable that emits items that are the result of combining the items emitted by the source observables.
+
+ If scheduler is omitted, identity_current_thread is used.
+
+ If aggregation function is omitted, the resulting observable returns tuples of emitted items.
+
+ \sample
+
+ Neither scheduler nor aggregation function are present:
+ \snippet combine_latest.cpp combine_latest sample
+ \snippet output.txt combine_latest sample
+
+ Only scheduler is present:
+ \snippet combine_latest.cpp Coordination combine_latest sample
+ \snippet output.txt Coordination combine_latest sample
+
+ Only aggregation function is present:
+ \snippet combine_latest.cpp Selector combine_latest sample
+ \snippet output.txt Selector combine_latest sample
+
+ Both scheduler and aggregation function are present:
+ \snippet combine_latest.cpp Coordination+Selector combine_latest sample
+ \snippet output.txt Coordination+Selector combine_latest sample
+ */
template<class... AN>
auto combine_latest(AN... an) const
- -> decltype(select_combine_latest<this_type, rxu::types<decltype(an)...>>{}(*(this_type*)nullptr, std::move(an)...)) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(select_combine_latest<this_type, rxu::types<decltype(an)...>>{}(*(this_type*)nullptr, std::move(an)...))
+ /// \endcond
+ {
return select_combine_latest<this_type, rxu::types<decltype(an)...>>{}(*this, std::move(an)...);
}
+ /// \cond SHOW_SERVICE_MEMBERS
template<class Source, class Coordination, class TS, class C = rxu::types_checked>
struct select_zip_cn : public std::false_type {};
@@ -1104,32 +1685,97 @@ public:
return observable_type(operator_type(identity_current_thread(), rxu::pack(), std::make_tuple(src, std::move(on)...)));
}
};
+ /// \endcond
- /// zip ->
- /// bring by one item from all given observables and use the Selector to select a value to emit from the new observable that is returned.
- ///
+ /*! Bring by one item from all given observables and select a value to emit from the new observable that is returned.
+
+ \tparam AN types of scheduler (optional), aggregate function (optional), and source observables
+
+ \param an scheduler (optional), aggregation function (optional), and source observables
+
+ \return Observable that emits the result of combining the items emitted and brought by one from each of the source observables.
+
+ If scheduler is omitted, identity_current_thread is used.
+
+ If aggregation function is omitted, the resulting observable returns tuples of emitted items.
+
+ \sample
+
+ Neither scheduler nor aggregation function are present:
+ \snippet zip.cpp zip sample
+ \snippet output.txt zip sample
+
+ Only scheduler is present:
+ \snippet zip.cpp Coordination zip sample
+ \snippet output.txt Coordination zip sample
+
+ Only aggregation function is present:
+ \snippet zip.cpp Selector zip sample
+ \snippet output.txt Selector zip sample
+
+ Both scheduler and aggregation function are present:
+ \snippet zip.cpp Coordination+Selector zip sample
+ \snippet output.txt Coordination+Selector zip sample
+ */
template<class... AN>
auto zip(AN... an) const
- -> decltype(select_zip<this_type, rxu::types<decltype(an)...>>{}(*(this_type*)nullptr, std::move(an)...)) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(select_zip<this_type, rxu::types<decltype(an)...>>{}(*(this_type*)nullptr, std::move(an)...))
+ /// \endcond
+ {
return select_zip<this_type, rxu::types<decltype(an)...>>{}(*this, std::move(an)...);
}
- /// group_by ->
- ///
+ /*! Return an observable that emits grouped_observables, each of which corresponds to a unique key value and each of which emits those items from the source observable that share that key value.
+
+ \tparam KeySelector the type of the key extracting function
+ \tparam MarbleSelector the type of the element extracting function
+ \tparam BinaryPredicate the type of the key comparing function
+
+ \param ks a function that extracts the key for each item
+ \param ms a function that extracts the return element for each item
+ \param p a function that implements comparison of two keys
+
+ \return Observable that emits values of grouped_observable type, each of which corresponds to a unique key value and each of which emits those items from the source observable that share that key value.
+
+ \sample
+ \snippet group_by.cpp group_by full intro
+ \snippet group_by.cpp group_by full sample
+ \snippet output.txt group_by full sample
+ */
template<class KeySelector, class MarbleSelector, class BinaryPredicate>
inline auto group_by(KeySelector ks, MarbleSelector ms, BinaryPredicate p) const
- -> decltype(EXPLICIT_THIS lift<typename rxo::detail::group_by_traits<T, this_type, KeySelector, MarbleSelector, BinaryPredicate>::grouped_observable_type>(rxo::detail::group_by<T, this_type, KeySelector, MarbleSelector, BinaryPredicate>(std::move(ks), std::move(ms), std::move(p)))) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<typename rxo::detail::group_by_traits<T, this_type, KeySelector, MarbleSelector, BinaryPredicate>::grouped_observable_type>(rxo::detail::group_by<T, this_type, KeySelector, MarbleSelector, BinaryPredicate>(std::move(ks), std::move(ms), std::move(p))))
+ /// \endcond
+ {
return lift<typename rxo::detail::group_by_traits<T, this_type, KeySelector, MarbleSelector, BinaryPredicate>::grouped_observable_type>(rxo::detail::group_by<T, this_type, KeySelector, MarbleSelector, BinaryPredicate>(std::move(ks), std::move(ms), std::move(p)));
}
- /// group_by ->
- ///
+ /*! Return an observable that emits grouped_observables, each of which corresponds to a unique key value and each of which emits those items from the source observable that share that key value.
+
+ \tparam KeySelector the type of the key extracting function
+ \tparam MarbleSelector the type of the element extracting function
+
+ \param ks a function that extracts the key for each item
+ \param ms a function that extracts the return element for each item
+
+ \return Observable that emits values of grouped_observable type, each of which corresponds to a unique key value and each of which emits those items from the source observable that share that key value.
+
+ \sample
+ \snippet group_by.cpp group_by sample
+ \snippet output.txt group_by sample
+ */
template<class KeySelector, class MarbleSelector>
inline auto group_by(KeySelector ks, MarbleSelector ms) const
- -> decltype(EXPLICIT_THIS lift<typename rxo::detail::group_by_traits<T, this_type, KeySelector, MarbleSelector, rxu::less>::grouped_observable_type>(rxo::detail::group_by<T, this_type, KeySelector, MarbleSelector, rxu::less>(std::move(ks), std::move(ms), rxu::less()))) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<typename rxo::detail::group_by_traits<T, this_type, KeySelector, MarbleSelector, rxu::less>::grouped_observable_type>(rxo::detail::group_by<T, this_type, KeySelector, MarbleSelector, rxu::less>(std::move(ks), std::move(ms), rxu::less())))
+ /// \endcond
+ {
return lift<typename rxo::detail::group_by_traits<T, this_type, KeySelector, MarbleSelector, rxu::less>::grouped_observable_type>(rxo::detail::group_by<T, this_type, KeySelector, MarbleSelector, rxu::less>(std::move(ks), std::move(ms), rxu::less()));
}
+ /// \cond SHOW_SERVICE_MEMBERS
/// multicast ->
/// allows connections to the source to be independent of subscriptions
///
@@ -1139,64 +1785,151 @@ public:
return connectable_observable<T, rxo::detail::multicast<T, this_type, Subject>>(
rxo::detail::multicast<T, this_type, Subject>(*this, std::move(sub)));
}
+ /// \endcond
- /// publish_synchronized ->
- /// turns a cold observable hot and allows connections to the source to be independent of subscriptions.
- /// all values are queued and delivered using the scheduler from the supplied coordination
- ///
+ /*! Turn a cold observable hot and allow connections to the source to be independent of subscriptions.
+
+ \tparam Coordination the type of the scheduler
+
+ \param cn a scheduler all values are queued and delivered on
+ \param cs the subscription to control lifetime
+
+ \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers, on the specified scheduler.
+
+ \sample
+ \snippet publish.cpp publish_synchronized sample
+ \snippet output.txt publish_synchronized sample
+ */
template<class Coordination>
auto publish_synchronized(Coordination cn, composite_subscription cs = composite_subscription()) const
- -> decltype(EXPLICIT_THIS multicast(rxsub::synchronize<T, Coordination>(std::move(cn), cs))) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS multicast(rxsub::synchronize<T, Coordination>(std::move(cn), cs)))
+ /// \endcond
+ {
return multicast(rxsub::synchronize<T, Coordination>(std::move(cn), cs));
}
- /// publish ->
- /// turns a cold observable hot and allows connections to the source to be independent of subscriptions
- /// NOTE: multicast of a subject
- ///
+ /*! Turn a cold observable hot and allow connections to the source to be independent of subscriptions.
+
+ \param cs the subscription to control lifetime
+
+ \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers.
+
+ \sample
+ \snippet publish.cpp publish subject sample
+ \snippet output.txt publish subject sample
+ */
auto publish(composite_subscription cs = composite_subscription()) const
- -> decltype(EXPLICIT_THIS multicast(rxsub::subject<T>(cs))) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS multicast(rxsub::subject<T>(cs)))
+ /// \endcond
+ {
return multicast(rxsub::subject<T>(cs));
}
- /// publish ->
- /// turns a cold observable hot, sends the most recent value to any new subscriber and allows connections to the source to be independent of subscriptions
- /// NOTE: multicast of a behavior
- ///
+ /*! Turn a cold observable hot, send the most recent value to any new subscriber, and allow connections to the source to be independent of subscriptions.
+
+ \tparam T the type of the emitted item
+
+ \param first an initial item to be emitted by the resulting observable at connection time before emitting the items from the source observable; not emitted to observers that subscribe after the time of connection
+ \param cs the subscription to control lifetime
+
+ \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers.
+
+ \sample
+ \snippet publish.cpp publish behavior sample
+ \snippet output.txt publish behavior sample
+ */
auto publish(T first, composite_subscription cs = composite_subscription()) const
- -> decltype(EXPLICIT_THIS multicast(rxsub::behavior<T>(first, cs))) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS multicast(rxsub::behavior<T>(first, cs)))
+ /// \endcond
+ {
return multicast(rxsub::behavior<T>(first, cs));
}
- /// subscribe_on ->
- /// subscription and unsubscription are queued and delivered using the scheduler from the supplied coordination
- ///
+ /*! Subscription and unsubscription are queued and delivered using the scheduler from the supplied coordination.
+
+ \tparam Coordination the type of the scheduler
+
+ \param cn the scheduler to perform subscription actions on
+
+ \return The source observable modified so that its subscriptions happen on the specified scheduler.
+
+ \sample
+ \snippet subscribe_on.cpp subscribe_on sample
+ \snippet output.txt subscribe_on sample
+
+ Invoking rxcpp::observable::observe_on operator, instead of subscribe_on, gives following results:
+ \snippet output.txt observe_on sample
+ */
template<class Coordination>
auto subscribe_on(Coordination cn) const
- -> observable<rxu::value_type_t<rxo::detail::subscribe_on<T, this_type, Coordination>>, rxo::detail::subscribe_on<T, this_type, Coordination>> {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> observable<rxu::value_type_t<rxo::detail::subscribe_on<T, this_type, Coordination>>, rxo::detail::subscribe_on<T, this_type, Coordination>>
+ /// \endcond
+ {
return observable<rxu::value_type_t<rxo::detail::subscribe_on<T, this_type, Coordination>>, rxo::detail::subscribe_on<T, this_type, Coordination>>(
rxo::detail::subscribe_on<T, this_type, Coordination>(*this, std::move(cn)));
}
- /// observe_on ->
- /// all values are queued and delivered using the scheduler from the supplied coordination
- ///
+ /*! All values are queued and delivered using the scheduler from the supplied coordination.
+
+ \tparam Coordination the type of the scheduler
+
+ \param cn the scheduler to notify observers on
+
+ \return The source observable modified so that its observers are notified on the specified scheduler.
+
+ \sample
+ \snippet observe_on.cpp observe_on sample
+ \snippet output.txt subscribe_on sample
+
+ Invoking rxcpp::observable::subscribe_on operator, instead of observe_on, gives following results:
+ \snippet output.txt observe_on sample
+ */
template<class Coordination>
auto observe_on(Coordination cn) const
- -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::observe_on<T, Coordination>(std::move(cn)))) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::observe_on<T, Coordination>(std::move(cn))))
+ /// \endcond
+ {
return lift<T>(rxo::detail::observe_on<T, Coordination>(std::move(cn)));
}
- /// reduce ->
- /// for each item from this observable use Accumulator to combine items, when completed use ResultSelector to produce a value that will be emitted from the new observable that is returned.
- ///
+ /*! For each item from this observable use Accumulator to combine items, when completed use ResultSelector to produce a value that will be emitted from the new observable that is returned.
+
+ \tparam Seed the type of the initial value for the accumulator
+ \tparam Accumulator the type of the data accumulating function
+ \tparam ResultSelector the type of the result producing function
+
+ \param seed the initial value for the accumulator
+ \param a an accumulator function to be invoked on each item emitted by the source observable, the result of which will be used in the next accumulator call
+ \param rs a result producing function that makes the final value from the last accumulator call result
+
+ \return An observable that emits a single item that is the result of accumulating the output from the items emitted by the source observable.
+
+ Some basic reduce-type operators have already been implemented:
+ - rxcpp::observable::count
+ - rxcpp::observable::sum
+ - rxcpp::observable::average
+
+ \sample
+ Geometric mean of source values:
+ \snippet reduce.cpp reduce sample
+ \snippet output.txt reduce sample
+ */
template<class Seed, class Accumulator, class ResultSelector>
auto reduce(Seed seed, Accumulator&& a, ResultSelector&& rs) const
- -> observable<rxu::value_type_t<rxo::detail::reduce<T, source_operator_type, Accumulator, ResultSelector, Seed>>, rxo::detail::reduce<T, source_operator_type, Accumulator, ResultSelector, Seed>> {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> observable<rxu::value_type_t<rxo::detail::reduce<T, source_operator_type, Accumulator, ResultSelector, Seed>>, rxo::detail::reduce<T, source_operator_type, Accumulator, ResultSelector, Seed>>
+ /// \endcond
+ {
return observable<rxu::value_type_t<rxo::detail::reduce<T, source_operator_type, Accumulator, ResultSelector, Seed>>, rxo::detail::reduce<T, source_operator_type, Accumulator, ResultSelector, Seed>>(
rxo::detail::reduce<T, source_operator_type, Accumulator, ResultSelector, Seed>(source_operator, std::forward<Accumulator>(a), std::forward<ResultSelector>(rs), seed));
}
+ /// \cond SHOW_SERVICE_MEMBERS
template<class Seed, class Accumulator, class ResultSelector>
struct defer_reduce : public defer_observable<
rxu::all_true<
@@ -1206,207 +1939,420 @@ public:
rxo::detail::reduce, T, source_operator_type, Accumulator, ResultSelector, Seed>
{
};
+ /// \endcond
- /// first ->
- /// for each item from this observable reduce it by sending only the first item.
- ///
+ /*! For each item from this observable reduce it by sending only the first item.
+
+ \return An observable that emits only the very first item emitted by the source observable.
+
+ \sample
+ \snippet math.cpp first sample
+ \snippet output.txt first sample
+ */
auto first() const
-> observable<T>;
- /// last ->
- /// for each item from this observable reduce it by sending only the last item.
- ///
+ /*! For each item from this observable reduce it by sending only the last item.
+
+ \return An observable that emits only the very last item emitted by the source observable.
+
+ \sample
+ \snippet math.cpp last sample
+ \snippet output.txt last sample
+ */
auto last() const
-> observable<T>;
- /// count ->
- /// for each item from this observable reduce it by incrementing a count.
- ///
+ /*! For each item from this observable reduce it by incrementing a count.
+
+ \return An observable that emits a single item: the number of elements emitted by the source observable.
+
+ \sample
+ \snippet math.cpp count sample
+ \snippet output.txt count sample
+ */
auto count() const
-> observable<int>;
- /// sum ->
- /// for each item from this observable reduce it by adding to the previous items.
- ///
+ /*! For each item from this observable reduce it by adding to the previous items.
+
+ \return An observable that emits a single item: the sum of elements emitted by the source observable.
+
+ \sample
+ \snippet math.cpp sum sample
+ \snippet output.txt sum sample
+ */
auto sum() const
- -> typename defer_reduce<rxu::defer_seed_type<rxo::detail::initialize_seeder, T>, rxu::plus, rxu::defer_type<identity_for, T>>::observable_type {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> typename defer_reduce<rxu::defer_seed_type<rxo::detail::initialize_seeder, T>, rxu::plus, rxu::defer_type<identity_for, T>>::observable_type
+ /// \endcond
+ {
return defer_reduce<rxu::defer_seed_type<rxo::detail::initialize_seeder, T>, rxu::plus, rxu::defer_type<identity_for, T>>::make(source_operator, rxu::plus(), identity_for<T>(), rxo::detail::initialize_seeder<T>().seed());
}
- /// average ->
- /// for each item from this observable reduce it by adding to the previous values and then dividing by the number of items at the end.
- ///
+ /*! For each item from this observable reduce it by adding to the previous values and then dividing by the number of items at the end.
+
+ \return An observable that emits a single item: the average of elements emitted by the source observable.
+
+ \sample
+ \snippet math.cpp average sample
+ \snippet output.txt average sample
+ */
auto average() const
- -> typename defer_reduce<rxu::defer_seed_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>>::observable_type {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> typename defer_reduce<rxu::defer_seed_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>>::observable_type
+ /// \endcond
+ {
return defer_reduce<rxu::defer_seed_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>>::make(source_operator, rxo::detail::average<T>(), rxo::detail::average<T>(), rxo::detail::average<T>().seed());
}
- /// scan ->
- /// for each item from this observable use Accumulator to combine items into a value that will be emitted from the new observable that is returned.
- ///
+ /*! For each item from this observable use Accumulator to combine items into a value that will be emitted from the new observable that is returned.
+
+ \tparam Seed the type of the initial value for the accumulator
+ \tparam Accumulator the type of the data accumulating function
+
+ \param seed the initial value for the accumulator
+ \param a an accumulator function to be invoked on each item emitted by the source observable, whose result will be emitted and used in the next accumulator call
+
+ \return An observable that emits the results of each call to the accumulator function.
+
+ \sample
+ \snippet scan.cpp scan sample
+ \snippet output.txt scan sample
+ */
template<class Seed, class Accumulator>
auto scan(Seed seed, Accumulator&& a) const
- -> observable<Seed, rxo::detail::scan<T, this_type, Accumulator, Seed>> {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> observable<Seed, rxo::detail::scan<T, this_type, Accumulator, Seed>>
+ /// \endcond
+ {
return observable<Seed, rxo::detail::scan<T, this_type, Accumulator, Seed>>(
rxo::detail::scan<T, this_type, Accumulator, Seed>(*this, std::forward<Accumulator>(a), seed));
}
- /// skip ->
- /// make new observable with skipped first count items from this observable
- ///
- ///
+ /*! Make new observable with skipped first count items from this observable.
+
+ \tparam Count the type of the items counter
+
+ \param t the number of items to skip
+
+ \return An observable that is identical to the source observable except that it does not emit the first t items that the source observable emits.
+
+ \sample
+ \snippet skip.cpp skip sample
+ \snippet output.txt skip sample
+ */
template<class Count>
auto skip(Count t) const
- -> observable<T, rxo::detail::skip<T, this_type, Count>> {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> observable<T, rxo::detail::skip<T, this_type, Count>>
+ /// \endcond
+ {
return observable<T, rxo::detail::skip<T, this_type, Count>>(
rxo::detail::skip<T, this_type, Count>(*this, t));
}
- /// skip_until ->
- /// All sources must be synchronized! This means that calls across all the subscribers must be serial.
- /// make new observable with items skipped until on_next occurs on the TriggerSource
- ///
- ///
+ /*! Make new observable with items skipped until on_next occurs on the trigger observable
+
+ \tparam TriggerSource the type of the trigger observable
+
+ \param t an observable that has to emit an item before the source observable's elements begin to be mirrored by the resulting observable
+
+ \return An observable that skips items from the source observable until the second observable emits an item, then emits the remaining items.
+
+ \note All sources must be synchronized! This means that calls across all the subscribers must be serial.
+
+ \sample
+ \snippet skip_until.cpp skip_until sample
+ \snippet output.txt skip_until sample
+ */
template<class TriggerSource>
auto skip_until(TriggerSource&& t) const
+ /// \cond SHOW_SERVICE_MEMBERS
-> typename std::enable_if<is_observable<TriggerSource>::value,
- observable<T, rxo::detail::skip_until<T, this_type, TriggerSource, identity_one_worker>>>::type {
+ observable<T, rxo::detail::skip_until<T, this_type, TriggerSource, identity_one_worker>>>::type
+ /// \endcond
+ {
return observable<T, rxo::detail::skip_until<T, this_type, TriggerSource, identity_one_worker>>(
rxo::detail::skip_until<T, this_type, TriggerSource, identity_one_worker>(*this, std::forward<TriggerSource>(t), identity_one_worker(rxsc::make_current_thread())));
}
- /// skip_until ->
- /// The coordination is used to synchronize sources from different contexts.
- /// make new observable with items skipped until on_next occurs on the TriggerSource
- ///
- ///
+ /*! Make new observable with items skipped until on_next occurs on the trigger observable
+
+ \tparam TriggerSource the type of the trigger observable
+ \tparam Coordination the type of the scheduler
+
+ \param t an observable that has to emit an item before the source observable's elements begin to be mirrored by the resulting observable
+ \param cn the scheduler to use for scheduling the items
+
+ \return An observable that skips items from the source observable until the second observable emits an item, then emits the remaining items.
+
+ \sample
+ \snippet skip_until.cpp threaded skip_until sample
+ \snippet output.txt threaded skip_until sample
+ */
template<class TriggerSource, class Coordination>
- auto skip_until(TriggerSource&& t, Coordination&& sf) const
+ auto skip_until(TriggerSource&& t, Coordination&& cn) const
+ /// \cond SHOW_SERVICE_MEMBERS
-> typename std::enable_if<is_observable<TriggerSource>::value && is_coordination<Coordination>::value,
- observable<T, rxo::detail::skip_until<T, this_type, TriggerSource, Coordination>>>::type {
+ observable<T, rxo::detail::skip_until<T, this_type, TriggerSource, Coordination>>>::type
+ /// \endcond
+ {
return observable<T, rxo::detail::skip_until<T, this_type, TriggerSource, Coordination>>(
- rxo::detail::skip_until<T, this_type, TriggerSource, Coordination>(*this, std::forward<TriggerSource>(t), std::forward<Coordination>(sf)));
+ rxo::detail::skip_until<T, this_type, TriggerSource, Coordination>(*this, std::forward<TriggerSource>(t), std::forward<Coordination>(cn)));
}
- /// take ->
- /// for the first count items from this observable emit them from the new observable that is returned.
- ///
- ///
+ /*! For the first count items from this observable emit them from the new observable that is returned.
+
+ \tparam Count the type of the items counter
+
+ \param t the number of items to take
+
+ \return An observable that emits only the first t items emitted by the source Observable, or all of the items from the source observable if that observable emits fewer than t items.
+
+ \sample
+ \snippet take.cpp take sample
+ \snippet output.txt take sample
+ */
template<class Count>
auto take(Count t) const
- -> observable<T, rxo::detail::take<T, this_type, Count>> {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> observable<T, rxo::detail::take<T, this_type, Count>>
+ /// \endcond
+ {
return observable<T, rxo::detail::take<T, this_type, Count>>(
rxo::detail::take<T, this_type, Count>(*this, t));
}
- /// take_until ->
- /// All sources must be synchronized! This means that calls across all the subscribers must be serial.
- /// for each item from this observable until on_next occurs on the TriggerSource, emit them from the new observable that is returned.
- ///
- ///
+ /*! For each item from this observable until on_next occurs on the trigger observable, emit them from the new observable that is returned.
+
+ \tparam TriggerSource the type of the trigger observable
+
+ \param t an observable whose first emitted item will stop emitting items from the source observable
+
+ \return An observable that emits the items emitted by the source observable until such time as other emits its first item.
+
+ \note All sources must be synchronized! This means that calls across all the subscribers must be serial.
+
+ \sample
+ \snippet take_until.cpp take_until sample
+ \snippet output.txt take_until sample
+ */
template<class TriggerSource>
auto take_until(TriggerSource t) const
+ /// \cond SHOW_SERVICE_MEMBERS
-> typename std::enable_if<is_observable<TriggerSource>::value,
- observable<T, rxo::detail::take_until<T, this_type, TriggerSource, identity_one_worker>>>::type {
+ observable<T, rxo::detail::take_until<T, this_type, TriggerSource, identity_one_worker>>>::type
+ /// \endcond
+ {
return observable<T, rxo::detail::take_until<T, this_type, TriggerSource, identity_one_worker>>(
rxo::detail::take_until<T, this_type, TriggerSource, identity_one_worker>(*this, std::move(t), identity_current_thread()));
}
- /// take_until ->
- /// The coordination is used to synchronize sources from different contexts.
- /// for each item from this observable until on_next occurs on the TriggerSource, emit them from the new observable that is returned.
- ///
- ///
+ /*! For each item from this observable until on_next occurs on the trigger observable, emit them from the new observable that is returned.
+
+ \tparam TriggerSource the type of the trigger observable
+ \tparam Coordination the type of the scheduler
+
+ \param t an observable whose first emitted item will stop emitting items from the source observable
+ \param cn the scheduler to use for scheduling the items
+
+ \return An observable that emits the items emitted by the source observable until such time as other emits its first item.
+
+ \sample
+ \snippet take_until.cpp threaded take_until sample
+ \snippet output.txt threaded take_until sample
+ */
template<class TriggerSource, class Coordination>
- auto take_until(TriggerSource t, Coordination sf) const
+ auto take_until(TriggerSource t, Coordination cn) const
+ /// \cond SHOW_SERVICE_MEMBERS
-> typename std::enable_if<is_observable<TriggerSource>::value && is_coordination<Coordination>::value,
- observable<T, rxo::detail::take_until<T, this_type, TriggerSource, Coordination>>>::type {
+ observable<T, rxo::detail::take_until<T, this_type, TriggerSource, Coordination>>>::type
+ /// \endcond
+ {
return observable<T, rxo::detail::take_until<T, this_type, TriggerSource, Coordination>>(
- rxo::detail::take_until<T, this_type, TriggerSource, Coordination>(*this, std::move(t), std::move(sf)));
+ rxo::detail::take_until<T, this_type, TriggerSource, Coordination>(*this, std::move(t), std::move(cn)));
}
- /// take_until ->
- /// All sources must be synchronized! This means that calls across all the subscribers must be serial.
- /// for each item from this observable until the specified time, emit them from the new observable that is returned.
- ///
- ///
+ /*! For each item from this observable until the specified time, emit them from the new observable that is returned.
+
+ \tparam TimePoint the type of the time interval
+
+ \param when an observable whose first emitted item will stop emitting items from the source observable
+
+ \return An observable that emits those items emitted by the source observable before the time runs out.
+
+ \note All sources must be synchronized! This means that calls across all the subscribers must be serial.
+
+ \sample
+ \snippet take_until.cpp take_until time sample
+ \snippet output.txt take_until time sample
+ */
template<class TimePoint>
auto take_until(TimePoint when) const
+ /// \cond SHOW_SERVICE_MEMBERS
-> typename std::enable_if<std::is_convertible<TimePoint, rxsc::scheduler::clock_type::time_point>::value,
- observable<T, rxo::detail::take_until<T, this_type, decltype(rxs::timer(when, identity_current_thread())), identity_one_worker>>>::type {
+ observable<T, rxo::detail::take_until<T, this_type, decltype(rxs::timer(when, identity_current_thread())), identity_one_worker>>>::type
+ /// \endcond
+ {
auto cn = identity_current_thread();
return take_until(rxs::timer(when, cn), cn);
}
- /// take_until ->
- /// The coordination is used to synchronize sources from different contexts.
- /// for each item from this observable until the specified time, emit them from the new observable that is returned.
- ///
- ///
+ /*! For each item from this observable until the specified time, emit them from the new observable that is returned.
+
+ \tparam TimePoint the type of the time interval
+ \tparam Coordination the type of the scheduler
+
+ \param when an observable whose first emitted item will stop emitting items from the source observable
+ \param cn the scheduler to use for scheduling the items
+
+ \return An observable that emits those items emitted by the source observable before the time runs out.
+
+ \sample
+ \snippet take_until.cpp threaded take_until time sample
+ \snippet output.txt threaded take_until time sample
+ */
template<class Coordination>
auto take_until(rxsc::scheduler::clock_type::time_point when, Coordination cn) const
+ /// \cond SHOW_SERVICE_MEMBERS
-> typename std::enable_if<is_coordination<Coordination>::value,
- observable<T, rxo::detail::take_until<T, this_type, decltype(rxs::interval(when, cn)), Coordination>>>::type {
- return take_until(rxs::interval(when, cn), cn);
+ observable<T, rxo::detail::take_until<T, this_type, decltype(rxs::timer(when, cn)), Coordination>>>::type
+ /// \endcond
+ {
+ return take_until(rxs::timer(when, cn), cn);
}
- /// repeat ->
- /// infinitely repeats this observable
- ///
- ///
+ /*! Infinitely repeat this observable.
+
+ \return An observable that emits the items emitted by the source observable repeatedly and in sequence.
+
+ \sample
+ \snippet repeat.cpp repeat sample
+ \snippet output.txt repeat sample
+
+ If the source observable calls on_error, repeat stops:
+ \snippet repeat.cpp repeat error sample
+ \snippet output.txt repeat error sample
+ */
auto repeat() const
- -> observable<T, rxo::detail::repeat<T, this_type, int>> {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> observable<T, rxo::detail::repeat<T, this_type, int>>
+ /// \endcond
+ {
return observable<T, rxo::detail::repeat<T, this_type, int>>(
rxo::detail::repeat<T, this_type, int>(*this, 0));
}
- /// repeat ->
- /// repeats this observable for given number of times
- ///
- ///
+ /*! Repeat this observable for the given number of times.
+
+ \tparam Count the type of the counter
+
+ \param t the number of times the source observable items are repeated
+
+ \return An observable that repeats the sequence of items emitted by the source observable for t times.
+
+ Call to repeat(0) infinitely repeats the source observable.
+
+ \sample
+ \snippet repeat.cpp repeat count sample
+ \snippet output.txt repeat count sample
+ */
template<class Count>
auto repeat(Count t) const
- -> observable<T, rxo::detail::repeat<T, this_type, Count>> {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> observable<T, rxo::detail::repeat<T, this_type, Count>>
+ /// \endcond
+ {
return observable<T, rxo::detail::repeat<T, this_type, Count>>(
rxo::detail::repeat<T, this_type, Count>(*this, t));
}
- /// retry ->
- /// infinitely retrys this observable
- ///
- ///
+ /*! Infinitely retry this observable.
+
+ \return An observable that mirrors the source observable, resubscribing to it if it calls on_error.
+
+ \sample
+ \snippet retry.cpp retry sample
+ \snippet output.txt retry sample
+ */
auto retry() const
- -> observable<T, rxo::detail::retry<T, this_type, int>> {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> observable<T, rxo::detail::retry<T, this_type, int>>
+ /// \endcond
+ {
return observable<T, rxo::detail::retry<T, this_type, int>>(
rxo::detail::retry<T, this_type, int>(*this, 0));
}
- /// retry ->
- /// retrys this observable for given number of times
- ///
- ///
+ /*! Retry this observable for the given number of times.
+
+ \tparam Count the type of the counter
+
+ \param t the number of retries
+
+ \return An observable that mirrors the source observable, resubscribing to it if it calls on_error up to a specified number of retries.
+
+ Call to retry(0) infinitely retries the source observable.
+
+ \sample
+ \snippet retry.cpp retry count sample
+ \snippet output.txt retry count sample
+ */
template<class Count>
auto retry(Count t) const
- -> observable<T, rxo::detail::retry<T, this_type, Count>> {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> observable<T, rxo::detail::retry<T, this_type, Count>>
+ /// \endcond
+ {
return observable<T, rxo::detail::retry<T, this_type, Count>>(
rxo::detail::retry<T, this_type, Count>(*this, t));
}
- /// start_with ->
- /// start with the supplied values, then concatenate this observable
- ///
- ///
+ /*! Start with the supplied values, then concatenate this observable.
+
+ \tparam Value0 ...
+ \tparam ValueN the type of sending values
+
+ \param v0 ...
+ \param vn values to send
+
+ \return Observable that emits the specified items and then emits the items emitted by the source observable.
+
+ \sample
+ \snippet start_with.cpp short start_with sample
+ \snippet output.txt short start_with sample
+
+ Another form of this operator, rxcpp::observable<void, void>::start_with, gets the source observable as a parameter:
+ \snippet start_with.cpp full start_with sample
+ \snippet output.txt full start_with sample
+ */
template<class Value0, class... ValueN>
auto start_with(Value0 v0, ValueN... vn) const
- -> decltype(rxo::start_with(*(this_type*)nullptr, std::move(v0), std::move(vn)...)) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(rxo::start_with(*(this_type*)nullptr, std::move(v0), std::move(vn)...))
+ /// \endcond
+ {
return rxo::start_with(*this, std::move(v0), std::move(vn)...);
}
+
+ /*! Take values pairwise from this observable.
- /// pairwise ->
- /// take values pairwise from the observable
- ///
+ \return Observable that emits tuples of two the most recent items emitted by the source observable.
+
+ \sample
+ \snippet pairwise.cpp pairwise sample
+ \snippet output.txt pairwise sample
+
+ If the source observable emits less than two items, no pairs are emitted by the source observable:
+ \snippet pairwise.cpp pairwise short sample
+ \snippet output.txt pairwise short sample
+ */
auto pairwise() const
- -> decltype(EXPLICIT_THIS lift<rxu::value_type_t<rxo::detail::pairwise<T>>>(rxo::detail::pairwise<T>())) {
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<rxu::value_type_t<rxo::detail::pairwise<T>>>(rxo::detail::pairwise<T>()))
+ /// \endcond
+ {
return lift<rxu::value_type_t<rxo::detail::pairwise<T>>>(rxo::detail::pairwise<T>());
}
};
diff --git a/projects/CMake/CMakeLists.txt b/projects/CMake/CMakeLists.txt
index 0515b55..76ef1ab 100644
--- a/projects/CMake/CMakeLists.txt
+++ b/projects/CMake/CMakeLists.txt
@@ -156,19 +156,49 @@ if(DOXYGEN_FOUND)
# Target to build examples
set(DOXY_EXAMPLE_SRC_LIST
${DOXY_EXAMPLES_SRC_DIR}/main.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/amb.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/as_dynamic.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/buffer.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/combine_latest.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/concat.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/concat_map.cpp
${DOXY_EXAMPLES_SRC_DIR}/create.cpp
${DOXY_EXAMPLES_SRC_DIR}/defer.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/distinct_until_changed.cpp
${DOXY_EXAMPLES_SRC_DIR}/empty.cpp
${DOXY_EXAMPLES_SRC_DIR}/error.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/filter.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/finally.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/flat_map.cpp
${DOXY_EXAMPLES_SRC_DIR}/from.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/group_by.cpp
${DOXY_EXAMPLES_SRC_DIR}/interval.cpp
${DOXY_EXAMPLES_SRC_DIR}/iterate.cpp
${DOXY_EXAMPLES_SRC_DIR}/just.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/map.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/math.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/merge.cpp
${DOXY_EXAMPLES_SRC_DIR}/never.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/observe_on.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/pairwise.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/publish.cpp
${DOXY_EXAMPLES_SRC_DIR}/range.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/reduce.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/repeat.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/retry.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/scan.cpp
${DOXY_EXAMPLES_SRC_DIR}/scope.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/skip.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/skip_until.cpp
${DOXY_EXAMPLES_SRC_DIR}/start_with.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/subscribe.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/subscribe_on.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/switch_on_next.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/take.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/take_until.cpp
${DOXY_EXAMPLES_SRC_DIR}/timer.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/window.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/zip.cpp
)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${DOXY_EXAMPLES_BIN_DIR})
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY_DEBUG ${DOXY_EXAMPLES_BIN_DIR})