diff options
author | Mark Salyzyn <salyzyn@google.com> | 2012-01-24 20:30:10 -0800 |
---|---|---|
committer | Mark Salyzyn <salyzyn@google.com> | 2014-01-27 15:22:04 -0800 |
commit | 23f04107dcedcef22556c1f57502cdddbfa2663f (patch) | |
tree | f7040d57a21d668cecd9be5c18ffbb6509dbf747 /libsysutils | |
parent | 40b21558e52e9245929495c5344443276e5d87c1 (diff) | |
download | core-23f04107dcedcef22556c1f57502cdddbfa2663f.tar.gz core-23f04107dcedcef22556c1f57502cdddbfa2663f.tar.bz2 core-23f04107dcedcef22556c1f57502cdddbfa2663f.zip |
libsysutils: Add iovec/runOnEachSocket
SocketClient:
* Replace sendDataLocked with sendDataLockedv which takes an iovec.
* Add a version of sendData, sendDatav, which takes an iovec.
* do not preserve iovec content through sendDatav
SocketListener:
* Add runOnEachSocket, which allows to to specify a SocketClientCommand to
run individually on each socket. This allows you to do broadcast-like
actions customized for each individual socket.
* Client safe list reference counting for sendBroadcast & runOnEach Socket
(cherry picked from commit a6e965578e44f8ae5f98de822ba5decec381d5fc)
Signed-off-by: Nick Kralevich <nnk@google.com>
Signed-off-by: Mark Salyzyn <salyzyn@google.com>
Change-Id: I716f89c01b4cb7af900045c7e41fac1492defb06
Diffstat (limited to 'libsysutils')
-rw-r--r-- | libsysutils/src/SocketClient.cpp | 59 | ||||
-rw-r--r-- | libsysutils/src/SocketListener.cpp | 53 |
2 files changed, 89 insertions, 23 deletions
diff --git a/libsysutils/src/SocketClient.cpp b/libsysutils/src/SocketClient.cpp index ae0e0770d..3625d936c 100644 --- a/libsysutils/src/SocketClient.cpp +++ b/libsysutils/src/SocketClient.cpp @@ -71,7 +71,7 @@ int SocketClient::sendMsg(int code, const char *msg, bool addErrno, bool useCmdN ret = asprintf(&buf, "%d %s", code, msg); } } - /* Send the zero-terminated message */ + // Send the zero-terminated message if (ret != -1) { ret = sendMsg(buf); free(buf); @@ -79,22 +79,25 @@ int SocketClient::sendMsg(int code, const char *msg, bool addErrno, bool useCmdN return ret; } -/** send 3-digit code, null, binary-length, binary data */ +// send 3-digit code, null, binary-length, binary data int SocketClient::sendBinaryMsg(int code, const void *data, int len) { - /* 4 bytes for the code & null + 4 bytes for the len */ + // 4 bytes for the code & null + 4 bytes for the len char buf[8]; - /* Write the code */ + // Write the code snprintf(buf, 4, "%.3d", code); - /* Write the len */ + // Write the len uint32_t tmp = htonl(len); memcpy(buf + 4, &tmp, sizeof(uint32_t)); + struct iovec vec[2]; + vec[0].iov_base = (void *) buf; + vec[0].iov_len = sizeof(buf); + vec[1].iov_base = (void *) data; + vec[1].iov_len = len; + pthread_mutex_lock(&mWriteMutex); - int result = sendDataLocked(buf, sizeof(buf)); - if (result == 0 && len > 0) { - result = sendDataLocked(data, len); - } + int result = sendDataLockedv(vec, (len > 0) ? 2 : 1); pthread_mutex_unlock(&mWriteMutex); return result; @@ -147,33 +150,51 @@ int SocketClient::sendMsg(const char *msg) { } int SocketClient::sendData(const void *data, int len) { + struct iovec vec[1]; + vec[0].iov_base = (void *) data; + vec[0].iov_len = len; pthread_mutex_lock(&mWriteMutex); - int rc = sendDataLocked(data, len); + int rc = sendDataLockedv(vec, 1); pthread_mutex_unlock(&mWriteMutex); return rc; } -int SocketClient::sendDataLocked(const void *data, int len) { - int rc = 0; - const char *p = (const char*) data; - int brtw = len; +int SocketClient::sendDatav(struct iovec *iov, int iovcnt) { + pthread_mutex_lock(&mWriteMutex); + int rc = sendDataLockedv(iov, iovcnt); + pthread_mutex_unlock(&mWriteMutex); + + return rc; +} + +int SocketClient::sendDataLockedv(struct iovec *iov, int iovcnt) { if (mSocket < 0) { errno = EHOSTUNREACH; return -1; } - if (len == 0) { + if (iovcnt <= 0) { return 0; } - while (brtw > 0) { - rc = send(mSocket, p, brtw, MSG_NOSIGNAL); + int current = 0; + + for (;;) { + ssize_t rc = writev(mSocket, iov + current, iovcnt - current); if (rc > 0) { - p += rc; - brtw -= rc; + size_t written = rc; + while ((current < iovcnt) && (written >= iov[current].iov_len)) { + written -= iov[current].iov_len; + current++; + } + if (current == iovcnt) { + break; + } + iov[current].iov_base = (char *)iov[current].iov_base + written; + iov[current].iov_len -= written; continue; } diff --git a/libsysutils/src/SocketListener.cpp b/libsysutils/src/SocketListener.cpp index 0296910ce..1b53867c3 100644 --- a/libsysutils/src/SocketListener.cpp +++ b/libsysutils/src/SocketListener.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2008 The Android Open Source Project + * Copyright (C) 2008-2014 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -164,6 +164,7 @@ void SocketListener::runListener() { pthread_mutex_lock(&mClientsLock); for (it = mClients->begin(); it != mClients->end(); ++it) { + // NB: calling out to an other object with mClientsLock held (safe) int fd = (*it)->getSocket(); FD_SET(fd, &read_fds); if (fd > max) @@ -206,9 +207,12 @@ void SocketListener::runListener() { pendingList->clear(); pthread_mutex_lock(&mClientsLock); for (it = mClients->begin(); it != mClients->end(); ++it) { - int fd = (*it)->getSocket(); + SocketClient* c = *it; + // NB: calling out to an other object with mClientsLock held (safe) + int fd = c->getSocket(); if (FD_ISSET(fd, &read_fds)) { - pendingList->push_back(*it); + pendingList->push_back(c); + c->incRef(); } } pthread_mutex_unlock(&mClientsLock); @@ -236,20 +240,61 @@ void SocketListener::runListener() { /* Remove our reference to the client */ c->decRef(); } + c->decRef(); } } delete pendingList; } void SocketListener::sendBroadcast(int code, const char *msg, bool addErrno) { + SocketClientCollection safeList; + + /* Add all active clients to the safe list first */ + safeList.clear(); pthread_mutex_lock(&mClientsLock); SocketClientCollection::iterator i; for (i = mClients->begin(); i != mClients->end(); ++i) { + SocketClient* c = *i; + c->incRef(); + safeList.push_back(c); + } + pthread_mutex_unlock(&mClientsLock); + + while (!safeList.empty()) { + /* Pop the first item from the list */ + i = safeList.begin(); + SocketClient* c = *i; + safeList.erase(i); // broadcasts are unsolicited and should not include a cmd number - if ((*i)->sendMsg(code, msg, addErrno, false)) { + if (c->sendMsg(code, msg, addErrno, false)) { SLOGW("Error sending broadcast (%s)", strerror(errno)); } + c->decRef(); + } +} + +void SocketListener::runOnEachSocket(SocketClientCommand *command) { + SocketClientCollection safeList; + + /* Add all active clients to the safe list first */ + safeList.clear(); + pthread_mutex_lock(&mClientsLock); + SocketClientCollection::iterator i; + + for (i = mClients->begin(); i != mClients->end(); ++i) { + SocketClient* c = *i; + c->incRef(); + safeList.push_back(c); } pthread_mutex_unlock(&mClientsLock); + + while (!safeList.empty()) { + /* Pop the first item from the list */ + i = safeList.begin(); + SocketClient* c = *i; + safeList.erase(i); + command->runSocketCommand(c); + c->decRef(); + } } |