diff options
author | Kirk Shoop <kirk.shoop@microsoft.com> | 2015-06-12 07:35:12 -0700 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2015-06-18 19:55:10 -0700 |
commit | 83351c7ba1379bdf1978e3aecde5c604bfec0ffe (patch) | |
tree | 6a0d2a0fd22dd80f53492dbb6804438a079c8b72 | |
parent | 6a72451c421947eedbe7feaf1f523af193320a43 (diff) | |
download | platform_external_Reactive-Extensions_RxCpp-83351c7ba1379bdf1978e3aecde5c604bfec0ffe.tar.gz platform_external_Reactive-Extensions_RxCpp-83351c7ba1379bdf1978e3aecde5c604bfec0ffe.tar.bz2 platform_external_Reactive-Extensions_RxCpp-83351c7ba1379bdf1978e3aecde5c604bfec0ffe.zip |
fixes worker leaks (#101 & #132)
uses weak pointer to hold the worker in schedulable, empties queue
when new_thread worker lifetime ends and remove observer when
hot_observable subscription ends.
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-observe_on.hpp | 24 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-notification.hpp | 6 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-scheduler.hpp | 73 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-subscriber.hpp | 3 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/schedulers/rx-currentthread.hpp | 60 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp | 25 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/schedulers/rx-test.hpp | 11 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/schedulers/rx-virtualtime.hpp | 10 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp | 16 | ||||
-rw-r--r-- | appveyor.yml | 14 |
10 files changed, 147 insertions, 95 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp index dfad938..3d25e78 100644 --- a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp @@ -39,7 +39,7 @@ struct observe_on typedef rxn::notification<T> notification_type; typedef typename notification_type::type base_notification_type; - typedef std::queue<base_notification_type> queue_type; + typedef std::deque<base_notification_type> queue_type; struct mode { @@ -54,7 +54,7 @@ struct observe_on struct observe_on_state : std::enable_shared_from_this<observe_on_state> { mutable std::mutex lock; - mutable queue_type queue; + mutable queue_type fill_queue; mutable queue_type drain_queue; composite_subscription lifetime; mutable typename mode::type current; @@ -84,25 +84,25 @@ struct observe_on if (drain_queue.empty() || !destination.is_subscribed()) { std::unique_lock<std::mutex> guard(lock); if (!destination.is_subscribed() || - (!lifetime.is_subscribed() && queue.empty() && drain_queue.empty())) { + (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty())) { current = mode::Disposed; queue_type expired; - swap(expired, queue); + swap(expired, fill_queue); guard.unlock(); lifetime.unsubscribe(); destination.unsubscribe(); return; } if (drain_queue.empty()) { - if (queue.empty()) { + if (fill_queue.empty()) { current = mode::Empty; return; } - swap(queue, drain_queue); + swap(fill_queue, drain_queue); } } auto notification = std::move(drain_queue.front()); - drain_queue.pop(); + drain_queue.pop_front(); notification->accept(destination); self(); } catch(...) { @@ -110,7 +110,7 @@ struct observe_on std::unique_lock<std::mutex> guard(lock); current = mode::Errored; queue_type expired; - swap(expired, queue); + swap(expired, fill_queue); } }; @@ -121,7 +121,7 @@ struct observe_on current = mode::Errored; using std::swap; queue_type expired; - swap(expired, queue); + swap(expired, fill_queue); return; } @@ -143,17 +143,17 @@ struct observe_on void on_next(source_value_type v) const { std::unique_lock<std::mutex> guard(state->lock); - state->queue.push(notification_type::on_next(std::move(v))); + state->fill_queue.push_back(notification_type::on_next(std::move(v))); state->ensure_processing(guard); } void on_error(std::exception_ptr e) const { std::unique_lock<std::mutex> guard(state->lock); - state->queue.push(notification_type::on_error(e)); + state->fill_queue.push_back(notification_type::on_error(e)); state->ensure_processing(guard); } void on_completed() const { std::unique_lock<std::mutex> guard(state->lock); - state->queue.push(notification_type::on_completed()); + state->fill_queue.push_back(notification_type::on_completed()); state->ensure_processing(guard); } diff --git a/Rx/v2/src/rxcpp/rx-notification.hpp b/Rx/v2/src/rxcpp/rx-notification.hpp index 3928fd5..56d82d8 100644 --- a/Rx/v2/src/rxcpp/rx-notification.hpp +++ b/Rx/v2/src/rxcpp/rx-notification.hpp @@ -119,6 +119,9 @@ private: struct on_next_notification : public base { on_next_notification(T value) : value(std::move(value)) { } + on_next_notification(const on_next_notification& o) : value(o.value) {} + on_next_notification(const on_next_notification&& o) : value(std::move(o.value)) {} + on_next_notification& operator=(on_next_notification o) { value = std::move(o.value); return *this; } virtual void out(std::ostream& os) const { os << "on_next( "; detail::to_stream(os, value, 0, 0); @@ -140,6 +143,9 @@ private: struct on_error_notification : public base { on_error_notification(std::exception_ptr ep) : ep(ep) { } + on_error_notification(const on_error_notification& o) : ep(o.ep) {} + on_error_notification(const on_error_notification&& o) : ep(std::move(o.ep)) {} + on_error_notification& operator=(on_error_notification o) { ep = std::move(o.ep); return *this; } virtual void out(std::ostream& os) const { os << "on_error("; try { diff --git a/Rx/v2/src/rxcpp/rx-scheduler.hpp b/Rx/v2/src/rxcpp/rx-scheduler.hpp index 40d3337..b842775 100644 --- a/Rx/v2/src/rxcpp/rx-scheduler.hpp +++ b/Rx/v2/src/rxcpp/rx-scheduler.hpp @@ -22,6 +22,9 @@ typedef std::shared_ptr<action_type> action_ptr; typedef std::shared_ptr<worker_interface> worker_interface_ptr; typedef std::shared_ptr<const worker_interface> const_worker_interface_ptr; +typedef std::weak_ptr<worker_interface> worker_interface_weak_ptr; +typedef std::weak_ptr<const worker_interface> const_worker_interface_weak_ptr; + typedef std::shared_ptr<scheduler_interface> scheduler_interface_ptr; typedef std::shared_ptr<const scheduler_interface> const_scheduler_interface_ptr; @@ -189,6 +192,8 @@ struct is_action_function } +class weak_worker; + /// a worker ensures that all scheduled actions on the same instance are executed in-order with no overlap /// a worker ensures that all scheduled actions are unsubscribed when it is unsubscribed /// some inner implementations will impose additional constraints on the execution of items. @@ -198,6 +203,7 @@ class worker : public worker_base detail::worker_interface_ptr inner; composite_subscription lifetime; friend bool operator==(const worker&, const worker&); + friend class weak_worker; public: typedef scheduler_base::clock_type clock_type; typedef composite_subscription::weak_subscription weak_subscription; @@ -323,6 +329,26 @@ inline bool operator==(const worker& lhs, const worker& rhs) { inline bool operator!=(const worker& lhs, const worker& rhs) { return !(lhs == rhs); } + +class weak_worker +{ + detail::worker_interface_weak_ptr inner; + composite_subscription lifetime; + +public: + weak_worker() + { + } + explicit weak_worker(worker& owner) + : inner(owner.inner) + , lifetime(owner.lifetime) + { + } + + worker lock() const { + return worker(lifetime, inner.lock()); + } +}; class scheduler_interface : public std::enable_shared_from_this<scheduler_interface> @@ -399,7 +425,7 @@ class schedulable : public schedulable_base typedef schedulable this_type; composite_subscription lifetime; - worker controller; + weak_worker controller; action activity; bool scoped; composite_subscription::weak_subscription action_scope; @@ -471,7 +497,7 @@ public: ~schedulable() { if (scoped) { - controller.remove(action_scope); + controller.lock().remove(action_scope); } } schedulable() @@ -482,7 +508,7 @@ public: /// action and worker share lifetime schedulable(worker q, action a) : lifetime(q.get_subscription()) - , controller(std::move(q)) + , controller(q) , activity(std::move(a)) , scoped(false) { @@ -490,19 +516,19 @@ public: /// action and worker have independent lifetimes schedulable(composite_subscription cs, worker q, action a) : lifetime(std::move(cs)) - , controller(std::move(q)) + , controller(q) , activity(std::move(a)) , scoped(true) - , action_scope(controller.add(lifetime)) + , action_scope(controller.lock().add(lifetime)) { } /// inherit lifetimes schedulable(schedulable scbl, worker q, action a) : lifetime(scbl.get_subscription()) - , controller(std::move(q)) + , controller(q) , activity(std::move(a)) , scoped(scbl.scoped) - , action_scope(scbl.scoped ? controller.add(lifetime) : weak_subscription()) + , action_scope(scbl.scoped ? controller.lock().add(lifetime) : weak_subscription()) { } @@ -512,11 +538,11 @@ public: inline composite_subscription& get_subscription() { return lifetime; } - inline const worker& get_worker() const { - return controller; + inline const worker get_worker() const { + return controller.lock(); } - inline worker& get_worker() { - return controller; + inline worker get_worker() { + return controller.lock(); } inline const action& get_action() const { return activity; @@ -577,24 +603,24 @@ public: // scheduler // inline clock_type::time_point now() const { - return controller.now(); + return controller.lock().now(); } /// put this on the queue of the stored scheduler to run asap inline void schedule() const { if (is_subscribed()) { - controller.schedule(*this); + get_worker().schedule(*this); } } /// put this on the queue of the stored scheduler to run at the specified time inline void schedule(clock_type::time_point when) const { if (is_subscribed()) { - controller.schedule(when, *this); + get_worker().schedule(when, *this); } } /// put this on the queue of the stored scheduler to run after a delay from now inline void schedule(clock_type::duration when) const { if (is_subscribed()) { - controller.schedule(when, *this); + get_worker().schedule(when, *this); } } @@ -786,11 +812,12 @@ auto worker::schedule_periodically(clock_type::time_point initial, clock_type::d } template<class... ArgN> void worker::schedule_periodically_rebind(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl, ArgN&&... an) const { + auto keepAlive = *this; auto target = std::make_shared<clock_type::time_point>(initial); - auto activity = make_schedulable(scbl, *this, std::forward<ArgN>(an)...); + auto activity = make_schedulable(scbl, keepAlive, std::forward<ArgN>(an)...); auto periodic = make_schedulable( activity, - [target, period, activity](schedulable self) { + [keepAlive, target, period, activity](schedulable self) { // any recursion requests will be pushed to the scheduler queue recursion r(false); // call action @@ -852,28 +879,28 @@ private: compare_elem > queue_type; - queue_type queue; + queue_type q; int64_t ordinal; public: const_reference top() const { - return queue.top().first; + return q.top().first; } void pop() { - queue.pop(); + q.pop(); } bool empty() const { - return queue.empty(); + return q.empty(); } void push(const item_type& value) { - queue.push(elem_type(value, ordinal++)); + q.push(elem_type(value, ordinal++)); } void push(item_type&& value) { - queue.push(elem_type(std::move(value), ordinal++)); + q.push(elem_type(std::move(value), ordinal++)); } }; diff --git a/Rx/v2/src/rxcpp/rx-subscriber.hpp b/Rx/v2/src/rxcpp/rx-subscriber.hpp index 9e6f263..cfb62c6 100644 --- a/Rx/v2/src/rxcpp/rx-subscriber.hpp +++ b/Rx/v2/src/rxcpp/rx-subscriber.hpp @@ -249,7 +249,8 @@ auto make_subscriber( template<class T, class Observer> auto make_subscriber(const Observer& o) -> typename std::enable_if< - is_observer<Observer>::value, + is_observer<Observer>::value && + !is_subscriber<Observer>::value, subscriber<T, Observer>>::type { return subscriber<T, Observer>(trace_id::make_next_id_subscriber(), composite_subscription(), o); } diff --git a/Rx/v2/src/rxcpp/schedulers/rx-currentthread.hpp b/Rx/v2/src/rxcpp/schedulers/rx-currentthread.hpp index f9847f9..1da6cb1 100644 --- a/Rx/v2/src/rxcpp/schedulers/rx-currentthread.hpp +++ b/Rx/v2/src/rxcpp/schedulers/rx-currentthread.hpp @@ -27,19 +27,19 @@ public: struct current_thread_queue_type { std::shared_ptr<worker_interface> w; recursion r; - queue_item_time queue; + queue_item_time q; }; private: #if defined(RXCPP_THREAD_LOCAL) static current_thread_queue_type*& current_thread_queue() { - static RXCPP_THREAD_LOCAL current_thread_queue_type* queue; - return queue; + static RXCPP_THREAD_LOCAL current_thread_queue_type* q; + return q; } #else static rxu::thread_local_storage<current_thread_queue_type>& current_thread_queue() { - static rxu::thread_local_storage<current_thread_queue_type> queue; - return queue; + static rxu::thread_local_storage<current_thread_queue_type> q; + return q; } #endif @@ -58,21 +58,21 @@ public: if (!current_thread_queue()) { abort(); } - return current_thread_queue()->queue.empty(); + return current_thread_queue()->q.empty(); } static queue_item_time::const_reference top() { if (!current_thread_queue()) { abort(); } - return current_thread_queue()->queue.top(); + return current_thread_queue()->q.top(); } static void pop() { auto& state = current_thread_queue(); if (!state) { abort(); } - state->queue.pop(); - if (state->queue.empty()) { + state->q.pop(); + if (state->q.empty()) { // allow recursion state->r.reset(true); } @@ -85,7 +85,7 @@ public: if (!item.what.is_subscribed()) { return; } - state->queue.push(std::move(item)); + state->q.push(std::move(item)); // disallow recursion state->r.reset(false); } @@ -103,15 +103,15 @@ public: result->w = std::move(w); return result; } - static void set(current_thread_queue_type* queue) { + static void set(current_thread_queue_type* q) { if (!!current_thread_queue()) { abort(); } // publish new queue - current_thread_queue() = queue; + current_thread_queue() = q; } - static void destroy(current_thread_queue_type* queue) { - delete queue; + static void destroy(current_thread_queue_type* q) { + delete q; } static void destroy() { if (!current_thread_queue()) { @@ -134,7 +134,7 @@ private: typedef current_thread this_type; current_thread(const this_type&); - typedef detail::action_queue queue; + typedef detail::action_queue queue_type; struct derecurser : public worker_interface { @@ -154,11 +154,11 @@ private: } virtual void schedule(const schedulable& scbl) const { - queue::push(queue::item_type(now(), scbl)); + queue_type::push(queue_type::item_type(now(), scbl)); } virtual void schedule(clock_type::time_point when, const schedulable& scbl) const { - queue::push(queue::item_type(when, scbl)); + queue_type::push(queue_type::item_type(when, scbl)); } }; @@ -190,44 +190,44 @@ private: { // check ownership - if (queue::owned()) { + if (queue_type::owned()) { // already has an owner - delegate - queue::get_worker_interface()->schedule(when, scbl); + queue_type::get_worker_interface()->schedule(when, scbl); return; } // take ownership - queue::ensure(std::make_shared<derecurser>()); + queue_type::ensure(std::make_shared<derecurser>()); } // release ownership RXCPP_UNWIND_AUTO([]{ - queue::destroy(); + queue_type::destroy(); }); - const auto& recursor = queue::get_recursion().get_recurse(); + const auto& recursor = queue_type::get_recursion().get_recurse(); std::this_thread::sleep_until(when); if (scbl.is_subscribed()) { scbl(recursor); } - if (queue::empty()) { + if (queue_type::empty()) { return; } // loop until queue is empty for ( - auto next = queue::top().when; + auto next = queue_type::top().when; (std::this_thread::sleep_until(next), true); - next = queue::top().when + next = queue_type::top().when ) { - auto what = queue::top().what; + auto what = queue_type::top().what; - queue::pop(); + queue_type::pop(); if (what.is_subscribed()) { what(recursor); } - if (queue::empty()) { + if (queue_type::empty()) { break; } } @@ -245,10 +245,10 @@ public: { } - static bool is_schedule_required() { return !queue::owned(); } + static bool is_schedule_required() { return !queue_type::owned(); } inline bool is_tail_recursion_allowed() const { - return queue::empty(); + return queue_type::empty(); } virtual clock_type::time_point now() const { diff --git a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp index 2e24c0f..bed6ef9 100644 --- a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp +++ b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp @@ -24,7 +24,7 @@ private: private: typedef new_worker this_type; - typedef detail::action_queue queue; + typedef detail::action_queue queue_type; new_worker(const this_type&); @@ -57,7 +57,7 @@ private: composite_subscription lifetime; mutable std::mutex lock; mutable std::condition_variable wake; - mutable queue_item_time queue; + mutable queue_item_time q; std::thread worker; recursion r; }; @@ -80,31 +80,34 @@ private: auto keepAlive = state; state->lifetime.add([keepAlive](){ + std::unique_lock<std::mutex> guard(keepAlive->lock); + auto expired = std::move(keepAlive->q); + if (!keepAlive->q.empty()) abort(); keepAlive->wake.notify_one(); }); state->worker = tf([keepAlive](){ // take ownership - queue::ensure(std::make_shared<new_worker>(keepAlive)); + queue_type::ensure(std::make_shared<new_worker>(keepAlive)); // release ownership RXCPP_UNWIND_AUTO([]{ - queue::destroy(); + queue_type::destroy(); }); for(;;) { std::unique_lock<std::mutex> guard(keepAlive->lock); - if (keepAlive->queue.empty()) { + if (keepAlive->q.empty()) { keepAlive->wake.wait(guard, [keepAlive](){ - return !keepAlive->lifetime.is_subscribed() || !keepAlive->queue.empty(); + return !keepAlive->lifetime.is_subscribed() || !keepAlive->q.empty(); }); } if (!keepAlive->lifetime.is_subscribed()) { break; } - auto& peek = keepAlive->queue.top(); + auto& peek = keepAlive->q.top(); if (!peek.what.is_subscribed()) { - keepAlive->queue.pop(); + keepAlive->q.pop(); continue; } if (clock_type::now() < peek.when) { @@ -112,8 +115,8 @@ private: continue; } auto what = peek.what; - keepAlive->queue.pop(); - keepAlive->r.reset(keepAlive->queue.empty()); + keepAlive->q.pop(); + keepAlive->r.reset(keepAlive->q.empty()); guard.unlock(); what(keepAlive->r.get_recurse()); } @@ -131,7 +134,7 @@ private: virtual void schedule(clock_type::time_point when, const schedulable& scbl) const { if (scbl.is_subscribed()) { std::unique_lock<std::mutex> guard(state->lock); - state->queue.push(new_worker_state::item_type(when, scbl)); + state->q.push(new_worker_state::item_type(when, scbl)); state->r.reset(false); } state->wake.notify_one(); diff --git a/Rx/v2/src/rxcpp/schedulers/rx-test.hpp b/Rx/v2/src/rxcpp/schedulers/rx-test.hpp index 8ff981a..5e32489 100644 --- a/Rx/v2/src/rxcpp/schedulers/rx-test.hpp +++ b/Rx/v2/src/rxcpp/schedulers/rx-test.hpp @@ -284,7 +284,7 @@ class hot_observable typedef subscriber<T> observer_type; mutable std::vector<recorded_type> mv; mutable std::vector<rxn::subscription> sv; - mutable std::vector<observer_type> observers; + mutable std::list<observer_type> observers; mutable worker controller; public: @@ -312,13 +312,15 @@ public: virtual ~hot_observable() {} virtual void on_subscribe(observer_type o) const { - observers.push_back(o); + auto olocation = observers.insert(observers.end(), o); + sv.push_back(rxn::subscription(sc->clock())); auto index = sv.size() - 1; auto sharedThis = std::static_pointer_cast<const this_type>(this->shared_from_this()); - o.add([sharedThis, index]() { + o.add([sharedThis, index, olocation]() { sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock()); + sharedThis->observers.erase(olocation); }); } @@ -401,6 +403,9 @@ public: { std::shared_ptr<detail::test_type::test_type_worker> tester; public: + + ~test_worker() { + } explicit test_worker(composite_subscription cs, std::shared_ptr<detail::test_type::test_type_worker> t) : worker(cs, std::static_pointer_cast<worker_interface>(t)) diff --git a/Rx/v2/src/rxcpp/schedulers/rx-virtualtime.hpp b/Rx/v2/src/rxcpp/schedulers/rx-virtualtime.hpp index f3eb5e9..f0fc46e 100644 --- a/Rx/v2/src/rxcpp/schedulers/rx-virtualtime.hpp +++ b/Rx/v2/src/rxcpp/schedulers/rx-virtualtime.hpp @@ -174,7 +174,7 @@ struct virtual_time : public detail::virtual_time_base<Absolute, Relative> typedef detail::schedulable_queue< typename item_type::time_point_type> queue_item_time; - mutable queue_item_time queue; + mutable queue_item_time q; public: virtual ~virtual_time() @@ -191,13 +191,13 @@ protected: } virtual item_type top() const { - return queue.top(); + return q.top(); } virtual void pop() const { - queue.pop(); + q.pop(); } virtual bool empty() const { - return queue.empty(); + return q.empty(); } using base::schedule_absolute; @@ -217,7 +217,7 @@ protected: a(r.get_recurse()); } }); - queue.push(item_type(when, run)); + q.push(item_type(when, run)); } }; diff --git a/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp b/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp index 278209d..cf0c5e9 100644 --- a/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp +++ b/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp @@ -41,7 +41,7 @@ class synchronize_observer : public detail::multicast_observer<T> mutable std::mutex lock; mutable std::condition_variable wake; - mutable queue_type queue; + mutable queue_type fill_queue; composite_subscription lifetime; mutable typename mode::type current; coordinator_type coordinator; @@ -60,17 +60,17 @@ class synchronize_observer : public detail::multicast_observer<T> std::unique_lock<std::mutex> guard(lock); if (!destination.is_subscribed()) { current = mode::Disposed; - queue.clear(); + fill_queue.clear(); guard.unlock(); lifetime.unsubscribe(); return; } - if (queue.empty()) { + if (fill_queue.empty()) { current = mode::Empty; return; } - auto notification = std::move(queue.front()); - queue.pop_front(); + auto notification = std::move(fill_queue.front()); + fill_queue.pop_front(); guard.unlock(); notification->accept(destination); self(); @@ -105,7 +105,7 @@ class synchronize_observer : public detail::multicast_observer<T> void on_next(V v) const { if (lifetime.is_subscribed()) { std::unique_lock<std::mutex> guard(lock); - queue.push_back(notification_type::on_next(std::move(v))); + fill_queue.push_back(notification_type::on_next(std::move(v))); ensure_processing(guard); } wake.notify_one(); @@ -113,7 +113,7 @@ class synchronize_observer : public detail::multicast_observer<T> void on_error(std::exception_ptr e) const { if (lifetime.is_subscribed()) { std::unique_lock<std::mutex> guard(lock); - queue.push_back(notification_type::on_error(e)); + fill_queue.push_back(notification_type::on_error(e)); ensure_processing(guard); } wake.notify_one(); @@ -121,7 +121,7 @@ class synchronize_observer : public detail::multicast_observer<T> void on_completed() const { if (lifetime.is_subscribed()) { std::unique_lock<std::mutex> guard(lock); - queue.push_back(notification_type::on_completed()); + fill_queue.push_back(notification_type::on_completed()); ensure_processing(guard); } wake.notify_one(); diff --git a/appveyor.yml b/appveyor.yml index 3f1a0ce..304ee3c 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -26,6 +26,16 @@ build: test_script: - build\test\Debug\rxcppv2_test.exe +artifacts: + - path: Rx\v2\src\ + name: rxcpp source + type: zip + - path: Rx\v2\examples\ + name: rxcpp examples + type: zip + notifications: - slack: - secure: qaGjbI98VXZa7Zd2s3RmMzfA+ymrfWUDuzevdtOcHssEGBXbcoOJzLHNOmG+Y1nX + - provider: Slack + auth_token: + secure: qaGjbI98VXZa7Zd2s3RmMzfA+ymrfWUDuzevdtOcHssEGBXbcoOJzLHNOmG+Y1nX + channel: rxcpp |