diff options
author | Josh Gao <jmgao@google.com> | 2018-02-08 01:06:22 +0000 |
---|---|---|
committer | Gerrit Code Review <noreply-gerritcodereview@google.com> | 2018-02-08 01:06:22 +0000 |
commit | 9bf624cd3a5945f89def99b1aaeb8bc04a97b527 (patch) | |
tree | 9e062583bf186a1c7b01a9b99815d4ad35547f02 /adb | |
parent | f99711fe87fee33b8b6ec73e46fcaac3c420acb8 (diff) | |
parent | 27cb7dca773616f2946e941c02798db3275deeea (diff) | |
download | core-9bf624cd3a5945f89def99b1aaeb8bc04a97b527.tar.gz core-9bf624cd3a5945f89def99b1aaeb8bc04a97b527.tar.bz2 core-9bf624cd3a5945f89def99b1aaeb8bc04a97b527.zip |
Merge "adb: switch asocket::enqueue to std::string."
Diffstat (limited to 'adb')
-rw-r--r-- | adb/adb.cpp | 7 | ||||
-rw-r--r-- | adb/adb.h | 5 | ||||
-rw-r--r-- | adb/jdwp_service.cpp | 32 | ||||
-rw-r--r-- | adb/range.h | 65 | ||||
-rw-r--r-- | adb/socket.h | 12 | ||||
-rw-r--r-- | adb/socket_test.cpp | 8 | ||||
-rw-r--r-- | adb/sockets.cpp | 148 | ||||
-rw-r--r-- | adb/transport.cpp | 15 |
8 files changed, 166 insertions, 126 deletions
diff --git a/adb/adb.cpp b/adb/adb.cpp index 0e3889762..ee3503bb3 100644 --- a/adb/adb.cpp +++ b/adb/adb.cpp @@ -474,13 +474,14 @@ void handle_packet(apacket *p, atransport *t) asocket* s = find_local_socket(p->msg.arg1, p->msg.arg0); if (s) { unsigned rid = p->msg.arg0; - p->len = p->msg.data_length; - if (s->enqueue(s, p) == 0) { + // TODO: Convert apacket::data to a type that we can move out of. + std::string copy(p->data, p->data + p->msg.data_length); + + if (s->enqueue(s, std::move(copy)) == 0) { D("Enqueue the socket"); send_ready(s->id, rid, t); } - return; } } break; @@ -73,11 +73,6 @@ struct amessage { }; struct apacket { - apacket* next; - - size_t len; - char* ptr; - amessage msg; char data[MAX_PAYLOAD]; }; diff --git a/adb/jdwp_service.cpp b/adb/jdwp_service.cpp index f0dff06f0..0a8a85a46 100644 --- a/adb/jdwp_service.cpp +++ b/adb/jdwp_service.cpp @@ -470,10 +470,9 @@ static void jdwp_socket_close(asocket* s) { free(s); } -static int jdwp_socket_enqueue(asocket* s, apacket* p) { +static int jdwp_socket_enqueue(asocket* s, std::string) { /* you can't write to this asocket */ D("LS(%d): JDWP socket received data?", s->id); - put_apacket(p); s->peer->close(s->peer); return -1; } @@ -486,9 +485,11 @@ static void jdwp_socket_ready(asocket* s) { * on the second one, close the connection */ if (!jdwp->pass) { - apacket* p = get_apacket(); - p->len = jdwp_process_list((char*)p->data, s->get_max_payload()); - peer->enqueue(peer, p); + std::string data; + data.resize(s->get_max_payload()); + size_t len = jdwp_process_list(&data[0], data.size()); + data.resize(len); + peer->enqueue(peer, std::move(data)); jdwp->pass = true; } else { peer->close(peer); @@ -524,17 +525,14 @@ struct JdwpTracker : public asocket { static std::vector<std::unique_ptr<JdwpTracker>> _jdwp_trackers; static void jdwp_process_list_updated(void) { - char buffer[1024]; - int len = jdwp_process_list_msg(buffer, sizeof(buffer)); + std::string data; + data.resize(1024); + data.resize(jdwp_process_list_msg(&data[0], data.size())); for (auto& t : _jdwp_trackers) { - apacket* p = get_apacket(); - memcpy(p->data, buffer, len); - p->len = len; - if (t->peer) { // The tracker might not have been connected yet. - t->peer->enqueue(t->peer, p); + t->peer->enqueue(t->peer, data); } } } @@ -560,17 +558,17 @@ static void jdwp_tracker_ready(asocket* s) { JdwpTracker* t = (JdwpTracker*)s; if (t->need_initial) { - apacket* p = get_apacket(); + std::string data; + data.resize(s->get_max_payload()); + data.resize(jdwp_process_list_msg(&data[0], data.size())); t->need_initial = false; - p->len = jdwp_process_list_msg((char*)p->data, s->get_max_payload()); - s->peer->enqueue(s->peer, p); + s->peer->enqueue(s->peer, std::move(data)); } } -static int jdwp_tracker_enqueue(asocket* s, apacket* p) { +static int jdwp_tracker_enqueue(asocket* s, std::string) { /* you can't write to this socket */ D("LS(%d): JDWP tracker received data?", s->id); - put_apacket(p); s->peer->close(s->peer); return -1; } diff --git a/adb/range.h b/adb/range.h new file mode 100644 index 000000000..7a0b8221d --- /dev/null +++ b/adb/range.h @@ -0,0 +1,65 @@ +#pragma once + +/* + * Copyright (C) 2018 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <string> + +#include <android-base/logging.h> + +struct Range { + explicit Range(std::string data) : data_(std::move(data)) {} + + Range(const Range& copy) = delete; + Range& operator=(const Range& copy) = delete; + + Range(Range&& move) = default; + Range& operator=(Range&& move) = default; + + bool empty() const { + return size() == 0; + } + + size_t size() const { + return data_.size() - begin_offset_ - end_offset_; + }; + + void drop_front(size_t n) { + CHECK_GE(size(), n); + begin_offset_ += n; + } + + void drop_end(size_t n) { + CHECK_GE(size(), n); + end_offset_ += n; + } + + char* data() { + return &data_[0] + begin_offset_; + } + + std::string::iterator begin() { + return data_.begin() + begin_offset_; + } + + std::string::iterator end() { + return data_.end() - end_offset_; + } + + std::string data_; + size_t begin_offset_ = 0; + size_t end_offset_ = 0; +}; diff --git a/adb/socket.h b/adb/socket.h index 563e2c787..a1b52b358 100644 --- a/adb/socket.h +++ b/adb/socket.h @@ -19,9 +19,12 @@ #include <stddef.h> +#include <deque> #include <memory> +#include <string> #include "fdevent.h" +#include "range.h" struct apacket; class atransport; @@ -59,9 +62,10 @@ struct asocket { fdevent fde; int fd; - // queue of apackets waiting to be written - apacket* pkt_first; - apacket* pkt_last; + // queue of data waiting to be written + std::deque<Range> packet_queue; + + std::string smart_socket_data; /* enqueue is called by our peer when it has data * for us. It should return 0 if we can accept more @@ -69,7 +73,7 @@ struct asocket { * peer->ready() when we once again are ready to * receive data. */ - int (*enqueue)(asocket* s, apacket* pkt); + int (*enqueue)(asocket* s, std::string data); /* ready is called by the peer when it is ready for * us to send data via enqueue again diff --git a/adb/socket_test.cpp b/adb/socket_test.cpp index f7c66dbac..04ad6f366 100644 --- a/adb/socket_test.cpp +++ b/adb/socket_test.cpp @@ -114,10 +114,10 @@ static void CloseWithPacketThreadFunc(CloseWithPacketArg* arg) { ASSERT_TRUE(s != nullptr); arg->bytes_written = 0; while (true) { - apacket* p = get_apacket(); - p->len = sizeof(p->data); - arg->bytes_written += p->len; - int ret = s->enqueue(s, p); + std::string data; + data.resize(MAX_PAYLOAD); + arg->bytes_written += data.size(); + int ret = s->enqueue(s, std::move(data)); if (ret == 1) { // The writer has one packet waiting to send. break; diff --git a/adb/sockets.cpp b/adb/sockets.cpp index c2f15292b..e9c45b78d 100644 --- a/adb/sockets.cpp +++ b/adb/sockets.cpp @@ -37,6 +37,7 @@ #include "adb.h" #include "adb_io.h" +#include "range.h" #include "transport.h" static std::recursive_mutex& local_socket_list_lock = *new std::recursive_mutex(); @@ -105,55 +106,47 @@ restart: } } -static int local_socket_enqueue(asocket* s, apacket* p) { - D("LS(%d): enqueue %zu", s->id, p->len); +static int local_socket_enqueue(asocket* s, std::string data) { + D("LS(%d): enqueue %zu", s->id, data.size()); - p->ptr = p->data; + Range r(std::move(data)); /* if there is already data queue'd, we will receive ** events when it's time to write. just add this to ** the tail */ - if (s->pkt_first) { + if (!s->packet_queue.empty()) { goto enqueue; } /* write as much as we can, until we ** would block or there is an error/eof */ - while (p->len > 0) { - int r = adb_write(s->fd, p->ptr, p->len); - if (r > 0) { - p->len -= r; - p->ptr += r; + while (!r.empty()) { + int rc = adb_write(s->fd, r.data(), r.size()); + if (rc > 0) { + r.drop_front(rc); continue; } - if ((r == 0) || (errno != EAGAIN)) { + + if (rc == 0 || errno != EAGAIN) { D("LS(%d): not ready, errno=%d: %s", s->id, errno, strerror(errno)); - put_apacket(p); s->has_write_error = true; s->close(s); return 1; /* not ready (error) */ } else { + // errno == EAGAIN break; } } - if (p->len == 0) { - put_apacket(p); + if (r.empty()) { return 0; /* ready for more data */ } enqueue: - p->next = 0; - if (s->pkt_first) { - s->pkt_last->next = p; - } else { - s->pkt_first = p; - } - s->pkt_last = p; - /* make sure we are notified when we can drain the queue */ + s->packet_queue.push_back(std::move(r)); fdevent_add(&s->fde, FDE_WRITE); return 1; /* not ready (backlog) */ @@ -167,7 +160,6 @@ static void local_socket_ready(asocket* s) { // be sure to hold the socket list lock when calling this static void local_socket_destroy(asocket* s) { - apacket *p, *n; int exit_on_close = s->exit_on_close; D("LS(%d): destroying fde.fd=%d", s->id, s->fde.fd); @@ -177,12 +169,6 @@ static void local_socket_destroy(asocket* s) { */ fdevent_remove(&s->fde); - /* dispose of any unwritten data */ - for (p = s->pkt_first; p; p = n) { - D("LS(%d): discarding %zu bytes", s->id, p->len); - n = p->next; - put_apacket(p); - } remove_socket(s); free(s); @@ -212,7 +198,7 @@ static void local_socket_close(asocket* s) { /* If we are already closing, or if there are no ** pending packets, destroy immediately */ - if (s->closing || s->has_write_error || s->pkt_first == NULL) { + if (s->closing || s->has_write_error || s->packet_queue.empty()) { int id = s->id; local_socket_destroy(s); D("LS(%d): closed", id); @@ -238,35 +224,30 @@ static void local_socket_event_func(int fd, unsigned ev, void* _s) { ** in order to simplify the code. */ if (ev & FDE_WRITE) { - apacket* p; - while ((p = s->pkt_first) != nullptr) { - while (p->len > 0) { - int r = adb_write(fd, p->ptr, p->len); - if (r == -1) { + while (!s->packet_queue.empty()) { + Range& r = s->packet_queue.front(); + while (!r.empty()) { + int rc = adb_write(fd, r.data(), r.size()); + if (rc == -1) { /* returning here is ok because FDE_READ will ** be processed in the next iteration loop */ if (errno == EAGAIN) { return; } - } else if (r > 0) { - p->ptr += r; - p->len -= r; + } else if (rc > 0) { + r.drop_front(rc); continue; } - D(" closing after write because r=%d and errno is %d", r, errno); + D(" closing after write because rc=%d and errno is %d", rc, errno); s->has_write_error = true; s->close(s); return; } - if (p->len == 0) { - s->pkt_first = p->next; - if (s->pkt_first == 0) { - s->pkt_last = 0; - } - put_apacket(p); + if (r.empty()) { + s->packet_queue.pop_front(); } } @@ -288,9 +269,10 @@ static void local_socket_event_func(int fd, unsigned ev, void* _s) { } if (ev & FDE_READ) { - apacket* p = get_apacket(); - char* x = p->data; const size_t max_payload = s->get_max_payload(); + std::string data; + data.resize(max_payload); + char* x = &data[0]; size_t avail = max_payload; int r = 0; int is_eof = 0; @@ -315,16 +297,15 @@ static void local_socket_event_func(int fd, unsigned ev, void* _s) { } D("LS(%d): fd=%d post avail loop. r=%d is_eof=%d forced_eof=%d", s->id, s->fd, r, is_eof, s->fde.force_eof); - if ((avail == max_payload) || (s->peer == 0)) { - put_apacket(p); - } else { - p->len = max_payload - avail; + + if (avail != max_payload && s->peer) { + data.resize(max_payload - avail); // s->peer->enqueue() may call s->close() and free s, // so save variables for debug printing below. unsigned saved_id = s->id; int saved_fd = s->fd; - r = s->peer->enqueue(s->peer, p); + r = s->peer->enqueue(s->peer, std::move(data)); D("LS(%u): fd=%d post peer->enqueue(). r=%d", saved_id, saved_fd, r); if (r < 0) { @@ -428,12 +409,22 @@ static asocket* create_host_service_socket(const char* name, const char* serial, } #endif /* ADB_HOST */ -static int remote_socket_enqueue(asocket* s, apacket* p) { +static int remote_socket_enqueue(asocket* s, std::string data) { D("entered remote_socket_enqueue RS(%d) WRITE fd=%d peer.fd=%d", s->id, s->fd, s->peer->fd); + apacket* p = get_apacket(); + p->msg.command = A_WRTE; p->msg.arg0 = s->peer->id; p->msg.arg1 = s->id; - p->msg.data_length = p->len; + p->msg.data_length = data.size(); + + if (data.size() > sizeof(p->data)) { + put_apacket(p); + return -1; + } + + // TODO: Convert apacket::data to a type that we can move into. + memcpy(p->data, data.data(), data.size()); send_packet(p, s->transport); return 1; } @@ -533,7 +524,7 @@ static void local_socket_close_notify(asocket* s) { s->close(s); } -static unsigned unhex(char* s, int len) { +static unsigned unhex(const char* s, int len) { unsigned n = 0, c; while (len-- > 0) { @@ -637,8 +628,7 @@ char* skip_host_serial(char* service) { #endif // ADB_HOST -static int smart_socket_enqueue(asocket* s, apacket* p) { - unsigned len; +static int smart_socket_enqueue(asocket* s, std::string data) { #if ADB_HOST char* service = nullptr; char* serial = nullptr; @@ -646,49 +636,38 @@ static int smart_socket_enqueue(asocket* s, apacket* p) { TransportType type = kTransportAny; #endif - D("SS(%d): enqueue %zu", s->id, p->len); + D("SS(%d): enqueue %zu", s->id, data.size()); - if (s->pkt_first == 0) { - s->pkt_first = p; - s->pkt_last = p; + if (s->smart_socket_data.empty()) { + s->smart_socket_data = std::move(data); } else { - if ((s->pkt_first->len + p->len) > s->get_max_payload()) { - D("SS(%d): overflow", s->id); - put_apacket(p); - goto fail; - } - - memcpy(s->pkt_first->data + s->pkt_first->len, p->data, p->len); - s->pkt_first->len += p->len; - put_apacket(p); - - p = s->pkt_first; + std::copy(data.begin(), data.end(), std::back_inserter(s->smart_socket_data)); } /* don't bother if we can't decode the length */ - if (p->len < 4) { + if (s->smart_socket_data.size() < 4) { return 0; } - len = unhex(p->data, 4); - if ((len < 1) || (len > MAX_PAYLOAD)) { - D("SS(%d): bad size (%d)", s->id, len); + uint32_t len = unhex(s->smart_socket_data.data(), 4); + if (len == 0 || len > MAX_PAYLOAD) { + D("SS(%d): bad size (%u)", s->id, len); goto fail; } - D("SS(%d): len is %d", s->id, len); + D("SS(%d): len is %u", s->id, len); /* can't do anything until we have the full header */ - if ((len + 4) > p->len) { - D("SS(%d): waiting for %zu more bytes", s->id, len + 4 - p->len); + if ((len + 4) > s->smart_socket_data.size()) { + D("SS(%d): waiting for %zu more bytes", s->id, len + 4 - s->smart_socket_data.size()); return 0; } - p->data[len + 4] = 0; + s->smart_socket_data[len + 4] = 0; - D("SS(%d): '%s'", s->id, (char*)(p->data + 4)); + D("SS(%d): '%s'", s->id, (char*)(s->smart_socket_data.data() + 4)); #if ADB_HOST - service = (char*)p->data + 4; + service = &s->smart_socket_data[4]; if (!strncmp(service, "host-serial:", strlen("host-serial:"))) { char* serial_end; service += strlen("host-serial:"); @@ -736,7 +715,7 @@ static int smart_socket_enqueue(asocket* s, apacket* p) { } if (!strncmp(service, "transport", strlen("transport"))) { D("SS(%d): okay transport", s->id); - p->len = 0; + s->smart_socket_data.clear(); return 0; } @@ -807,7 +786,7 @@ static int smart_socket_enqueue(asocket* s, apacket* p) { /* give him our transport and upref it */ s->peer->transport = s->transport; - connect_to_remote(s->peer, (char*)(p->data + 4)); + connect_to_remote(s->peer, s->smart_socket_data.data() + 4); s->peer = 0; s->close(s); return 1; @@ -827,9 +806,6 @@ static void smart_socket_ready(asocket* s) { static void smart_socket_close(asocket* s) { D("SS(%d): closed", s->id); - if (s->pkt_first) { - put_apacket(s->pkt_first); - } if (s->peer) { s->peer->peer = 0; s->peer->close(s->peer); diff --git a/adb/transport.cpp b/adb/transport.cpp index c90c59c6f..3b0669cd8 100644 --- a/adb/transport.cpp +++ b/adb/transport.cpp @@ -414,21 +414,22 @@ static void device_tracker_close(asocket* socket) { free(tracker); } -static int device_tracker_enqueue(asocket* socket, apacket* p) { +static int device_tracker_enqueue(asocket* socket, std::string) { /* you can't read from a device tracker, close immediately */ - put_apacket(p); device_tracker_close(socket); return -1; } static int device_tracker_send(device_tracker* tracker, const std::string& string) { - apacket* p = get_apacket(); asocket* peer = tracker->socket.peer; - snprintf(reinterpret_cast<char*>(p->data), 5, "%04x", static_cast<int>(string.size())); - memcpy(&p->data[4], string.data(), string.size()); - p->len = 4 + string.size(); - return peer->enqueue(peer, p); + std::string data; + data.resize(4 + string.size()); + char buf[5]; + snprintf(buf, sizeof(buf), "%04x", static_cast<int>(string.size())); + memcpy(&data[0], buf, 4); + memcpy(&data[4], string.data(), string.size()); + return peer->enqueue(peer, std::move(data)); } static void device_tracker_ready(asocket* socket) { |