diff options
author | Kirk Shoop <kirk.shoop@gmail.com> | 2018-10-28 14:40:32 -0700 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@gmail.com> | 2018-10-28 21:30:17 -0700 |
commit | ffa35af3bc171f5cc49019194cf4f16042dd069b (patch) | |
tree | a7e6ea8056cef076517b5d66db42234454a86dac | |
parent | 4aa52e42579cbd9e2cef6c0a6c2b0d8edf73ac5d (diff) | |
download | platform_external_Reactive-Extensions_RxCpp-ffa35af3bc171f5cc49019194cf4f16042dd069b.tar.gz platform_external_Reactive-Extensions_RxCpp-ffa35af3bc171f5cc49019194cf4f16042dd069b.tar.bz2 platform_external_Reactive-Extensions_RxCpp-ffa35af3bc171f5cc49019194cf4f16042dd069b.zip |
fixes for replay and moved-from container not being empty
-rw-r--r-- | Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp | 1 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp | 30 |
2 files changed, 19 insertions, 12 deletions
diff --git a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp index e31ed55..5145e92 100644 --- a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp +++ b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp @@ -72,6 +72,7 @@ private: state->lifetime.add([keepAlive](){ std::unique_lock<std::mutex> guard(keepAlive->lock); auto expired = std::move(keepAlive->q); + keepAlive->q = new_worker_state::queue_item_time{}; if (!keepAlive->q.empty()) std::terminate(); keepAlive->wake.notify_one(); diff --git a/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp b/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp index b7f7d68..020b0f8 100644 --- a/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp +++ b/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp @@ -43,6 +43,7 @@ class replay_observer : public detail::multicast_observer<T> mutable std::list<time_point_type> time_points; mutable count_type count; mutable period_type period; + mutable composite_subscription replayLifetime; public: mutable coordination_type coordination; mutable coordinator_type coordinator; @@ -56,9 +57,13 @@ class replay_observer : public detail::multicast_observer<T> } public: - explicit replay_observer_state(count_type _count, period_type _period, coordination_type _coordination, coordinator_type _coordinator) + ~replay_observer_state(){ + replayLifetime.unsubscribe(); + } + explicit replay_observer_state(count_type _count, period_type _period, coordination_type _coordination, coordinator_type _coordinator, composite_subscription _replayLifetime) : count(_count) , period(_period) + , replayLifetime(_replayLifetime) , coordination(std::move(_coordination)) , coordinator(std::move(_coordinator)) { @@ -66,6 +71,7 @@ class replay_observer : public detail::multicast_observer<T> void add(T v) const { std::unique_lock<std::mutex> guard(lock); + if (!count.empty()) { if (values.size() == count.get()) remove_oldest(); @@ -89,11 +95,12 @@ class replay_observer : public detail::multicast_observer<T> std::shared_ptr<replay_observer_state> state; public: - replay_observer(count_type count, period_type period, coordination_type coordination, composite_subscription cs) - : base_type(cs) + replay_observer(count_type count, period_type period, coordination_type coordination, composite_subscription replayLifetime, composite_subscription subscriberLifetime) + : base_type(subscriberLifetime) { - auto coordinator = coordination.create_coordinator(cs); - state = std::make_shared<replay_observer_state>(std::move(count), std::move(period), std::move(coordination), std::move(coordinator)); + replayLifetime.add(subscriberLifetime); + auto coordinator = coordination.create_coordinator(replayLifetime); + state = std::make_shared<replay_observer_state>(std::move(count), std::move(period), std::move(coordination), std::move(coordinator), std::move(replayLifetime)); } subscriber<T> get_subscriber() const { @@ -129,22 +136,22 @@ class replay public: explicit replay(Coordination cn, composite_subscription cs = composite_subscription()) - : s(count_type(), period_type(), cn, cs) + : s(count_type(), period_type(), cn, cs, composite_subscription{}) { } replay(std::size_t count, Coordination cn, composite_subscription cs = composite_subscription()) - : s(count_type(std::move(count)), period_type(), cn, cs) + : s(count_type(std::move(count)), period_type(), cn, cs, composite_subscription{}) { } replay(rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs = composite_subscription()) - : s(count_type(), period_type(period), cn, cs) + : s(count_type(), period_type(period), cn, cs, composite_subscription{}) { } replay(std::size_t count, rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs = composite_subscription()) - : s(count_type(count), period_type(period), cn, cs) + : s(count_type(count), period_type(period), cn, cs, composite_subscription{}) { } @@ -163,9 +170,8 @@ public: observable<T> get_observable() const { auto keepAlive = s; auto observable = make_observable_dynamic<T>([=](subscriber<T> o){ - if (keepAlive.get_subscription().is_subscribed()) { - for (auto&& value: get_values()) - o.on_next(value); + for (auto&& value: get_values()) { + o.on_next(value); } keepAlive.add(keepAlive.get_subscriber(), std::move(o)); }); |