diff options
author | Alex Vakulenko <avakulenko@google.com> | 2015-09-24 16:25:15 -0700 |
---|---|---|
committer | Alex Vakulenko <avakulenko@google.com> | 2015-09-24 16:58:50 -0700 |
commit | 49fb1ce7956110497bd7cad33b3a954faca4b77c (patch) | |
tree | 34b7e6c58c262307ed7c6ab16db0514aca7c036a | |
parent | ceda6cb7f3944c1bf10c73ab6252b940bcca996f (diff) | |
download | platform_external_libbrillo-49fb1ce7956110497bd7cad33b3a954faca4b77c.tar.gz platform_external_libbrillo-49fb1ce7956110497bd7cad33b3a954faca4b77c.tar.bz2 platform_external_libbrillo-49fb1ce7956110497bd7cad33b3a954faca4b77c.zip |
Add stream_utils::CopyData() to copy data between two streams
This is a useful utility function to copy data from one stream
into another. It can be used in a variety of cases, e.g. reading
data from a stream into memory - create memory stream and copy data
from source stream into it.
Change-Id: Iccdd29966efaf76b90f5d89faa5bfbf80e8586f3
-rw-r--r-- | chromeos/streams/stream_utils.cc | 96 | ||||
-rw-r--r-- | chromeos/streams/stream_utils.h | 30 | ||||
-rw-r--r-- | chromeos/streams/stream_utils_unittest.cc | 182 |
3 files changed, 308 insertions, 0 deletions
diff --git a/chromeos/streams/stream_utils.cc b/chromeos/streams/stream_utils.cc index 1b2004c..b9e9847 100644 --- a/chromeos/streams/stream_utils.cc +++ b/chromeos/streams/stream_utils.cc @@ -6,11 +6,80 @@ #include <limits> +#include <base/bind.h> +#include <chromeos/message_loops/message_loop.h> #include <chromeos/streams/stream_errors.h> namespace chromeos { namespace stream_utils { +namespace { + +// Status of asynchronous CopyData operation. +struct CopyDataState { + chromeos::StreamPtr in_stream; + chromeos::StreamPtr out_stream; + std::vector<uint8_t> buffer; + uint64_t remaining_to_copy; + uint64_t size_copied; + CopyDataSuccessCallback success_callback; + CopyDataErrorCallback error_callback; +}; + +// Async CopyData I/O error callback. +void OnCopyDataError(const std::shared_ptr<CopyDataState>& state, + const chromeos::Error* error) { + state->error_callback.Run(std::move(state->in_stream), + std::move(state->out_stream), error); +} + +// Forward declaration. +void PerformRead(const std::shared_ptr<CopyDataState>& state); + +// Callback from read operation for CopyData. Writes the read data to the output +// stream and invokes PerformRead when done to restart the copy cycle. +void PerformWrite(const std::shared_ptr<CopyDataState>& state, size_t size) { + if (size == 0) { + state->success_callback.Run(std::move(state->in_stream), + std::move(state->out_stream), + state->size_copied); + return; + } + state->size_copied += size; + CHECK_GE(state->remaining_to_copy, size); + state->remaining_to_copy -= size; + + chromeos::ErrorPtr error; + bool success = state->out_stream->WriteAllAsync( + state->buffer.data(), size, base::Bind(&PerformRead, state), + base::Bind(&OnCopyDataError, state), &error); + + if (!success) + OnCopyDataError(state, error.get()); +} + +// Performs the read part of asynchronous CopyData operation. Reads the data +// from input stream and invokes PerformWrite when done to write the data to +// the output stream. +void PerformRead(const std::shared_ptr<CopyDataState>& state) { + chromeos::ErrorPtr error; + const uint64_t buffer_size = state->buffer.size(); + // |buffer_size| is guaranteed to fit in size_t, so |size_to_read| value will + // also not overflow size_t, so the static_cast below is safe. + size_t size_to_read = + static_cast<size_t>(std::min(buffer_size, state->remaining_to_copy)); + if (size_to_read == 0) + return PerformWrite(state, 0); // Nothing more to read. Finish operation. + bool success = state->in_stream->ReadAsync( + state->buffer.data(), size_to_read, base::Bind(PerformWrite, state), + base::Bind(OnCopyDataError, state), &error); + + if (!success) + OnCopyDataError(state, error.get()); +} + +} // anonymous namespace + bool ErrorStreamClosed(const tracked_objects::Location& location, ErrorPtr* error) { Error::AddTo(error, @@ -117,5 +186,32 @@ bool CalculateStreamPosition(const tracked_objects::Location& location, return true; } +void CopyData(StreamPtr in_stream, + StreamPtr out_stream, + const CopyDataSuccessCallback& success_callback, + const CopyDataErrorCallback& error_callback) { + CopyData(std::move(in_stream), std::move(out_stream), + std::numeric_limits<uint64_t>::max(), 4096, success_callback, + error_callback); +} + +void CopyData(StreamPtr in_stream, + StreamPtr out_stream, + uint64_t max_size_to_copy, + size_t buffer_size, + const CopyDataSuccessCallback& success_callback, + const CopyDataErrorCallback& error_callback) { + auto state = std::make_shared<CopyDataState>(); + state->in_stream = std::move(in_stream); + state->out_stream = std::move(out_stream); + state->buffer.resize(buffer_size); + state->remaining_to_copy = max_size_to_copy; + state->size_copied = 0; + state->success_callback = success_callback; + state->error_callback = error_callback; + chromeos::MessageLoop::current()->PostTask(FROM_HERE, + base::Bind(&PerformRead, state)); +} + } // namespace stream_utils } // namespace chromeos diff --git a/chromeos/streams/stream_utils.h b/chromeos/streams/stream_utils.h index 09ff38c..0e85887 100644 --- a/chromeos/streams/stream_utils.h +++ b/chromeos/streams/stream_utils.h @@ -78,6 +78,36 @@ inline Stream::AccessMode MakeAccessMode(bool read, bool write) { return write ? Stream::AccessMode::WRITE : Stream::AccessMode::READ; } +using CopyDataSuccessCallback = + base::Callback<void(StreamPtr, StreamPtr, uint64_t)>; +using CopyDataErrorCallback = + base::Callback<void(StreamPtr, StreamPtr, const chromeos::Error*)>; + +// Asynchronously copies data from input stream to output stream until all the +// data from the input stream is read. The function takes ownership of both +// streams for the duration of the operation and then gives them back when +// either the |success_callback| or |error_callback| is called. +// |success_callback| also provides the number of bytes actually copied. +// This variant of CopyData uses internal buffer of 4 KiB for the operation. +CHROMEOS_EXPORT void CopyData(StreamPtr in_stream, + StreamPtr out_stream, + const CopyDataSuccessCallback& success_callback, + const CopyDataErrorCallback& error_callback); + +// Asynchronously copies data from input stream to output stream until the +// maximum amount of data specified in |max_size_to_copy| is copied or the end +// of the input stream is encountered. The function takes ownership of both +// streams for the duration of the operation and then gives them back when +// either the |success_callback| or |error_callback| is called. +// |success_callback| also provides the number of bytes actually copied. +// |buffer_size| specifies the size of the read buffer to use for the operation. +CHROMEOS_EXPORT void CopyData(StreamPtr in_stream, + StreamPtr out_stream, + uint64_t max_size_to_copy, + size_t buffer_size, + const CopyDataSuccessCallback& success_callback, + const CopyDataErrorCallback& error_callback); + } // namespace stream_utils } // namespace chromeos diff --git a/chromeos/streams/stream_utils_unittest.cc b/chromeos/streams/stream_utils_unittest.cc index 5af404f..1de4fc3 100644 --- a/chromeos/streams/stream_utils_unittest.cc +++ b/chromeos/streams/stream_utils_unittest.cc @@ -6,9 +6,45 @@ #include <limits> +#include <base/bind.h> +#include <chromeos/message_loops/fake_message_loop.h> +#include <chromeos/message_loops/message_loop.h> +#include <chromeos/streams/mock_stream.h> #include <chromeos/streams/stream_errors.h> +#include <gmock/gmock.h> #include <gtest/gtest.h> +using testing::DoAll; +using testing::InSequence; +using testing::Return; +using testing::StrictMock; +using testing::_; + +ACTION_TEMPLATE(InvokeAsyncCallback, + HAS_1_TEMPLATE_PARAMS(int, k), + AND_1_VALUE_PARAMS(size)) { + chromeos::MessageLoop::current()->PostTask( + FROM_HERE, base::Bind(std::get<k>(args), size)); + return true; +} + +ACTION_TEMPLATE(InvokeAsyncCallback, + HAS_1_TEMPLATE_PARAMS(int, k), + AND_0_VALUE_PARAMS()) { + chromeos::MessageLoop::current()->PostTask(FROM_HERE, std::get<k>(args)); + return true; +} + +ACTION_TEMPLATE(InvokeAsyncErrorCallback, + HAS_1_TEMPLATE_PARAMS(int, k), + AND_1_VALUE_PARAMS(code)) { + chromeos::ErrorPtr error; + chromeos::Error::AddTo(&error, FROM_HERE, "test", code, "message"); + chromeos::MessageLoop::current()->PostTask( + FROM_HERE, base::Bind(std::get<k>(args), base::Owned(error.release()))); + return true; +} + namespace chromeos { TEST(StreamUtils, ErrorStreamClosed) { @@ -115,4 +151,150 @@ TEST(StreamUtils, CalculateStreamPosition) { FROM_HERE, 1, Whence::FROM_CURRENT, max_int64, end_pos, &pos, nullptr)); } +class CopyStreamDataTest : public testing::Test { + public: + void SetUp() override { + fake_loop_.SetAsCurrent(); + in_stream_.reset(new StrictMock<MockStream>{}); + out_stream_.reset(new StrictMock<MockStream>{}); + } + + FakeMessageLoop fake_loop_{nullptr}; + std::unique_ptr<StrictMock<MockStream>> in_stream_; + std::unique_ptr<StrictMock<MockStream>> out_stream_; + bool succeeded_{false}; + bool failed_{false}; + + void OnSuccess(uint64_t expected, + StreamPtr in_stream, + StreamPtr out_stream, + uint64_t copied) { + EXPECT_EQ(expected, copied); + succeeded_ = true; + } + + void OnError(const std::string& expected_error, + StreamPtr in_stream, + StreamPtr out_stream, + const Error* error) { + EXPECT_EQ(expected_error, error->GetCode()); + failed_ = true; + } + + void ExpectSuccess() { + EXPECT_TRUE(succeeded_); + EXPECT_FALSE(failed_); + } + + void ExpectFailure() { + EXPECT_FALSE(succeeded_); + EXPECT_TRUE(failed_); + } +}; + +TEST_F(CopyStreamDataTest, CopyAllAtOnce) { + { + InSequence seq; + EXPECT_CALL(*in_stream_, ReadAsync(_, 100, _, _, _)) + .WillOnce(InvokeAsyncCallback<2>(100)); + EXPECT_CALL(*out_stream_, WriteAllAsync(_, 100, _, _, _)) + .WillOnce(InvokeAsyncCallback<2>()); + } + stream_utils::CopyData( + std::move(in_stream_), std::move(out_stream_), 100, 4096, + base::Bind(&CopyStreamDataTest::OnSuccess, base::Unretained(this), 100), + base::Bind(&CopyStreamDataTest::OnError, base::Unretained(this), "")); + fake_loop_.Run(); + ExpectSuccess(); +} + +TEST_F(CopyStreamDataTest, CopyInBlocks) { + { + InSequence seq; + EXPECT_CALL(*in_stream_, ReadAsync(_, 100, _, _, _)) + .WillOnce(InvokeAsyncCallback<2>(60)); + EXPECT_CALL(*out_stream_, WriteAllAsync(_, 60, _, _, _)) + .WillOnce(InvokeAsyncCallback<2>()); + EXPECT_CALL(*in_stream_, ReadAsync(_, 40, _, _, _)) + .WillOnce(InvokeAsyncCallback<2>(40)); + EXPECT_CALL(*out_stream_, WriteAllAsync(_, 40, _, _, _)) + .WillOnce(InvokeAsyncCallback<2>()); + } + stream_utils::CopyData( + std::move(in_stream_), std::move(out_stream_), 100, 4096, + base::Bind(&CopyStreamDataTest::OnSuccess, base::Unretained(this), 100), + base::Bind(&CopyStreamDataTest::OnError, base::Unretained(this), "")); + fake_loop_.Run(); + ExpectSuccess(); +} + +TEST_F(CopyStreamDataTest, CopyTillEndOfStream) { + { + InSequence seq; + EXPECT_CALL(*in_stream_, ReadAsync(_, 100, _, _, _)) + .WillOnce(InvokeAsyncCallback<2>(60)); + EXPECT_CALL(*out_stream_, WriteAllAsync(_, 60, _, _, _)) + .WillOnce(InvokeAsyncCallback<2>()); + EXPECT_CALL(*in_stream_, ReadAsync(_, 40, _, _, _)) + .WillOnce(InvokeAsyncCallback<2>(0)); + } + stream_utils::CopyData( + std::move(in_stream_), std::move(out_stream_), 100, 4096, + base::Bind(&CopyStreamDataTest::OnSuccess, base::Unretained(this), 60), + base::Bind(&CopyStreamDataTest::OnError, base::Unretained(this), "")); + fake_loop_.Run(); + ExpectSuccess(); +} + +TEST_F(CopyStreamDataTest, CopyInSmallBlocks) { + { + InSequence seq; + EXPECT_CALL(*in_stream_, ReadAsync(_, 60, _, _, _)) + .WillOnce(InvokeAsyncCallback<2>(60)); + EXPECT_CALL(*out_stream_, WriteAllAsync(_, 60, _, _, _)) + .WillOnce(InvokeAsyncCallback<2>()); + EXPECT_CALL(*in_stream_, ReadAsync(_, 40, _, _, _)) + .WillOnce(InvokeAsyncCallback<2>(40)); + EXPECT_CALL(*out_stream_, WriteAllAsync(_, 40, _, _, _)) + .WillOnce(InvokeAsyncCallback<2>()); + } + stream_utils::CopyData( + std::move(in_stream_), std::move(out_stream_), 100, 60, + base::Bind(&CopyStreamDataTest::OnSuccess, base::Unretained(this), 100), + base::Bind(&CopyStreamDataTest::OnError, base::Unretained(this), "")); + fake_loop_.Run(); + ExpectSuccess(); +} + +TEST_F(CopyStreamDataTest, ErrorRead) { + { + InSequence seq; + EXPECT_CALL(*in_stream_, ReadAsync(_, 60, _, _, _)) + .WillOnce(InvokeAsyncErrorCallback<3>("read")); + } + stream_utils::CopyData( + std::move(in_stream_), std::move(out_stream_), 100, 60, + base::Bind(&CopyStreamDataTest::OnSuccess, base::Unretained(this), 0), + base::Bind(&CopyStreamDataTest::OnError, base::Unretained(this), "read")); + fake_loop_.Run(); + ExpectFailure(); +} + +TEST_F(CopyStreamDataTest, ErrorWrite) { + { + InSequence seq; + EXPECT_CALL(*in_stream_, ReadAsync(_, 60, _, _, _)) + .WillOnce(InvokeAsyncCallback<2>(60)); + EXPECT_CALL(*out_stream_, WriteAllAsync(_, 60, _, _, _)) + .WillOnce(InvokeAsyncErrorCallback<3>("write")); + } + stream_utils::CopyData( + std::move(in_stream_), std::move(out_stream_), 100, 60, + base::Bind(&CopyStreamDataTest::OnSuccess, base::Unretained(this), 0), + base::Bind(&CopyStreamDataTest::OnError, base::Unretained(this), + "write")); + fake_loop_.Run(); + ExpectFailure(); +} + } // namespace chromeos |