From 76149330a666200b62422454a4fd8d3cd0a4ae37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Esser?= Date: Thu, 3 May 2018 14:59:52 +0200 Subject: Fix '-Wignored-qualifiers' --- Rx/v2/src/rxcpp/rx-subscription.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Rx/v2/src/rxcpp/rx-subscription.hpp b/Rx/v2/src/rxcpp/rx-subscription.hpp index 9c00469..ee4e53e 100644 --- a/Rx/v2/src/rxcpp/rx-subscription.hpp +++ b/Rx/v2/src/rxcpp/rx-subscription.hpp @@ -379,7 +379,7 @@ public: composite_subscription() : inner_type() - , subscription(*static_cast(this)) + , subscription(*static_cast(this)) { } -- cgit v1.2.3 From 7cebbd4d2b3ac16249d538024e7f31eaa300c962 Mon Sep 17 00:00:00 2001 From: Kirk Shoop Date: Fri, 4 May 2018 11:03:54 -0700 Subject: Update README.md --- README.md | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 8106c4a..dbeaaab 100644 --- a/README.md +++ b/README.md @@ -12,8 +12,12 @@ Gitter.im | [![Join in on gitter.im](https://img.shields.io/gitter/room/Reactive Packages | [![NuGet version](http://img.shields.io/nuget/v/RxCpp.svg?style=flat-square)](http://www.nuget.org/packages/RxCpp/) [![vcpkg port](https://img.shields.io/badge/vcpkg-port-blue.svg?style=flat-square)](https://github.com/Microsoft/vcpkg/tree/master/ports/rxcpp) Documentation | [![rxcpp doxygen documentation](https://img.shields.io/badge/rxcpp-latest-brightgreen.svg?style=flat-square)](http://reactive-extensions.github.io/RxCpp)
[![reactivex intro](https://img.shields.io/badge/reactivex.io-intro-brightgreen.svg?style=flat-square)](http://reactivex.io/intro.html) [![rx marble diagrams](https://img.shields.io/badge/rxmarbles-diagrams-brightgreen.svg?style=flat-square)](http://rxmarbles.com/) +# Usage + +__RxCpp__ is a header-only C++ library that only depends on the standard library. The CMake build generates documentation and unit tests. + # Example -Add ```Rx/v2/src``` to the include paths +Add `Rx/v2/src` to the include paths [![lines from bytes](https://img.shields.io/badge/blog%20post-lines%20from%20bytes-blue.svg?style=flat-square)](http://kirkshoop.github.io/async/rxcpp/c++/2015/07/07/rxcpp_-_parsing_bytes_to_lines_of_text.html) @@ -230,7 +234,5 @@ When Doxygen+Graphviz is installed, CMake creates a special build task named `do Before submitting a feature or substantial code contribution please discuss it with the team and ensure it follows the product roadmap. Note that all code submissions will be rigorously reviewed and tested by the Rx Team, and only those that meet an extremely high bar for both quality and design/roadmap appropriateness will be merged into the source. -You will be prompted to submit a Contributor License Agreement form after submitting your pull request. This needs to only be done once for any Microsoft OSS project. Fill in the [Contributor License Agreement](https://cla2.msopentech.com/) (CLA). - # Microsoft Open Source Code of Conduct This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments. -- cgit v1.2.3 From b3e88fd0eb86d707b343496a62f9bb4e606bb04a Mon Sep 17 00:00:00 2001 From: Kirk Shoop Date: Fri, 4 May 2018 13:30:03 -0700 Subject: Update README.md --- README.md | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index dbeaaab..d0791d5 100644 --- a/README.md +++ b/README.md @@ -3,18 +3,18 @@ The Reactive Extensions for C++ (__RxCpp__) is a library of algorithms for value Platform | Status | ----------- | :------------ | Windows | [![Windows Status](http://img.shields.io/appveyor/ci/kirkshoop/RxCpp-446.svg?style=flat-square)](https://ci.appveyor.com/project/kirkshoop/rxcpp-446) -Linux & OSX | [![Linux & Osx Status](http://img.shields.io/travis/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://travis-ci.org/Reactive-Extensions/RxCpp) +Linux & OSX | [![Linux & Osx Status](http://img.shields.io/travis/ReactiveX/RxCpp.svg?style=flat-square)](https://travis-ci.org/ReactiveX/RxCpp) Source | Badges | ------------- | :--------------- | -Github | [![GitHub license](https://img.shields.io/github/license/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp)
[![GitHub release](https://img.shields.io/github/release/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp/releases)
[![GitHub commits](https://img.shields.io/github/commits-since/Reactive-Extensions/RxCpp/v4.0.0.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp) -Gitter.im | [![Join in on gitter.im](https://img.shields.io/gitter/room/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://gitter.im/Reactive-Extensions/RxCpp?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) +Github | [![GitHub license](https://img.shields.io/github/license/ReactiveX/RxCpp.svg?style=flat-square)](https://github.com/ReactiveX/RxCpp)
[![GitHub release](https://img.shields.io/github/release/ReactiveX/RxCpp.svg?style=flat-square)](https://github.com/ReactiveX/RxCpp/releases)
[![GitHub commits](https://img.shields.io/github/commits-since/ReactiveX/RxCpp/v4.0.0.svg?style=flat-square)](https://github.com/ReactiveX/RxCpp) +Gitter.im | [![Join in on gitter.im](https://img.shields.io/gitter/room/ReactiveX/RxCpp.svg?style=flat-square)](https://gitter.im/ReactiveX/RxCpp?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) Packages | [![NuGet version](http://img.shields.io/nuget/v/RxCpp.svg?style=flat-square)](http://www.nuget.org/packages/RxCpp/) [![vcpkg port](https://img.shields.io/badge/vcpkg-port-blue.svg?style=flat-square)](https://github.com/Microsoft/vcpkg/tree/master/ports/rxcpp) -Documentation | [![rxcpp doxygen documentation](https://img.shields.io/badge/rxcpp-latest-brightgreen.svg?style=flat-square)](http://reactive-extensions.github.io/RxCpp)
[![reactivex intro](https://img.shields.io/badge/reactivex.io-intro-brightgreen.svg?style=flat-square)](http://reactivex.io/intro.html) [![rx marble diagrams](https://img.shields.io/badge/rxmarbles-diagrams-brightgreen.svg?style=flat-square)](http://rxmarbles.com/) +Documentation | [![rxcpp doxygen documentation](https://img.shields.io/badge/rxcpp-latest-brightgreen.svg?style=flat-square)](http://reactivex.github.io/RxCpp)
[![reactivex intro](https://img.shields.io/badge/reactivex.io-intro-brightgreen.svg?style=flat-square)](http://reactivex.io/intro.html) [![rx marble diagrams](https://img.shields.io/badge/rxmarbles-diagrams-brightgreen.svg?style=flat-square)](http://rxmarbles.com/) # Usage -__RxCpp__ is a header-only C++ library that only depends on the standard library. The CMake build generates documentation and unit tests. +__RxCpp__ is a header-only C++ library that only depends on the standard library. The CMake build generates documentation and unit tests. The unit tests depend on a git submodule for the [Catch](https://github.com/philsquared/Catch) library. # Example Add `Rx/v2/src` to the include paths @@ -128,7 +128,7 @@ Credit [ReactiveX.io](http://reactivex.io/intro.html) ### Other language implementations * Java: [RxJava](https://github.com/ReactiveX/RxJava) -* JavaScript: [RxJS](https://github.com/Reactive-Extensions/RxJS) +* JavaScript: [rxjs](https://github.com/ReactiveX/rxjs) * C#: [Rx.NET](https://github.com/Reactive-Extensions/Rx.NET) * [More..](http://reactivex.io/languages.html) @@ -148,7 +148,7 @@ Credit [ReactiveX.io](http://reactivex.io/intro.html) RxCpp uses a git submodule (in `ext/catch`) for the excellent [Catch](https://github.com/philsquared/Catch) library. The easiest way to ensure that the submodules are included in the clone is to add `--recursive` in the clone command. ```shell -git clone --recursive https://github.com/Reactive-Extensions/RxCpp.git +git clone --recursive https://github.com/ReactiveX/RxCpp.git cd RxCpp ``` @@ -224,7 +224,7 @@ Example of by-tag # Documentation -RxCpp uses Doxygen to generate project [documentation](http://reactive-extensions.github.io/RxCpp). +RxCpp uses Doxygen to generate project [documentation](http://reactivex.github.io/RxCpp). When Doxygen+Graphviz is installed, CMake creates a special build task named `doc`. It creates actual documentation and puts it to `projects/doxygen/html/` folder, which can be published to the `gh-pages` branch. Each merged pull request will build the docs and publish them. -- cgit v1.2.3 From 533e3a349ac0699c946c5497675ac5d159479030 Mon Sep 17 00:00:00 2001 From: Kirk Shoop Date: Fri, 4 May 2018 13:35:33 -0700 Subject: Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index d0791d5..3cd4d7a 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ Linux & OSX | [![Linux & Osx Status](http://img.shields.io/travis/ReactiveX/RxCp Source | Badges | ------------- | :--------------- | Github | [![GitHub license](https://img.shields.io/github/license/ReactiveX/RxCpp.svg?style=flat-square)](https://github.com/ReactiveX/RxCpp)
[![GitHub release](https://img.shields.io/github/release/ReactiveX/RxCpp.svg?style=flat-square)](https://github.com/ReactiveX/RxCpp/releases)
[![GitHub commits](https://img.shields.io/github/commits-since/ReactiveX/RxCpp/v4.0.0.svg?style=flat-square)](https://github.com/ReactiveX/RxCpp) -Gitter.im | [![Join in on gitter.im](https://img.shields.io/gitter/room/ReactiveX/RxCpp.svg?style=flat-square)](https://gitter.im/ReactiveX/RxCpp?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) +Gitter.im | [![Join in on gitter.im](https://img.shields.io/gitter/room/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://gitter.im/ReactiveX/RxCpp?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) Packages | [![NuGet version](http://img.shields.io/nuget/v/RxCpp.svg?style=flat-square)](http://www.nuget.org/packages/RxCpp/) [![vcpkg port](https://img.shields.io/badge/vcpkg-port-blue.svg?style=flat-square)](https://github.com/Microsoft/vcpkg/tree/master/ports/rxcpp) Documentation | [![rxcpp doxygen documentation](https://img.shields.io/badge/rxcpp-latest-brightgreen.svg?style=flat-square)](http://reactivex.github.io/RxCpp)
[![reactivex intro](https://img.shields.io/badge/reactivex.io-intro-brightgreen.svg?style=flat-square)](http://reactivex.io/intro.html) [![rx marble diagrams](https://img.shields.io/badge/rxmarbles-diagrams-brightgreen.svg?style=flat-square)](http://rxmarbles.com/) -- cgit v1.2.3 From 28db2ee9fdac1aed6fa8e360ea5ddf12856d0f6b Mon Sep 17 00:00:00 2001 From: Andrei Lebedev Date: Fri, 11 May 2018 00:32:43 +0300 Subject: Remove unused variable and captures in doxygen/buffer.cpp --- Rx/v2/examples/doxygen/buffer.cpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/Rx/v2/examples/doxygen/buffer.cpp b/Rx/v2/examples/doxygen/buffer.cpp index e88c2ed..58f023a 100644 --- a/Rx/v2/examples/doxygen/buffer.cpp +++ b/Rx/v2/examples/doxygen/buffer.cpp @@ -162,7 +162,6 @@ SCENARIO("buffer period sample"){ SCENARIO("buffer period+count+coordination sample"){ printf("//! [buffer period+count+coordination sample]\n"); - auto start = std::chrono::steady_clock::now(); auto int1 = rxcpp::observable<>::range(1L, 3L); auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50)); auto values = int1. @@ -171,7 +170,7 @@ SCENARIO("buffer period+count+coordination sample"){ values. as_blocking(). subscribe( - [start](std::vector v){ + [](std::vector v){ printf("OnNext:"); std::for_each(v.begin(), v.end(), [](long a){ printf(" %ld", a); @@ -184,7 +183,6 @@ SCENARIO("buffer period+count+coordination sample"){ SCENARIO("buffer period+count sample"){ printf("//! [buffer period+count sample]\n"); - auto start = std::chrono::steady_clock::now(); auto int1 = rxcpp::observable<>::range(1L, 3L); auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50)); auto values = int1. @@ -192,7 +190,7 @@ SCENARIO("buffer period+count sample"){ buffer_with_time_or_count(std::chrono::milliseconds(20), 2); values. subscribe( - [start](std::vector v){ + [](std::vector v){ printf("OnNext:"); std::for_each(v.begin(), v.end(), [](long a){ printf(" %ld", a); -- cgit v1.2.3 From abaf9881831d3adf52625443611aa1c3ea5e5dbf Mon Sep 17 00:00:00 2001 From: Andrei Lebedev Date: Fri, 11 May 2018 02:08:37 +0300 Subject: Fix exit_recursed_scope_type dtor cleaning requestor early --- Rx/v2/src/rxcpp/rx-scheduler.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Rx/v2/src/rxcpp/rx-scheduler.hpp b/Rx/v2/src/rxcpp/rx-scheduler.hpp index 0f239be..8bdb4dd 100644 --- a/Rx/v2/src/rxcpp/rx-scheduler.hpp +++ b/Rx/v2/src/rxcpp/rx-scheduler.hpp @@ -480,9 +480,9 @@ class schedulable : public schedulable_base // no change in recursion scope return *this; } - exit_recursed_scope_type reset(const recurse& r) const { + std::shared_ptr reset(const recurse& r) const { requestor = std::addressof(r.get_recursed()); - return exit_recursed_scope_type(this); + return std::make_shared(this); } bool is_recursed() const { return !!requestor; -- cgit v1.2.3 From c7de35be5dc31785984f9b3c6b21f7bf46a05034 Mon Sep 17 00:00:00 2001 From: Andrei Lebedev Date: Fri, 11 May 2018 08:22:09 +0300 Subject: Replace shared_ptr with move ctor --- Rx/v2/src/rxcpp/rx-scheduler.hpp | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/Rx/v2/src/rxcpp/rx-scheduler.hpp b/Rx/v2/src/rxcpp/rx-scheduler.hpp index 8bdb4dd..dbf6cb2 100644 --- a/Rx/v2/src/rxcpp/rx-scheduler.hpp +++ b/Rx/v2/src/rxcpp/rx-scheduler.hpp @@ -458,12 +458,19 @@ class schedulable : public schedulable_base public: ~exit_recursed_scope_type() { + if (that != nullptr) { that->requestor = nullptr; + } } exit_recursed_scope_type(const recursed_scope_type* that) : that(that) { } + exit_recursed_scope_type(exit_recursed_scope_type && other) /*noexcept*/ + : that(other.that) + { + other.that = nullptr; + } }; public: recursed_scope_type() @@ -480,9 +487,9 @@ class schedulable : public schedulable_base // no change in recursion scope return *this; } - std::shared_ptr reset(const recurse& r) const { + exit_recursed_scope_type reset(const recurse& r) const { requestor = std::addressof(r.get_recursed()); - return std::make_shared(this); + return exit_recursed_scope_type(this); } bool is_recursed() const { return !!requestor; -- cgit v1.2.3 From eaf0545efef9fdccc0a6bc88f23ef0fc876c36f3 Mon Sep 17 00:00:00 2001 From: Andrei Lebedev Date: Fri, 11 May 2018 08:23:22 +0300 Subject: Replace commented out noexcept with macro --- Rx/v2/src/rxcpp/rx-scheduler.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Rx/v2/src/rxcpp/rx-scheduler.hpp b/Rx/v2/src/rxcpp/rx-scheduler.hpp index dbf6cb2..fc68979 100644 --- a/Rx/v2/src/rxcpp/rx-scheduler.hpp +++ b/Rx/v2/src/rxcpp/rx-scheduler.hpp @@ -466,7 +466,7 @@ class schedulable : public schedulable_base : that(that) { } - exit_recursed_scope_type(exit_recursed_scope_type && other) /*noexcept*/ + exit_recursed_scope_type(exit_recursed_scope_type && other) RXCPP_NOEXCEPT : that(other.that) { other.that = nullptr; -- cgit v1.2.3 From ce5dfc2fdd49693cd109a5f5470b73e982c33f35 Mon Sep 17 00:00:00 2001 From: Kirk Shoop Date: Mon, 28 May 2018 10:15:22 -0700 Subject: Update README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 3cd4d7a..330ca9c 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ Linux & OSX | [![Linux & Osx Status](http://img.shields.io/travis/ReactiveX/RxCp Source | Badges | ------------- | :--------------- | -Github | [![GitHub license](https://img.shields.io/github/license/ReactiveX/RxCpp.svg?style=flat-square)](https://github.com/ReactiveX/RxCpp)
[![GitHub release](https://img.shields.io/github/release/ReactiveX/RxCpp.svg?style=flat-square)](https://github.com/ReactiveX/RxCpp/releases)
[![GitHub commits](https://img.shields.io/github/commits-since/ReactiveX/RxCpp/v4.0.0.svg?style=flat-square)](https://github.com/ReactiveX/RxCpp) +Github | [![GitHub license](https://img.shields.io/github/license/ReactiveX/RxCpp.svg?style=flat-square)](https://github.com/ReactiveX/RxCpp)
[![GitHub release](https://img.shields.io/github/release/ReactiveX/RxCpp.svg?style=flat-square)](https://github.com/ReactiveX/RxCpp/releases)
[![GitHub commits](https://img.shields.io/github/commits-since/ReactiveX/RxCpp/4.1.0.svg?style=flat-square)](https://github.com/ReactiveX/RxCpp) Gitter.im | [![Join in on gitter.im](https://img.shields.io/gitter/room/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://gitter.im/ReactiveX/RxCpp?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) Packages | [![NuGet version](http://img.shields.io/nuget/v/RxCpp.svg?style=flat-square)](http://www.nuget.org/packages/RxCpp/) [![vcpkg port](https://img.shields.io/badge/vcpkg-port-blue.svg?style=flat-square)](https://github.com/Microsoft/vcpkg/tree/master/ports/rxcpp) Documentation | [![rxcpp doxygen documentation](https://img.shields.io/badge/rxcpp-latest-brightgreen.svg?style=flat-square)](http://reactivex.github.io/RxCpp)
[![reactivex intro](https://img.shields.io/badge/reactivex.io-intro-brightgreen.svg?style=flat-square)](http://reactivex.io/intro.html) [![rx marble diagrams](https://img.shields.io/badge/rxmarbles-diagrams-brightgreen.svg?style=flat-square)](http://rxmarbles.com/) @@ -152,7 +152,7 @@ git clone --recursive https://github.com/ReactiveX/RxCpp.git cd RxCpp ``` -# Building RxCpp +# Building RxCpp Unit Tests * RxCpp is regularly tested on OSX and Windows. * RxCpp is regularly built with Clang, Gcc and VC -- cgit v1.2.3 From 0b93c186708460962d5f47414fe80b289a24bbe1 Mon Sep 17 00:00:00 2001 From: Andrei Lebedev Date: Sun, 8 Jul 2018 18:36:26 +0300 Subject: Remove static from observe_on_run_loop - Different run loops can be used with factory calls --- Rx/v2/src/rxcpp/operators/rx-observe_on.hpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp index 99de4c3..274f3f1 100644 --- a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp @@ -316,8 +316,7 @@ public: }; inline observe_on_one_worker observe_on_run_loop(const rxsc::run_loop& rl) { - static observe_on_one_worker r(rxsc::make_run_loop(rl)); - return r; + return observe_on_one_worker(rxsc::make_run_loop(rl)); } inline observe_on_one_worker observe_on_event_loop() { -- cgit v1.2.3 From 546e91b8b07edea7816afcf4dc638622534e8234 Mon Sep 17 00:00:00 2001 From: Diorcet Yann Date: Tue, 10 Jul 2018 16:37:21 +0200 Subject: group_by support DurationSelector (#447) * group_by support DurationSelector * remove unused names --- Rx/v2/src/rxcpp/operators/rx-group_by.hpp | 116 +++++++++++++++++++++--------- Rx/v2/src/rxcpp/rx-util.hpp | 9 +++ 2 files changed, 92 insertions(+), 33 deletions(-) diff --git a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp index 4eadbab..8b451e5 100644 --- a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp @@ -9,6 +9,7 @@ \tparam KeySelector the type of the key extracting function \tparam MarbleSelector the type of the element extracting function \tparam BinaryPredicate the type of the key comparing function + \tparam DurationSelector the type of the duration observable function \param ks a function that extracts the key for each item (optional) \param ms a function that extracts the return element for each item (optional) @@ -63,7 +64,7 @@ struct is_group_by_selector_for { static const bool value = !std::is_same::value; }; -template +template struct group_by_traits { typedef T source_value_type; @@ -71,6 +72,7 @@ struct group_by_traits typedef rxu::decay_t key_selector_type; typedef rxu::decay_t marble_selector_type; typedef rxu::decay_t predicate_type; + typedef rxu::decay_t duration_selector_type; static_assert(is_group_by_selector_for::value, "group_by KeySelector must be a function with the signature key_type(source_value_type)"); @@ -87,14 +89,15 @@ struct group_by_traits typedef grouped_observable grouped_observable_type; }; -template +template struct group_by { - typedef group_by_traits traits_type; + typedef group_by_traits traits_type; typedef typename traits_type::key_selector_type key_selector_type; typedef typename traits_type::marble_selector_type marble_selector_type; typedef typename traits_type::marble_type marble_type; typedef typename traits_type::predicate_type predicate_type; + typedef typename traits_type::duration_selector_type duration_selector_type; typedef typename traits_type::subject_type subject_type; typedef typename traits_type::key_type key_type; @@ -130,21 +133,23 @@ struct group_by struct group_by_values { - group_by_values(key_selector_type ks, marble_selector_type ms, predicate_type p) + group_by_values(key_selector_type ks, marble_selector_type ms, predicate_type p, duration_selector_type ds) : keySelector(std::move(ks)) , marbleSelector(std::move(ms)) , predicate(std::move(p)) + , durationSelector(std::move(ds)) { } mutable key_selector_type keySelector; mutable marble_selector_type marbleSelector; mutable predicate_type predicate; + mutable duration_selector_type durationSelector; }; group_by_values initial; - group_by(key_selector_type ks, marble_selector_type ms, predicate_type p) - : initial(std::move(ks), std::move(ms), std::move(p)) + group_by(key_selector_type ks, marble_selector_type ms, predicate_type p, duration_selector_type ds) + : initial(std::move(ks), std::move(ms), std::move(p), std::move(ds)) { } @@ -206,7 +211,35 @@ struct group_by } auto sub = subject_type(); g = state->groups.insert(std::make_pair(selectedKey.get(), sub.get_subscriber())).first; - dest.on_next(make_dynamic_grouped_observable(group_by_observable(state, sub, selectedKey.get()))); + auto obs = make_dynamic_grouped_observable(group_by_observable(state, sub, selectedKey.get())); + auto durationObs = on_exception( + [&](){ + return this->durationSelector(obs);}, + [this](std::exception_ptr e){on_error(e);}); + if (durationObs.empty()) { + return; + } + + dest.on_next(obs); + composite_subscription duration_sub; + auto ssub = state->source_lifetime.add(duration_sub); + + auto expire_state = state; + auto expire_dest = g->second; + auto expire = [=]() { + auto g = expire_state->groups.find(selectedKey.get()); + if (g != expire_state->groups.end()) { + expire_state->groups.erase(g); + expire_dest.on_completed(); + } + expire_state->source_lifetime.remove(ssub); + }; + auto robs = durationObs.get().take(1); + duration_sub.add(robs.subscribe( + [](const typename decltype(robs)::value_type &){}, + [=](std::exception_ptr ) {expire();}, + [=](){expire();} + )); } auto selectedMarble = on_exception( [&](){ @@ -243,33 +276,36 @@ struct group_by } }; -template +template class group_by_factory { typedef rxu::decay_t key_selector_type; typedef rxu::decay_t marble_selector_type; typedef rxu::decay_t predicate_type; + typedef rxu::decay_t duration_selector_type; key_selector_type keySelector; marble_selector_type marbleSelector; predicate_type predicate; + duration_selector_type durationSelector; public: - group_by_factory(key_selector_type ks, marble_selector_type ms, predicate_type p) + group_by_factory(key_selector_type ks, marble_selector_type ms, predicate_type p, duration_selector_type ds) : keySelector(std::move(ks)) , marbleSelector(std::move(ms)) , predicate(std::move(p)) + , durationSelector(std::move(ds)) { } template struct group_by_factory_traits { typedef rxu::value_type_t> value_type; - typedef detail::group_by_traits traits_type; - typedef detail::group_by group_by_type; + typedef detail::group_by_traits traits_type; + typedef detail::group_by group_by_type; }; template auto operator()(Observable&& source) - -> decltype(source.template lift::traits_type::grouped_observable_type>(typename group_by_factory_traits::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate)))) { - return source.template lift::traits_type::grouped_observable_type>(typename group_by_factory_traits::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate))); + -> decltype(source.template lift::traits_type::grouped_observable_type>(typename group_by_factory_traits::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate), std::move(durationSelector)))) { + return source.template lift::traits_type::grouped_observable_type>(typename group_by_factory_traits::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate), std::move(durationSelector))); } }; @@ -288,61 +324,75 @@ auto group_by(AN&&... an) template<> struct member_overload { - template, + class Traits = rxo::detail::group_by_traits, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>, + class GroupBy = rxo::detail::group_by, rxu::decay_t, rxu::decay_t, rxu::decay_t, rxu::decay_t>, + class Value = typename Traits::grouped_observable_type> + static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms, BinaryPredicate&& p, DurationSelector&& ds) + -> decltype(o.template lift(GroupBy(std::forward(ks), std::forward(ms), std::forward(p), std::forward(ds)))) { + return o.template lift(GroupBy(std::forward(ks), std::forward(ms), std::forward(p), std::forward(ds))); + } + + template>>, class SourceValue = rxu::value_type_t, - class Traits = rxo::detail::group_by_traits, KeySelector, MarbleSelector, BinaryPredicate>, - class GroupBy = rxo::detail::group_by, rxu::decay_t, rxu::decay_t, rxu::decay_t>, + class Traits = rxo::detail::group_by_traits, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>, + class GroupBy = rxo::detail::group_by, rxu::decay_t, rxu::decay_t, rxu::decay_t, rxu::decay_t>, class Value = typename Traits::grouped_observable_type> static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms, BinaryPredicate&& p) - -> decltype(o.template lift(GroupBy(std::forward(ks), std::forward(ms), std::forward(p)))) { - return o.template lift(GroupBy(std::forward(ks), std::forward(ms), std::forward(p))); + -> decltype(o.template lift(GroupBy(std::forward(ks), std::forward(ms), std::forward(p), rxu::ret>>()))) { + return o.template lift(GroupBy(std::forward(ks), std::forward(ms), std::forward(p), rxu::ret>>())); } - template>>, class SourceValue = rxu::value_type_t, - class Traits = rxo::detail::group_by_traits, KeySelector, MarbleSelector, BinaryPredicate>, - class GroupBy = rxo::detail::group_by, rxu::decay_t, rxu::decay_t, rxu::decay_t>, + class Traits = rxo::detail::group_by_traits, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>, + class GroupBy = rxo::detail::group_by, rxu::decay_t, rxu::decay_t, rxu::decay_t, rxu::decay_t>, class Value = typename Traits::grouped_observable_type> static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms) - -> decltype(o.template lift(GroupBy(std::forward(ks), std::forward(ms), rxu::less()))) { - return o.template lift(GroupBy(std::forward(ks), std::forward(ms), rxu::less())); + -> decltype(o.template lift(GroupBy(std::forward(ks), std::forward(ms), rxu::less(), rxu::ret>>()))) { + return o.template lift(GroupBy(std::forward(ks), std::forward(ms), rxu::less(), rxu::ret>>())); } template, class BinaryPredicate=rxu::less, + class DurationSelector=rxu::ret>>, class SourceValue = rxu::value_type_t, - class Traits = rxo::detail::group_by_traits, KeySelector, MarbleSelector, BinaryPredicate>, - class GroupBy = rxo::detail::group_by, rxu::decay_t, rxu::decay_t, rxu::decay_t>, + class Traits = rxo::detail::group_by_traits, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>, + class GroupBy = rxo::detail::group_by, rxu::decay_t, rxu::decay_t, rxu::decay_t, rxu::decay_t>, class Value = typename Traits::grouped_observable_type> static auto member(Observable&& o, KeySelector&& ks) - -> decltype(o.template lift(GroupBy(std::forward(ks), rxu::detail::take_at<0>(), rxu::less()))) { - return o.template lift(GroupBy(std::forward(ks), rxu::detail::take_at<0>(), rxu::less())); + -> decltype(o.template lift(GroupBy(std::forward(ks), rxu::detail::take_at<0>(), rxu::less(), rxu::ret>>()))) { + return o.template lift(GroupBy(std::forward(ks), rxu::detail::take_at<0>(), rxu::less(), rxu::ret>>())); } template, class MarbleSelector=rxu::detail::take_at<0>, class BinaryPredicate=rxu::less, + class DurationSelector=rxu::ret>>, class Enabled = rxu::enable_if_all_true_type_t< all_observables>, class SourceValue = rxu::value_type_t, - class Traits = rxo::detail::group_by_traits, KeySelector, MarbleSelector, BinaryPredicate>, - class GroupBy = rxo::detail::group_by, rxu::decay_t, rxu::decay_t, rxu::decay_t>, + class Traits = rxo::detail::group_by_traits, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>, + class GroupBy = rxo::detail::group_by, rxu::decay_t, rxu::decay_t, rxu::decay_t, rxu::decay_t>, class Value = typename Traits::grouped_observable_type> static auto member(Observable&& o) - -> decltype(o.template lift(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less()))) { - return o.template lift(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less())); + -> decltype(o.template lift(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less(), rxu::ret>>()))) { + return o.template lift(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less(), rxu::ret>>())); } template static operators::detail::group_by_invalid_t member(const AN&...) { std::terminate(); return {}; - static_assert(sizeof...(AN) == 10000, "group_by takes (optional KeySelector, optional MarbleSelector, optional BinaryKeyPredicate), KeySelector takes (Observable::value_type) -> KeyValue, MarbleSelector takes (Observable::value_type) -> MarbleValue, BinaryKeyPredicate takes (KeyValue, KeyValue) -> bool"); - } + static_assert(sizeof...(AN) == 10000, "group_by takes (optional KeySelector, optional MarbleSelector, optional BinaryKeyPredicate, optional DurationSelector), KeySelector takes (Observable::value_type) -> KeyValue, MarbleSelector takes (Observable::value_type) -> MarbleValue, BinaryKeyPredicate takes (KeyValue, KeyValue) -> bool, DurationSelector takes (Observable::value_type) -> Observable"); + } }; diff --git a/Rx/v2/src/rxcpp/rx-util.hpp b/Rx/v2/src/rxcpp/rx-util.hpp index 97ff73f..d4d5536 100644 --- a/Rx/v2/src/rxcpp/rx-util.hpp +++ b/Rx/v2/src/rxcpp/rx-util.hpp @@ -416,6 +416,15 @@ struct less { return std::forward(lhs) < std::forward(rhs); } }; +template +struct ret +{ + template + auto operator()(LHS&& ) const + -> decltype(T()) + { return T(); } +}; + template struct equal_to { -- cgit v1.2.3 From 1e9312fc7eba8e78719f68d2d6743cf8ecce0e85 Mon Sep 17 00:00:00 2001 From: Kirk Shoop Date: Tue, 10 Jul 2018 07:39:45 -0700 Subject: adding test for nocompare observe_on (#448) * adding test for nocompare observe_on notification uses SFINAE to compile for value_types that do not have operator== * use as_dynamic to avoid vc 2013 bug --- Rx/v2/test/operators/observe_on.cpp | 56 ++++++++++++++++++++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/Rx/v2/test/operators/observe_on.cpp b/Rx/v2/test/operators/observe_on.cpp index 644ab93..ffa85aa 100644 --- a/Rx/v2/test/operators/observe_on.cpp +++ b/Rx/v2/test/operators/observe_on.cpp @@ -1,5 +1,6 @@ #include "../test.h" #include +#include #include const int static_onnextcalls = 100000; @@ -136,4 +137,57 @@ SCENARIO("stream observe_on", "[observe][observe_on]"){ } } -} \ No newline at end of file +} + +class nocompare { +public: + int v; +}; + +SCENARIO("observe_on no-comparison", "[observe][observe_on]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto so = rx::observe_on_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages in; + const rxsc::test::messages out; + + auto xs = sc.make_hot_observable({ + in.next(150, nocompare{1}), + in.next(210, nocompare{2}), + in.next(240, nocompare{3}), + in.completed(300) + }); + + WHEN("observe_on is specified"){ + + auto res = w.start( + [so, xs]() { + return xs + | rxo::observe_on(so) + | rxo::map([](nocompare v){ return v.v; }) + | rxo::as_dynamic(); + } + ); + + THEN("the output contains items sent while subscribed"){ + auto required = rxu::to_vector({ + out.next(211, 2), + out.next(241, 3), + out.completed(301) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + out.subscribe(200, 300) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} -- cgit v1.2.3 From b3753b360072a32822c564e288782ed704c7494d Mon Sep 17 00:00:00 2001 From: Kirk Shoop Date: Sun, 5 Aug 2018 15:15:24 -0700 Subject: fix blocking_observable::subscribe removes spinning from blocking submit. ran all perf tests on osx without issue. should fix #430 and help with #451 --- Rx/v2/src/rxcpp/rx-observable.hpp | 45 +++++------------------------- Rx/v2/test/operators/merge_delay_error.cpp | 6 ++-- 2 files changed, 10 insertions(+), 41 deletions(-) diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 496db0b..037f375 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -173,28 +173,9 @@ class blocking_observable -> void { std::mutex lock; std::condition_variable wake; + bool disposed = false; std::exception_ptr error; - struct tracking - { - ~tracking() - { - if (!disposed || !wakened) std::terminate(); - } - tracking() - { - disposed = false; - wakened = false; - false_wakes = 0; - true_wakes = 0; - } - std::atomic_bool disposed; - std::atomic_bool wakened; - std::atomic_int false_wakes; - std::atomic_int true_wakes; - }; - auto track = std::make_shared(); - auto dest = make_subscriber(std::forward(an)...); // keep any error to rethrow at the end. @@ -213,31 +194,19 @@ class blocking_observable auto cs = scbr.get_subscription(); cs.add( - [&, track](){ - // OSX geting invalid x86 op if notify_one is after the disposed = true - // presumably because the condition_variable may already have been awakened - // and is now sitting in a while loop on disposed + [&](){ + std::unique_lock guard(lock); wake.notify_one(); - track->disposed = true; + disposed = true; }); - std::unique_lock guard(lock); source.subscribe(std::move(scbr)); + std::unique_lock guard(lock); wake.wait(guard, - [&, track](){ - // this is really not good. - // false wakeups were never followed by true wakeups so.. - - // anyways this gets triggered before disposed is set now so wait. - while (!track->disposed) { - ++track->false_wakes; - } - ++track->true_wakes; - return true; + [&](){ + return disposed; }); - track->wakened = true; - if (!track->disposed || !track->wakened) std::terminate(); if (error) {std::rethrow_exception(error);} } diff --git a/Rx/v2/test/operators/merge_delay_error.cpp b/Rx/v2/test/operators/merge_delay_error.cpp index d560b45..b53b884 100644 --- a/Rx/v2/test/operators/merge_delay_error.cpp +++ b/Rx/v2/test/operators/merge_delay_error.cpp @@ -7,7 +7,7 @@ const int static_onnextcalls = 1000000; //merge_delay_error must work the very same way as `merge()` except the error handling -SCENARIO("merge completes", "[merge][join][operators]"){ +SCENARIO("merge_delay_error completes", "[merge][join][operators]"){ GIVEN("1 hot observable with 3 cold observables of ints."){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); @@ -117,7 +117,7 @@ SCENARIO("merge completes", "[merge][join][operators]"){ } } -SCENARIO("variadic merge completes with error", "[merge][join][operators]"){ +SCENARIO("variadic merge_delay_error completes with error", "[merge][join][operators]"){ GIVEN("1 hot observable with 3 cold observables of ints."){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); @@ -211,7 +211,7 @@ SCENARIO("variadic merge completes with error", "[merge][join][operators]"){ } } -SCENARIO("variadic merge completes with 2 errors", "[merge][join][operators]"){ +SCENARIO("variadic merge_delay_error completes with 2 errors", "[merge][join][operators]"){ GIVEN("1 hot observable with 3 cold observables of ints."){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); -- cgit v1.2.3 From a7d5856385f126e874db6010d9dbfd37290c61de Mon Sep 17 00:00:00 2001 From: Yatao Li Date: Mon, 6 Aug 2018 18:53:13 +0800 Subject: fix #390 --- Ix/CPP/src/cpplinq/linq.hpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Ix/CPP/src/cpplinq/linq.hpp b/Ix/CPP/src/cpplinq/linq.hpp index be77151..6552f79 100644 --- a/Ix/CPP/src/cpplinq/linq.hpp +++ b/Ix/CPP/src/cpplinq/linq.hpp @@ -471,22 +471,22 @@ public: // TODO: skip_while(pred) - template + template typename std::enable_if::value, ITEM>::type sum() const { ITEM seed{}; return sum(seed); } - typename element_type sum(typename element_type seed) const { + element_type sum(element_type seed) const { return std::accumulate(begin(), end(), seed); } - template ::type> + template ::type> typename std::enable_if::value, Result>::type sum(Selector sel) const { return from(begin(), end()).select(sel).sum(); } - template ::type> + template ::type> Result sum(Selector sel, Result seed) const { return from(begin(), end()).select(sel).sum(seed); } -- cgit v1.2.3