diff options
author | Alex Deymo <deymo@chromium.org> | 2015-08-05 19:15:00 -0700 |
---|---|---|
committer | Bertrand SIMONNET <bsimonnet@google.com> | 2015-08-12 10:57:16 -0700 |
commit | d145d0a667c8ccd56378f1b78176e04f4a5b03df (patch) | |
tree | 1a46e3e502b246744af379f278c9c3f63b9d1ad0 /chromeos | |
parent | 667cf9b0bb9604c1e057a833f33583f2a437fc0a (diff) | |
download | platform_external_libbrillo-d145d0a667c8ccd56378f1b78176e04f4a5b03df.tar.gz platform_external_libbrillo-d145d0a667c8ccd56378f1b78176e04f4a5b03df.tar.bz2 platform_external_libbrillo-d145d0a667c8ccd56378f1b78176e04f4a5b03df.zip |
libchromeos: Don't WaitForData() if the Stream doesn't block.
Read[All]Async and Write[All]Async would first call WaitForData()
before attempting to read/write from the Stream. In some cases, such asi
the FileStream implementation using a BaseMessageLoop
(base::MessageLoopForIO, implemented using libevent/epoll) we can't
WaitForData() on a file descriptor if it never blocks. A regular file
or a block device are common examples of a file descriptor for wich
we can't call MessageLoop::WatchFileDescriptor() in that implementation.
This CL prevents this scenario by calling first ReadNonBlocking() or
WriteNonBlocking() and then falling back to WaitForData() only if
ReadNonBlocking() or WriteNonBlocking(), respectively, return a
situation in which the Blocking call would actually block.
BUG=chromium:499886
TEST=Updated unittests.
Reviewed-on: https://chromium-review.googlesource.com/291092
Tested-by: Alex Deymo <deymo@chromium.org>
Trybot-Ready: Alex Deymo <deymo@chromium.org>
Reviewed-by: Alex Vakulenko <avakulenko@chromium.org>
Commit-Queue: Alex Deymo <deymo@chromium.org>
(cherry-picked from
https://chromium.googlesource.com/chromiumos/platform2 at
cd6dfe22ffaf6322b33bbf2e1f1322093d67a46e)
Change-Id: Iba97e384e9fb5aa7827b68e11135b396dcda67ed
Diffstat (limited to 'chromeos')
-rw-r--r-- | chromeos/streams/file_stream_unittest.cc | 10 | ||||
-rw-r--r-- | chromeos/streams/stream.cc | 191 | ||||
-rw-r--r-- | chromeos/streams/stream.h | 83 | ||||
-rw-r--r-- | chromeos/streams/stream_unittest.cc | 98 |
4 files changed, 313 insertions, 69 deletions
diff --git a/chromeos/streams/file_stream_unittest.cc b/chromeos/streams/file_stream_unittest.cc index faf8631..c760fb5 100644 --- a/chromeos/streams/file_stream_unittest.cc +++ b/chromeos/streams/file_stream_unittest.cc @@ -34,6 +34,12 @@ namespace chromeos { namespace { +// gmock action that would return a blocking situation from a read() or write(). +ACTION(ReturnWouldBlock) { + errno = EWOULDBLOCK; + return -1; +} + // Helper function to read one byte from the stream. inline int ReadByte(Stream* stream) { uint8_t byte = 0; @@ -369,6 +375,8 @@ TEST_F(FileStreamTest, ReadAsync) { auto error_callback = [&failed](const Error* error) { failed = true; }; FileStream::FileDescriptorInterface::DataCallback data_callback; + EXPECT_CALL(fd_mock(), Read(test_read_buffer_, 100)) + .WillOnce(ReturnWouldBlock()); EXPECT_CALL(fd_mock(), WaitForData(Stream::AccessMode::READ, _, _)) .WillOnce(DoAll(SaveArg<1>(&data_callback), Return(true))); EXPECT_TRUE(stream_->ReadAsync(test_read_buffer_, 100, @@ -506,6 +514,8 @@ TEST_F(FileStreamTest, WriteAsync) { auto error_callback = [&failed](const Error* error) { failed = true; }; FileStream::FileDescriptorInterface::DataCallback data_callback; + EXPECT_CALL(fd_mock(), Write(test_write_buffer_, 100)) + .WillOnce(ReturnWouldBlock()); EXPECT_CALL(fd_mock(), WaitForData(Stream::AccessMode::WRITE, _, _)) .WillOnce(DoAll(SaveArg<1>(&data_callback), Return(true))); EXPECT_TRUE(stream_->WriteAsync(test_write_buffer_, 100, diff --git a/chromeos/streams/stream.cc b/chromeos/streams/stream.cc index 35e044d..64f9428 100644 --- a/chromeos/streams/stream.cc +++ b/chromeos/streams/stream.cc @@ -35,14 +35,12 @@ bool Stream::ReadAsync(void* buffer, "Another asynchronous operation is still pending"); return false; } - is_async_read_pending_ = true; - is_async_read_pending_ = WaitForData( - AccessMode::READ, - base::Bind(&Stream::OnReadAvailable, weak_ptr_factory_.GetWeakPtr(), - buffer, size_to_read, success_callback, error_callback), - error); - return is_async_read_pending_; + auto callback = base::Bind(&Stream::IgnoreEOSCallback, success_callback); + // If we can read some data right away non-blocking we should still run the + // callback from the main loop, so we pass true here for force_async_callback. + return ReadAsyncImpl(buffer, size_to_read, callback, error_callback, error, + true); } bool Stream::ReadAllAsync(void* buffer, @@ -50,10 +48,18 @@ bool Stream::ReadAllAsync(void* buffer, const base::Closure& success_callback, const ErrorCallback& error_callback, ErrorPtr* error) { + if (is_async_read_pending_) { + Error::AddTo(error, FROM_HERE, errors::stream::kDomain, + errors::stream::kOperationNotSupported, + "Another asynchronous operation is still pending"); + return false; + } + auto callback = base::Bind(&Stream::ReadAllAsyncCallback, weak_ptr_factory_.GetWeakPtr(), buffer, size_to_read, success_callback, error_callback); - return ReadAsync(buffer, size_to_read, callback, error_callback, error); + return ReadAsyncImpl(buffer, size_to_read, callback, error_callback, error, + true); } bool Stream::ReadBlocking(void* buffer, @@ -104,13 +110,10 @@ bool Stream::WriteAsync(const void* buffer, "Another asynchronous operation is still pending"); return false; } - is_async_write_pending_ = true; - is_async_write_pending_ = WaitForData( - AccessMode::WRITE, - base::Bind(&Stream::OnWriteAvailable, weak_ptr_factory_.GetWeakPtr(), - buffer, size_to_write, success_callback, error_callback), - error); - return is_async_write_pending_; + // If we can read some data right away non-blocking we should still run the + // callback from the main loop, so we pass true here for force_async_callback. + return WriteAsyncImpl(buffer, size_to_write, success_callback, error_callback, + error, true); } bool Stream::WriteAllAsync(const void* buffer, @@ -118,10 +121,18 @@ bool Stream::WriteAllAsync(const void* buffer, const base::Closure& success_callback, const ErrorCallback& error_callback, ErrorPtr* error) { + if (is_async_write_pending_) { + Error::AddTo(error, FROM_HERE, errors::stream::kDomain, + errors::stream::kOperationNotSupported, + "Another asynchronous operation is still pending"); + return false; + } + auto callback = base::Bind(&Stream::WriteAllAsyncCallback, weak_ptr_factory_.GetWeakPtr(), buffer, size_to_write, success_callback, error_callback); - return WriteAsync(buffer, size_to_write, callback, error_callback, error); + return WriteAsyncImpl(buffer, size_to_write, callback, error_callback, error, + true); } bool Stream::WriteBlocking(const void* buffer, @@ -173,31 +184,120 @@ bool Stream::FlushAsync(const base::Closure& success_callback, return true; } +void Stream::IgnoreEOSCallback( + const base::Callback<void(size_t)>& success_callback, + size_t bytes, + bool eos) { + success_callback.Run(bytes); +} + +bool Stream::ReadAsyncImpl( + void* buffer, + size_t size_to_read, + const base::Callback<void(size_t, bool)>& success_callback, + const ErrorCallback& error_callback, + ErrorPtr* error, + bool force_async_callback) { + CHECK(!is_async_read_pending_); + // We set this value to true early in the function so calling others will + // prevent us from calling WaitForData() to make calls to + // ReadAsync() fail while we run WaitForData(). + is_async_read_pending_ = true; + + size_t read = 0; + bool eos = false; + if (!ReadNonBlocking(buffer, size_to_read, &read, &eos, error)) + return false; + + if (read > 0 || eos) { + if (force_async_callback) { + MessageLoop::current()->PostTask( + FROM_HERE, + base::Bind(&Stream::OnReadAsyncDone, weak_ptr_factory_.GetWeakPtr(), + success_callback, read, eos)); + } else { + is_async_read_pending_ = false; + success_callback.Run(read, eos); + } + return true; + } + + is_async_read_pending_ = WaitForData( + AccessMode::READ, + base::Bind(&Stream::OnReadAvailable, weak_ptr_factory_.GetWeakPtr(), + buffer, size_to_read, success_callback, error_callback), + error); + return is_async_read_pending_; +} + +void Stream::OnReadAsyncDone( + const base::Callback<void(size_t, bool)>& success_callback, + size_t bytes_read, + bool eos) { + is_async_read_pending_ = false; + success_callback.Run(bytes_read, eos); +} + void Stream::OnReadAvailable( void* buffer, - size_t size, - const base::Callback<void(size_t)>& success_callback, + size_t size_to_read, + const base::Callback<void(size_t, bool)>& success_callback, const ErrorCallback& error_callback, AccessMode mode) { CHECK(stream_utils::IsReadAccessMode(mode)); CHECK(is_async_read_pending_); is_async_read_pending_ = false; ErrorPtr error; - size_t read = 0; - bool eos = false; - if (!ReadNonBlocking(buffer, size, &read, &eos, &error)) - return error_callback.Run(error.get()); + // Just reschedule the read operation but don't need to run the callback from + // the main loop since we are already running on a callback. + if (!ReadAsyncImpl(buffer, size_to_read, success_callback, error_callback, + &error, false)) { + error_callback.Run(error.get()); + } +} - // If we have some data read, or if we reached the end of stream, call - // the success callback. - if (read > 0 || eos) - return success_callback.Run(read); +bool Stream::WriteAsyncImpl( + const void* buffer, + size_t size_to_write, + const base::Callback<void(size_t)>& success_callback, + const ErrorCallback& error_callback, + ErrorPtr* error, + bool force_async_callback) { + CHECK(!is_async_write_pending_); + // We set this value to true early in the function so calling others will + // prevent us from calling WaitForData() to make calls to + // ReadAsync() fail while we run WaitForData(). + is_async_write_pending_ = true; + + size_t written = 0; + if (!WriteNonBlocking(buffer, size_to_write, &written, error)) + return false; - // Otherwise just reschedule the read operation. - if (!ReadAsync(buffer, size, success_callback, error_callback, &error)) - return error_callback.Run(error.get()); + if (written > 0) { + if (force_async_callback) { + MessageLoop::current()->PostTask( + FROM_HERE, + base::Bind(&Stream::OnWriteAsyncDone, weak_ptr_factory_.GetWeakPtr(), + success_callback, written)); + } else { + is_async_write_pending_ = false; + success_callback.Run(written); + } + return true; + } + is_async_write_pending_ = WaitForData( + AccessMode::WRITE, + base::Bind(&Stream::OnWriteAvailable, weak_ptr_factory_.GetWeakPtr(), + buffer, size_to_write, success_callback, error_callback), + error); + return is_async_write_pending_; +} - return; +void Stream::OnWriteAsyncDone( + const base::Callback<void(size_t)>& success_callback, + size_t size_written) { + is_async_write_pending_ = false; + success_callback.Run(size_written); } void Stream::OnWriteAvailable( @@ -210,38 +310,37 @@ void Stream::OnWriteAvailable( CHECK(is_async_write_pending_); is_async_write_pending_ = false; ErrorPtr error; - size_t written = 0; - if (!WriteNonBlocking(buffer, size, &written, &error)) + // Just reschedule the read operation but don't need to run the callback from + // the main loop since we are already running on a callback. + if (!WriteAsyncImpl(buffer, size, success_callback, error_callback, &error, + false)) { error_callback.Run(error.get()); - - if (written > 0) - return success_callback.Run(written); - - if (!WriteAsync(buffer, size, success_callback, error_callback, &error)) - return error_callback.Run(error.get()); - - return; + } } void Stream::ReadAllAsyncCallback(void* buffer, size_t size_to_read, const base::Closure& success_callback, const ErrorCallback& error_callback, - size_t size_read) { + size_t size_read, + bool eos) { ErrorPtr error; - if (size_to_read != 0 && size_read == 0) { + size_to_read -= size_read; + if (size_to_read != 0 && eos) { stream_utils::ErrorReadPastEndOfStream(FROM_HERE, &error); error_callback.Run(error.get()); return; } - size_to_read -= size_read; + if (size_to_read) { buffer = AdvancePointer(buffer, size_read); auto callback = base::Bind(&Stream::ReadAllAsyncCallback, weak_ptr_factory_.GetWeakPtr(), buffer, size_to_read, success_callback, error_callback); - if (!ReadAsync(buffer, size_to_read, callback, error_callback, &error)) + if (!ReadAsyncImpl(buffer, size_to_read, callback, error_callback, &error, + false)) { error_callback.Run(error.get()); + } } else { success_callback.Run(); } @@ -265,8 +364,10 @@ void Stream::WriteAllAsyncCallback(const void* buffer, auto callback = base::Bind(&Stream::WriteAllAsyncCallback, weak_ptr_factory_.GetWeakPtr(), buffer, size_to_write, success_callback, error_callback); - if (!WriteAsync(buffer, size_to_write, callback, error_callback, &error)) + if (!WriteAsyncImpl(buffer, size_to_write, callback, error_callback, &error, + false)) { error_callback.Run(error.get()); + } } else { success_callback.Run(); } diff --git a/chromeos/streams/stream.h b/chromeos/streams/stream.h index 3eec485..f1ad75a 100644 --- a/chromeos/streams/stream.h +++ b/chromeos/streams/stream.h @@ -12,8 +12,8 @@ #include <base/macros.h> #include <base/memory/weak_ptr.h> #include <base/time/time.h> -#include <chromeos/errors/error.h> #include <chromeos/chromeos_export.h> +#include <chromeos/errors/error.h> namespace chromeos { @@ -394,21 +394,79 @@ class CHROMEOS_EXPORT Stream { Stream() = default; private: + // Simple wrapper to call the externally exposed |success_callback| that only + // receives a size_t. + CHROMEOS_PRIVATE static void IgnoreEOSCallback( + const base::Callback<void(size_t)>& success_callback, + size_t read, + bool eos); + + // The internal implementation of ReadAsync() and ReadAllAsync(). + // Calls ReadNonBlocking and if there's no data available waits for it calling + // WaitForData(). The extra |force_async_callback| tell whether the success + // callback should be called from the main loop instead of directly from this + // method. This method only calls WaitForData() if ReadNonBlocking() returns a + // situation in which it would block (bytes_read = 0 and eos = false), + // preventing us from calling WaitForData() on streams that don't support such + // feature. + CHROMEOS_PRIVATE bool ReadAsyncImpl( + void* buffer, + size_t size_to_read, + const base::Callback<void(size_t, bool)>& success_callback, + const ErrorCallback& error_callback, + ErrorPtr* error, + bool force_async_callback); + + // Called from the main loop when the ReadAsyncImpl finished right away + // without waiting for data. We use this callback to call the + // |sucess_callback| but invalidate the callback if the Stream is destroyed + // while this call is waiting in the main loop. + CHROMEOS_PRIVATE void OnReadAsyncDone( + const base::Callback<void(size_t, bool)>& success_callback, + size_t bytes_read, + bool eos); + // Called from WaitForData() when read operations can be performed // without blocking (the type of operation is provided in |mode|). - void OnReadAvailable(void* buffer, - size_t size, - const base::Callback<void(size_t)>& success_callback, - const ErrorCallback& error_callback, - AccessMode mode); + CHROMEOS_PRIVATE void OnReadAvailable( + void* buffer, + size_t size_to_read, + const base::Callback<void(size_t, bool)>& success_callback, + const ErrorCallback& error_callback, + AccessMode mode); + + // The internal implementation of WriteAsync() and WriteAllAsync(). + // Calls WriteNonBlocking and if the write would block for it to not block + // calling WaitForData(). The extra |force_async_callback| tell whether the + // success callback should be called from the main loop instead of directly + // from this method. This method only calls WaitForData() if + // WriteNonBlocking() returns a situation in which it would block + // (size_written = 0 and eos = false), preventing us from calling + // WaitForData() on streams that don't support such feature. + CHROMEOS_PRIVATE bool WriteAsyncImpl( + const void* buffer, + size_t size_to_write, + const base::Callback<void(size_t)>& success_callback, + const ErrorCallback& error_callback, + ErrorPtr* error, + bool force_async_callback); + + // Called from the main loop when the WriteAsyncImpl finished right away + // without waiting for data. We use this callback to call the + // |sucess_callback| but invalidate the callback if the Stream is destroyed + // while this call is waiting in the main loop. + CHROMEOS_PRIVATE void OnWriteAsyncDone( + const base::Callback<void(size_t)>& success_callback, + size_t size_written); // Called from WaitForData() when write operations can be performed // without blocking (the type of operation is provided in |mode|). - void OnWriteAvailable(const void* buffer, - size_t size, - const base::Callback<void(size_t)>& success_callback, - const ErrorCallback& error_callback, - AccessMode mode); + CHROMEOS_PRIVATE void OnWriteAvailable( + const void* buffer, + size_t size, + const base::Callback<void(size_t)>& success_callback, + const ErrorCallback& error_callback, + AccessMode mode); // Helper callbacks to implement ReadAllAsync/WriteAllAsync. CHROMEOS_PRIVATE void ReadAllAsyncCallback( @@ -416,7 +474,8 @@ class CHROMEOS_EXPORT Stream { size_t size_to_read, const base::Closure& success_callback, const ErrorCallback& error_callback, - size_t size_read); + size_t size_read, + bool eos); CHROMEOS_PRIVATE void WriteAllAsyncCallback( const void* buffer, size_t size_to_write, diff --git a/chromeos/streams/stream_unittest.cc b/chromeos/streams/stream_unittest.cc index 9630d97..6485a17 100644 --- a/chromeos/streams/stream_unittest.cc +++ b/chromeos/streams/stream_unittest.cc @@ -11,6 +11,7 @@ #include <gtest/gtest.h> #include <chromeos/bind_lambda.h> +#include <chromeos/message_loops/fake_message_loop.h> #include <chromeos/streams/stream_errors.h> using testing::DoAll; @@ -22,8 +23,8 @@ using testing::_; namespace chromeos { -using Whence = Stream::Whence; using AccessMode = Stream::AccessMode; +using Whence = Stream::Whence; // To verify "non-trivial" methods implemented in Stream, mock out the // "trivial" methods to make sure the ones we are interested in testing @@ -97,22 +98,34 @@ TEST(Stream, SetPosition) { TEST(Stream, ReadAsync) { size_t read_size = 0; + bool succeeded = false; bool failed = false; - auto success_callback = [&read_size](size_t size) { read_size = size; }; + auto success_callback = [&read_size, &succeeded](size_t size) { + read_size = size; + succeeded = true; + }; auto error_callback = [&failed](const Error* error) { failed = true; }; MockStreamImpl stream_mock; base::Callback<void(AccessMode)> data_callback; char buf[10]; + // This sets up an initial non blocking read that would block, so ReadAsync() + // should wait for more data. + EXPECT_CALL(stream_mock, ReadNonBlocking(buf, 10, _, _, _)) + .WillOnce( + DoAll(SetArgPointee<2>(0), SetArgPointee<3>(false), Return(true))); EXPECT_CALL(stream_mock, WaitForData(AccessMode::READ, _, _)) .WillOnce(DoAll(SaveArg<1>(&data_callback), Return(true))); EXPECT_TRUE(stream_mock.ReadAsync(buf, sizeof(buf), base::Bind(success_callback), base::Bind(error_callback), nullptr)); EXPECT_EQ(0u, read_size); + EXPECT_FALSE(succeeded); EXPECT_FALSE(failed); + // Since the previous call is waiting for the data to be available, we can't + // schedule another read. ErrorPtr error; EXPECT_FALSE(stream_mock.ReadAsync(buf, sizeof(buf), base::Bind(success_callback), @@ -122,6 +135,8 @@ TEST(Stream, ReadAsync) { EXPECT_EQ("Another asynchronous operation is still pending", error->GetMessage()); + // Making the data available via data_callback should not schedule the + // success callback from the main loop and run it directly instead. EXPECT_CALL(stream_mock, ReadNonBlocking(buf, 10, _, _, _)) .WillOnce(DoAll(SetArgPointee<2>(7), SetArgPointee<3>(false), @@ -131,6 +146,46 @@ TEST(Stream, ReadAsync) { EXPECT_FALSE(failed); } +TEST(Stream, ReadAsync_DontWaitForData) { + bool succeeded = false; + bool failed = false; + auto success_callback = [&succeeded](size_t size) { succeeded = true; }; + auto error_callback = [&failed](const Error* error) { failed = true; }; + + MockStreamImpl stream_mock; + char buf[10]; + FakeMessageLoop fake_loop_{nullptr}; + fake_loop_.SetAsCurrent(); + + EXPECT_CALL(stream_mock, ReadNonBlocking(buf, 10, _, _, _)) + .WillOnce( + DoAll(SetArgPointee<2>(5), SetArgPointee<3>(false), Return(true))); + EXPECT_CALL(stream_mock, WaitForData(_, _, _)).Times(0); + EXPECT_TRUE(stream_mock.ReadAsync(buf, sizeof(buf), + base::Bind(success_callback), + base::Bind(error_callback), nullptr)); + // Even if ReadNonBlocking() returned some data without waiting, the + // |success_callback| should not run yet. + EXPECT_TRUE(fake_loop_.PendingTasks()); + EXPECT_FALSE(succeeded); + EXPECT_FALSE(failed); + + // Since the previous callback is still waiting in the main loop, we can't + // schedule another read yet. + ErrorPtr error; + EXPECT_FALSE(stream_mock.ReadAsync(buf, sizeof(buf), + base::Bind(success_callback), + base::Bind(error_callback), &error)); + EXPECT_EQ(errors::stream::kDomain, error->GetDomain()); + EXPECT_EQ(errors::stream::kOperationNotSupported, error->GetCode()); + EXPECT_EQ("Another asynchronous operation is still pending", + error->GetMessage()); + + fake_loop_.Run(); + EXPECT_TRUE(succeeded); + EXPECT_FALSE(failed); +} + TEST(Stream, ReadAllAsync) { bool succeeded = false; bool failed = false; @@ -141,6 +196,11 @@ TEST(Stream, ReadAllAsync) { base::Callback<void(AccessMode)> data_callback; char buf[10]; + // This sets up an initial non blocking read that would block, so + // ReadAllAsync() should wait for more data. + EXPECT_CALL(stream_mock, ReadNonBlocking(buf, 10, _, _, _)) + .WillOnce( + DoAll(SetArgPointee<2>(0), SetArgPointee<3>(false), Return(true))); EXPECT_CALL(stream_mock, WaitForData(AccessMode::READ, _, _)) .WillOnce(DoAll(SaveArg<1>(&data_callback), Return(true))); EXPECT_TRUE(stream_mock.ReadAllAsync(buf, sizeof(buf), @@ -149,16 +209,23 @@ TEST(Stream, ReadAllAsync) { nullptr)); EXPECT_FALSE(succeeded); EXPECT_FALSE(failed); + testing::Mock::VerifyAndClearExpectations(&stream_mock); + // ReadAllAsync() will try to read non blocking until the read would block + // before it waits for the data to be available again. EXPECT_CALL(stream_mock, ReadNonBlocking(buf, 10, _, _, _)) .WillOnce(DoAll(SetArgPointee<2>(7), SetArgPointee<3>(false), Return(true))); + EXPECT_CALL(stream_mock, ReadNonBlocking(buf + 7, 3, _, _, _)) + .WillOnce( + DoAll(SetArgPointee<2>(0), SetArgPointee<3>(false), Return(true))); EXPECT_CALL(stream_mock, WaitForData(AccessMode::READ, _, _)) .WillOnce(DoAll(SaveArg<1>(&data_callback), Return(true))); data_callback.Run(AccessMode::READ); EXPECT_FALSE(succeeded); EXPECT_FALSE(failed); + testing::Mock::VerifyAndClearExpectations(&stream_mock); EXPECT_CALL(stream_mock, ReadNonBlocking(buf + 7, 3, _, _, _)) .WillOnce(DoAll(SetArgPointee<2>(3), @@ -183,26 +250,22 @@ TEST(Stream, ReadAllAsync_EOS) { base::Callback<void(AccessMode)> data_callback; char buf[10]; + EXPECT_CALL(stream_mock, ReadNonBlocking(buf, 10, _, _, _)) + .WillOnce( + DoAll(SetArgPointee<2>(0), SetArgPointee<3>(false), Return(true))); EXPECT_CALL(stream_mock, WaitForData(AccessMode::READ, _, _)) .WillOnce(DoAll(SaveArg<1>(&data_callback), Return(true))); EXPECT_TRUE(stream_mock.ReadAllAsync(buf, sizeof(buf), base::Bind(success_callback), base::Bind(error_callback), nullptr)); + + // ReadAsyncAll() should finish and fail once ReadNonBlocking() returns an + // end-of-stream condition. EXPECT_CALL(stream_mock, ReadNonBlocking(buf, 10, _, _, _)) .WillOnce(DoAll(SetArgPointee<2>(7), SetArgPointee<3>(true), Return(true))); - EXPECT_CALL(stream_mock, WaitForData(AccessMode::READ, _, _)) - .WillOnce(DoAll(SaveArg<1>(&data_callback), Return(true))); - data_callback.Run(AccessMode::READ); - EXPECT_FALSE(succeeded); - EXPECT_FALSE(failed); - - EXPECT_CALL(stream_mock, ReadNonBlocking(buf + 7, 3, _, _, _)) - .WillOnce(DoAll(SetArgPointee<2>(0), - SetArgPointee<3>(true), - Return(true))); data_callback.Run(AccessMode::READ); EXPECT_FALSE(succeeded); EXPECT_TRUE(failed); @@ -292,9 +355,14 @@ TEST(Stream, WriteAsync) { auto error_callback = [&failed](const Error* error) { failed = true; }; MockStreamImpl stream_mock; + InSequence s; base::Callback<void(AccessMode)> data_callback; char buf[10] = {}; + // WriteNonBlocking returns a blocking situation (size_written = 0) so the + // WaitForData() is run. + EXPECT_CALL(stream_mock, WriteNonBlocking(buf, 10, _, _)) + .WillOnce(DoAll(SetArgPointee<2>(0), Return(true))); EXPECT_CALL(stream_mock, WaitForData(AccessMode::WRITE, _, _)) .WillOnce(DoAll(SaveArg<1>(&data_callback), Return(true))); EXPECT_TRUE(stream_mock.WriteAsync(buf, sizeof(buf), @@ -329,20 +397,26 @@ TEST(Stream, WriteAllAsync) { base::Callback<void(AccessMode)> data_callback; char buf[10] = {}; + EXPECT_CALL(stream_mock, WriteNonBlocking(buf, 10, _, _)) + .WillOnce(DoAll(SetArgPointee<2>(0), Return(true))); EXPECT_CALL(stream_mock, WaitForData(AccessMode::WRITE, _, _)) .WillOnce(DoAll(SaveArg<1>(&data_callback), Return(true))); EXPECT_TRUE(stream_mock.WriteAllAsync(buf, sizeof(buf), base::Bind(success_callback), base::Bind(error_callback), nullptr)); + testing::Mock::VerifyAndClearExpectations(&stream_mock); EXPECT_FALSE(succeeded); EXPECT_FALSE(failed); EXPECT_CALL(stream_mock, WriteNonBlocking(buf, 10, _, _)) .WillOnce(DoAll(SetArgPointee<2>(7), Return(true))); + EXPECT_CALL(stream_mock, WriteNonBlocking(buf + 7, 3, _, _)) + .WillOnce(DoAll(SetArgPointee<2>(0), Return(true))); EXPECT_CALL(stream_mock, WaitForData(AccessMode::WRITE, _, _)) .WillOnce(DoAll(SaveArg<1>(&data_callback), Return(true))); data_callback.Run(AccessMode::WRITE); + testing::Mock::VerifyAndClearExpectations(&stream_mock); EXPECT_FALSE(succeeded); EXPECT_FALSE(failed); |