aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBertrand Simonnet <bsimonnet@google.com>2015-08-17 18:16:02 +0000
committerGerrit Code Review <noreply-gerritcodereview@google.com>2015-08-17 18:16:02 +0000
commite40a06405f1e8e8de4a6d051b3527790cc96f542 (patch)
tree634f2b990123d0c285d5250cd85fce0ec12d9108
parentd88086808d10ae3d3e48a0e5638d68e8367088ee (diff)
parent07c1779e51680364060f3ec289249869ac7bc5ca (diff)
downloadplatform_external_libbrillo-e40a06405f1e8e8de4a6d051b3527790cc96f542.tar.gz
platform_external_libbrillo-e40a06405f1e8e8de4a6d051b3527790cc96f542.tar.bz2
platform_external_libbrillo-e40a06405f1e8e8de4a6d051b3527790cc96f542.zip
Merge "libchromeos: Prevent starvation in BaseMessageLoop."
-rw-r--r--chromeos/message_loops/base_message_loop.cc123
-rw-r--r--chromeos/message_loops/base_message_loop.h34
-rw-r--r--chromeos/message_loops/message_loop_unittest.cc97
3 files changed, 226 insertions, 28 deletions
diff --git a/chromeos/message_loops/base_message_loop.cc b/chromeos/message_loops/base_message_loop.cc
index dbca1ef..4923138 100644
--- a/chromeos/message_loops/base_message_loop.cc
+++ b/chromeos/message_loops/base_message_loop.cc
@@ -25,7 +25,7 @@ BaseMessageLoop::~BaseMessageLoop() {
DVLOG_LOC(io_task.second.location(), 1)
<< "Removing file descriptor watcher task_id " << io_task.first
<< " leaked on BaseMessageLoop, scheduled from this location.";
- io_task.second.fd_watcher()->StopWatchingFileDescriptor();
+ io_task.second.StopWatching();
}
// Note all pending canceled delayed tasks when destroying the message loop.
@@ -91,14 +91,11 @@ MessageLoop::TaskId BaseMessageLoop::WatchFileDescriptor(
auto it_bool = io_tasks_.emplace(
std::piecewise_construct,
std::forward_as_tuple(task_id),
- std::forward_as_tuple(from_here, this, task_id, persistent, task));
+ std::forward_as_tuple(
+ from_here, this, task_id, fd, base_mode, persistent, task));
// This should always insert a new element.
DCHECK(it_bool.second);
- IOTask* new_io_task = &it_bool.first->second;
-
- bool scheduled = base_loop_->WatchFileDescriptor(
- fd, persistent, base_mode, new_io_task->fd_watcher(), new_io_task);
-
+ bool scheduled = it_bool.first->second.StartWatching();
DVLOG_LOC(from_here, 1)
<< "Watching fd " << fd << " for "
<< (mode == MessageLoop::kWatchRead ? "reading" : "writing")
@@ -122,13 +119,7 @@ bool BaseMessageLoop::CancelTask(TaskId task_id) {
auto io_task_it = io_tasks_.find(task_id);
if (io_task_it == io_tasks_.end())
return false;
-
- DVLOG_LOC(io_task_it->second.location(), 1)
- << "Removing task_id " << task_id << " scheduled from this location.";
- // Destroying the FileDescriptorWatcher implicitly stops watching the file
- // descriptor.
- io_tasks_.erase(io_task_it);
- return true;
+ return io_task_it->second.CancelTask();
}
// A DelayedTask was found for this task_id at this point.
@@ -219,35 +210,98 @@ void BaseMessageLoop::OnRanPostedTask(MessageLoop::TaskId task_id) {
delayed_tasks_.erase(task_it);
}
+void BaseMessageLoop::OnFileReadyPostedTask(MessageLoop::TaskId task_id) {
+ auto task_it = io_tasks_.find(task_id);
+ // Even if this task was canceled while we were waiting in the message loop
+ // for this method to run, the entry in io_tasks_ should still be present, but
+ // won't do anything.
+ DCHECK(task_it != io_tasks_.end());
+ task_it->second.OnFileReadyPostedTask();
+}
+
BaseMessageLoop::IOTask::IOTask(const tracked_objects::Location& location,
BaseMessageLoop* loop,
MessageLoop::TaskId task_id,
+ int fd,
+ base::MessageLoopForIO::Mode base_mode,
bool persistent,
const Closure& task)
: location_(location), loop_(loop), task_id_(task_id),
- persistent_(persistent), closure_(task) {}
+ fd_(fd), base_mode_(base_mode), persistent_(persistent), closure_(task) {}
-void BaseMessageLoop::IOTask::OnFileCanReadWithoutBlocking(int fd) {
- OnFileReady(fd);
+bool BaseMessageLoop::IOTask::StartWatching() {
+ return loop_->base_loop_->WatchFileDescriptor(
+ fd_, persistent_, base_mode_, &fd_watcher_, this);
}
-void BaseMessageLoop::IOTask::OnFileCanWriteWithoutBlocking(int fd) {
- OnFileReady(fd);
+void BaseMessageLoop::IOTask::StopWatching() {
+ // This is safe to call even if we are not watching for it.
+ fd_watcher_.StopWatchingFileDescriptor();
+}
+
+void BaseMessageLoop::IOTask::OnFileCanReadWithoutBlocking(int /* fd */) {
+ OnFileReady();
+}
+
+void BaseMessageLoop::IOTask::OnFileCanWriteWithoutBlocking(int /* fd */) {
+ OnFileReady();
+}
+
+void BaseMessageLoop::IOTask::OnFileReady() {
+ // When the file descriptor becomes available we stop watching for it and
+ // schedule a task to run the callback from the main loop. The callback will
+ // run using the same scheduler use to run other delayed tasks, avoiding
+ // starvation of the available posted tasks if there are file descriptors
+ // always available. The new posted task will use the same TaskId as the
+ // current file descriptor watching task an could be canceled in either state,
+ // when waiting for the file descriptor or waiting in the main loop.
+ StopWatching();
+ bool base_scheduled = loop_->base_loop_->task_runner()->PostTask(
+ location_,
+ base::Bind(&BaseMessageLoop::OnFileReadyPostedTask,
+ loop_->weak_ptr_factory_.GetWeakPtr(),
+ task_id_));
+ posted_task_pending_ = true;
+ if (base_scheduled) {
+ DVLOG_LOC(location_, 1)
+ << "Dispatching task_id " << task_id_ << " for "
+ << (base_mode_ == base::MessageLoopForIO::WATCH_READ ?
+ "reading" : "writing")
+ << " file descriptor " << fd_ << ", scheduled from this location.";
+ } else {
+ // In the rare case that PostTask() fails, we fall back to run it directly.
+ // This would indicate a bigger problem with the message loop setup.
+ LOG(ERROR) << "Error on base::MessageLoopForIO::PostTask().";
+ OnFileReadyPostedTask();
+ }
}
-void BaseMessageLoop::IOTask::OnFileReady(int fd) {
+void BaseMessageLoop::IOTask::OnFileReadyPostedTask() {
// We can't access |this| after running the |closure_| since it could call
// CancelTask on its own task_id, so we copy the members we need now.
BaseMessageLoop* loop_ptr = loop_;
+ DCHECK(posted_task_pending_ = true);
+ posted_task_pending_ = false;
+
+ // If this task was already canceled, the closure will be null and there is
+ // nothing else to do here. This execution doesn't count a step for RunOnce()
+ // unless we have a callback to run.
+ if (closure_.is_null()) {
+ loop_->io_tasks_.erase(task_id_);
+ return;
+ }
DVLOG_LOC(location_, 1)
- << "Running task_id " << task_id_
- << " for watching file descriptor " << fd
- << ", scheduled from this location.";
+ << "Running task_id " << task_id_ << " for "
+ << (base_mode_ == base::MessageLoopForIO::WATCH_READ ?
+ "reading" : "writing")
+ << " file descriptor " << fd_ << ", scheduled from this location.";
if (persistent_) {
// In the persistent case we just run the callback. If this callback cancels
- // the task id, we can't access |this| anymore.
+ // the task id, we can't access |this| anymore, so we re-start watching the
+ // file descriptor before running the callback.
+ StartWatching();
closure_.Run();
} else {
// This will destroy |this|, the fd_watcher and therefore stop watching this
@@ -264,4 +318,25 @@ void BaseMessageLoop::IOTask::OnFileReady(int fd) {
}
}
+bool BaseMessageLoop::IOTask::CancelTask() {
+ if (closure_.is_null())
+ return false;
+
+ DVLOG_LOC(location_, 1)
+ << "Removing task_id " << task_id_ << " scheduled from this location.";
+
+ if (!posted_task_pending_) {
+ // Destroying the FileDescriptorWatcher implicitly stops watching the file
+ // descriptor. This will delete our instance.
+ loop_->io_tasks_.erase(task_id_);
+ return true;
+ }
+ // The IOTask is waiting for the message loop to run its delayed task, so
+ // it is not watching for the file descriptor. We release the closure
+ // resources now but keep the IOTask instance alive while we wait for the
+ // callback to run and delete the IOTask.
+ closure_ = Closure();
+ return true;
+}
+
} // namespace chromeos
diff --git a/chromeos/message_loops/base_message_loop.h b/chromeos/message_loops/base_message_loop.h
index 2014c71..902b828 100644
--- a/chromeos/message_loops/base_message_loop.h
+++ b/chromeos/message_loops/base_message_loop.h
@@ -55,6 +55,12 @@ class CHROMEOS_EXPORT BaseMessageLoop : public MessageLoop {
// scheduled with Post*Task() of id |task_id|, even if it was canceled.
void OnRanPostedTask(MessageLoop::TaskId task_id);
+ // Called from the message loop when the IOTask should run the scheduled
+ // callback. This is a simple wrapper of IOTask::OnFileReadyPostedTask()
+ // posted from the BaseMessageLoop so it is deleted when the BaseMessageLoop
+ // goes out of scope since we can't cancel the callback otherwise.
+ void OnFileReadyPostedTask(MessageLoop::TaskId task_id);
+
// Return a new unused task_id.
TaskId NextTaskId();
@@ -72,30 +78,50 @@ class CHROMEOS_EXPORT BaseMessageLoop : public MessageLoop {
IOTask(const tracked_objects::Location& location,
BaseMessageLoop* loop,
MessageLoop::TaskId task_id,
+ int fd,
+ base::MessageLoopForIO::Mode base_mode,
bool persistent,
const base::Closure& task);
const tracked_objects::Location& location() const { return location_; }
- base::MessageLoopForIO::FileDescriptorWatcher* fd_watcher() {
- return &fd_watcher_;
- }
+
+ // Used to start/stop watching the file descriptor while keeping the
+ // IOTask entry available.
+ bool StartWatching();
+ void StopWatching();
+
+ // Called from the message loop as a PostTask() when the file descriptor is
+ // available, scheduled to run from OnFileReady().
+ void OnFileReadyPostedTask();
+
+ // Cancel the IOTask and returns whether it was actually canceled, with the
+ // same semantics as MessageLoop::CancelTask().
+ bool CancelTask();
private:
tracked_objects::Location location_;
BaseMessageLoop* loop_;
+ // These are the arguments passed in the constructor, basically forwarding
+ // all the arguments passed to WatchFileDescriptor() plus the assigned
+ // TaskId for this task.
MessageLoop::TaskId task_id_;
+ int fd_;
+ base::MessageLoopForIO::Mode base_mode_;
bool persistent_;
base::Closure closure_;
base::MessageLoopForIO::FileDescriptorWatcher fd_watcher_;
+ // Tells whether there is a pending call to OnFileReadPostedTask().
+ bool posted_task_pending_{false};
+
// base::MessageLoopForIO::Watcher overrides:
void OnFileCanReadWithoutBlocking(int fd) override;
void OnFileCanWriteWithoutBlocking(int fd) override;
// Common implementation for both the read and write case.
- void OnFileReady(int fd);
+ void OnFileReady();
DISALLOW_COPY_AND_ASSIGN(IOTask);
};
diff --git a/chromeos/message_loops/message_loop_unittest.cc b/chromeos/message_loops/message_loop_unittest.cc
index 30f400f..73b1f96 100644
--- a/chromeos/message_loops/message_loop_unittest.cc
+++ b/chromeos/message_loops/message_loop_unittest.cc
@@ -15,6 +15,7 @@
#include <unistd.h>
#include <memory>
+#include <vector>
#include <base/bind.h>
#include <base/location.h>
@@ -34,6 +35,9 @@ namespace {
// file descriptors when testing watching for a file descriptor.
class ScopedPipe {
public:
+ // The internal pipe size.
+ static const int kPipeSize;
+
ScopedPipe() {
int fds[2];
if (pipe(fds) != 0) {
@@ -41,6 +45,7 @@ class ScopedPipe {
}
reader = fds[0];
writer = fds[1];
+ EXPECT_EQ(kPipeSize, fcntl(writer, F_SETPIPE_SZ, kPipeSize));
}
~ScopedPipe() {
if (reader != -1)
@@ -54,6 +59,8 @@ class ScopedPipe {
int writer{-1};
};
+const int ScopedPipe::kPipeSize = 4096;
+
class ScopedSocketPair {
public:
ScopedSocketPair() {
@@ -322,4 +329,94 @@ TYPED_TEST(MessageLoopTest, DeletePersistenIOTaskFromSelf) {
EXPECT_EQ(MessageLoop::kTaskIdNull, task_id);
}
+// Test that we can cancel several persistent file descriptor watching callbacks
+// from a scheduled callback. In the BaseMessageLoop implementation, this code
+// will cause us to cancel an IOTask that has a pending delayed task, but
+// otherwise is a valid test case on all implementations.
+TYPED_TEST(MessageLoopTest, DeleteAllPersistenIOTaskFromSelf) {
+ const int kNumTasks = 5;
+ ScopedPipe pipes[kNumTasks];
+ TaskId task_ids[kNumTasks];
+
+ for (int i = 0; i < kNumTasks; ++i) {
+ task_ids[i] = this->loop_->WatchFileDescriptor(
+ FROM_HERE, pipes[i].writer, MessageLoop::kWatchWrite,
+ true /* persistent */,
+ Bind([this, kNumTasks, &task_ids] {
+ for (int j = 0; j < kNumTasks; ++j) {
+ // Once we cancel all the tasks, none should run, so this code runs
+ // only once from one callback.
+ EXPECT_TRUE(this->loop_->CancelTask(task_ids[j]));
+ task_ids[j] = MessageLoop::kTaskIdNull;
+ }
+ }));
+ }
+ MessageLoopRunMaxIterations(this->loop_.get(), 100);
+ for (int i = 0; i < kNumTasks; ++i) {
+ EXPECT_EQ(MessageLoop::kTaskIdNull, task_ids[i]);
+ }
+}
+
+// Test that if there are several tasks watching for file descriptors to be
+// available or simply waiting in the message loop are fairly scheduled to run.
+// In other words, this test ensures that having a file descriptor always
+// available doesn't prevent other file descriptors watching tasks or delayed
+// tasks to be dispatched, causing starvation.
+TYPED_TEST(MessageLoopTest, AllTasksAreEqual) {
+ int total_calls = 0;
+
+ // First, schedule a repeating timeout callback to run from the main loop.
+ int timeout_called = 0;
+ base::Closure timeout_callback;
+ MessageLoop::TaskId timeout_task;
+ timeout_callback = base::Bind(
+ [this, &timeout_called, &total_calls, &timeout_callback, &timeout_task] {
+ timeout_called++;
+ total_calls++;
+ timeout_task = this->loop_->PostTask(FROM_HERE, Bind(timeout_callback));
+ if (total_calls > 100)
+ this->loop_->BreakLoop();
+ });
+ timeout_task = this->loop_->PostTask(FROM_HERE, timeout_callback);
+
+ // Second, schedule several file descriptor watchers.
+ const int kNumTasks = 3;
+ ScopedPipe pipes[kNumTasks];
+ MessageLoop::TaskId tasks[kNumTasks];
+
+ int reads[kNumTasks] = {};
+ auto fd_callback = [this, &pipes, &reads, &total_calls](int i) {
+ reads[i]++;
+ total_calls++;
+ char c;
+ EXPECT_EQ(1, HANDLE_EINTR(read(pipes[i].reader, &c, 1)));
+ if (total_calls > 100)
+ this->loop_->BreakLoop();
+ };
+
+ for (int i = 0; i < kNumTasks; ++i) {
+ tasks[i] = this->loop_->WatchFileDescriptor(
+ FROM_HERE, pipes[i].reader, MessageLoop::kWatchRead,
+ true /* persistent */,
+ Bind(fd_callback, i));
+ // Make enough bytes available on each file descriptor. This should not
+ // block because we set the size of the file descriptor buffer when
+ // creating it.
+ std::vector<char> blob(1000, 'a');
+ EXPECT_EQ(blob.size(),
+ HANDLE_EINTR(write(pipes[i].writer, blob.data(), blob.size())));
+ }
+ this->loop_->Run();
+ EXPECT_GT(total_calls, 100);
+ // We run the loop up 100 times and expect each callback to run at least 10
+ // times. A good scheduler should balance these callbacks.
+ EXPECT_GE(timeout_called, 10);
+ EXPECT_TRUE(this->loop_->CancelTask(timeout_task));
+ for (int i = 0; i < kNumTasks; ++i) {
+ EXPECT_GE(reads[i], 10) << "Reading from pipes[" << i << "], fd "
+ << pipes[i].reader;
+ EXPECT_TRUE(this->loop_->CancelTask(tasks[i]));
+ }
+}
+
} // namespace chromeos