diff options
author | Josh Gao <jmgao@google.com> | 2018-02-16 13:24:58 -0800 |
---|---|---|
committer | Josh Gao <jmgao@google.com> | 2018-03-05 13:00:28 -0800 |
commit | 0bbf69cbbf5359846ae577da53679ebba2961386 (patch) | |
tree | 898b66b9dade71bac5ce9c28e3c0d9b4972eb9db /adb/transport.h | |
parent | c7567fa52efe99a972c7c69a13949e0c9080b619 (diff) | |
download | system_core-0bbf69cbbf5359846ae577da53679ebba2961386.tar.gz system_core-0bbf69cbbf5359846ae577da53679ebba2961386.tar.bz2 system_core-0bbf69cbbf5359846ae577da53679ebba2961386.zip |
adb: convert Connection to a nonblocking interface.
Rename the existing Connection to BlockingConnection, add a nonblocking
Connection, and add an adapter between the two, to enable future work
to reduce the impedance mismatch from implementing a blocking interface
on top of nonblocking primitives.
While we're here, delete A_SYNC, and remove one layer of pipes when
sending a packet (replacing it with a condition variable when using
BlockingConnectionAdapter).
Test: python test_device.py, manually plugging/unplugging devices
Change-Id: Ieac2bf937471d9d494075575f07e53b589aba20a
Diffstat (limited to 'adb/transport.h')
-rw-r--r-- | adb/transport.h | 76 |
1 files changed, 64 insertions, 12 deletions
diff --git a/adb/transport.h b/adb/transport.h index 9700f445b..a492f008d 100644 --- a/adb/transport.h +++ b/adb/transport.h @@ -20,12 +20,14 @@ #include <sys/types.h> #include <atomic> +#include <condition_variable> #include <deque> #include <functional> #include <list> #include <memory> #include <mutex> #include <string> +#include <thread> #include <unordered_set> #include <openssl/rsa.h> @@ -57,15 +59,47 @@ extern const char* const kFeaturePushSync; TransportId NextTransportId(); -// Abstraction for a blocking packet transport. +// Abstraction for a non-blocking packet transport. struct Connection { Connection() = default; - Connection(const Connection& copy) = delete; - Connection(Connection&& move) = delete; - - // Destroy a Connection. Formerly known as 'Close' in atransport. virtual ~Connection() = default; + void SetTransportName(std::string transport_name) { + transport_name_ = std::move(transport_name); + } + + using ReadCallback = std::function<bool(Connection*, std::unique_ptr<apacket>)>; + void SetReadCallback(ReadCallback callback) { + CHECK(!read_callback_); + read_callback_ = callback; + } + + // Called after the Connection has terminated, either by an error or because Stop was called. + using ErrorCallback = std::function<void(Connection*, const std::string&)>; + void SetErrorCallback(ErrorCallback callback) { + CHECK(!error_callback_); + error_callback_ = callback; + } + + virtual bool Write(std::unique_ptr<apacket> packet) = 0; + + virtual void Start() = 0; + virtual void Stop() = 0; + + std::string transport_name_; + ReadCallback read_callback_; + ErrorCallback error_callback_; +}; + +// Abstraction for a blocking packet transport. +struct BlockingConnection { + BlockingConnection() = default; + BlockingConnection(const BlockingConnection& copy) = delete; + BlockingConnection(BlockingConnection&& move) = delete; + + // Destroy a BlockingConnection. Formerly known as 'Close' in atransport. + virtual ~BlockingConnection() = default; + // Read/Write a packet. These functions are concurrently called from a transport's reader/writer // threads. virtual bool Read(apacket* packet) = 0; @@ -77,7 +111,30 @@ struct Connection { virtual void Close() = 0; }; -struct FdConnection : public Connection { +struct BlockingConnectionAdapter : public Connection { + explicit BlockingConnectionAdapter(std::unique_ptr<BlockingConnection> connection); + + virtual ~BlockingConnectionAdapter(); + + virtual bool Write(std::unique_ptr<apacket> packet) override final; + + virtual void Start() override final; + virtual void Stop() override final; + + bool stopped_ = false; + + std::unique_ptr<BlockingConnection> underlying_; + std::thread read_thread_; + std::thread write_thread_; + + std::deque<std::unique_ptr<apacket>> write_queue_; + std::mutex mutex_; + std::condition_variable cv_; + + std::once_flag error_flag_; +}; + +struct FdConnection : public BlockingConnection { explicit FdConnection(unique_fd fd) : fd_(std::move(fd)) {} bool Read(apacket* packet) override final; @@ -89,7 +146,7 @@ struct FdConnection : public Connection { unique_fd fd_; }; -struct UsbConnection : public Connection { +struct UsbConnection : public BlockingConnection { explicit UsbConnection(usb_handle* handle) : handle_(handle) {} ~UsbConnection(); @@ -110,7 +167,6 @@ class atransport { atransport(ConnectionState state = kCsOffline) : id(NextTransportId()), connection_state_(state) { - transport_fde = {}; // Initialize protocol to min version for compatibility with older versions. // Version will be updated post-connect. protocol_version = A_VERSION_MIN; @@ -126,11 +182,7 @@ class atransport { void SetConnectionState(ConnectionState state); const TransportId id; - int fd = -1; - int transport_socket = -1; - fdevent transport_fde; size_t ref_count = 0; - uint32_t sync_token = 0; bool online = false; TransportType type = kTransportAny; |