aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKirk Shoop <kirk.shoop@microsoft.com>2015-06-12 07:35:12 -0700
committerKirk Shoop <kirk.shoop@microsoft.com>2015-06-18 19:55:10 -0700
commit83351c7ba1379bdf1978e3aecde5c604bfec0ffe (patch)
tree6a0d2a0fd22dd80f53492dbb6804438a079c8b72
parent6a72451c421947eedbe7feaf1f523af193320a43 (diff)
downloadplatform_external_Reactive-Extensions_RxCpp-83351c7ba1379bdf1978e3aecde5c604bfec0ffe.tar.gz
platform_external_Reactive-Extensions_RxCpp-83351c7ba1379bdf1978e3aecde5c604bfec0ffe.tar.bz2
platform_external_Reactive-Extensions_RxCpp-83351c7ba1379bdf1978e3aecde5c604bfec0ffe.zip
fixes worker leaks (#101 & #132)
uses weak pointer to hold the worker in schedulable, empties queue when new_thread worker lifetime ends and remove observer when hot_observable subscription ends.
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-observe_on.hpp24
-rw-r--r--Rx/v2/src/rxcpp/rx-notification.hpp6
-rw-r--r--Rx/v2/src/rxcpp/rx-scheduler.hpp73
-rw-r--r--Rx/v2/src/rxcpp/rx-subscriber.hpp3
-rw-r--r--Rx/v2/src/rxcpp/schedulers/rx-currentthread.hpp60
-rw-r--r--Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp25
-rw-r--r--Rx/v2/src/rxcpp/schedulers/rx-test.hpp11
-rw-r--r--Rx/v2/src/rxcpp/schedulers/rx-virtualtime.hpp10
-rw-r--r--Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp16
-rw-r--r--appveyor.yml14
10 files changed, 147 insertions, 95 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp
index dfad938..3d25e78 100644
--- a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp
@@ -39,7 +39,7 @@ struct observe_on
typedef rxn::notification<T> notification_type;
typedef typename notification_type::type base_notification_type;
- typedef std::queue<base_notification_type> queue_type;
+ typedef std::deque<base_notification_type> queue_type;
struct mode
{
@@ -54,7 +54,7 @@ struct observe_on
struct observe_on_state : std::enable_shared_from_this<observe_on_state>
{
mutable std::mutex lock;
- mutable queue_type queue;
+ mutable queue_type fill_queue;
mutable queue_type drain_queue;
composite_subscription lifetime;
mutable typename mode::type current;
@@ -84,25 +84,25 @@ struct observe_on
if (drain_queue.empty() || !destination.is_subscribed()) {
std::unique_lock<std::mutex> guard(lock);
if (!destination.is_subscribed() ||
- (!lifetime.is_subscribed() && queue.empty() && drain_queue.empty())) {
+ (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty())) {
current = mode::Disposed;
queue_type expired;
- swap(expired, queue);
+ swap(expired, fill_queue);
guard.unlock();
lifetime.unsubscribe();
destination.unsubscribe();
return;
}
if (drain_queue.empty()) {
- if (queue.empty()) {
+ if (fill_queue.empty()) {
current = mode::Empty;
return;
}
- swap(queue, drain_queue);
+ swap(fill_queue, drain_queue);
}
}
auto notification = std::move(drain_queue.front());
- drain_queue.pop();
+ drain_queue.pop_front();
notification->accept(destination);
self();
} catch(...) {
@@ -110,7 +110,7 @@ struct observe_on
std::unique_lock<std::mutex> guard(lock);
current = mode::Errored;
queue_type expired;
- swap(expired, queue);
+ swap(expired, fill_queue);
}
};
@@ -121,7 +121,7 @@ struct observe_on
current = mode::Errored;
using std::swap;
queue_type expired;
- swap(expired, queue);
+ swap(expired, fill_queue);
return;
}
@@ -143,17 +143,17 @@ struct observe_on
void on_next(source_value_type v) const {
std::unique_lock<std::mutex> guard(state->lock);
- state->queue.push(notification_type::on_next(std::move(v)));
+ state->fill_queue.push_back(notification_type::on_next(std::move(v)));
state->ensure_processing(guard);
}
void on_error(std::exception_ptr e) const {
std::unique_lock<std::mutex> guard(state->lock);
- state->queue.push(notification_type::on_error(e));
+ state->fill_queue.push_back(notification_type::on_error(e));
state->ensure_processing(guard);
}
void on_completed() const {
std::unique_lock<std::mutex> guard(state->lock);
- state->queue.push(notification_type::on_completed());
+ state->fill_queue.push_back(notification_type::on_completed());
state->ensure_processing(guard);
}
diff --git a/Rx/v2/src/rxcpp/rx-notification.hpp b/Rx/v2/src/rxcpp/rx-notification.hpp
index 3928fd5..56d82d8 100644
--- a/Rx/v2/src/rxcpp/rx-notification.hpp
+++ b/Rx/v2/src/rxcpp/rx-notification.hpp
@@ -119,6 +119,9 @@ private:
struct on_next_notification : public base {
on_next_notification(T value) : value(std::move(value)) {
}
+ on_next_notification(const on_next_notification& o) : value(o.value) {}
+ on_next_notification(const on_next_notification&& o) : value(std::move(o.value)) {}
+ on_next_notification& operator=(on_next_notification o) { value = std::move(o.value); return *this; }
virtual void out(std::ostream& os) const {
os << "on_next( ";
detail::to_stream(os, value, 0, 0);
@@ -140,6 +143,9 @@ private:
struct on_error_notification : public base {
on_error_notification(std::exception_ptr ep) : ep(ep) {
}
+ on_error_notification(const on_error_notification& o) : ep(o.ep) {}
+ on_error_notification(const on_error_notification&& o) : ep(std::move(o.ep)) {}
+ on_error_notification& operator=(on_error_notification o) { ep = std::move(o.ep); return *this; }
virtual void out(std::ostream& os) const {
os << "on_error(";
try {
diff --git a/Rx/v2/src/rxcpp/rx-scheduler.hpp b/Rx/v2/src/rxcpp/rx-scheduler.hpp
index 40d3337..b842775 100644
--- a/Rx/v2/src/rxcpp/rx-scheduler.hpp
+++ b/Rx/v2/src/rxcpp/rx-scheduler.hpp
@@ -22,6 +22,9 @@ typedef std::shared_ptr<action_type> action_ptr;
typedef std::shared_ptr<worker_interface> worker_interface_ptr;
typedef std::shared_ptr<const worker_interface> const_worker_interface_ptr;
+typedef std::weak_ptr<worker_interface> worker_interface_weak_ptr;
+typedef std::weak_ptr<const worker_interface> const_worker_interface_weak_ptr;
+
typedef std::shared_ptr<scheduler_interface> scheduler_interface_ptr;
typedef std::shared_ptr<const scheduler_interface> const_scheduler_interface_ptr;
@@ -189,6 +192,8 @@ struct is_action_function
}
+class weak_worker;
+
/// a worker ensures that all scheduled actions on the same instance are executed in-order with no overlap
/// a worker ensures that all scheduled actions are unsubscribed when it is unsubscribed
/// some inner implementations will impose additional constraints on the execution of items.
@@ -198,6 +203,7 @@ class worker : public worker_base
detail::worker_interface_ptr inner;
composite_subscription lifetime;
friend bool operator==(const worker&, const worker&);
+ friend class weak_worker;
public:
typedef scheduler_base::clock_type clock_type;
typedef composite_subscription::weak_subscription weak_subscription;
@@ -323,6 +329,26 @@ inline bool operator==(const worker& lhs, const worker& rhs) {
inline bool operator!=(const worker& lhs, const worker& rhs) {
return !(lhs == rhs);
}
+
+class weak_worker
+{
+ detail::worker_interface_weak_ptr inner;
+ composite_subscription lifetime;
+
+public:
+ weak_worker()
+ {
+ }
+ explicit weak_worker(worker& owner)
+ : inner(owner.inner)
+ , lifetime(owner.lifetime)
+ {
+ }
+
+ worker lock() const {
+ return worker(lifetime, inner.lock());
+ }
+};
class scheduler_interface
: public std::enable_shared_from_this<scheduler_interface>
@@ -399,7 +425,7 @@ class schedulable : public schedulable_base
typedef schedulable this_type;
composite_subscription lifetime;
- worker controller;
+ weak_worker controller;
action activity;
bool scoped;
composite_subscription::weak_subscription action_scope;
@@ -471,7 +497,7 @@ public:
~schedulable()
{
if (scoped) {
- controller.remove(action_scope);
+ controller.lock().remove(action_scope);
}
}
schedulable()
@@ -482,7 +508,7 @@ public:
/// action and worker share lifetime
schedulable(worker q, action a)
: lifetime(q.get_subscription())
- , controller(std::move(q))
+ , controller(q)
, activity(std::move(a))
, scoped(false)
{
@@ -490,19 +516,19 @@ public:
/// action and worker have independent lifetimes
schedulable(composite_subscription cs, worker q, action a)
: lifetime(std::move(cs))
- , controller(std::move(q))
+ , controller(q)
, activity(std::move(a))
, scoped(true)
- , action_scope(controller.add(lifetime))
+ , action_scope(controller.lock().add(lifetime))
{
}
/// inherit lifetimes
schedulable(schedulable scbl, worker q, action a)
: lifetime(scbl.get_subscription())
- , controller(std::move(q))
+ , controller(q)
, activity(std::move(a))
, scoped(scbl.scoped)
- , action_scope(scbl.scoped ? controller.add(lifetime) : weak_subscription())
+ , action_scope(scbl.scoped ? controller.lock().add(lifetime) : weak_subscription())
{
}
@@ -512,11 +538,11 @@ public:
inline composite_subscription& get_subscription() {
return lifetime;
}
- inline const worker& get_worker() const {
- return controller;
+ inline const worker get_worker() const {
+ return controller.lock();
}
- inline worker& get_worker() {
- return controller;
+ inline worker get_worker() {
+ return controller.lock();
}
inline const action& get_action() const {
return activity;
@@ -577,24 +603,24 @@ public:
// scheduler
//
inline clock_type::time_point now() const {
- return controller.now();
+ return controller.lock().now();
}
/// put this on the queue of the stored scheduler to run asap
inline void schedule() const {
if (is_subscribed()) {
- controller.schedule(*this);
+ get_worker().schedule(*this);
}
}
/// put this on the queue of the stored scheduler to run at the specified time
inline void schedule(clock_type::time_point when) const {
if (is_subscribed()) {
- controller.schedule(when, *this);
+ get_worker().schedule(when, *this);
}
}
/// put this on the queue of the stored scheduler to run after a delay from now
inline void schedule(clock_type::duration when) const {
if (is_subscribed()) {
- controller.schedule(when, *this);
+ get_worker().schedule(when, *this);
}
}
@@ -786,11 +812,12 @@ auto worker::schedule_periodically(clock_type::time_point initial, clock_type::d
}
template<class... ArgN>
void worker::schedule_periodically_rebind(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl, ArgN&&... an) const {
+ auto keepAlive = *this;
auto target = std::make_shared<clock_type::time_point>(initial);
- auto activity = make_schedulable(scbl, *this, std::forward<ArgN>(an)...);
+ auto activity = make_schedulable(scbl, keepAlive, std::forward<ArgN>(an)...);
auto periodic = make_schedulable(
activity,
- [target, period, activity](schedulable self) {
+ [keepAlive, target, period, activity](schedulable self) {
// any recursion requests will be pushed to the scheduler queue
recursion r(false);
// call action
@@ -852,28 +879,28 @@ private:
compare_elem
> queue_type;
- queue_type queue;
+ queue_type q;
int64_t ordinal;
public:
const_reference top() const {
- return queue.top().first;
+ return q.top().first;
}
void pop() {
- queue.pop();
+ q.pop();
}
bool empty() const {
- return queue.empty();
+ return q.empty();
}
void push(const item_type& value) {
- queue.push(elem_type(value, ordinal++));
+ q.push(elem_type(value, ordinal++));
}
void push(item_type&& value) {
- queue.push(elem_type(std::move(value), ordinal++));
+ q.push(elem_type(std::move(value), ordinal++));
}
};
diff --git a/Rx/v2/src/rxcpp/rx-subscriber.hpp b/Rx/v2/src/rxcpp/rx-subscriber.hpp
index 9e6f263..cfb62c6 100644
--- a/Rx/v2/src/rxcpp/rx-subscriber.hpp
+++ b/Rx/v2/src/rxcpp/rx-subscriber.hpp
@@ -249,7 +249,8 @@ auto make_subscriber(
template<class T, class Observer>
auto make_subscriber(const Observer& o)
-> typename std::enable_if<
- is_observer<Observer>::value,
+ is_observer<Observer>::value &&
+ !is_subscriber<Observer>::value,
subscriber<T, Observer>>::type {
return subscriber<T, Observer>(trace_id::make_next_id_subscriber(), composite_subscription(), o);
}
diff --git a/Rx/v2/src/rxcpp/schedulers/rx-currentthread.hpp b/Rx/v2/src/rxcpp/schedulers/rx-currentthread.hpp
index f9847f9..1da6cb1 100644
--- a/Rx/v2/src/rxcpp/schedulers/rx-currentthread.hpp
+++ b/Rx/v2/src/rxcpp/schedulers/rx-currentthread.hpp
@@ -27,19 +27,19 @@ public:
struct current_thread_queue_type {
std::shared_ptr<worker_interface> w;
recursion r;
- queue_item_time queue;
+ queue_item_time q;
};
private:
#if defined(RXCPP_THREAD_LOCAL)
static current_thread_queue_type*& current_thread_queue() {
- static RXCPP_THREAD_LOCAL current_thread_queue_type* queue;
- return queue;
+ static RXCPP_THREAD_LOCAL current_thread_queue_type* q;
+ return q;
}
#else
static rxu::thread_local_storage<current_thread_queue_type>& current_thread_queue() {
- static rxu::thread_local_storage<current_thread_queue_type> queue;
- return queue;
+ static rxu::thread_local_storage<current_thread_queue_type> q;
+ return q;
}
#endif
@@ -58,21 +58,21 @@ public:
if (!current_thread_queue()) {
abort();
}
- return current_thread_queue()->queue.empty();
+ return current_thread_queue()->q.empty();
}
static queue_item_time::const_reference top() {
if (!current_thread_queue()) {
abort();
}
- return current_thread_queue()->queue.top();
+ return current_thread_queue()->q.top();
}
static void pop() {
auto& state = current_thread_queue();
if (!state) {
abort();
}
- state->queue.pop();
- if (state->queue.empty()) {
+ state->q.pop();
+ if (state->q.empty()) {
// allow recursion
state->r.reset(true);
}
@@ -85,7 +85,7 @@ public:
if (!item.what.is_subscribed()) {
return;
}
- state->queue.push(std::move(item));
+ state->q.push(std::move(item));
// disallow recursion
state->r.reset(false);
}
@@ -103,15 +103,15 @@ public:
result->w = std::move(w);
return result;
}
- static void set(current_thread_queue_type* queue) {
+ static void set(current_thread_queue_type* q) {
if (!!current_thread_queue()) {
abort();
}
// publish new queue
- current_thread_queue() = queue;
+ current_thread_queue() = q;
}
- static void destroy(current_thread_queue_type* queue) {
- delete queue;
+ static void destroy(current_thread_queue_type* q) {
+ delete q;
}
static void destroy() {
if (!current_thread_queue()) {
@@ -134,7 +134,7 @@ private:
typedef current_thread this_type;
current_thread(const this_type&);
- typedef detail::action_queue queue;
+ typedef detail::action_queue queue_type;
struct derecurser : public worker_interface
{
@@ -154,11 +154,11 @@ private:
}
virtual void schedule(const schedulable& scbl) const {
- queue::push(queue::item_type(now(), scbl));
+ queue_type::push(queue_type::item_type(now(), scbl));
}
virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
- queue::push(queue::item_type(when, scbl));
+ queue_type::push(queue_type::item_type(when, scbl));
}
};
@@ -190,44 +190,44 @@ private:
{
// check ownership
- if (queue::owned()) {
+ if (queue_type::owned()) {
// already has an owner - delegate
- queue::get_worker_interface()->schedule(when, scbl);
+ queue_type::get_worker_interface()->schedule(when, scbl);
return;
}
// take ownership
- queue::ensure(std::make_shared<derecurser>());
+ queue_type::ensure(std::make_shared<derecurser>());
}
// release ownership
RXCPP_UNWIND_AUTO([]{
- queue::destroy();
+ queue_type::destroy();
});
- const auto& recursor = queue::get_recursion().get_recurse();
+ const auto& recursor = queue_type::get_recursion().get_recurse();
std::this_thread::sleep_until(when);
if (scbl.is_subscribed()) {
scbl(recursor);
}
- if (queue::empty()) {
+ if (queue_type::empty()) {
return;
}
// loop until queue is empty
for (
- auto next = queue::top().when;
+ auto next = queue_type::top().when;
(std::this_thread::sleep_until(next), true);
- next = queue::top().when
+ next = queue_type::top().when
) {
- auto what = queue::top().what;
+ auto what = queue_type::top().what;
- queue::pop();
+ queue_type::pop();
if (what.is_subscribed()) {
what(recursor);
}
- if (queue::empty()) {
+ if (queue_type::empty()) {
break;
}
}
@@ -245,10 +245,10 @@ public:
{
}
- static bool is_schedule_required() { return !queue::owned(); }
+ static bool is_schedule_required() { return !queue_type::owned(); }
inline bool is_tail_recursion_allowed() const {
- return queue::empty();
+ return queue_type::empty();
}
virtual clock_type::time_point now() const {
diff --git a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
index 2e24c0f..bed6ef9 100644
--- a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
+++ b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
@@ -24,7 +24,7 @@ private:
private:
typedef new_worker this_type;
- typedef detail::action_queue queue;
+ typedef detail::action_queue queue_type;
new_worker(const this_type&);
@@ -57,7 +57,7 @@ private:
composite_subscription lifetime;
mutable std::mutex lock;
mutable std::condition_variable wake;
- mutable queue_item_time queue;
+ mutable queue_item_time q;
std::thread worker;
recursion r;
};
@@ -80,31 +80,34 @@ private:
auto keepAlive = state;
state->lifetime.add([keepAlive](){
+ std::unique_lock<std::mutex> guard(keepAlive->lock);
+ auto expired = std::move(keepAlive->q);
+ if (!keepAlive->q.empty()) abort();
keepAlive->wake.notify_one();
});
state->worker = tf([keepAlive](){
// take ownership
- queue::ensure(std::make_shared<new_worker>(keepAlive));
+ queue_type::ensure(std::make_shared<new_worker>(keepAlive));
// release ownership
RXCPP_UNWIND_AUTO([]{
- queue::destroy();
+ queue_type::destroy();
});
for(;;) {
std::unique_lock<std::mutex> guard(keepAlive->lock);
- if (keepAlive->queue.empty()) {
+ if (keepAlive->q.empty()) {
keepAlive->wake.wait(guard, [keepAlive](){
- return !keepAlive->lifetime.is_subscribed() || !keepAlive->queue.empty();
+ return !keepAlive->lifetime.is_subscribed() || !keepAlive->q.empty();
});
}
if (!keepAlive->lifetime.is_subscribed()) {
break;
}
- auto& peek = keepAlive->queue.top();
+ auto& peek = keepAlive->q.top();
if (!peek.what.is_subscribed()) {
- keepAlive->queue.pop();
+ keepAlive->q.pop();
continue;
}
if (clock_type::now() < peek.when) {
@@ -112,8 +115,8 @@ private:
continue;
}
auto what = peek.what;
- keepAlive->queue.pop();
- keepAlive->r.reset(keepAlive->queue.empty());
+ keepAlive->q.pop();
+ keepAlive->r.reset(keepAlive->q.empty());
guard.unlock();
what(keepAlive->r.get_recurse());
}
@@ -131,7 +134,7 @@ private:
virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
if (scbl.is_subscribed()) {
std::unique_lock<std::mutex> guard(state->lock);
- state->queue.push(new_worker_state::item_type(when, scbl));
+ state->q.push(new_worker_state::item_type(when, scbl));
state->r.reset(false);
}
state->wake.notify_one();
diff --git a/Rx/v2/src/rxcpp/schedulers/rx-test.hpp b/Rx/v2/src/rxcpp/schedulers/rx-test.hpp
index 8ff981a..5e32489 100644
--- a/Rx/v2/src/rxcpp/schedulers/rx-test.hpp
+++ b/Rx/v2/src/rxcpp/schedulers/rx-test.hpp
@@ -284,7 +284,7 @@ class hot_observable
typedef subscriber<T> observer_type;
mutable std::vector<recorded_type> mv;
mutable std::vector<rxn::subscription> sv;
- mutable std::vector<observer_type> observers;
+ mutable std::list<observer_type> observers;
mutable worker controller;
public:
@@ -312,13 +312,15 @@ public:
virtual ~hot_observable() {}
virtual void on_subscribe(observer_type o) const {
- observers.push_back(o);
+ auto olocation = observers.insert(observers.end(), o);
+
sv.push_back(rxn::subscription(sc->clock()));
auto index = sv.size() - 1;
auto sharedThis = std::static_pointer_cast<const this_type>(this->shared_from_this());
- o.add([sharedThis, index]() {
+ o.add([sharedThis, index, olocation]() {
sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock());
+ sharedThis->observers.erase(olocation);
});
}
@@ -401,6 +403,9 @@ public:
{
std::shared_ptr<detail::test_type::test_type_worker> tester;
public:
+
+ ~test_worker() {
+ }
explicit test_worker(composite_subscription cs, std::shared_ptr<detail::test_type::test_type_worker> t)
: worker(cs, std::static_pointer_cast<worker_interface>(t))
diff --git a/Rx/v2/src/rxcpp/schedulers/rx-virtualtime.hpp b/Rx/v2/src/rxcpp/schedulers/rx-virtualtime.hpp
index f3eb5e9..f0fc46e 100644
--- a/Rx/v2/src/rxcpp/schedulers/rx-virtualtime.hpp
+++ b/Rx/v2/src/rxcpp/schedulers/rx-virtualtime.hpp
@@ -174,7 +174,7 @@ struct virtual_time : public detail::virtual_time_base<Absolute, Relative>
typedef detail::schedulable_queue<
typename item_type::time_point_type> queue_item_time;
- mutable queue_item_time queue;
+ mutable queue_item_time q;
public:
virtual ~virtual_time()
@@ -191,13 +191,13 @@ protected:
}
virtual item_type top() const {
- return queue.top();
+ return q.top();
}
virtual void pop() const {
- queue.pop();
+ q.pop();
}
virtual bool empty() const {
- return queue.empty();
+ return q.empty();
}
using base::schedule_absolute;
@@ -217,7 +217,7 @@ protected:
a(r.get_recurse());
}
});
- queue.push(item_type(when, run));
+ q.push(item_type(when, run));
}
};
diff --git a/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp b/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp
index 278209d..cf0c5e9 100644
--- a/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp
+++ b/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp
@@ -41,7 +41,7 @@ class synchronize_observer : public detail::multicast_observer<T>
mutable std::mutex lock;
mutable std::condition_variable wake;
- mutable queue_type queue;
+ mutable queue_type fill_queue;
composite_subscription lifetime;
mutable typename mode::type current;
coordinator_type coordinator;
@@ -60,17 +60,17 @@ class synchronize_observer : public detail::multicast_observer<T>
std::unique_lock<std::mutex> guard(lock);
if (!destination.is_subscribed()) {
current = mode::Disposed;
- queue.clear();
+ fill_queue.clear();
guard.unlock();
lifetime.unsubscribe();
return;
}
- if (queue.empty()) {
+ if (fill_queue.empty()) {
current = mode::Empty;
return;
}
- auto notification = std::move(queue.front());
- queue.pop_front();
+ auto notification = std::move(fill_queue.front());
+ fill_queue.pop_front();
guard.unlock();
notification->accept(destination);
self();
@@ -105,7 +105,7 @@ class synchronize_observer : public detail::multicast_observer<T>
void on_next(V v) const {
if (lifetime.is_subscribed()) {
std::unique_lock<std::mutex> guard(lock);
- queue.push_back(notification_type::on_next(std::move(v)));
+ fill_queue.push_back(notification_type::on_next(std::move(v)));
ensure_processing(guard);
}
wake.notify_one();
@@ -113,7 +113,7 @@ class synchronize_observer : public detail::multicast_observer<T>
void on_error(std::exception_ptr e) const {
if (lifetime.is_subscribed()) {
std::unique_lock<std::mutex> guard(lock);
- queue.push_back(notification_type::on_error(e));
+ fill_queue.push_back(notification_type::on_error(e));
ensure_processing(guard);
}
wake.notify_one();
@@ -121,7 +121,7 @@ class synchronize_observer : public detail::multicast_observer<T>
void on_completed() const {
if (lifetime.is_subscribed()) {
std::unique_lock<std::mutex> guard(lock);
- queue.push_back(notification_type::on_completed());
+ fill_queue.push_back(notification_type::on_completed());
ensure_processing(guard);
}
wake.notify_one();
diff --git a/appveyor.yml b/appveyor.yml
index 3f1a0ce..304ee3c 100644
--- a/appveyor.yml
+++ b/appveyor.yml
@@ -26,6 +26,16 @@ build:
test_script:
- build\test\Debug\rxcppv2_test.exe
+artifacts:
+ - path: Rx\v2\src\
+ name: rxcpp source
+ type: zip
+ - path: Rx\v2\examples\
+ name: rxcpp examples
+ type: zip
+
notifications:
- slack:
- secure: qaGjbI98VXZa7Zd2s3RmMzfA+ymrfWUDuzevdtOcHssEGBXbcoOJzLHNOmG+Y1nX
+ - provider: Slack
+ auth_token:
+ secure: qaGjbI98VXZa7Zd2s3RmMzfA+ymrfWUDuzevdtOcHssEGBXbcoOJzLHNOmG+Y1nX
+ channel: rxcpp