diff options
author | Bertrand Simonnet <bsimonnet@google.com> | 2015-08-17 18:16:02 +0000 |
---|---|---|
committer | Gerrit Code Review <noreply-gerritcodereview@google.com> | 2015-08-17 18:16:02 +0000 |
commit | e40a06405f1e8e8de4a6d051b3527790cc96f542 (patch) | |
tree | 634f2b990123d0c285d5250cd85fce0ec12d9108 | |
parent | d88086808d10ae3d3e48a0e5638d68e8367088ee (diff) | |
parent | 07c1779e51680364060f3ec289249869ac7bc5ca (diff) | |
download | platform_external_libbrillo-e40a06405f1e8e8de4a6d051b3527790cc96f542.tar.gz platform_external_libbrillo-e40a06405f1e8e8de4a6d051b3527790cc96f542.tar.bz2 platform_external_libbrillo-e40a06405f1e8e8de4a6d051b3527790cc96f542.zip |
Merge "libchromeos: Prevent starvation in BaseMessageLoop."
-rw-r--r-- | chromeos/message_loops/base_message_loop.cc | 123 | ||||
-rw-r--r-- | chromeos/message_loops/base_message_loop.h | 34 | ||||
-rw-r--r-- | chromeos/message_loops/message_loop_unittest.cc | 97 |
3 files changed, 226 insertions, 28 deletions
diff --git a/chromeos/message_loops/base_message_loop.cc b/chromeos/message_loops/base_message_loop.cc index dbca1ef..4923138 100644 --- a/chromeos/message_loops/base_message_loop.cc +++ b/chromeos/message_loops/base_message_loop.cc @@ -25,7 +25,7 @@ BaseMessageLoop::~BaseMessageLoop() { DVLOG_LOC(io_task.second.location(), 1) << "Removing file descriptor watcher task_id " << io_task.first << " leaked on BaseMessageLoop, scheduled from this location."; - io_task.second.fd_watcher()->StopWatchingFileDescriptor(); + io_task.second.StopWatching(); } // Note all pending canceled delayed tasks when destroying the message loop. @@ -91,14 +91,11 @@ MessageLoop::TaskId BaseMessageLoop::WatchFileDescriptor( auto it_bool = io_tasks_.emplace( std::piecewise_construct, std::forward_as_tuple(task_id), - std::forward_as_tuple(from_here, this, task_id, persistent, task)); + std::forward_as_tuple( + from_here, this, task_id, fd, base_mode, persistent, task)); // This should always insert a new element. DCHECK(it_bool.second); - IOTask* new_io_task = &it_bool.first->second; - - bool scheduled = base_loop_->WatchFileDescriptor( - fd, persistent, base_mode, new_io_task->fd_watcher(), new_io_task); - + bool scheduled = it_bool.first->second.StartWatching(); DVLOG_LOC(from_here, 1) << "Watching fd " << fd << " for " << (mode == MessageLoop::kWatchRead ? "reading" : "writing") @@ -122,13 +119,7 @@ bool BaseMessageLoop::CancelTask(TaskId task_id) { auto io_task_it = io_tasks_.find(task_id); if (io_task_it == io_tasks_.end()) return false; - - DVLOG_LOC(io_task_it->second.location(), 1) - << "Removing task_id " << task_id << " scheduled from this location."; - // Destroying the FileDescriptorWatcher implicitly stops watching the file - // descriptor. - io_tasks_.erase(io_task_it); - return true; + return io_task_it->second.CancelTask(); } // A DelayedTask was found for this task_id at this point. @@ -219,35 +210,98 @@ void BaseMessageLoop::OnRanPostedTask(MessageLoop::TaskId task_id) { delayed_tasks_.erase(task_it); } +void BaseMessageLoop::OnFileReadyPostedTask(MessageLoop::TaskId task_id) { + auto task_it = io_tasks_.find(task_id); + // Even if this task was canceled while we were waiting in the message loop + // for this method to run, the entry in io_tasks_ should still be present, but + // won't do anything. + DCHECK(task_it != io_tasks_.end()); + task_it->second.OnFileReadyPostedTask(); +} + BaseMessageLoop::IOTask::IOTask(const tracked_objects::Location& location, BaseMessageLoop* loop, MessageLoop::TaskId task_id, + int fd, + base::MessageLoopForIO::Mode base_mode, bool persistent, const Closure& task) : location_(location), loop_(loop), task_id_(task_id), - persistent_(persistent), closure_(task) {} + fd_(fd), base_mode_(base_mode), persistent_(persistent), closure_(task) {} -void BaseMessageLoop::IOTask::OnFileCanReadWithoutBlocking(int fd) { - OnFileReady(fd); +bool BaseMessageLoop::IOTask::StartWatching() { + return loop_->base_loop_->WatchFileDescriptor( + fd_, persistent_, base_mode_, &fd_watcher_, this); } -void BaseMessageLoop::IOTask::OnFileCanWriteWithoutBlocking(int fd) { - OnFileReady(fd); +void BaseMessageLoop::IOTask::StopWatching() { + // This is safe to call even if we are not watching for it. + fd_watcher_.StopWatchingFileDescriptor(); +} + +void BaseMessageLoop::IOTask::OnFileCanReadWithoutBlocking(int /* fd */) { + OnFileReady(); +} + +void BaseMessageLoop::IOTask::OnFileCanWriteWithoutBlocking(int /* fd */) { + OnFileReady(); +} + +void BaseMessageLoop::IOTask::OnFileReady() { + // When the file descriptor becomes available we stop watching for it and + // schedule a task to run the callback from the main loop. The callback will + // run using the same scheduler use to run other delayed tasks, avoiding + // starvation of the available posted tasks if there are file descriptors + // always available. The new posted task will use the same TaskId as the + // current file descriptor watching task an could be canceled in either state, + // when waiting for the file descriptor or waiting in the main loop. + StopWatching(); + bool base_scheduled = loop_->base_loop_->task_runner()->PostTask( + location_, + base::Bind(&BaseMessageLoop::OnFileReadyPostedTask, + loop_->weak_ptr_factory_.GetWeakPtr(), + task_id_)); + posted_task_pending_ = true; + if (base_scheduled) { + DVLOG_LOC(location_, 1) + << "Dispatching task_id " << task_id_ << " for " + << (base_mode_ == base::MessageLoopForIO::WATCH_READ ? + "reading" : "writing") + << " file descriptor " << fd_ << ", scheduled from this location."; + } else { + // In the rare case that PostTask() fails, we fall back to run it directly. + // This would indicate a bigger problem with the message loop setup. + LOG(ERROR) << "Error on base::MessageLoopForIO::PostTask()."; + OnFileReadyPostedTask(); + } } -void BaseMessageLoop::IOTask::OnFileReady(int fd) { +void BaseMessageLoop::IOTask::OnFileReadyPostedTask() { // We can't access |this| after running the |closure_| since it could call // CancelTask on its own task_id, so we copy the members we need now. BaseMessageLoop* loop_ptr = loop_; + DCHECK(posted_task_pending_ = true); + posted_task_pending_ = false; + + // If this task was already canceled, the closure will be null and there is + // nothing else to do here. This execution doesn't count a step for RunOnce() + // unless we have a callback to run. + if (closure_.is_null()) { + loop_->io_tasks_.erase(task_id_); + return; + } DVLOG_LOC(location_, 1) - << "Running task_id " << task_id_ - << " for watching file descriptor " << fd - << ", scheduled from this location."; + << "Running task_id " << task_id_ << " for " + << (base_mode_ == base::MessageLoopForIO::WATCH_READ ? + "reading" : "writing") + << " file descriptor " << fd_ << ", scheduled from this location."; if (persistent_) { // In the persistent case we just run the callback. If this callback cancels - // the task id, we can't access |this| anymore. + // the task id, we can't access |this| anymore, so we re-start watching the + // file descriptor before running the callback. + StartWatching(); closure_.Run(); } else { // This will destroy |this|, the fd_watcher and therefore stop watching this @@ -264,4 +318,25 @@ void BaseMessageLoop::IOTask::OnFileReady(int fd) { } } +bool BaseMessageLoop::IOTask::CancelTask() { + if (closure_.is_null()) + return false; + + DVLOG_LOC(location_, 1) + << "Removing task_id " << task_id_ << " scheduled from this location."; + + if (!posted_task_pending_) { + // Destroying the FileDescriptorWatcher implicitly stops watching the file + // descriptor. This will delete our instance. + loop_->io_tasks_.erase(task_id_); + return true; + } + // The IOTask is waiting for the message loop to run its delayed task, so + // it is not watching for the file descriptor. We release the closure + // resources now but keep the IOTask instance alive while we wait for the + // callback to run and delete the IOTask. + closure_ = Closure(); + return true; +} + } // namespace chromeos diff --git a/chromeos/message_loops/base_message_loop.h b/chromeos/message_loops/base_message_loop.h index 2014c71..902b828 100644 --- a/chromeos/message_loops/base_message_loop.h +++ b/chromeos/message_loops/base_message_loop.h @@ -55,6 +55,12 @@ class CHROMEOS_EXPORT BaseMessageLoop : public MessageLoop { // scheduled with Post*Task() of id |task_id|, even if it was canceled. void OnRanPostedTask(MessageLoop::TaskId task_id); + // Called from the message loop when the IOTask should run the scheduled + // callback. This is a simple wrapper of IOTask::OnFileReadyPostedTask() + // posted from the BaseMessageLoop so it is deleted when the BaseMessageLoop + // goes out of scope since we can't cancel the callback otherwise. + void OnFileReadyPostedTask(MessageLoop::TaskId task_id); + // Return a new unused task_id. TaskId NextTaskId(); @@ -72,30 +78,50 @@ class CHROMEOS_EXPORT BaseMessageLoop : public MessageLoop { IOTask(const tracked_objects::Location& location, BaseMessageLoop* loop, MessageLoop::TaskId task_id, + int fd, + base::MessageLoopForIO::Mode base_mode, bool persistent, const base::Closure& task); const tracked_objects::Location& location() const { return location_; } - base::MessageLoopForIO::FileDescriptorWatcher* fd_watcher() { - return &fd_watcher_; - } + + // Used to start/stop watching the file descriptor while keeping the + // IOTask entry available. + bool StartWatching(); + void StopWatching(); + + // Called from the message loop as a PostTask() when the file descriptor is + // available, scheduled to run from OnFileReady(). + void OnFileReadyPostedTask(); + + // Cancel the IOTask and returns whether it was actually canceled, with the + // same semantics as MessageLoop::CancelTask(). + bool CancelTask(); private: tracked_objects::Location location_; BaseMessageLoop* loop_; + // These are the arguments passed in the constructor, basically forwarding + // all the arguments passed to WatchFileDescriptor() plus the assigned + // TaskId for this task. MessageLoop::TaskId task_id_; + int fd_; + base::MessageLoopForIO::Mode base_mode_; bool persistent_; base::Closure closure_; base::MessageLoopForIO::FileDescriptorWatcher fd_watcher_; + // Tells whether there is a pending call to OnFileReadPostedTask(). + bool posted_task_pending_{false}; + // base::MessageLoopForIO::Watcher overrides: void OnFileCanReadWithoutBlocking(int fd) override; void OnFileCanWriteWithoutBlocking(int fd) override; // Common implementation for both the read and write case. - void OnFileReady(int fd); + void OnFileReady(); DISALLOW_COPY_AND_ASSIGN(IOTask); }; diff --git a/chromeos/message_loops/message_loop_unittest.cc b/chromeos/message_loops/message_loop_unittest.cc index 30f400f..73b1f96 100644 --- a/chromeos/message_loops/message_loop_unittest.cc +++ b/chromeos/message_loops/message_loop_unittest.cc @@ -15,6 +15,7 @@ #include <unistd.h> #include <memory> +#include <vector> #include <base/bind.h> #include <base/location.h> @@ -34,6 +35,9 @@ namespace { // file descriptors when testing watching for a file descriptor. class ScopedPipe { public: + // The internal pipe size. + static const int kPipeSize; + ScopedPipe() { int fds[2]; if (pipe(fds) != 0) { @@ -41,6 +45,7 @@ class ScopedPipe { } reader = fds[0]; writer = fds[1]; + EXPECT_EQ(kPipeSize, fcntl(writer, F_SETPIPE_SZ, kPipeSize)); } ~ScopedPipe() { if (reader != -1) @@ -54,6 +59,8 @@ class ScopedPipe { int writer{-1}; }; +const int ScopedPipe::kPipeSize = 4096; + class ScopedSocketPair { public: ScopedSocketPair() { @@ -322,4 +329,94 @@ TYPED_TEST(MessageLoopTest, DeletePersistenIOTaskFromSelf) { EXPECT_EQ(MessageLoop::kTaskIdNull, task_id); } +// Test that we can cancel several persistent file descriptor watching callbacks +// from a scheduled callback. In the BaseMessageLoop implementation, this code +// will cause us to cancel an IOTask that has a pending delayed task, but +// otherwise is a valid test case on all implementations. +TYPED_TEST(MessageLoopTest, DeleteAllPersistenIOTaskFromSelf) { + const int kNumTasks = 5; + ScopedPipe pipes[kNumTasks]; + TaskId task_ids[kNumTasks]; + + for (int i = 0; i < kNumTasks; ++i) { + task_ids[i] = this->loop_->WatchFileDescriptor( + FROM_HERE, pipes[i].writer, MessageLoop::kWatchWrite, + true /* persistent */, + Bind([this, kNumTasks, &task_ids] { + for (int j = 0; j < kNumTasks; ++j) { + // Once we cancel all the tasks, none should run, so this code runs + // only once from one callback. + EXPECT_TRUE(this->loop_->CancelTask(task_ids[j])); + task_ids[j] = MessageLoop::kTaskIdNull; + } + })); + } + MessageLoopRunMaxIterations(this->loop_.get(), 100); + for (int i = 0; i < kNumTasks; ++i) { + EXPECT_EQ(MessageLoop::kTaskIdNull, task_ids[i]); + } +} + +// Test that if there are several tasks watching for file descriptors to be +// available or simply waiting in the message loop are fairly scheduled to run. +// In other words, this test ensures that having a file descriptor always +// available doesn't prevent other file descriptors watching tasks or delayed +// tasks to be dispatched, causing starvation. +TYPED_TEST(MessageLoopTest, AllTasksAreEqual) { + int total_calls = 0; + + // First, schedule a repeating timeout callback to run from the main loop. + int timeout_called = 0; + base::Closure timeout_callback; + MessageLoop::TaskId timeout_task; + timeout_callback = base::Bind( + [this, &timeout_called, &total_calls, &timeout_callback, &timeout_task] { + timeout_called++; + total_calls++; + timeout_task = this->loop_->PostTask(FROM_HERE, Bind(timeout_callback)); + if (total_calls > 100) + this->loop_->BreakLoop(); + }); + timeout_task = this->loop_->PostTask(FROM_HERE, timeout_callback); + + // Second, schedule several file descriptor watchers. + const int kNumTasks = 3; + ScopedPipe pipes[kNumTasks]; + MessageLoop::TaskId tasks[kNumTasks]; + + int reads[kNumTasks] = {}; + auto fd_callback = [this, &pipes, &reads, &total_calls](int i) { + reads[i]++; + total_calls++; + char c; + EXPECT_EQ(1, HANDLE_EINTR(read(pipes[i].reader, &c, 1))); + if (total_calls > 100) + this->loop_->BreakLoop(); + }; + + for (int i = 0; i < kNumTasks; ++i) { + tasks[i] = this->loop_->WatchFileDescriptor( + FROM_HERE, pipes[i].reader, MessageLoop::kWatchRead, + true /* persistent */, + Bind(fd_callback, i)); + // Make enough bytes available on each file descriptor. This should not + // block because we set the size of the file descriptor buffer when + // creating it. + std::vector<char> blob(1000, 'a'); + EXPECT_EQ(blob.size(), + HANDLE_EINTR(write(pipes[i].writer, blob.data(), blob.size()))); + } + this->loop_->Run(); + EXPECT_GT(total_calls, 100); + // We run the loop up 100 times and expect each callback to run at least 10 + // times. A good scheduler should balance these callbacks. + EXPECT_GE(timeout_called, 10); + EXPECT_TRUE(this->loop_->CancelTask(timeout_task)); + for (int i = 0; i < kNumTasks; ++i) { + EXPECT_GE(reads[i], 10) << "Reading from pipes[" << i << "], fd " + << pipes[i].reader; + EXPECT_TRUE(this->loop_->CancelTask(tasks[i])); + } +} + } // namespace chromeos |