summaryrefslogtreecommitdiffstats
path: root/adb/transport.h
diff options
context:
space:
mode:
authorJosh Gao <jmgao@google.com>2018-02-16 13:24:58 -0800
committerJosh Gao <jmgao@google.com>2018-03-05 13:00:28 -0800
commit0bbf69cbbf5359846ae577da53679ebba2961386 (patch)
tree898b66b9dade71bac5ce9c28e3c0d9b4972eb9db /adb/transport.h
parentc7567fa52efe99a972c7c69a13949e0c9080b619 (diff)
downloadsystem_core-0bbf69cbbf5359846ae577da53679ebba2961386.tar.gz
system_core-0bbf69cbbf5359846ae577da53679ebba2961386.tar.bz2
system_core-0bbf69cbbf5359846ae577da53679ebba2961386.zip
adb: convert Connection to a nonblocking interface.
Rename the existing Connection to BlockingConnection, add a nonblocking Connection, and add an adapter between the two, to enable future work to reduce the impedance mismatch from implementing a blocking interface on top of nonblocking primitives. While we're here, delete A_SYNC, and remove one layer of pipes when sending a packet (replacing it with a condition variable when using BlockingConnectionAdapter). Test: python test_device.py, manually plugging/unplugging devices Change-Id: Ieac2bf937471d9d494075575f07e53b589aba20a
Diffstat (limited to 'adb/transport.h')
-rw-r--r--adb/transport.h76
1 files changed, 64 insertions, 12 deletions
diff --git a/adb/transport.h b/adb/transport.h
index 9700f445b..a492f008d 100644
--- a/adb/transport.h
+++ b/adb/transport.h
@@ -20,12 +20,14 @@
#include <sys/types.h>
#include <atomic>
+#include <condition_variable>
#include <deque>
#include <functional>
#include <list>
#include <memory>
#include <mutex>
#include <string>
+#include <thread>
#include <unordered_set>
#include <openssl/rsa.h>
@@ -57,15 +59,47 @@ extern const char* const kFeaturePushSync;
TransportId NextTransportId();
-// Abstraction for a blocking packet transport.
+// Abstraction for a non-blocking packet transport.
struct Connection {
Connection() = default;
- Connection(const Connection& copy) = delete;
- Connection(Connection&& move) = delete;
-
- // Destroy a Connection. Formerly known as 'Close' in atransport.
virtual ~Connection() = default;
+ void SetTransportName(std::string transport_name) {
+ transport_name_ = std::move(transport_name);
+ }
+
+ using ReadCallback = std::function<bool(Connection*, std::unique_ptr<apacket>)>;
+ void SetReadCallback(ReadCallback callback) {
+ CHECK(!read_callback_);
+ read_callback_ = callback;
+ }
+
+ // Called after the Connection has terminated, either by an error or because Stop was called.
+ using ErrorCallback = std::function<void(Connection*, const std::string&)>;
+ void SetErrorCallback(ErrorCallback callback) {
+ CHECK(!error_callback_);
+ error_callback_ = callback;
+ }
+
+ virtual bool Write(std::unique_ptr<apacket> packet) = 0;
+
+ virtual void Start() = 0;
+ virtual void Stop() = 0;
+
+ std::string transport_name_;
+ ReadCallback read_callback_;
+ ErrorCallback error_callback_;
+};
+
+// Abstraction for a blocking packet transport.
+struct BlockingConnection {
+ BlockingConnection() = default;
+ BlockingConnection(const BlockingConnection& copy) = delete;
+ BlockingConnection(BlockingConnection&& move) = delete;
+
+ // Destroy a BlockingConnection. Formerly known as 'Close' in atransport.
+ virtual ~BlockingConnection() = default;
+
// Read/Write a packet. These functions are concurrently called from a transport's reader/writer
// threads.
virtual bool Read(apacket* packet) = 0;
@@ -77,7 +111,30 @@ struct Connection {
virtual void Close() = 0;
};
-struct FdConnection : public Connection {
+struct BlockingConnectionAdapter : public Connection {
+ explicit BlockingConnectionAdapter(std::unique_ptr<BlockingConnection> connection);
+
+ virtual ~BlockingConnectionAdapter();
+
+ virtual bool Write(std::unique_ptr<apacket> packet) override final;
+
+ virtual void Start() override final;
+ virtual void Stop() override final;
+
+ bool stopped_ = false;
+
+ std::unique_ptr<BlockingConnection> underlying_;
+ std::thread read_thread_;
+ std::thread write_thread_;
+
+ std::deque<std::unique_ptr<apacket>> write_queue_;
+ std::mutex mutex_;
+ std::condition_variable cv_;
+
+ std::once_flag error_flag_;
+};
+
+struct FdConnection : public BlockingConnection {
explicit FdConnection(unique_fd fd) : fd_(std::move(fd)) {}
bool Read(apacket* packet) override final;
@@ -89,7 +146,7 @@ struct FdConnection : public Connection {
unique_fd fd_;
};
-struct UsbConnection : public Connection {
+struct UsbConnection : public BlockingConnection {
explicit UsbConnection(usb_handle* handle) : handle_(handle) {}
~UsbConnection();
@@ -110,7 +167,6 @@ class atransport {
atransport(ConnectionState state = kCsOffline)
: id(NextTransportId()), connection_state_(state) {
- transport_fde = {};
// Initialize protocol to min version for compatibility with older versions.
// Version will be updated post-connect.
protocol_version = A_VERSION_MIN;
@@ -126,11 +182,7 @@ class atransport {
void SetConnectionState(ConnectionState state);
const TransportId id;
- int fd = -1;
- int transport_socket = -1;
- fdevent transport_fde;
size_t ref_count = 0;
- uint32_t sync_token = 0;
bool online = false;
TransportType type = kTransportAny;