aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlex Vakulenko <avakulenko@google.com>2015-09-24 16:25:15 -0700
committerAlex Vakulenko <avakulenko@google.com>2015-09-24 16:58:50 -0700
commit49fb1ce7956110497bd7cad33b3a954faca4b77c (patch)
tree34b7e6c58c262307ed7c6ab16db0514aca7c036a
parentceda6cb7f3944c1bf10c73ab6252b940bcca996f (diff)
downloadplatform_external_libbrillo-49fb1ce7956110497bd7cad33b3a954faca4b77c.tar.gz
platform_external_libbrillo-49fb1ce7956110497bd7cad33b3a954faca4b77c.tar.bz2
platform_external_libbrillo-49fb1ce7956110497bd7cad33b3a954faca4b77c.zip
Add stream_utils::CopyData() to copy data between two streams
This is a useful utility function to copy data from one stream into another. It can be used in a variety of cases, e.g. reading data from a stream into memory - create memory stream and copy data from source stream into it. Change-Id: Iccdd29966efaf76b90f5d89faa5bfbf80e8586f3
-rw-r--r--chromeos/streams/stream_utils.cc96
-rw-r--r--chromeos/streams/stream_utils.h30
-rw-r--r--chromeos/streams/stream_utils_unittest.cc182
3 files changed, 308 insertions, 0 deletions
diff --git a/chromeos/streams/stream_utils.cc b/chromeos/streams/stream_utils.cc
index 1b2004c..b9e9847 100644
--- a/chromeos/streams/stream_utils.cc
+++ b/chromeos/streams/stream_utils.cc
@@ -6,11 +6,80 @@
#include <limits>
+#include <base/bind.h>
+#include <chromeos/message_loops/message_loop.h>
#include <chromeos/streams/stream_errors.h>
namespace chromeos {
namespace stream_utils {
+namespace {
+
+// Status of asynchronous CopyData operation.
+struct CopyDataState {
+ chromeos::StreamPtr in_stream;
+ chromeos::StreamPtr out_stream;
+ std::vector<uint8_t> buffer;
+ uint64_t remaining_to_copy;
+ uint64_t size_copied;
+ CopyDataSuccessCallback success_callback;
+ CopyDataErrorCallback error_callback;
+};
+
+// Async CopyData I/O error callback.
+void OnCopyDataError(const std::shared_ptr<CopyDataState>& state,
+ const chromeos::Error* error) {
+ state->error_callback.Run(std::move(state->in_stream),
+ std::move(state->out_stream), error);
+}
+
+// Forward declaration.
+void PerformRead(const std::shared_ptr<CopyDataState>& state);
+
+// Callback from read operation for CopyData. Writes the read data to the output
+// stream and invokes PerformRead when done to restart the copy cycle.
+void PerformWrite(const std::shared_ptr<CopyDataState>& state, size_t size) {
+ if (size == 0) {
+ state->success_callback.Run(std::move(state->in_stream),
+ std::move(state->out_stream),
+ state->size_copied);
+ return;
+ }
+ state->size_copied += size;
+ CHECK_GE(state->remaining_to_copy, size);
+ state->remaining_to_copy -= size;
+
+ chromeos::ErrorPtr error;
+ bool success = state->out_stream->WriteAllAsync(
+ state->buffer.data(), size, base::Bind(&PerformRead, state),
+ base::Bind(&OnCopyDataError, state), &error);
+
+ if (!success)
+ OnCopyDataError(state, error.get());
+}
+
+// Performs the read part of asynchronous CopyData operation. Reads the data
+// from input stream and invokes PerformWrite when done to write the data to
+// the output stream.
+void PerformRead(const std::shared_ptr<CopyDataState>& state) {
+ chromeos::ErrorPtr error;
+ const uint64_t buffer_size = state->buffer.size();
+ // |buffer_size| is guaranteed to fit in size_t, so |size_to_read| value will
+ // also not overflow size_t, so the static_cast below is safe.
+ size_t size_to_read =
+ static_cast<size_t>(std::min(buffer_size, state->remaining_to_copy));
+ if (size_to_read == 0)
+ return PerformWrite(state, 0); // Nothing more to read. Finish operation.
+ bool success = state->in_stream->ReadAsync(
+ state->buffer.data(), size_to_read, base::Bind(PerformWrite, state),
+ base::Bind(OnCopyDataError, state), &error);
+
+ if (!success)
+ OnCopyDataError(state, error.get());
+}
+
+} // anonymous namespace
+
bool ErrorStreamClosed(const tracked_objects::Location& location,
ErrorPtr* error) {
Error::AddTo(error,
@@ -117,5 +186,32 @@ bool CalculateStreamPosition(const tracked_objects::Location& location,
return true;
}
+void CopyData(StreamPtr in_stream,
+ StreamPtr out_stream,
+ const CopyDataSuccessCallback& success_callback,
+ const CopyDataErrorCallback& error_callback) {
+ CopyData(std::move(in_stream), std::move(out_stream),
+ std::numeric_limits<uint64_t>::max(), 4096, success_callback,
+ error_callback);
+}
+
+void CopyData(StreamPtr in_stream,
+ StreamPtr out_stream,
+ uint64_t max_size_to_copy,
+ size_t buffer_size,
+ const CopyDataSuccessCallback& success_callback,
+ const CopyDataErrorCallback& error_callback) {
+ auto state = std::make_shared<CopyDataState>();
+ state->in_stream = std::move(in_stream);
+ state->out_stream = std::move(out_stream);
+ state->buffer.resize(buffer_size);
+ state->remaining_to_copy = max_size_to_copy;
+ state->size_copied = 0;
+ state->success_callback = success_callback;
+ state->error_callback = error_callback;
+ chromeos::MessageLoop::current()->PostTask(FROM_HERE,
+ base::Bind(&PerformRead, state));
+}
+
} // namespace stream_utils
} // namespace chromeos
diff --git a/chromeos/streams/stream_utils.h b/chromeos/streams/stream_utils.h
index 09ff38c..0e85887 100644
--- a/chromeos/streams/stream_utils.h
+++ b/chromeos/streams/stream_utils.h
@@ -78,6 +78,36 @@ inline Stream::AccessMode MakeAccessMode(bool read, bool write) {
return write ? Stream::AccessMode::WRITE : Stream::AccessMode::READ;
}
+using CopyDataSuccessCallback =
+ base::Callback<void(StreamPtr, StreamPtr, uint64_t)>;
+using CopyDataErrorCallback =
+ base::Callback<void(StreamPtr, StreamPtr, const chromeos::Error*)>;
+
+// Asynchronously copies data from input stream to output stream until all the
+// data from the input stream is read. The function takes ownership of both
+// streams for the duration of the operation and then gives them back when
+// either the |success_callback| or |error_callback| is called.
+// |success_callback| also provides the number of bytes actually copied.
+// This variant of CopyData uses internal buffer of 4 KiB for the operation.
+CHROMEOS_EXPORT void CopyData(StreamPtr in_stream,
+ StreamPtr out_stream,
+ const CopyDataSuccessCallback& success_callback,
+ const CopyDataErrorCallback& error_callback);
+
+// Asynchronously copies data from input stream to output stream until the
+// maximum amount of data specified in |max_size_to_copy| is copied or the end
+// of the input stream is encountered. The function takes ownership of both
+// streams for the duration of the operation and then gives them back when
+// either the |success_callback| or |error_callback| is called.
+// |success_callback| also provides the number of bytes actually copied.
+// |buffer_size| specifies the size of the read buffer to use for the operation.
+CHROMEOS_EXPORT void CopyData(StreamPtr in_stream,
+ StreamPtr out_stream,
+ uint64_t max_size_to_copy,
+ size_t buffer_size,
+ const CopyDataSuccessCallback& success_callback,
+ const CopyDataErrorCallback& error_callback);
+
} // namespace stream_utils
} // namespace chromeos
diff --git a/chromeos/streams/stream_utils_unittest.cc b/chromeos/streams/stream_utils_unittest.cc
index 5af404f..1de4fc3 100644
--- a/chromeos/streams/stream_utils_unittest.cc
+++ b/chromeos/streams/stream_utils_unittest.cc
@@ -6,9 +6,45 @@
#include <limits>
+#include <base/bind.h>
+#include <chromeos/message_loops/fake_message_loop.h>
+#include <chromeos/message_loops/message_loop.h>
+#include <chromeos/streams/mock_stream.h>
#include <chromeos/streams/stream_errors.h>
+#include <gmock/gmock.h>
#include <gtest/gtest.h>
+using testing::DoAll;
+using testing::InSequence;
+using testing::Return;
+using testing::StrictMock;
+using testing::_;
+
+ACTION_TEMPLATE(InvokeAsyncCallback,
+ HAS_1_TEMPLATE_PARAMS(int, k),
+ AND_1_VALUE_PARAMS(size)) {
+ chromeos::MessageLoop::current()->PostTask(
+ FROM_HERE, base::Bind(std::get<k>(args), size));
+ return true;
+}
+
+ACTION_TEMPLATE(InvokeAsyncCallback,
+ HAS_1_TEMPLATE_PARAMS(int, k),
+ AND_0_VALUE_PARAMS()) {
+ chromeos::MessageLoop::current()->PostTask(FROM_HERE, std::get<k>(args));
+ return true;
+}
+
+ACTION_TEMPLATE(InvokeAsyncErrorCallback,
+ HAS_1_TEMPLATE_PARAMS(int, k),
+ AND_1_VALUE_PARAMS(code)) {
+ chromeos::ErrorPtr error;
+ chromeos::Error::AddTo(&error, FROM_HERE, "test", code, "message");
+ chromeos::MessageLoop::current()->PostTask(
+ FROM_HERE, base::Bind(std::get<k>(args), base::Owned(error.release())));
+ return true;
+}
+
namespace chromeos {
TEST(StreamUtils, ErrorStreamClosed) {
@@ -115,4 +151,150 @@ TEST(StreamUtils, CalculateStreamPosition) {
FROM_HERE, 1, Whence::FROM_CURRENT, max_int64, end_pos, &pos, nullptr));
}
+class CopyStreamDataTest : public testing::Test {
+ public:
+ void SetUp() override {
+ fake_loop_.SetAsCurrent();
+ in_stream_.reset(new StrictMock<MockStream>{});
+ out_stream_.reset(new StrictMock<MockStream>{});
+ }
+
+ FakeMessageLoop fake_loop_{nullptr};
+ std::unique_ptr<StrictMock<MockStream>> in_stream_;
+ std::unique_ptr<StrictMock<MockStream>> out_stream_;
+ bool succeeded_{false};
+ bool failed_{false};
+
+ void OnSuccess(uint64_t expected,
+ StreamPtr in_stream,
+ StreamPtr out_stream,
+ uint64_t copied) {
+ EXPECT_EQ(expected, copied);
+ succeeded_ = true;
+ }
+
+ void OnError(const std::string& expected_error,
+ StreamPtr in_stream,
+ StreamPtr out_stream,
+ const Error* error) {
+ EXPECT_EQ(expected_error, error->GetCode());
+ failed_ = true;
+ }
+
+ void ExpectSuccess() {
+ EXPECT_TRUE(succeeded_);
+ EXPECT_FALSE(failed_);
+ }
+
+ void ExpectFailure() {
+ EXPECT_FALSE(succeeded_);
+ EXPECT_TRUE(failed_);
+ }
+};
+
+TEST_F(CopyStreamDataTest, CopyAllAtOnce) {
+ {
+ InSequence seq;
+ EXPECT_CALL(*in_stream_, ReadAsync(_, 100, _, _, _))
+ .WillOnce(InvokeAsyncCallback<2>(100));
+ EXPECT_CALL(*out_stream_, WriteAllAsync(_, 100, _, _, _))
+ .WillOnce(InvokeAsyncCallback<2>());
+ }
+ stream_utils::CopyData(
+ std::move(in_stream_), std::move(out_stream_), 100, 4096,
+ base::Bind(&CopyStreamDataTest::OnSuccess, base::Unretained(this), 100),
+ base::Bind(&CopyStreamDataTest::OnError, base::Unretained(this), ""));
+ fake_loop_.Run();
+ ExpectSuccess();
+}
+
+TEST_F(CopyStreamDataTest, CopyInBlocks) {
+ {
+ InSequence seq;
+ EXPECT_CALL(*in_stream_, ReadAsync(_, 100, _, _, _))
+ .WillOnce(InvokeAsyncCallback<2>(60));
+ EXPECT_CALL(*out_stream_, WriteAllAsync(_, 60, _, _, _))
+ .WillOnce(InvokeAsyncCallback<2>());
+ EXPECT_CALL(*in_stream_, ReadAsync(_, 40, _, _, _))
+ .WillOnce(InvokeAsyncCallback<2>(40));
+ EXPECT_CALL(*out_stream_, WriteAllAsync(_, 40, _, _, _))
+ .WillOnce(InvokeAsyncCallback<2>());
+ }
+ stream_utils::CopyData(
+ std::move(in_stream_), std::move(out_stream_), 100, 4096,
+ base::Bind(&CopyStreamDataTest::OnSuccess, base::Unretained(this), 100),
+ base::Bind(&CopyStreamDataTest::OnError, base::Unretained(this), ""));
+ fake_loop_.Run();
+ ExpectSuccess();
+}
+
+TEST_F(CopyStreamDataTest, CopyTillEndOfStream) {
+ {
+ InSequence seq;
+ EXPECT_CALL(*in_stream_, ReadAsync(_, 100, _, _, _))
+ .WillOnce(InvokeAsyncCallback<2>(60));
+ EXPECT_CALL(*out_stream_, WriteAllAsync(_, 60, _, _, _))
+ .WillOnce(InvokeAsyncCallback<2>());
+ EXPECT_CALL(*in_stream_, ReadAsync(_, 40, _, _, _))
+ .WillOnce(InvokeAsyncCallback<2>(0));
+ }
+ stream_utils::CopyData(
+ std::move(in_stream_), std::move(out_stream_), 100, 4096,
+ base::Bind(&CopyStreamDataTest::OnSuccess, base::Unretained(this), 60),
+ base::Bind(&CopyStreamDataTest::OnError, base::Unretained(this), ""));
+ fake_loop_.Run();
+ ExpectSuccess();
+}
+
+TEST_F(CopyStreamDataTest, CopyInSmallBlocks) {
+ {
+ InSequence seq;
+ EXPECT_CALL(*in_stream_, ReadAsync(_, 60, _, _, _))
+ .WillOnce(InvokeAsyncCallback<2>(60));
+ EXPECT_CALL(*out_stream_, WriteAllAsync(_, 60, _, _, _))
+ .WillOnce(InvokeAsyncCallback<2>());
+ EXPECT_CALL(*in_stream_, ReadAsync(_, 40, _, _, _))
+ .WillOnce(InvokeAsyncCallback<2>(40));
+ EXPECT_CALL(*out_stream_, WriteAllAsync(_, 40, _, _, _))
+ .WillOnce(InvokeAsyncCallback<2>());
+ }
+ stream_utils::CopyData(
+ std::move(in_stream_), std::move(out_stream_), 100, 60,
+ base::Bind(&CopyStreamDataTest::OnSuccess, base::Unretained(this), 100),
+ base::Bind(&CopyStreamDataTest::OnError, base::Unretained(this), ""));
+ fake_loop_.Run();
+ ExpectSuccess();
+}
+
+TEST_F(CopyStreamDataTest, ErrorRead) {
+ {
+ InSequence seq;
+ EXPECT_CALL(*in_stream_, ReadAsync(_, 60, _, _, _))
+ .WillOnce(InvokeAsyncErrorCallback<3>("read"));
+ }
+ stream_utils::CopyData(
+ std::move(in_stream_), std::move(out_stream_), 100, 60,
+ base::Bind(&CopyStreamDataTest::OnSuccess, base::Unretained(this), 0),
+ base::Bind(&CopyStreamDataTest::OnError, base::Unretained(this), "read"));
+ fake_loop_.Run();
+ ExpectFailure();
+}
+
+TEST_F(CopyStreamDataTest, ErrorWrite) {
+ {
+ InSequence seq;
+ EXPECT_CALL(*in_stream_, ReadAsync(_, 60, _, _, _))
+ .WillOnce(InvokeAsyncCallback<2>(60));
+ EXPECT_CALL(*out_stream_, WriteAllAsync(_, 60, _, _, _))
+ .WillOnce(InvokeAsyncErrorCallback<3>("write"));
+ }
+ stream_utils::CopyData(
+ std::move(in_stream_), std::move(out_stream_), 100, 60,
+ base::Bind(&CopyStreamDataTest::OnSuccess, base::Unretained(this), 0),
+ base::Bind(&CopyStreamDataTest::OnError, base::Unretained(this),
+ "write"));
+ fake_loop_.Run();
+ ExpectFailure();
+}
+
} // namespace chromeos