aboutsummaryrefslogtreecommitdiffstats
path: root/chromeos
diff options
context:
space:
mode:
authorAlex Deymo <deymo@chromium.org>2015-08-05 19:15:00 -0700
committerBertrand SIMONNET <bsimonnet@google.com>2015-08-12 10:57:16 -0700
commitd145d0a667c8ccd56378f1b78176e04f4a5b03df (patch)
tree1a46e3e502b246744af379f278c9c3f63b9d1ad0 /chromeos
parent667cf9b0bb9604c1e057a833f33583f2a437fc0a (diff)
downloadplatform_external_libbrillo-d145d0a667c8ccd56378f1b78176e04f4a5b03df.tar.gz
platform_external_libbrillo-d145d0a667c8ccd56378f1b78176e04f4a5b03df.tar.bz2
platform_external_libbrillo-d145d0a667c8ccd56378f1b78176e04f4a5b03df.zip
libchromeos: Don't WaitForData() if the Stream doesn't block.
Read[All]Async and Write[All]Async would first call WaitForData() before attempting to read/write from the Stream. In some cases, such asi the FileStream implementation using a BaseMessageLoop (base::MessageLoopForIO, implemented using libevent/epoll) we can't WaitForData() on a file descriptor if it never blocks. A regular file or a block device are common examples of a file descriptor for wich we can't call MessageLoop::WatchFileDescriptor() in that implementation. This CL prevents this scenario by calling first ReadNonBlocking() or WriteNonBlocking() and then falling back to WaitForData() only if ReadNonBlocking() or WriteNonBlocking(), respectively, return a situation in which the Blocking call would actually block. BUG=chromium:499886 TEST=Updated unittests. Reviewed-on: https://chromium-review.googlesource.com/291092 Tested-by: Alex Deymo <deymo@chromium.org> Trybot-Ready: Alex Deymo <deymo@chromium.org> Reviewed-by: Alex Vakulenko <avakulenko@chromium.org> Commit-Queue: Alex Deymo <deymo@chromium.org> (cherry-picked from https://chromium.googlesource.com/chromiumos/platform2 at cd6dfe22ffaf6322b33bbf2e1f1322093d67a46e) Change-Id: Iba97e384e9fb5aa7827b68e11135b396dcda67ed
Diffstat (limited to 'chromeos')
-rw-r--r--chromeos/streams/file_stream_unittest.cc10
-rw-r--r--chromeos/streams/stream.cc191
-rw-r--r--chromeos/streams/stream.h83
-rw-r--r--chromeos/streams/stream_unittest.cc98
4 files changed, 313 insertions, 69 deletions
diff --git a/chromeos/streams/file_stream_unittest.cc b/chromeos/streams/file_stream_unittest.cc
index faf8631..c760fb5 100644
--- a/chromeos/streams/file_stream_unittest.cc
+++ b/chromeos/streams/file_stream_unittest.cc
@@ -34,6 +34,12 @@ namespace chromeos {
namespace {
+// gmock action that would return a blocking situation from a read() or write().
+ACTION(ReturnWouldBlock) {
+ errno = EWOULDBLOCK;
+ return -1;
+}
+
// Helper function to read one byte from the stream.
inline int ReadByte(Stream* stream) {
uint8_t byte = 0;
@@ -369,6 +375,8 @@ TEST_F(FileStreamTest, ReadAsync) {
auto error_callback = [&failed](const Error* error) { failed = true; };
FileStream::FileDescriptorInterface::DataCallback data_callback;
+ EXPECT_CALL(fd_mock(), Read(test_read_buffer_, 100))
+ .WillOnce(ReturnWouldBlock());
EXPECT_CALL(fd_mock(), WaitForData(Stream::AccessMode::READ, _, _))
.WillOnce(DoAll(SaveArg<1>(&data_callback), Return(true)));
EXPECT_TRUE(stream_->ReadAsync(test_read_buffer_, 100,
@@ -506,6 +514,8 @@ TEST_F(FileStreamTest, WriteAsync) {
auto error_callback = [&failed](const Error* error) { failed = true; };
FileStream::FileDescriptorInterface::DataCallback data_callback;
+ EXPECT_CALL(fd_mock(), Write(test_write_buffer_, 100))
+ .WillOnce(ReturnWouldBlock());
EXPECT_CALL(fd_mock(), WaitForData(Stream::AccessMode::WRITE, _, _))
.WillOnce(DoAll(SaveArg<1>(&data_callback), Return(true)));
EXPECT_TRUE(stream_->WriteAsync(test_write_buffer_, 100,
diff --git a/chromeos/streams/stream.cc b/chromeos/streams/stream.cc
index 35e044d..64f9428 100644
--- a/chromeos/streams/stream.cc
+++ b/chromeos/streams/stream.cc
@@ -35,14 +35,12 @@ bool Stream::ReadAsync(void* buffer,
"Another asynchronous operation is still pending");
return false;
}
- is_async_read_pending_ = true;
- is_async_read_pending_ = WaitForData(
- AccessMode::READ,
- base::Bind(&Stream::OnReadAvailable, weak_ptr_factory_.GetWeakPtr(),
- buffer, size_to_read, success_callback, error_callback),
- error);
- return is_async_read_pending_;
+ auto callback = base::Bind(&Stream::IgnoreEOSCallback, success_callback);
+ // If we can read some data right away non-blocking we should still run the
+ // callback from the main loop, so we pass true here for force_async_callback.
+ return ReadAsyncImpl(buffer, size_to_read, callback, error_callback, error,
+ true);
}
bool Stream::ReadAllAsync(void* buffer,
@@ -50,10 +48,18 @@ bool Stream::ReadAllAsync(void* buffer,
const base::Closure& success_callback,
const ErrorCallback& error_callback,
ErrorPtr* error) {
+ if (is_async_read_pending_) {
+ Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
+ errors::stream::kOperationNotSupported,
+ "Another asynchronous operation is still pending");
+ return false;
+ }
+
auto callback = base::Bind(&Stream::ReadAllAsyncCallback,
weak_ptr_factory_.GetWeakPtr(), buffer,
size_to_read, success_callback, error_callback);
- return ReadAsync(buffer, size_to_read, callback, error_callback, error);
+ return ReadAsyncImpl(buffer, size_to_read, callback, error_callback, error,
+ true);
}
bool Stream::ReadBlocking(void* buffer,
@@ -104,13 +110,10 @@ bool Stream::WriteAsync(const void* buffer,
"Another asynchronous operation is still pending");
return false;
}
- is_async_write_pending_ = true;
- is_async_write_pending_ = WaitForData(
- AccessMode::WRITE,
- base::Bind(&Stream::OnWriteAvailable, weak_ptr_factory_.GetWeakPtr(),
- buffer, size_to_write, success_callback, error_callback),
- error);
- return is_async_write_pending_;
+ // If we can read some data right away non-blocking we should still run the
+ // callback from the main loop, so we pass true here for force_async_callback.
+ return WriteAsyncImpl(buffer, size_to_write, success_callback, error_callback,
+ error, true);
}
bool Stream::WriteAllAsync(const void* buffer,
@@ -118,10 +121,18 @@ bool Stream::WriteAllAsync(const void* buffer,
const base::Closure& success_callback,
const ErrorCallback& error_callback,
ErrorPtr* error) {
+ if (is_async_write_pending_) {
+ Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
+ errors::stream::kOperationNotSupported,
+ "Another asynchronous operation is still pending");
+ return false;
+ }
+
auto callback = base::Bind(&Stream::WriteAllAsyncCallback,
weak_ptr_factory_.GetWeakPtr(), buffer,
size_to_write, success_callback, error_callback);
- return WriteAsync(buffer, size_to_write, callback, error_callback, error);
+ return WriteAsyncImpl(buffer, size_to_write, callback, error_callback, error,
+ true);
}
bool Stream::WriteBlocking(const void* buffer,
@@ -173,31 +184,120 @@ bool Stream::FlushAsync(const base::Closure& success_callback,
return true;
}
+void Stream::IgnoreEOSCallback(
+ const base::Callback<void(size_t)>& success_callback,
+ size_t bytes,
+ bool eos) {
+ success_callback.Run(bytes);
+}
+
+bool Stream::ReadAsyncImpl(
+ void* buffer,
+ size_t size_to_read,
+ const base::Callback<void(size_t, bool)>& success_callback,
+ const ErrorCallback& error_callback,
+ ErrorPtr* error,
+ bool force_async_callback) {
+ CHECK(!is_async_read_pending_);
+ // We set this value to true early in the function so calling others will
+ // prevent us from calling WaitForData() to make calls to
+ // ReadAsync() fail while we run WaitForData().
+ is_async_read_pending_ = true;
+
+ size_t read = 0;
+ bool eos = false;
+ if (!ReadNonBlocking(buffer, size_to_read, &read, &eos, error))
+ return false;
+
+ if (read > 0 || eos) {
+ if (force_async_callback) {
+ MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&Stream::OnReadAsyncDone, weak_ptr_factory_.GetWeakPtr(),
+ success_callback, read, eos));
+ } else {
+ is_async_read_pending_ = false;
+ success_callback.Run(read, eos);
+ }
+ return true;
+ }
+
+ is_async_read_pending_ = WaitForData(
+ AccessMode::READ,
+ base::Bind(&Stream::OnReadAvailable, weak_ptr_factory_.GetWeakPtr(),
+ buffer, size_to_read, success_callback, error_callback),
+ error);
+ return is_async_read_pending_;
+}
+
+void Stream::OnReadAsyncDone(
+ const base::Callback<void(size_t, bool)>& success_callback,
+ size_t bytes_read,
+ bool eos) {
+ is_async_read_pending_ = false;
+ success_callback.Run(bytes_read, eos);
+}
+
void Stream::OnReadAvailable(
void* buffer,
- size_t size,
- const base::Callback<void(size_t)>& success_callback,
+ size_t size_to_read,
+ const base::Callback<void(size_t, bool)>& success_callback,
const ErrorCallback& error_callback,
AccessMode mode) {
CHECK(stream_utils::IsReadAccessMode(mode));
CHECK(is_async_read_pending_);
is_async_read_pending_ = false;
ErrorPtr error;
- size_t read = 0;
- bool eos = false;
- if (!ReadNonBlocking(buffer, size, &read, &eos, &error))
- return error_callback.Run(error.get());
+ // Just reschedule the read operation but don't need to run the callback from
+ // the main loop since we are already running on a callback.
+ if (!ReadAsyncImpl(buffer, size_to_read, success_callback, error_callback,
+ &error, false)) {
+ error_callback.Run(error.get());
+ }
+}
- // If we have some data read, or if we reached the end of stream, call
- // the success callback.
- if (read > 0 || eos)
- return success_callback.Run(read);
+bool Stream::WriteAsyncImpl(
+ const void* buffer,
+ size_t size_to_write,
+ const base::Callback<void(size_t)>& success_callback,
+ const ErrorCallback& error_callback,
+ ErrorPtr* error,
+ bool force_async_callback) {
+ CHECK(!is_async_write_pending_);
+ // We set this value to true early in the function so calling others will
+ // prevent us from calling WaitForData() to make calls to
+ // ReadAsync() fail while we run WaitForData().
+ is_async_write_pending_ = true;
+
+ size_t written = 0;
+ if (!WriteNonBlocking(buffer, size_to_write, &written, error))
+ return false;
- // Otherwise just reschedule the read operation.
- if (!ReadAsync(buffer, size, success_callback, error_callback, &error))
- return error_callback.Run(error.get());
+ if (written > 0) {
+ if (force_async_callback) {
+ MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&Stream::OnWriteAsyncDone, weak_ptr_factory_.GetWeakPtr(),
+ success_callback, written));
+ } else {
+ is_async_write_pending_ = false;
+ success_callback.Run(written);
+ }
+ return true;
+ }
+ is_async_write_pending_ = WaitForData(
+ AccessMode::WRITE,
+ base::Bind(&Stream::OnWriteAvailable, weak_ptr_factory_.GetWeakPtr(),
+ buffer, size_to_write, success_callback, error_callback),
+ error);
+ return is_async_write_pending_;
+}
- return;
+void Stream::OnWriteAsyncDone(
+ const base::Callback<void(size_t)>& success_callback,
+ size_t size_written) {
+ is_async_write_pending_ = false;
+ success_callback.Run(size_written);
}
void Stream::OnWriteAvailable(
@@ -210,38 +310,37 @@ void Stream::OnWriteAvailable(
CHECK(is_async_write_pending_);
is_async_write_pending_ = false;
ErrorPtr error;
- size_t written = 0;
- if (!WriteNonBlocking(buffer, size, &written, &error))
+ // Just reschedule the read operation but don't need to run the callback from
+ // the main loop since we are already running on a callback.
+ if (!WriteAsyncImpl(buffer, size, success_callback, error_callback, &error,
+ false)) {
error_callback.Run(error.get());
-
- if (written > 0)
- return success_callback.Run(written);
-
- if (!WriteAsync(buffer, size, success_callback, error_callback, &error))
- return error_callback.Run(error.get());
-
- return;
+ }
}
void Stream::ReadAllAsyncCallback(void* buffer,
size_t size_to_read,
const base::Closure& success_callback,
const ErrorCallback& error_callback,
- size_t size_read) {
+ size_t size_read,
+ bool eos) {
ErrorPtr error;
- if (size_to_read != 0 && size_read == 0) {
+ size_to_read -= size_read;
+ if (size_to_read != 0 && eos) {
stream_utils::ErrorReadPastEndOfStream(FROM_HERE, &error);
error_callback.Run(error.get());
return;
}
- size_to_read -= size_read;
+
if (size_to_read) {
buffer = AdvancePointer(buffer, size_read);
auto callback = base::Bind(&Stream::ReadAllAsyncCallback,
weak_ptr_factory_.GetWeakPtr(), buffer,
size_to_read, success_callback, error_callback);
- if (!ReadAsync(buffer, size_to_read, callback, error_callback, &error))
+ if (!ReadAsyncImpl(buffer, size_to_read, callback, error_callback, &error,
+ false)) {
error_callback.Run(error.get());
+ }
} else {
success_callback.Run();
}
@@ -265,8 +364,10 @@ void Stream::WriteAllAsyncCallback(const void* buffer,
auto callback = base::Bind(&Stream::WriteAllAsyncCallback,
weak_ptr_factory_.GetWeakPtr(), buffer,
size_to_write, success_callback, error_callback);
- if (!WriteAsync(buffer, size_to_write, callback, error_callback, &error))
+ if (!WriteAsyncImpl(buffer, size_to_write, callback, error_callback, &error,
+ false)) {
error_callback.Run(error.get());
+ }
} else {
success_callback.Run();
}
diff --git a/chromeos/streams/stream.h b/chromeos/streams/stream.h
index 3eec485..f1ad75a 100644
--- a/chromeos/streams/stream.h
+++ b/chromeos/streams/stream.h
@@ -12,8 +12,8 @@
#include <base/macros.h>
#include <base/memory/weak_ptr.h>
#include <base/time/time.h>
-#include <chromeos/errors/error.h>
#include <chromeos/chromeos_export.h>
+#include <chromeos/errors/error.h>
namespace chromeos {
@@ -394,21 +394,79 @@ class CHROMEOS_EXPORT Stream {
Stream() = default;
private:
+ // Simple wrapper to call the externally exposed |success_callback| that only
+ // receives a size_t.
+ CHROMEOS_PRIVATE static void IgnoreEOSCallback(
+ const base::Callback<void(size_t)>& success_callback,
+ size_t read,
+ bool eos);
+
+ // The internal implementation of ReadAsync() and ReadAllAsync().
+ // Calls ReadNonBlocking and if there's no data available waits for it calling
+ // WaitForData(). The extra |force_async_callback| tell whether the success
+ // callback should be called from the main loop instead of directly from this
+ // method. This method only calls WaitForData() if ReadNonBlocking() returns a
+ // situation in which it would block (bytes_read = 0 and eos = false),
+ // preventing us from calling WaitForData() on streams that don't support such
+ // feature.
+ CHROMEOS_PRIVATE bool ReadAsyncImpl(
+ void* buffer,
+ size_t size_to_read,
+ const base::Callback<void(size_t, bool)>& success_callback,
+ const ErrorCallback& error_callback,
+ ErrorPtr* error,
+ bool force_async_callback);
+
+ // Called from the main loop when the ReadAsyncImpl finished right away
+ // without waiting for data. We use this callback to call the
+ // |sucess_callback| but invalidate the callback if the Stream is destroyed
+ // while this call is waiting in the main loop.
+ CHROMEOS_PRIVATE void OnReadAsyncDone(
+ const base::Callback<void(size_t, bool)>& success_callback,
+ size_t bytes_read,
+ bool eos);
+
// Called from WaitForData() when read operations can be performed
// without blocking (the type of operation is provided in |mode|).
- void OnReadAvailable(void* buffer,
- size_t size,
- const base::Callback<void(size_t)>& success_callback,
- const ErrorCallback& error_callback,
- AccessMode mode);
+ CHROMEOS_PRIVATE void OnReadAvailable(
+ void* buffer,
+ size_t size_to_read,
+ const base::Callback<void(size_t, bool)>& success_callback,
+ const ErrorCallback& error_callback,
+ AccessMode mode);
+
+ // The internal implementation of WriteAsync() and WriteAllAsync().
+ // Calls WriteNonBlocking and if the write would block for it to not block
+ // calling WaitForData(). The extra |force_async_callback| tell whether the
+ // success callback should be called from the main loop instead of directly
+ // from this method. This method only calls WaitForData() if
+ // WriteNonBlocking() returns a situation in which it would block
+ // (size_written = 0 and eos = false), preventing us from calling
+ // WaitForData() on streams that don't support such feature.
+ CHROMEOS_PRIVATE bool WriteAsyncImpl(
+ const void* buffer,
+ size_t size_to_write,
+ const base::Callback<void(size_t)>& success_callback,
+ const ErrorCallback& error_callback,
+ ErrorPtr* error,
+ bool force_async_callback);
+
+ // Called from the main loop when the WriteAsyncImpl finished right away
+ // without waiting for data. We use this callback to call the
+ // |sucess_callback| but invalidate the callback if the Stream is destroyed
+ // while this call is waiting in the main loop.
+ CHROMEOS_PRIVATE void OnWriteAsyncDone(
+ const base::Callback<void(size_t)>& success_callback,
+ size_t size_written);
// Called from WaitForData() when write operations can be performed
// without blocking (the type of operation is provided in |mode|).
- void OnWriteAvailable(const void* buffer,
- size_t size,
- const base::Callback<void(size_t)>& success_callback,
- const ErrorCallback& error_callback,
- AccessMode mode);
+ CHROMEOS_PRIVATE void OnWriteAvailable(
+ const void* buffer,
+ size_t size,
+ const base::Callback<void(size_t)>& success_callback,
+ const ErrorCallback& error_callback,
+ AccessMode mode);
// Helper callbacks to implement ReadAllAsync/WriteAllAsync.
CHROMEOS_PRIVATE void ReadAllAsyncCallback(
@@ -416,7 +474,8 @@ class CHROMEOS_EXPORT Stream {
size_t size_to_read,
const base::Closure& success_callback,
const ErrorCallback& error_callback,
- size_t size_read);
+ size_t size_read,
+ bool eos);
CHROMEOS_PRIVATE void WriteAllAsyncCallback(
const void* buffer,
size_t size_to_write,
diff --git a/chromeos/streams/stream_unittest.cc b/chromeos/streams/stream_unittest.cc
index 9630d97..6485a17 100644
--- a/chromeos/streams/stream_unittest.cc
+++ b/chromeos/streams/stream_unittest.cc
@@ -11,6 +11,7 @@
#include <gtest/gtest.h>
#include <chromeos/bind_lambda.h>
+#include <chromeos/message_loops/fake_message_loop.h>
#include <chromeos/streams/stream_errors.h>
using testing::DoAll;
@@ -22,8 +23,8 @@ using testing::_;
namespace chromeos {
-using Whence = Stream::Whence;
using AccessMode = Stream::AccessMode;
+using Whence = Stream::Whence;
// To verify "non-trivial" methods implemented in Stream, mock out the
// "trivial" methods to make sure the ones we are interested in testing
@@ -97,22 +98,34 @@ TEST(Stream, SetPosition) {
TEST(Stream, ReadAsync) {
size_t read_size = 0;
+ bool succeeded = false;
bool failed = false;
- auto success_callback = [&read_size](size_t size) { read_size = size; };
+ auto success_callback = [&read_size, &succeeded](size_t size) {
+ read_size = size;
+ succeeded = true;
+ };
auto error_callback = [&failed](const Error* error) { failed = true; };
MockStreamImpl stream_mock;
base::Callback<void(AccessMode)> data_callback;
char buf[10];
+ // This sets up an initial non blocking read that would block, so ReadAsync()
+ // should wait for more data.
+ EXPECT_CALL(stream_mock, ReadNonBlocking(buf, 10, _, _, _))
+ .WillOnce(
+ DoAll(SetArgPointee<2>(0), SetArgPointee<3>(false), Return(true)));
EXPECT_CALL(stream_mock, WaitForData(AccessMode::READ, _, _))
.WillOnce(DoAll(SaveArg<1>(&data_callback), Return(true)));
EXPECT_TRUE(stream_mock.ReadAsync(buf, sizeof(buf),
base::Bind(success_callback),
base::Bind(error_callback), nullptr));
EXPECT_EQ(0u, read_size);
+ EXPECT_FALSE(succeeded);
EXPECT_FALSE(failed);
+ // Since the previous call is waiting for the data to be available, we can't
+ // schedule another read.
ErrorPtr error;
EXPECT_FALSE(stream_mock.ReadAsync(buf, sizeof(buf),
base::Bind(success_callback),
@@ -122,6 +135,8 @@ TEST(Stream, ReadAsync) {
EXPECT_EQ("Another asynchronous operation is still pending",
error->GetMessage());
+ // Making the data available via data_callback should not schedule the
+ // success callback from the main loop and run it directly instead.
EXPECT_CALL(stream_mock, ReadNonBlocking(buf, 10, _, _, _))
.WillOnce(DoAll(SetArgPointee<2>(7),
SetArgPointee<3>(false),
@@ -131,6 +146,46 @@ TEST(Stream, ReadAsync) {
EXPECT_FALSE(failed);
}
+TEST(Stream, ReadAsync_DontWaitForData) {
+ bool succeeded = false;
+ bool failed = false;
+ auto success_callback = [&succeeded](size_t size) { succeeded = true; };
+ auto error_callback = [&failed](const Error* error) { failed = true; };
+
+ MockStreamImpl stream_mock;
+ char buf[10];
+ FakeMessageLoop fake_loop_{nullptr};
+ fake_loop_.SetAsCurrent();
+
+ EXPECT_CALL(stream_mock, ReadNonBlocking(buf, 10, _, _, _))
+ .WillOnce(
+ DoAll(SetArgPointee<2>(5), SetArgPointee<3>(false), Return(true)));
+ EXPECT_CALL(stream_mock, WaitForData(_, _, _)).Times(0);
+ EXPECT_TRUE(stream_mock.ReadAsync(buf, sizeof(buf),
+ base::Bind(success_callback),
+ base::Bind(error_callback), nullptr));
+ // Even if ReadNonBlocking() returned some data without waiting, the
+ // |success_callback| should not run yet.
+ EXPECT_TRUE(fake_loop_.PendingTasks());
+ EXPECT_FALSE(succeeded);
+ EXPECT_FALSE(failed);
+
+ // Since the previous callback is still waiting in the main loop, we can't
+ // schedule another read yet.
+ ErrorPtr error;
+ EXPECT_FALSE(stream_mock.ReadAsync(buf, sizeof(buf),
+ base::Bind(success_callback),
+ base::Bind(error_callback), &error));
+ EXPECT_EQ(errors::stream::kDomain, error->GetDomain());
+ EXPECT_EQ(errors::stream::kOperationNotSupported, error->GetCode());
+ EXPECT_EQ("Another asynchronous operation is still pending",
+ error->GetMessage());
+
+ fake_loop_.Run();
+ EXPECT_TRUE(succeeded);
+ EXPECT_FALSE(failed);
+}
+
TEST(Stream, ReadAllAsync) {
bool succeeded = false;
bool failed = false;
@@ -141,6 +196,11 @@ TEST(Stream, ReadAllAsync) {
base::Callback<void(AccessMode)> data_callback;
char buf[10];
+ // This sets up an initial non blocking read that would block, so
+ // ReadAllAsync() should wait for more data.
+ EXPECT_CALL(stream_mock, ReadNonBlocking(buf, 10, _, _, _))
+ .WillOnce(
+ DoAll(SetArgPointee<2>(0), SetArgPointee<3>(false), Return(true)));
EXPECT_CALL(stream_mock, WaitForData(AccessMode::READ, _, _))
.WillOnce(DoAll(SaveArg<1>(&data_callback), Return(true)));
EXPECT_TRUE(stream_mock.ReadAllAsync(buf, sizeof(buf),
@@ -149,16 +209,23 @@ TEST(Stream, ReadAllAsync) {
nullptr));
EXPECT_FALSE(succeeded);
EXPECT_FALSE(failed);
+ testing::Mock::VerifyAndClearExpectations(&stream_mock);
+ // ReadAllAsync() will try to read non blocking until the read would block
+ // before it waits for the data to be available again.
EXPECT_CALL(stream_mock, ReadNonBlocking(buf, 10, _, _, _))
.WillOnce(DoAll(SetArgPointee<2>(7),
SetArgPointee<3>(false),
Return(true)));
+ EXPECT_CALL(stream_mock, ReadNonBlocking(buf + 7, 3, _, _, _))
+ .WillOnce(
+ DoAll(SetArgPointee<2>(0), SetArgPointee<3>(false), Return(true)));
EXPECT_CALL(stream_mock, WaitForData(AccessMode::READ, _, _))
.WillOnce(DoAll(SaveArg<1>(&data_callback), Return(true)));
data_callback.Run(AccessMode::READ);
EXPECT_FALSE(succeeded);
EXPECT_FALSE(failed);
+ testing::Mock::VerifyAndClearExpectations(&stream_mock);
EXPECT_CALL(stream_mock, ReadNonBlocking(buf + 7, 3, _, _, _))
.WillOnce(DoAll(SetArgPointee<2>(3),
@@ -183,26 +250,22 @@ TEST(Stream, ReadAllAsync_EOS) {
base::Callback<void(AccessMode)> data_callback;
char buf[10];
+ EXPECT_CALL(stream_mock, ReadNonBlocking(buf, 10, _, _, _))
+ .WillOnce(
+ DoAll(SetArgPointee<2>(0), SetArgPointee<3>(false), Return(true)));
EXPECT_CALL(stream_mock, WaitForData(AccessMode::READ, _, _))
.WillOnce(DoAll(SaveArg<1>(&data_callback), Return(true)));
EXPECT_TRUE(stream_mock.ReadAllAsync(buf, sizeof(buf),
base::Bind(success_callback),
base::Bind(error_callback),
nullptr));
+
+ // ReadAsyncAll() should finish and fail once ReadNonBlocking() returns an
+ // end-of-stream condition.
EXPECT_CALL(stream_mock, ReadNonBlocking(buf, 10, _, _, _))
.WillOnce(DoAll(SetArgPointee<2>(7),
SetArgPointee<3>(true),
Return(true)));
- EXPECT_CALL(stream_mock, WaitForData(AccessMode::READ, _, _))
- .WillOnce(DoAll(SaveArg<1>(&data_callback), Return(true)));
- data_callback.Run(AccessMode::READ);
- EXPECT_FALSE(succeeded);
- EXPECT_FALSE(failed);
-
- EXPECT_CALL(stream_mock, ReadNonBlocking(buf + 7, 3, _, _, _))
- .WillOnce(DoAll(SetArgPointee<2>(0),
- SetArgPointee<3>(true),
- Return(true)));
data_callback.Run(AccessMode::READ);
EXPECT_FALSE(succeeded);
EXPECT_TRUE(failed);
@@ -292,9 +355,14 @@ TEST(Stream, WriteAsync) {
auto error_callback = [&failed](const Error* error) { failed = true; };
MockStreamImpl stream_mock;
+ InSequence s;
base::Callback<void(AccessMode)> data_callback;
char buf[10] = {};
+ // WriteNonBlocking returns a blocking situation (size_written = 0) so the
+ // WaitForData() is run.
+ EXPECT_CALL(stream_mock, WriteNonBlocking(buf, 10, _, _))
+ .WillOnce(DoAll(SetArgPointee<2>(0), Return(true)));
EXPECT_CALL(stream_mock, WaitForData(AccessMode::WRITE, _, _))
.WillOnce(DoAll(SaveArg<1>(&data_callback), Return(true)));
EXPECT_TRUE(stream_mock.WriteAsync(buf, sizeof(buf),
@@ -329,20 +397,26 @@ TEST(Stream, WriteAllAsync) {
base::Callback<void(AccessMode)> data_callback;
char buf[10] = {};
+ EXPECT_CALL(stream_mock, WriteNonBlocking(buf, 10, _, _))
+ .WillOnce(DoAll(SetArgPointee<2>(0), Return(true)));
EXPECT_CALL(stream_mock, WaitForData(AccessMode::WRITE, _, _))
.WillOnce(DoAll(SaveArg<1>(&data_callback), Return(true)));
EXPECT_TRUE(stream_mock.WriteAllAsync(buf, sizeof(buf),
base::Bind(success_callback),
base::Bind(error_callback),
nullptr));
+ testing::Mock::VerifyAndClearExpectations(&stream_mock);
EXPECT_FALSE(succeeded);
EXPECT_FALSE(failed);
EXPECT_CALL(stream_mock, WriteNonBlocking(buf, 10, _, _))
.WillOnce(DoAll(SetArgPointee<2>(7), Return(true)));
+ EXPECT_CALL(stream_mock, WriteNonBlocking(buf + 7, 3, _, _))
+ .WillOnce(DoAll(SetArgPointee<2>(0), Return(true)));
EXPECT_CALL(stream_mock, WaitForData(AccessMode::WRITE, _, _))
.WillOnce(DoAll(SaveArg<1>(&data_callback), Return(true)));
data_callback.Run(AccessMode::WRITE);
+ testing::Mock::VerifyAndClearExpectations(&stream_mock);
EXPECT_FALSE(succeeded);
EXPECT_FALSE(failed);