summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSharvil Nanavati <sharvil@google.com>2014-08-13 00:40:49 -0700
committerAndre Eisenbach <eisenbach@google.com>2015-03-16 16:51:28 -0700
commitfbf89085bf308a98b00da77d1538539f6dd58604 (patch)
tree82802f0ae12dec58034a2cc0683da6a841765329
parent278abce72967eaa496b402ffc867e37a4613c187 (diff)
downloadandroid_system_bt-fbf89085bf308a98b00da77d1538539f6dd58604.tar.gz
android_system_bt-fbf89085bf308a98b00da77d1538539f6dd58604.tar.bz2
android_system_bt-fbf89085bf308a98b00da77d1538539f6dd58604.zip
Switch to an epoll-based reactor implementation.
epoll is a much nicer interface that very closely matches the reactor interface. It's also thread-safe which makes it a more suitable choice for bluedroid. As a result of this change, reactor_register and reactor_unregister are both thread-safe without introducing any synchronization in user-space.
-rw-r--r--hci/src/hci_inject.c14
-rw-r--r--osi/include/list.h1
-rw-r--r--osi/include/reactor.h57
-rw-r--r--osi/include/socket.h10
-rw-r--r--osi/include/thread.h12
-rw-r--r--osi/src/list.c12
-rw-r--r--osi/src/reactor.c248
-rw-r--r--osi/src/socket.c56
-rw-r--r--osi/src/thread.c78
-rw-r--r--osi/test/reactor_test.cpp50
-rw-r--r--osi/test/thread_test.cpp72
11 files changed, 292 insertions, 318 deletions
diff --git a/hci/src/hci_inject.c b/hci/src/hci_inject.c
index 78c1cc1a7..af5ed704d 100644
--- a/hci/src/hci_inject.c
+++ b/hci/src/hci_inject.c
@@ -62,14 +62,14 @@ bool hci_inject_open(void) {
hci = bt_hc_get_interface();
- clients = list_new(client_free);
- if (!clients)
- goto error;
-
thread = thread_new("hci_inject");
if (!thread)
goto error;
+ clients = list_new(client_free);
+ if (!clients)
+ goto error;
+
listen_socket = socket_new();
if (!listen_socket)
goto error;
@@ -77,7 +77,7 @@ bool hci_inject_open(void) {
if (!socket_listen(listen_socket, LISTEN_PORT))
goto error;
- socket_register(listen_socket, thread, accept_ready, NULL, NULL);
+ socket_register(listen_socket, thread_get_reactor(thread), NULL, accept_ready, NULL);
return true;
error:;
@@ -87,8 +87,8 @@ error:;
void hci_inject_close(void) {
socket_free(listen_socket);
- thread_free(thread);
list_free(clients);
+ thread_free(thread);
listen_socket = NULL;
thread = NULL;
@@ -132,7 +132,7 @@ static void accept_ready(socket_t *socket, UNUSED_ATTR void *context) {
return;
}
- socket_register(socket, thread, read_ready, NULL, client);
+ socket_register(socket, thread_get_reactor(thread), client, read_ready, NULL);
}
static void read_ready(UNUSED_ATTR socket_t *socket, void *context) {
diff --git a/osi/include/list.h b/osi/include/list.h
index 56cb7396d..b23532cf0 100644
--- a/osi/include/list.h
+++ b/osi/include/list.h
@@ -18,6 +18,7 @@ void list_free(list_t *list);
// Accessors.
bool list_is_empty(const list_t *list);
+bool list_contains(const list_t *list, const void *data);
size_t list_length(const list_t *list);
void *list_front(const list_t *list);
void *list_back(const list_t *list);
diff --git a/osi/include/reactor.h b/osi/include/reactor.h
index eeb538dc6..100c76268 100644
--- a/osi/include/reactor.h
+++ b/osi/include/reactor.h
@@ -26,37 +26,16 @@
// This module implements the Reactor pattern.
// See http://en.wikipedia.org/wiki/Reactor_pattern for details.
-struct reactor_t;
typedef struct reactor_t reactor_t;
-
-struct reactor_object_t;
typedef struct reactor_object_t reactor_object_t;
-// Enumerates the types of events a reactor object is interested
-// in responding to.
-typedef enum {
- REACTOR_INTEREST_READ = 1,
- REACTOR_INTEREST_WRITE = 2,
- REACTOR_INTEREST_READ_WRITE = 3,
-} reactor_interest_t;
-
// Enumerates the reasons a reactor has stopped.
typedef enum {
REACTOR_STATUS_STOP, // |reactor_stop| was called.
- REACTOR_STATUS_TIMEOUT, // a timeout was specified and the reactor timed out.
REACTOR_STATUS_ERROR, // there was an error during the operation.
REACTOR_STATUS_DONE, // the reactor completed its work (for the _run_once* variants).
} reactor_status_t;
-struct reactor_object_t {
- void *context; // a context that's passed back to the *_ready functions.
- int fd; // the file descriptor to monitor for events.
- reactor_interest_t interest; // the event types to monitor the file descriptor for.
-
- void (*read_ready)(void *context); // function to call when the file descriptor becomes readable.
- void (*write_ready)(void *context); // function to call when the file descriptor becomes writeable.
-};
-
// Creates a new reactor object. Returns NULL on failure. The returned object
// must be freed by calling |reactor_free|.
reactor_t *reactor_new(void);
@@ -72,18 +51,32 @@ reactor_status_t reactor_start(reactor_t *reactor);
// becomes ready. |reactor| may not be NULL.
reactor_status_t reactor_run_once(reactor_t *reactor);
-// Same as |reactor_run_once| with a bounded wait time in case no object becomes ready.
-reactor_status_t reactor_run_once_timeout(reactor_t *reactor, timeout_t timeout_ms);
-
// Immediately unblocks the reactor. This function is safe to call from any thread.
// |reactor| may not be NULL.
void reactor_stop(reactor_t *reactor);
-// Registers an object with the reactor. |obj| is neither copied nor is its ownership transferred
-// so the pointer must remain valid until it is unregistered with |reactor_unregister|. Neither
-// |reactor| nor |obj| may be NULL.
-void reactor_register(reactor_t *reactor, reactor_object_t *obj);
-
-// Unregisters a previously registered object with the |reactor|. Neither |reactor| nor |obj|
-// may be NULL.
-void reactor_unregister(reactor_t *reactor, reactor_object_t *obj);
+// Registers a file descriptor with the reactor. The file descriptor, |fd|, must be valid
+// when this function is called and its ownership is not transferred to the reactor. The
+// |context| variable is a user-defined opaque handle that is passed back to the |read_ready|
+// and |write_ready| functions. It is not copied or even dereferenced by the reactor so it
+// may contain any value including NULL. The |read_ready| and |write_ready| arguments are
+// optional and may be NULL. This function returns an opaque object that represents the
+// file descriptor's registration with the reactor. When the caller is no longer interested
+// in events on the |fd|, it must free the returned object by calling |reactor_unregister|.
+reactor_object_t *reactor_register(reactor_t *reactor,
+ int fd, void *context,
+ void (*read_ready)(void *context),
+ void (*write_ready)(void *context));
+
+// Changes the subscription mode for the file descriptor represented by |object|. If the
+// caller has already registered a file descriptor with a reactor, has a valid |object|,
+// and decides to change the |read_ready| and/or |write_ready| callback routines, they
+// can call this routine. Returns true if the subscription was changed, false otherwise.
+// |object| may not be NULL, |read_ready| and |write_ready| may be NULL.
+bool reactor_change_registration(reactor_object_t *object,
+ void (*read_ready)(void *context),
+ void (*write_ready)(void *context));
+
+// Unregisters a previously registered file descriptor with its reactor. |obj| may not be NULL.
+// |obj| is invalid after calling this function so the caller must drop all references to it.
+void reactor_unregister(reactor_object_t *obj);
diff --git a/osi/include/socket.h b/osi/include/socket.h
index e9ec8245c..754e46885 100644
--- a/osi/include/socket.h
+++ b/osi/include/socket.h
@@ -21,8 +21,6 @@
#include <stddef.h>
#include <stdint.h>
-#include "thread.h"
-
typedef struct reactor_t reactor_t;
typedef struct socket_t socket_t;
typedef uint16_t port_t;
@@ -68,13 +66,13 @@ ssize_t socket_read(const socket_t *socket, void *buf, size_t count);
// may be NULL.
ssize_t socket_write(const socket_t *socket, const void *buf, size_t count);
-// Registers |socket| with the |thread|. When the socket becomes readable, |read_cb|
+// Registers |socket| with the |reactor|. When the socket becomes readable, |read_cb|
// will be called. When the socket becomes writeable, |write_cb| will be called. The
// |context| parameter is passed, untouched, to each of the callback routines. Neither
-// |socket| nor |thread| may be NULL. |read_cb| or |write_cb|, but not both, may be NULL.
+// |socket| nor |reactor| may be NULL. |read_cb| or |write_cb|, but not both, may be NULL.
// |context| may be NULL.
-void socket_register(socket_t *socket, thread_t *thread, socket_cb read_cb, socket_cb write_cb, void *context);
+void socket_register(socket_t *socket, reactor_t *reactor, void *context, socket_cb read_cb, socket_cb write_cb);
-// Unregisters |socket| from whichever thread it is registered with, if any. This
+// Unregisters |socket| from whichever reactor it is registered with, if any. This
// function is idempotent.
void socket_unregister(socket_t *socket);
diff --git a/osi/include/thread.h b/osi/include/thread.h
index 5b9eb049e..17a1f364e 100644
--- a/osi/include/thread.h
+++ b/osi/include/thread.h
@@ -23,7 +23,6 @@
#define THREAD_NAME_MAX 16
typedef struct reactor_t reactor_t;
-typedef struct reactor_object_t reactor_object_t;
typedef struct thread_t thread_t;
typedef void (*thread_fn)(void *context);
@@ -57,14 +56,7 @@ void thread_stop(thread_t *thread);
// |thread| may not be NULL.
bool thread_is_self(const thread_t *thread);
+reactor_t *thread_get_reactor(const thread_t *thread);
+
// Returns the name of the given |thread|. |thread| may not be NULL.
const char *thread_name(const thread_t *thread);
-
-// Registers |reactor_object| with the reactor of the given |thread|
-// in a thread safe manner. Neither |thread| nor |reactor_object| may be NULL.
-void thread_register(thread_t *thread, reactor_object_t *reactor_object);
-
-// Unregisters |reactor_object| with the reactor of the given |thread|
-// in a thread safe manner. Does not return until the unregistration is
-// complete. Neither |thread| nor |reactor_object| may be NULL.
-void thread_unregister(thread_t *thread, reactor_object_t *reactor_object);
diff --git a/osi/src/list.c b/osi/src/list.c
index 691f03962..c65720d6f 100644
--- a/osi/src/list.c
+++ b/osi/src/list.c
@@ -47,6 +47,18 @@ bool list_is_empty(const list_t *list) {
return (list->length == 0);
}
+bool list_contains(const list_t *list, const void *data) {
+ assert(list != NULL);
+ assert(data != NULL);
+
+ for (const list_node_t *node = list_begin(list); node != list_end(list); node = list_next(node)) {
+ if (list_node(node) == data)
+ return true;
+ }
+
+ return false;
+}
+
// Returns the length of the list. This function does not accept a NULL list.
size_t list_length(const list_t *list) {
assert(list != NULL);
diff --git a/osi/src/reactor.c b/osi/src/reactor.c
index 4fcb996e4..eb88f75ce 100644
--- a/osi/src/reactor.c
+++ b/osi/src/reactor.c
@@ -20,9 +20,10 @@
#include <assert.h>
#include <errno.h>
+#include <pthread.h>
#include <stdlib.h>
+#include <sys/epoll.h>
#include <sys/eventfd.h>
-#include <sys/select.h>
#include <utils/Log.h>
#include "list.h"
@@ -33,29 +34,64 @@
#endif
struct reactor_t {
+ int epoll_fd;
int event_fd;
- list_t *objects;
+ pthread_mutex_t list_lock; // protects invalidation_list.
+ list_t *invalidation_list; // reactor objects that have been unregistered.
+ pthread_t run_thread; // the pthread on which reactor_run is executing.
+ bool is_running; // indicates whether |run_thread| is valid.
+ bool object_removed;
};
-static reactor_status_t run_reactor(reactor_t *reactor, int iterations, struct timeval *tv);
+struct reactor_object_t {
+ int fd; // the file descriptor to monitor for events.
+ void *context; // a context that's passed back to the *_ready functions.
+ reactor_t *reactor; // the reactor instance this object is registered with.
+ pthread_mutex_t lock; // protects the lifetime of this object and all variables.
-static const eventfd_t EVENT_REACTOR_CHANGE_SET = 1;
-static const eventfd_t EVENT_REACTOR_STOP = 0x8000000000000000LL;
+ void (*read_ready)(void *context); // function to call when the file descriptor becomes readable.
+ void (*write_ready)(void *context); // function to call when the file descriptor becomes writeable.
+};
+
+static reactor_status_t run_reactor(reactor_t *reactor, int iterations);
+
+static const size_t MAX_EVENTS = 64;
+static const eventfd_t EVENT_REACTOR_STOP = 1;
reactor_t *reactor_new(void) {
reactor_t *ret = (reactor_t *)calloc(1, sizeof(reactor_t));
if (!ret)
return NULL;
+ ret->epoll_fd = INVALID_FD;
+ ret->event_fd = INVALID_FD;
+
+ ret->epoll_fd = epoll_create(MAX_EVENTS);
+ if (ret->epoll_fd == INVALID_FD) {
+ ALOGE("%s unable to create epoll instance: %s", __func__, strerror(errno));
+ goto error;
+ }
+
ret->event_fd = eventfd(0, 0);
if (ret->event_fd == INVALID_FD) {
ALOGE("%s unable to create eventfd: %s", __func__, strerror(errno));
goto error;
}
- ret->objects = list_new(NULL);
- if (!ret->objects)
+ pthread_mutex_init(&ret->list_lock, NULL);
+ ret->invalidation_list = list_new(NULL);
+ if (!ret->invalidation_list) {
+ ALOGE("%s unable to allocate object invalidation list.", __func__);
+ goto error;
+ }
+
+ struct epoll_event event;
+ event.events = EPOLLIN;
+ event.data.ptr = NULL;
+ if (epoll_ctl(ret->epoll_fd, EPOLL_CTL_ADD, ret->event_fd, &event) == -1) {
+ ALOGE("%s unable to register eventfd with epoll set: %s", __func__, strerror(errno));
goto error;
+ }
return ret;
@@ -68,122 +104,186 @@ void reactor_free(reactor_t *reactor) {
if (!reactor)
return;
- list_free(reactor->objects);
+ list_free(reactor->invalidation_list);
close(reactor->event_fd);
+ close(reactor->epoll_fd);
free(reactor);
}
reactor_status_t reactor_start(reactor_t *reactor) {
assert(reactor != NULL);
- return run_reactor(reactor, 0, NULL);
+ return run_reactor(reactor, 0);
}
reactor_status_t reactor_run_once(reactor_t *reactor) {
assert(reactor != NULL);
- return run_reactor(reactor, 1, NULL);
+ return run_reactor(reactor, 1);
}
-reactor_status_t reactor_run_once_timeout(reactor_t *reactor, timeout_t timeout_ms) {
+void reactor_stop(reactor_t *reactor) {
assert(reactor != NULL);
- struct timeval tv;
- tv.tv_sec = timeout_ms / 1000;
- tv.tv_usec = (timeout_ms % 1000) * 1000;
- return run_reactor(reactor, 1, &tv);
+ eventfd_write(reactor->event_fd, EVENT_REACTOR_STOP);
}
-void reactor_stop(reactor_t *reactor) {
+reactor_object_t *reactor_register(reactor_t *reactor,
+ int fd, void *context,
+ void (*read_ready)(void *context),
+ void (*write_ready)(void *context)) {
assert(reactor != NULL);
+ assert(fd != INVALID_FD);
- eventfd_write(reactor->event_fd, EVENT_REACTOR_STOP);
+ reactor_object_t *object = (reactor_object_t *)calloc(1, sizeof(reactor_object_t));
+ if (!object) {
+ ALOGE("%s unable to allocate reactor object: %s", __func__, strerror(errno));
+ return NULL;
+ }
+
+ object->reactor = reactor;
+ object->fd = fd;
+ object->context = context;
+ object->read_ready = read_ready;
+ object->write_ready = write_ready;
+ pthread_mutex_init(&object->lock, NULL);
+
+ struct epoll_event event;
+ event.events = 0;
+ if (read_ready)
+ event.events |= (EPOLLIN | EPOLLRDHUP);
+ if (write_ready)
+ event.events |= EPOLLOUT;
+ event.data.ptr = object;
+
+ if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
+ ALOGE("%s unable to register fd %d to epoll set: %s", __func__, fd, strerror(errno));
+ pthread_mutex_destroy(&object->lock);
+ free(object);
+ return NULL;
+ }
+
+ return object;
}
-void reactor_register(reactor_t *reactor, reactor_object_t *obj) {
- assert(reactor != NULL);
- assert(obj != NULL);
+bool reactor_change_registration(reactor_object_t *object,
+ void (*read_ready)(void *context),
+ void (*write_ready)(void *context)) {
+ assert(object != NULL);
+
+ struct epoll_event event;
+ event.events = 0;
+ if (read_ready)
+ event.events |= (EPOLLIN | EPOLLRDHUP);
+ if (write_ready)
+ event.events |= EPOLLOUT;
+ event.data.ptr = object;
+
+ if (epoll_ctl(object->reactor->epoll_fd, EPOLL_CTL_MOD, object->fd, &event) == -1) {
+ ALOGE("%s unable to modify interest set for fd %d: %s", __func__, object->fd, strerror(errno));
+ return false;
+ }
+
+ pthread_mutex_lock(&object->lock);
+ object->read_ready = read_ready;
+ object->write_ready = write_ready;
+ pthread_mutex_unlock(&object->lock);
- // IMPORTANT
- // You might be wondering why on earth this is a |list_prepend|.
- // That is a good question...
- //
- // thread_t depends on this behavior.
- // The first reactor object it registers is its work queue, and prepending
- // means it will always be at the end of any given reactor iteration.
- // This is important to ensure we don't execute off dangling reactor objects
- // or do other bad things.
- list_prepend(reactor->objects, obj);
- eventfd_write(reactor->event_fd, EVENT_REACTOR_CHANGE_SET);
+ return true;
}
-void reactor_unregister(reactor_t *reactor, reactor_object_t *obj) {
- assert(reactor != NULL);
+void reactor_unregister(reactor_object_t *obj) {
assert(obj != NULL);
- list_remove(reactor->objects, obj);
- eventfd_write(reactor->event_fd, EVENT_REACTOR_CHANGE_SET);
+ reactor_t *reactor = obj->reactor;
+
+ if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_DEL, obj->fd, NULL) == -1)
+ ALOGE("%s unable to unregister fd %d from epoll set: %s", __func__, obj->fd, strerror(errno));
+
+ if (reactor->is_running && pthread_equal(pthread_self(), reactor->run_thread)) {
+ reactor->object_removed = true;
+ return;
+ }
+
+ pthread_mutex_lock(&reactor->list_lock);
+ list_append(reactor->invalidation_list, obj);
+ pthread_mutex_unlock(&reactor->list_lock);
+
+ // Taking the object lock here makes sure a callback for |obj| isn't
+ // currently executing. The reactor thread must then either be before
+ // the callbacks or after. If after, we know that the object won't be
+ // referenced because it has been taken out of the epoll set. If before,
+ // it won't be referenced because the reactor thread will check the
+ // invalidation_list and find it in there. So by taking this lock, we
+ // are waiting until the reactor thread drops all references to |obj|.
+ // One the wait completes, we can unlock and destroy |obj| safely.
+ pthread_mutex_lock(&obj->lock);
+ pthread_mutex_unlock(&obj->lock);
+ pthread_mutex_destroy(&obj->lock);
+ free(obj);
}
-// Runs the reactor loop for a maximum of |iterations| with the given timeout, |tv|.
+// Runs the reactor loop for a maximum of |iterations|.
// 0 |iterations| means loop forever.
-// NULL |tv| means no timeout (block until an event occurs).
// |reactor| may not be NULL.
-static reactor_status_t run_reactor(reactor_t *reactor, int iterations, struct timeval *tv) {
+static reactor_status_t run_reactor(reactor_t *reactor, int iterations) {
assert(reactor != NULL);
+ reactor->run_thread = pthread_self();
+ reactor->is_running = true;
+
+ struct epoll_event events[MAX_EVENTS];
for (int i = 0; iterations == 0 || i < iterations; ++i) {
- fd_set read_set;
- fd_set write_set;
- FD_ZERO(&read_set);
- FD_ZERO(&write_set);
- FD_SET(reactor->event_fd, &read_set);
-
- int max_fd = reactor->event_fd;
- for (const list_node_t *iter = list_begin(reactor->objects); iter != list_end(reactor->objects); iter = list_next(iter)) {
- reactor_object_t *object = (reactor_object_t *)list_node(iter);
- int fd = object->fd;
- reactor_interest_t interest = object->interest;
- if (interest & REACTOR_INTEREST_READ)
- FD_SET(fd, &read_set);
- if (interest & REACTOR_INTEREST_WRITE)
- FD_SET(fd, &write_set);
- if (fd > max_fd)
- max_fd = fd;
- }
+ pthread_mutex_lock(&reactor->list_lock);
+ list_clear(reactor->invalidation_list);
+ pthread_mutex_unlock(&reactor->list_lock);
int ret;
do {
- ret = select(max_fd + 1, &read_set, &write_set, NULL, tv);
+ ret = epoll_wait(reactor->epoll_fd, events, MAX_EVENTS, -1);
} while (ret == -1 && errno == EINTR);
if (ret == -1) {
- ALOGE("%s error in select: %s", __func__, strerror(errno));
+ ALOGE("%s error in epoll_wait: %s", __func__, strerror(errno));
+ reactor->is_running = false;
return REACTOR_STATUS_ERROR;
}
- if (ret == 0)
- return REACTOR_STATUS_TIMEOUT;
+ for (int j = 0; j < ret; ++j) {
+ // The event file descriptor is the only one that registers with
+ // a NULL data pointer. We use the NULL to identify it and break
+ // out of the reactor loop.
+ if (events[j].data.ptr == NULL) {
+ eventfd_t value;
+ eventfd_read(reactor->event_fd, &value);
+ reactor->is_running = false;
+ return REACTOR_STATUS_STOP;
+ }
+
+ reactor_object_t *object = (reactor_object_t *)events[j].data.ptr;
- if (FD_ISSET(reactor->event_fd, &read_set)) {
- eventfd_t value;
- eventfd_read(reactor->event_fd, &value);
- if (value < EVENT_REACTOR_STOP)
+ pthread_mutex_lock(&reactor->list_lock);
+ if (list_contains(reactor->invalidation_list, object)) {
+ pthread_mutex_unlock(&reactor->list_lock);
continue;
- return REACTOR_STATUS_STOP;
- }
+ }
+
+ // Downgrade the list lock to an object lock.
+ pthread_mutex_lock(&object->lock);
+ pthread_mutex_unlock(&reactor->list_lock);
- for (const list_node_t *iter = list_begin(reactor->objects); ret > 0 && iter != list_end(reactor->objects); iter = list_next(iter)) {
- reactor_object_t *object = (reactor_object_t *)list_node(iter);
- int fd = object->fd;
- if (FD_ISSET(fd, &read_set)) {
+ reactor->object_removed = false;
+ if (events[j].events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && object->read_ready)
object->read_ready(object->context);
- --ret;
- }
- if (FD_ISSET(fd, &write_set)) {
+ if (!reactor->object_removed && events[j].events & EPOLLOUT && object->write_ready)
object->write_ready(object->context);
- --ret;
+ pthread_mutex_unlock(&object->lock);
+
+ if (reactor->object_removed) {
+ pthread_mutex_destroy(&object->lock);
+ free(object);
}
}
}
+ reactor->is_running = false;
return REACTOR_STATUS_DONE;
}
diff --git a/osi/src/socket.c b/osi/src/socket.c
index 0155ed8fc..827941ca3 100644
--- a/osi/src/socket.c
+++ b/osi/src/socket.c
@@ -27,16 +27,16 @@
#include <unistd.h>
#include <utils/Log.h>
+#include "osi.h"
#include "reactor.h"
#include "socket.h"
-#include "thread.h"
struct socket_t {
- thread_t *thread;
- reactor_object_t socket_object;
+ int fd;
+ reactor_object_t *reactor_object;
socket_cb read_ready;
socket_cb write_ready;
- void *context;
+ void *context; // Not owned, do not free.
};
static void internal_read_ready(void *context);
@@ -49,14 +49,14 @@ socket_t *socket_new(void) {
goto error;
}
- ret->socket_object.fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
- if (ret->socket_object.fd == -1) {
+ ret->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if (ret->fd == INVALID_FD) {
ALOGE("%s unable to create socket: %s", __func__, strerror(errno));
goto error;
}
int enable = 1;
- if (setsockopt(ret->socket_object.fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)) == -1) {
+ if (setsockopt(ret->fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)) == -1) {
ALOGE("%s unable to set SO_REUSEADDR: %s", __func__, strerror(errno));
goto error;
}
@@ -65,7 +65,7 @@ socket_t *socket_new(void) {
error:;
if (ret)
- close(ret->socket_object.fd);
+ close(ret->fd);
free(ret);
return NULL;
}
@@ -75,7 +75,7 @@ void socket_free(socket_t *socket) {
return;
socket_unregister(socket);
- close(socket->socket_object.fd);
+ close(socket->fd);
free(socket);
}
@@ -86,12 +86,12 @@ bool socket_listen(const socket_t *socket, port_t port) {
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = 0;
addr.sin_port = htons(port);
- if (bind(socket->socket_object.fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
+ if (bind(socket->fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
ALOGE("%s unable to bind socket to port %u: %s", __func__, port, strerror(errno));
return false;
}
- if (listen(socket->socket_object.fd, 10) == -1) {
+ if (listen(socket->fd, 10) == -1) {
ALOGE("%s unable to listen on port %u: %s", __func__, port, strerror(errno));
return false;
}
@@ -102,8 +102,8 @@ bool socket_listen(const socket_t *socket, port_t port) {
socket_t *socket_accept(const socket_t *socket) {
assert(socket != NULL);
- int fd = accept(socket->socket_object.fd, NULL, NULL);
- if (fd == -1) {
+ int fd = accept(socket->fd, NULL, NULL);
+ if (fd == INVALID_FD) {
ALOGE("%s unable to accept socket: %s", __func__, strerror(errno));
return NULL;
}
@@ -115,7 +115,7 @@ socket_t *socket_accept(const socket_t *socket) {
return NULL;
}
- ret->socket_object.fd = fd;
+ ret->fd = fd;
return ret;
}
@@ -123,47 +123,39 @@ ssize_t socket_read(const socket_t *socket, void *buf, size_t count) {
assert(socket != NULL);
assert(buf != NULL);
- return recv(socket->socket_object.fd, buf, count, MSG_DONTWAIT);
+ return recv(socket->fd, buf, count, MSG_DONTWAIT);
}
ssize_t socket_write(const socket_t *socket, const void *buf, size_t count) {
assert(socket != NULL);
assert(buf != NULL);
- return send(socket->socket_object.fd, buf, count, MSG_DONTWAIT);
+ return send(socket->fd, buf, count, MSG_DONTWAIT);
}
-void socket_register(socket_t *socket, thread_t *thread, socket_cb read_cb, socket_cb write_cb, void *context) {
+void socket_register(socket_t *socket, reactor_t *reactor, void *context, socket_cb read_cb, socket_cb write_cb) {
assert(socket != NULL);
- assert(thread != NULL);
assert(read_cb || write_cb);
// Make sure the socket isn't currently registered.
socket_unregister(socket);
- socket->thread = thread;
socket->read_ready = read_cb;
socket->write_ready = write_cb;
socket->context = context;
- socket->socket_object.read_ready = internal_read_ready;
- socket->socket_object.write_ready = internal_write_ready;
- socket->socket_object.context = socket;
- if (read_cb && write_cb)
- socket->socket_object.interest = REACTOR_INTEREST_READ_WRITE;
- else if (read_cb)
- socket->socket_object.interest = REACTOR_INTEREST_READ;
- else if (write_cb)
- socket->socket_object.interest = REACTOR_INTEREST_WRITE;
-
- thread_register(thread, &socket->socket_object);
+ void (*read_fn)(void *) = (read_cb != NULL) ? internal_read_ready : NULL;
+ void (*write_fn)(void *) = (write_cb != NULL) ? internal_write_ready : NULL;
+
+ socket->reactor_object = reactor_register(reactor, socket->fd, socket, read_fn, write_fn);
}
void socket_unregister(socket_t *socket) {
assert(socket != NULL);
- if (socket->thread)
- thread_unregister(socket->thread, &socket->socket_object);
+ if (socket->reactor_object)
+ reactor_unregister(socket->reactor_object);
+ socket->reactor_object = NULL;
}
static void internal_read_ready(void *context) {
diff --git a/osi/src/thread.c b/osi/src/thread.c
index daa642305..4caefbdd7 100644
--- a/osi/src/thread.c
+++ b/osi/src/thread.c
@@ -50,21 +50,8 @@ typedef struct {
void *context;
} work_item_t;
-typedef struct {
- thread_t *thread;
- reactor_object_t *reactor_object;
-} reactor_register_arg_t;
-
-typedef struct {
- thread_t *thread;
- reactor_object_t *reactor_object;
- semaphore_t *unregistered_sem;
-} reactor_unregister_arg_t;
-
static void *run_thread(void *start_arg);
static void work_queue_read_cb(void *context);
-static void register_with_reactor_cb(void *context);
-static void unregister_with_reactor_cb(void *context);
static const size_t DEFAULT_WORK_QUEUE_CAPACITY = 128;
@@ -167,37 +154,6 @@ const char *thread_name(const thread_t *thread) {
return thread->name;
}
-void thread_register(thread_t *thread, reactor_object_t *reactor_object) {
- assert(thread != NULL);
- assert(reactor_object != NULL);
-
- reactor_register_arg_t *arg = (reactor_register_arg_t *)malloc(sizeof(reactor_register_arg_t));
- arg->thread = thread;
- arg->reactor_object = reactor_object;
-
- thread_post(thread, register_with_reactor_cb, arg);
-}
-
-void thread_unregister(thread_t *thread, reactor_object_t *reactor_object) {
- assert(thread != NULL);
- assert(reactor_object != NULL);
-
- reactor_unregister_arg_t arg;
-
- arg.thread = thread;
- arg.reactor_object = reactor_object;
- arg.unregistered_sem = semaphore_new(0);
-
- if (!arg.unregistered_sem) {
- ALOGE("%s unable to create unregistered semaphore.", __func__);
- return;
- }
-
- thread_post(thread, unregister_with_reactor_cb, &arg);
- semaphore_wait(arg.unregistered_sem);
- semaphore_free(arg.unregistered_sem);
-}
-
static void *run_thread(void *start_arg) {
assert(start_arg != NULL);
@@ -216,14 +172,12 @@ static void *run_thread(void *start_arg) {
semaphore_post(start->start_sem);
- reactor_object_t work_queue_object;
- work_queue_object.context = thread->work_queue;
- work_queue_object.fd = fixed_queue_get_dequeue_fd(thread->work_queue);
- work_queue_object.interest = REACTOR_INTEREST_READ;
- work_queue_object.read_ready = work_queue_read_cb;
+ int fd = fixed_queue_get_dequeue_fd(thread->work_queue);
+ void *context = thread->work_queue;
- reactor_register(thread->reactor, &work_queue_object);
+ reactor_object_t *work_queue_object = reactor_register(thread->reactor, fd, context, work_queue_read_cb, NULL);
reactor_start(thread->reactor);
+ reactor_unregister(work_queue_object);
// Make sure we dispatch all queued work items before exiting the thread.
// This allows a caller to safely tear down by enqueuing a teardown
@@ -251,27 +205,3 @@ static void work_queue_read_cb(void *context) {
item->func(item->context);
free(item);
}
-
-static void register_with_reactor_cb(void *context) {
- assert(context != NULL);
-
- reactor_register_arg_t *arg = (reactor_register_arg_t *)context;
- reactor_register(
- thread_get_reactor(arg->thread),
- arg->reactor_object
- );
-
- free(arg);
-}
-
-static void unregister_with_reactor_cb(void *context) {
- assert(context != NULL);
-
- reactor_unregister_arg_t *arg = (reactor_unregister_arg_t *)context;
- reactor_unregister(
- thread_get_reactor(arg->thread),
- arg->reactor_object
- );
-
- semaphore_post(arg->unregistered_sem);
-}
diff --git a/osi/test/reactor_test.cpp b/osi/test/reactor_test.cpp
index f830ec86b..7959b98cd 100644
--- a/osi/test/reactor_test.cpp
+++ b/osi/test/reactor_test.cpp
@@ -1,5 +1,6 @@
#include <gtest/gtest.h>
#include <pthread.h>
+#include <sys/eventfd.h>
#include <sys/time.h>
#include <unistd.h>
@@ -29,12 +30,6 @@ static void join_reactor_thread() {
pthread_join(thread, NULL);
}
-static uint64_t get_timestamp(void) {
- struct timeval tv;
- gettimeofday(&tv, NULL);
- return tv.tv_sec * 1000 + tv.tv_usec / 1000;
-}
-
TEST(ReactorTest, reactor_new) {
reactor_t *reactor = reactor_new();
EXPECT_TRUE(reactor != NULL);
@@ -75,13 +70,46 @@ TEST(ReactorTest, reactor_start_wait_stop) {
reactor_free(reactor);
}
-TEST(ReactorTest, reactor_run_once_timeout) {
+typedef struct {
+ reactor_t *reactor;
+ reactor_object_t *object;
+} unregister_arg_t;
+
+static void unregister_cb(void *context) {
+ unregister_arg_t *arg = (unregister_arg_t *)context;
+ reactor_unregister(arg->object);
+ reactor_stop(arg->reactor);
+}
+
+TEST(ReactorTest, reactor_unregister_from_callback) {
+ reactor_t *reactor = reactor_new();
+
+ int fd = eventfd(0, 0);
+ unregister_arg_t arg;
+ arg.reactor = reactor;
+ arg.object = reactor_register(reactor, fd, &arg, unregister_cb, NULL);
+ spawn_reactor_thread(reactor);
+ eventfd_write(fd, 1);
+
+ join_reactor_thread();
+
+ close(fd);
+ reactor_free(reactor);
+}
+
+TEST(ReactorTest, reactor_unregister_from_separate_thread) {
reactor_t *reactor = reactor_new();
- uint64_t start = get_timestamp();
- reactor_status_t status = reactor_run_once_timeout(reactor, 50);
- EXPECT_GE(get_timestamp() - start, static_cast<uint64_t>(50));
- EXPECT_EQ(status, REACTOR_STATUS_TIMEOUT);
+ int fd = eventfd(0, 0);
+
+ reactor_object_t *object = reactor_register(reactor, fd, NULL, NULL, NULL);
+ spawn_reactor_thread(reactor);
+ usleep(50 * 1000);
+ reactor_unregister(object);
+
+ reactor_stop(reactor);
+ join_reactor_thread();
+ close(fd);
reactor_free(reactor);
}
diff --git a/osi/test/thread_test.cpp b/osi/test/thread_test.cpp
index ea8e06c7e..32f8c287f 100644
--- a/osi/test/thread_test.cpp
+++ b/osi/test/thread_test.cpp
@@ -10,11 +10,6 @@ extern "C" {
#include "osi.h"
}
-typedef struct {
- semaphore_t *wait;
- semaphore_t *read;
-} read_wait_pair_t;
-
TEST(ThreadTest, test_new_simple) {
thread_t *thread = thread_new("test_thread");
ASSERT_TRUE(thread != NULL);
@@ -44,73 +39,6 @@ TEST(ThreadTest, test_very_long_name) {
thread_free(thread);
}
-static void signal_semaphore_when_called(UNUSED_ATTR void *context) {
- read_wait_pair_t *semaphores = (read_wait_pair_t *)context;
- semaphore_wait(semaphores->read);
- semaphore_post(semaphores->wait);
-}
-
-TEST(ThreadTest, test_register) {
- read_wait_pair_t semaphores;
- semaphores.wait = semaphore_new(0);
- semaphores.read = semaphore_new(0);
-
- thread_t *thread = thread_new("test_thread");
-
- reactor_object_t obj;
- obj.context = &semaphores;
- obj.fd = semaphore_get_fd(semaphores.read);
- obj.interest = REACTOR_INTEREST_READ;
- obj.read_ready = signal_semaphore_when_called;
-
- thread_register(thread, &obj);
-
- // See if the reactor picks it up
- semaphore_post(semaphores.read);
- semaphore_wait(semaphores.wait);
- thread_free(thread);
-}
-
-TEST(ThreadTest, test_unregister) {
- read_wait_pair_t semaphores;
- semaphores.wait = semaphore_new(0);
- semaphores.read = semaphore_new(0);
-
- thread_t *thread = thread_new("test_thread");
-
- reactor_object_t obj;
- obj.context = &semaphores;
- obj.fd = semaphore_get_fd(semaphores.read);
- obj.interest = REACTOR_INTEREST_READ;
- obj.read_ready = signal_semaphore_when_called;
-
- thread_register(thread, &obj);
-
- // See if the reactor picks it up
- semaphore_post(semaphores.read);
- semaphore_wait(semaphores.wait);
-
- thread_unregister(thread, &obj);
-
- semaphore_post(semaphores.read);
-
- // Select with a timeout and make sure it times out (should not get data after unregister returns)
- struct timeval timeout;
- timeout.tv_sec = 1;
- timeout.tv_usec = 0;
-
- int wait_fd = semaphore_get_fd(semaphores.wait);
- fd_set read_fds;
-
- FD_ZERO(&read_fds);
- FD_SET(wait_fd, &read_fds);
-
- select(wait_fd + 1, &read_fds, NULL, NULL, &timeout);
-
- ASSERT_FALSE(FD_ISSET(wait_fd, &read_fds));
- thread_free(thread);
-}
-
static void thread_is_self_fn(void *context) {
thread_t *thread = (thread_t *)context;
EXPECT_TRUE(thread_is_self(thread));