diff options
author | Sharvil Nanavati <sharvil@google.com> | 2014-08-13 00:40:49 -0700 |
---|---|---|
committer | Andre Eisenbach <eisenbach@google.com> | 2015-03-16 16:51:28 -0700 |
commit | fbf89085bf308a98b00da77d1538539f6dd58604 (patch) | |
tree | 82802f0ae12dec58034a2cc0683da6a841765329 | |
parent | 278abce72967eaa496b402ffc867e37a4613c187 (diff) | |
download | android_system_bt-fbf89085bf308a98b00da77d1538539f6dd58604.tar.gz android_system_bt-fbf89085bf308a98b00da77d1538539f6dd58604.tar.bz2 android_system_bt-fbf89085bf308a98b00da77d1538539f6dd58604.zip |
Switch to an epoll-based reactor implementation.
epoll is a much nicer interface that very closely matches the
reactor interface. It's also thread-safe which makes it a more
suitable choice for bluedroid. As a result of this change,
reactor_register and reactor_unregister are both thread-safe without
introducing any synchronization in user-space.
-rw-r--r-- | hci/src/hci_inject.c | 14 | ||||
-rw-r--r-- | osi/include/list.h | 1 | ||||
-rw-r--r-- | osi/include/reactor.h | 57 | ||||
-rw-r--r-- | osi/include/socket.h | 10 | ||||
-rw-r--r-- | osi/include/thread.h | 12 | ||||
-rw-r--r-- | osi/src/list.c | 12 | ||||
-rw-r--r-- | osi/src/reactor.c | 248 | ||||
-rw-r--r-- | osi/src/socket.c | 56 | ||||
-rw-r--r-- | osi/src/thread.c | 78 | ||||
-rw-r--r-- | osi/test/reactor_test.cpp | 50 | ||||
-rw-r--r-- | osi/test/thread_test.cpp | 72 |
11 files changed, 292 insertions, 318 deletions
diff --git a/hci/src/hci_inject.c b/hci/src/hci_inject.c index 78c1cc1a7..af5ed704d 100644 --- a/hci/src/hci_inject.c +++ b/hci/src/hci_inject.c @@ -62,14 +62,14 @@ bool hci_inject_open(void) { hci = bt_hc_get_interface(); - clients = list_new(client_free); - if (!clients) - goto error; - thread = thread_new("hci_inject"); if (!thread) goto error; + clients = list_new(client_free); + if (!clients) + goto error; + listen_socket = socket_new(); if (!listen_socket) goto error; @@ -77,7 +77,7 @@ bool hci_inject_open(void) { if (!socket_listen(listen_socket, LISTEN_PORT)) goto error; - socket_register(listen_socket, thread, accept_ready, NULL, NULL); + socket_register(listen_socket, thread_get_reactor(thread), NULL, accept_ready, NULL); return true; error:; @@ -87,8 +87,8 @@ error:; void hci_inject_close(void) { socket_free(listen_socket); - thread_free(thread); list_free(clients); + thread_free(thread); listen_socket = NULL; thread = NULL; @@ -132,7 +132,7 @@ static void accept_ready(socket_t *socket, UNUSED_ATTR void *context) { return; } - socket_register(socket, thread, read_ready, NULL, client); + socket_register(socket, thread_get_reactor(thread), client, read_ready, NULL); } static void read_ready(UNUSED_ATTR socket_t *socket, void *context) { diff --git a/osi/include/list.h b/osi/include/list.h index 56cb7396d..b23532cf0 100644 --- a/osi/include/list.h +++ b/osi/include/list.h @@ -18,6 +18,7 @@ void list_free(list_t *list); // Accessors. bool list_is_empty(const list_t *list); +bool list_contains(const list_t *list, const void *data); size_t list_length(const list_t *list); void *list_front(const list_t *list); void *list_back(const list_t *list); diff --git a/osi/include/reactor.h b/osi/include/reactor.h index eeb538dc6..100c76268 100644 --- a/osi/include/reactor.h +++ b/osi/include/reactor.h @@ -26,37 +26,16 @@ // This module implements the Reactor pattern. // See http://en.wikipedia.org/wiki/Reactor_pattern for details. -struct reactor_t; typedef struct reactor_t reactor_t; - -struct reactor_object_t; typedef struct reactor_object_t reactor_object_t; -// Enumerates the types of events a reactor object is interested -// in responding to. -typedef enum { - REACTOR_INTEREST_READ = 1, - REACTOR_INTEREST_WRITE = 2, - REACTOR_INTEREST_READ_WRITE = 3, -} reactor_interest_t; - // Enumerates the reasons a reactor has stopped. typedef enum { REACTOR_STATUS_STOP, // |reactor_stop| was called. - REACTOR_STATUS_TIMEOUT, // a timeout was specified and the reactor timed out. REACTOR_STATUS_ERROR, // there was an error during the operation. REACTOR_STATUS_DONE, // the reactor completed its work (for the _run_once* variants). } reactor_status_t; -struct reactor_object_t { - void *context; // a context that's passed back to the *_ready functions. - int fd; // the file descriptor to monitor for events. - reactor_interest_t interest; // the event types to monitor the file descriptor for. - - void (*read_ready)(void *context); // function to call when the file descriptor becomes readable. - void (*write_ready)(void *context); // function to call when the file descriptor becomes writeable. -}; - // Creates a new reactor object. Returns NULL on failure. The returned object // must be freed by calling |reactor_free|. reactor_t *reactor_new(void); @@ -72,18 +51,32 @@ reactor_status_t reactor_start(reactor_t *reactor); // becomes ready. |reactor| may not be NULL. reactor_status_t reactor_run_once(reactor_t *reactor); -// Same as |reactor_run_once| with a bounded wait time in case no object becomes ready. -reactor_status_t reactor_run_once_timeout(reactor_t *reactor, timeout_t timeout_ms); - // Immediately unblocks the reactor. This function is safe to call from any thread. // |reactor| may not be NULL. void reactor_stop(reactor_t *reactor); -// Registers an object with the reactor. |obj| is neither copied nor is its ownership transferred -// so the pointer must remain valid until it is unregistered with |reactor_unregister|. Neither -// |reactor| nor |obj| may be NULL. -void reactor_register(reactor_t *reactor, reactor_object_t *obj); - -// Unregisters a previously registered object with the |reactor|. Neither |reactor| nor |obj| -// may be NULL. -void reactor_unregister(reactor_t *reactor, reactor_object_t *obj); +// Registers a file descriptor with the reactor. The file descriptor, |fd|, must be valid +// when this function is called and its ownership is not transferred to the reactor. The +// |context| variable is a user-defined opaque handle that is passed back to the |read_ready| +// and |write_ready| functions. It is not copied or even dereferenced by the reactor so it +// may contain any value including NULL. The |read_ready| and |write_ready| arguments are +// optional and may be NULL. This function returns an opaque object that represents the +// file descriptor's registration with the reactor. When the caller is no longer interested +// in events on the |fd|, it must free the returned object by calling |reactor_unregister|. +reactor_object_t *reactor_register(reactor_t *reactor, + int fd, void *context, + void (*read_ready)(void *context), + void (*write_ready)(void *context)); + +// Changes the subscription mode for the file descriptor represented by |object|. If the +// caller has already registered a file descriptor with a reactor, has a valid |object|, +// and decides to change the |read_ready| and/or |write_ready| callback routines, they +// can call this routine. Returns true if the subscription was changed, false otherwise. +// |object| may not be NULL, |read_ready| and |write_ready| may be NULL. +bool reactor_change_registration(reactor_object_t *object, + void (*read_ready)(void *context), + void (*write_ready)(void *context)); + +// Unregisters a previously registered file descriptor with its reactor. |obj| may not be NULL. +// |obj| is invalid after calling this function so the caller must drop all references to it. +void reactor_unregister(reactor_object_t *obj); diff --git a/osi/include/socket.h b/osi/include/socket.h index e9ec8245c..754e46885 100644 --- a/osi/include/socket.h +++ b/osi/include/socket.h @@ -21,8 +21,6 @@ #include <stddef.h> #include <stdint.h> -#include "thread.h" - typedef struct reactor_t reactor_t; typedef struct socket_t socket_t; typedef uint16_t port_t; @@ -68,13 +66,13 @@ ssize_t socket_read(const socket_t *socket, void *buf, size_t count); // may be NULL. ssize_t socket_write(const socket_t *socket, const void *buf, size_t count); -// Registers |socket| with the |thread|. When the socket becomes readable, |read_cb| +// Registers |socket| with the |reactor|. When the socket becomes readable, |read_cb| // will be called. When the socket becomes writeable, |write_cb| will be called. The // |context| parameter is passed, untouched, to each of the callback routines. Neither -// |socket| nor |thread| may be NULL. |read_cb| or |write_cb|, but not both, may be NULL. +// |socket| nor |reactor| may be NULL. |read_cb| or |write_cb|, but not both, may be NULL. // |context| may be NULL. -void socket_register(socket_t *socket, thread_t *thread, socket_cb read_cb, socket_cb write_cb, void *context); +void socket_register(socket_t *socket, reactor_t *reactor, void *context, socket_cb read_cb, socket_cb write_cb); -// Unregisters |socket| from whichever thread it is registered with, if any. This +// Unregisters |socket| from whichever reactor it is registered with, if any. This // function is idempotent. void socket_unregister(socket_t *socket); diff --git a/osi/include/thread.h b/osi/include/thread.h index 5b9eb049e..17a1f364e 100644 --- a/osi/include/thread.h +++ b/osi/include/thread.h @@ -23,7 +23,6 @@ #define THREAD_NAME_MAX 16 typedef struct reactor_t reactor_t; -typedef struct reactor_object_t reactor_object_t; typedef struct thread_t thread_t; typedef void (*thread_fn)(void *context); @@ -57,14 +56,7 @@ void thread_stop(thread_t *thread); // |thread| may not be NULL. bool thread_is_self(const thread_t *thread); +reactor_t *thread_get_reactor(const thread_t *thread); + // Returns the name of the given |thread|. |thread| may not be NULL. const char *thread_name(const thread_t *thread); - -// Registers |reactor_object| with the reactor of the given |thread| -// in a thread safe manner. Neither |thread| nor |reactor_object| may be NULL. -void thread_register(thread_t *thread, reactor_object_t *reactor_object); - -// Unregisters |reactor_object| with the reactor of the given |thread| -// in a thread safe manner. Does not return until the unregistration is -// complete. Neither |thread| nor |reactor_object| may be NULL. -void thread_unregister(thread_t *thread, reactor_object_t *reactor_object); diff --git a/osi/src/list.c b/osi/src/list.c index 691f03962..c65720d6f 100644 --- a/osi/src/list.c +++ b/osi/src/list.c @@ -47,6 +47,18 @@ bool list_is_empty(const list_t *list) { return (list->length == 0); } +bool list_contains(const list_t *list, const void *data) { + assert(list != NULL); + assert(data != NULL); + + for (const list_node_t *node = list_begin(list); node != list_end(list); node = list_next(node)) { + if (list_node(node) == data) + return true; + } + + return false; +} + // Returns the length of the list. This function does not accept a NULL list. size_t list_length(const list_t *list) { assert(list != NULL); diff --git a/osi/src/reactor.c b/osi/src/reactor.c index 4fcb996e4..eb88f75ce 100644 --- a/osi/src/reactor.c +++ b/osi/src/reactor.c @@ -20,9 +20,10 @@ #include <assert.h> #include <errno.h> +#include <pthread.h> #include <stdlib.h> +#include <sys/epoll.h> #include <sys/eventfd.h> -#include <sys/select.h> #include <utils/Log.h> #include "list.h" @@ -33,29 +34,64 @@ #endif struct reactor_t { + int epoll_fd; int event_fd; - list_t *objects; + pthread_mutex_t list_lock; // protects invalidation_list. + list_t *invalidation_list; // reactor objects that have been unregistered. + pthread_t run_thread; // the pthread on which reactor_run is executing. + bool is_running; // indicates whether |run_thread| is valid. + bool object_removed; }; -static reactor_status_t run_reactor(reactor_t *reactor, int iterations, struct timeval *tv); +struct reactor_object_t { + int fd; // the file descriptor to monitor for events. + void *context; // a context that's passed back to the *_ready functions. + reactor_t *reactor; // the reactor instance this object is registered with. + pthread_mutex_t lock; // protects the lifetime of this object and all variables. -static const eventfd_t EVENT_REACTOR_CHANGE_SET = 1; -static const eventfd_t EVENT_REACTOR_STOP = 0x8000000000000000LL; + void (*read_ready)(void *context); // function to call when the file descriptor becomes readable. + void (*write_ready)(void *context); // function to call when the file descriptor becomes writeable. +}; + +static reactor_status_t run_reactor(reactor_t *reactor, int iterations); + +static const size_t MAX_EVENTS = 64; +static const eventfd_t EVENT_REACTOR_STOP = 1; reactor_t *reactor_new(void) { reactor_t *ret = (reactor_t *)calloc(1, sizeof(reactor_t)); if (!ret) return NULL; + ret->epoll_fd = INVALID_FD; + ret->event_fd = INVALID_FD; + + ret->epoll_fd = epoll_create(MAX_EVENTS); + if (ret->epoll_fd == INVALID_FD) { + ALOGE("%s unable to create epoll instance: %s", __func__, strerror(errno)); + goto error; + } + ret->event_fd = eventfd(0, 0); if (ret->event_fd == INVALID_FD) { ALOGE("%s unable to create eventfd: %s", __func__, strerror(errno)); goto error; } - ret->objects = list_new(NULL); - if (!ret->objects) + pthread_mutex_init(&ret->list_lock, NULL); + ret->invalidation_list = list_new(NULL); + if (!ret->invalidation_list) { + ALOGE("%s unable to allocate object invalidation list.", __func__); + goto error; + } + + struct epoll_event event; + event.events = EPOLLIN; + event.data.ptr = NULL; + if (epoll_ctl(ret->epoll_fd, EPOLL_CTL_ADD, ret->event_fd, &event) == -1) { + ALOGE("%s unable to register eventfd with epoll set: %s", __func__, strerror(errno)); goto error; + } return ret; @@ -68,122 +104,186 @@ void reactor_free(reactor_t *reactor) { if (!reactor) return; - list_free(reactor->objects); + list_free(reactor->invalidation_list); close(reactor->event_fd); + close(reactor->epoll_fd); free(reactor); } reactor_status_t reactor_start(reactor_t *reactor) { assert(reactor != NULL); - return run_reactor(reactor, 0, NULL); + return run_reactor(reactor, 0); } reactor_status_t reactor_run_once(reactor_t *reactor) { assert(reactor != NULL); - return run_reactor(reactor, 1, NULL); + return run_reactor(reactor, 1); } -reactor_status_t reactor_run_once_timeout(reactor_t *reactor, timeout_t timeout_ms) { +void reactor_stop(reactor_t *reactor) { assert(reactor != NULL); - struct timeval tv; - tv.tv_sec = timeout_ms / 1000; - tv.tv_usec = (timeout_ms % 1000) * 1000; - return run_reactor(reactor, 1, &tv); + eventfd_write(reactor->event_fd, EVENT_REACTOR_STOP); } -void reactor_stop(reactor_t *reactor) { +reactor_object_t *reactor_register(reactor_t *reactor, + int fd, void *context, + void (*read_ready)(void *context), + void (*write_ready)(void *context)) { assert(reactor != NULL); + assert(fd != INVALID_FD); - eventfd_write(reactor->event_fd, EVENT_REACTOR_STOP); + reactor_object_t *object = (reactor_object_t *)calloc(1, sizeof(reactor_object_t)); + if (!object) { + ALOGE("%s unable to allocate reactor object: %s", __func__, strerror(errno)); + return NULL; + } + + object->reactor = reactor; + object->fd = fd; + object->context = context; + object->read_ready = read_ready; + object->write_ready = write_ready; + pthread_mutex_init(&object->lock, NULL); + + struct epoll_event event; + event.events = 0; + if (read_ready) + event.events |= (EPOLLIN | EPOLLRDHUP); + if (write_ready) + event.events |= EPOLLOUT; + event.data.ptr = object; + + if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) { + ALOGE("%s unable to register fd %d to epoll set: %s", __func__, fd, strerror(errno)); + pthread_mutex_destroy(&object->lock); + free(object); + return NULL; + } + + return object; } -void reactor_register(reactor_t *reactor, reactor_object_t *obj) { - assert(reactor != NULL); - assert(obj != NULL); +bool reactor_change_registration(reactor_object_t *object, + void (*read_ready)(void *context), + void (*write_ready)(void *context)) { + assert(object != NULL); + + struct epoll_event event; + event.events = 0; + if (read_ready) + event.events |= (EPOLLIN | EPOLLRDHUP); + if (write_ready) + event.events |= EPOLLOUT; + event.data.ptr = object; + + if (epoll_ctl(object->reactor->epoll_fd, EPOLL_CTL_MOD, object->fd, &event) == -1) { + ALOGE("%s unable to modify interest set for fd %d: %s", __func__, object->fd, strerror(errno)); + return false; + } + + pthread_mutex_lock(&object->lock); + object->read_ready = read_ready; + object->write_ready = write_ready; + pthread_mutex_unlock(&object->lock); - // IMPORTANT - // You might be wondering why on earth this is a |list_prepend|. - // That is a good question... - // - // thread_t depends on this behavior. - // The first reactor object it registers is its work queue, and prepending - // means it will always be at the end of any given reactor iteration. - // This is important to ensure we don't execute off dangling reactor objects - // or do other bad things. - list_prepend(reactor->objects, obj); - eventfd_write(reactor->event_fd, EVENT_REACTOR_CHANGE_SET); + return true; } -void reactor_unregister(reactor_t *reactor, reactor_object_t *obj) { - assert(reactor != NULL); +void reactor_unregister(reactor_object_t *obj) { assert(obj != NULL); - list_remove(reactor->objects, obj); - eventfd_write(reactor->event_fd, EVENT_REACTOR_CHANGE_SET); + reactor_t *reactor = obj->reactor; + + if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_DEL, obj->fd, NULL) == -1) + ALOGE("%s unable to unregister fd %d from epoll set: %s", __func__, obj->fd, strerror(errno)); + + if (reactor->is_running && pthread_equal(pthread_self(), reactor->run_thread)) { + reactor->object_removed = true; + return; + } + + pthread_mutex_lock(&reactor->list_lock); + list_append(reactor->invalidation_list, obj); + pthread_mutex_unlock(&reactor->list_lock); + + // Taking the object lock here makes sure a callback for |obj| isn't + // currently executing. The reactor thread must then either be before + // the callbacks or after. If after, we know that the object won't be + // referenced because it has been taken out of the epoll set. If before, + // it won't be referenced because the reactor thread will check the + // invalidation_list and find it in there. So by taking this lock, we + // are waiting until the reactor thread drops all references to |obj|. + // One the wait completes, we can unlock and destroy |obj| safely. + pthread_mutex_lock(&obj->lock); + pthread_mutex_unlock(&obj->lock); + pthread_mutex_destroy(&obj->lock); + free(obj); } -// Runs the reactor loop for a maximum of |iterations| with the given timeout, |tv|. +// Runs the reactor loop for a maximum of |iterations|. // 0 |iterations| means loop forever. -// NULL |tv| means no timeout (block until an event occurs). // |reactor| may not be NULL. -static reactor_status_t run_reactor(reactor_t *reactor, int iterations, struct timeval *tv) { +static reactor_status_t run_reactor(reactor_t *reactor, int iterations) { assert(reactor != NULL); + reactor->run_thread = pthread_self(); + reactor->is_running = true; + + struct epoll_event events[MAX_EVENTS]; for (int i = 0; iterations == 0 || i < iterations; ++i) { - fd_set read_set; - fd_set write_set; - FD_ZERO(&read_set); - FD_ZERO(&write_set); - FD_SET(reactor->event_fd, &read_set); - - int max_fd = reactor->event_fd; - for (const list_node_t *iter = list_begin(reactor->objects); iter != list_end(reactor->objects); iter = list_next(iter)) { - reactor_object_t *object = (reactor_object_t *)list_node(iter); - int fd = object->fd; - reactor_interest_t interest = object->interest; - if (interest & REACTOR_INTEREST_READ) - FD_SET(fd, &read_set); - if (interest & REACTOR_INTEREST_WRITE) - FD_SET(fd, &write_set); - if (fd > max_fd) - max_fd = fd; - } + pthread_mutex_lock(&reactor->list_lock); + list_clear(reactor->invalidation_list); + pthread_mutex_unlock(&reactor->list_lock); int ret; do { - ret = select(max_fd + 1, &read_set, &write_set, NULL, tv); + ret = epoll_wait(reactor->epoll_fd, events, MAX_EVENTS, -1); } while (ret == -1 && errno == EINTR); if (ret == -1) { - ALOGE("%s error in select: %s", __func__, strerror(errno)); + ALOGE("%s error in epoll_wait: %s", __func__, strerror(errno)); + reactor->is_running = false; return REACTOR_STATUS_ERROR; } - if (ret == 0) - return REACTOR_STATUS_TIMEOUT; + for (int j = 0; j < ret; ++j) { + // The event file descriptor is the only one that registers with + // a NULL data pointer. We use the NULL to identify it and break + // out of the reactor loop. + if (events[j].data.ptr == NULL) { + eventfd_t value; + eventfd_read(reactor->event_fd, &value); + reactor->is_running = false; + return REACTOR_STATUS_STOP; + } + + reactor_object_t *object = (reactor_object_t *)events[j].data.ptr; - if (FD_ISSET(reactor->event_fd, &read_set)) { - eventfd_t value; - eventfd_read(reactor->event_fd, &value); - if (value < EVENT_REACTOR_STOP) + pthread_mutex_lock(&reactor->list_lock); + if (list_contains(reactor->invalidation_list, object)) { + pthread_mutex_unlock(&reactor->list_lock); continue; - return REACTOR_STATUS_STOP; - } + } + + // Downgrade the list lock to an object lock. + pthread_mutex_lock(&object->lock); + pthread_mutex_unlock(&reactor->list_lock); - for (const list_node_t *iter = list_begin(reactor->objects); ret > 0 && iter != list_end(reactor->objects); iter = list_next(iter)) { - reactor_object_t *object = (reactor_object_t *)list_node(iter); - int fd = object->fd; - if (FD_ISSET(fd, &read_set)) { + reactor->object_removed = false; + if (events[j].events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && object->read_ready) object->read_ready(object->context); - --ret; - } - if (FD_ISSET(fd, &write_set)) { + if (!reactor->object_removed && events[j].events & EPOLLOUT && object->write_ready) object->write_ready(object->context); - --ret; + pthread_mutex_unlock(&object->lock); + + if (reactor->object_removed) { + pthread_mutex_destroy(&object->lock); + free(object); } } } + reactor->is_running = false; return REACTOR_STATUS_DONE; } diff --git a/osi/src/socket.c b/osi/src/socket.c index 0155ed8fc..827941ca3 100644 --- a/osi/src/socket.c +++ b/osi/src/socket.c @@ -27,16 +27,16 @@ #include <unistd.h> #include <utils/Log.h> +#include "osi.h" #include "reactor.h" #include "socket.h" -#include "thread.h" struct socket_t { - thread_t *thread; - reactor_object_t socket_object; + int fd; + reactor_object_t *reactor_object; socket_cb read_ready; socket_cb write_ready; - void *context; + void *context; // Not owned, do not free. }; static void internal_read_ready(void *context); @@ -49,14 +49,14 @@ socket_t *socket_new(void) { goto error; } - ret->socket_object.fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); - if (ret->socket_object.fd == -1) { + ret->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (ret->fd == INVALID_FD) { ALOGE("%s unable to create socket: %s", __func__, strerror(errno)); goto error; } int enable = 1; - if (setsockopt(ret->socket_object.fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)) == -1) { + if (setsockopt(ret->fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)) == -1) { ALOGE("%s unable to set SO_REUSEADDR: %s", __func__, strerror(errno)); goto error; } @@ -65,7 +65,7 @@ socket_t *socket_new(void) { error:; if (ret) - close(ret->socket_object.fd); + close(ret->fd); free(ret); return NULL; } @@ -75,7 +75,7 @@ void socket_free(socket_t *socket) { return; socket_unregister(socket); - close(socket->socket_object.fd); + close(socket->fd); free(socket); } @@ -86,12 +86,12 @@ bool socket_listen(const socket_t *socket, port_t port) { addr.sin_family = AF_INET; addr.sin_addr.s_addr = 0; addr.sin_port = htons(port); - if (bind(socket->socket_object.fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { + if (bind(socket->fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { ALOGE("%s unable to bind socket to port %u: %s", __func__, port, strerror(errno)); return false; } - if (listen(socket->socket_object.fd, 10) == -1) { + if (listen(socket->fd, 10) == -1) { ALOGE("%s unable to listen on port %u: %s", __func__, port, strerror(errno)); return false; } @@ -102,8 +102,8 @@ bool socket_listen(const socket_t *socket, port_t port) { socket_t *socket_accept(const socket_t *socket) { assert(socket != NULL); - int fd = accept(socket->socket_object.fd, NULL, NULL); - if (fd == -1) { + int fd = accept(socket->fd, NULL, NULL); + if (fd == INVALID_FD) { ALOGE("%s unable to accept socket: %s", __func__, strerror(errno)); return NULL; } @@ -115,7 +115,7 @@ socket_t *socket_accept(const socket_t *socket) { return NULL; } - ret->socket_object.fd = fd; + ret->fd = fd; return ret; } @@ -123,47 +123,39 @@ ssize_t socket_read(const socket_t *socket, void *buf, size_t count) { assert(socket != NULL); assert(buf != NULL); - return recv(socket->socket_object.fd, buf, count, MSG_DONTWAIT); + return recv(socket->fd, buf, count, MSG_DONTWAIT); } ssize_t socket_write(const socket_t *socket, const void *buf, size_t count) { assert(socket != NULL); assert(buf != NULL); - return send(socket->socket_object.fd, buf, count, MSG_DONTWAIT); + return send(socket->fd, buf, count, MSG_DONTWAIT); } -void socket_register(socket_t *socket, thread_t *thread, socket_cb read_cb, socket_cb write_cb, void *context) { +void socket_register(socket_t *socket, reactor_t *reactor, void *context, socket_cb read_cb, socket_cb write_cb) { assert(socket != NULL); - assert(thread != NULL); assert(read_cb || write_cb); // Make sure the socket isn't currently registered. socket_unregister(socket); - socket->thread = thread; socket->read_ready = read_cb; socket->write_ready = write_cb; socket->context = context; - socket->socket_object.read_ready = internal_read_ready; - socket->socket_object.write_ready = internal_write_ready; - socket->socket_object.context = socket; - if (read_cb && write_cb) - socket->socket_object.interest = REACTOR_INTEREST_READ_WRITE; - else if (read_cb) - socket->socket_object.interest = REACTOR_INTEREST_READ; - else if (write_cb) - socket->socket_object.interest = REACTOR_INTEREST_WRITE; - - thread_register(thread, &socket->socket_object); + void (*read_fn)(void *) = (read_cb != NULL) ? internal_read_ready : NULL; + void (*write_fn)(void *) = (write_cb != NULL) ? internal_write_ready : NULL; + + socket->reactor_object = reactor_register(reactor, socket->fd, socket, read_fn, write_fn); } void socket_unregister(socket_t *socket) { assert(socket != NULL); - if (socket->thread) - thread_unregister(socket->thread, &socket->socket_object); + if (socket->reactor_object) + reactor_unregister(socket->reactor_object); + socket->reactor_object = NULL; } static void internal_read_ready(void *context) { diff --git a/osi/src/thread.c b/osi/src/thread.c index daa642305..4caefbdd7 100644 --- a/osi/src/thread.c +++ b/osi/src/thread.c @@ -50,21 +50,8 @@ typedef struct { void *context; } work_item_t; -typedef struct { - thread_t *thread; - reactor_object_t *reactor_object; -} reactor_register_arg_t; - -typedef struct { - thread_t *thread; - reactor_object_t *reactor_object; - semaphore_t *unregistered_sem; -} reactor_unregister_arg_t; - static void *run_thread(void *start_arg); static void work_queue_read_cb(void *context); -static void register_with_reactor_cb(void *context); -static void unregister_with_reactor_cb(void *context); static const size_t DEFAULT_WORK_QUEUE_CAPACITY = 128; @@ -167,37 +154,6 @@ const char *thread_name(const thread_t *thread) { return thread->name; } -void thread_register(thread_t *thread, reactor_object_t *reactor_object) { - assert(thread != NULL); - assert(reactor_object != NULL); - - reactor_register_arg_t *arg = (reactor_register_arg_t *)malloc(sizeof(reactor_register_arg_t)); - arg->thread = thread; - arg->reactor_object = reactor_object; - - thread_post(thread, register_with_reactor_cb, arg); -} - -void thread_unregister(thread_t *thread, reactor_object_t *reactor_object) { - assert(thread != NULL); - assert(reactor_object != NULL); - - reactor_unregister_arg_t arg; - - arg.thread = thread; - arg.reactor_object = reactor_object; - arg.unregistered_sem = semaphore_new(0); - - if (!arg.unregistered_sem) { - ALOGE("%s unable to create unregistered semaphore.", __func__); - return; - } - - thread_post(thread, unregister_with_reactor_cb, &arg); - semaphore_wait(arg.unregistered_sem); - semaphore_free(arg.unregistered_sem); -} - static void *run_thread(void *start_arg) { assert(start_arg != NULL); @@ -216,14 +172,12 @@ static void *run_thread(void *start_arg) { semaphore_post(start->start_sem); - reactor_object_t work_queue_object; - work_queue_object.context = thread->work_queue; - work_queue_object.fd = fixed_queue_get_dequeue_fd(thread->work_queue); - work_queue_object.interest = REACTOR_INTEREST_READ; - work_queue_object.read_ready = work_queue_read_cb; + int fd = fixed_queue_get_dequeue_fd(thread->work_queue); + void *context = thread->work_queue; - reactor_register(thread->reactor, &work_queue_object); + reactor_object_t *work_queue_object = reactor_register(thread->reactor, fd, context, work_queue_read_cb, NULL); reactor_start(thread->reactor); + reactor_unregister(work_queue_object); // Make sure we dispatch all queued work items before exiting the thread. // This allows a caller to safely tear down by enqueuing a teardown @@ -251,27 +205,3 @@ static void work_queue_read_cb(void *context) { item->func(item->context); free(item); } - -static void register_with_reactor_cb(void *context) { - assert(context != NULL); - - reactor_register_arg_t *arg = (reactor_register_arg_t *)context; - reactor_register( - thread_get_reactor(arg->thread), - arg->reactor_object - ); - - free(arg); -} - -static void unregister_with_reactor_cb(void *context) { - assert(context != NULL); - - reactor_unregister_arg_t *arg = (reactor_unregister_arg_t *)context; - reactor_unregister( - thread_get_reactor(arg->thread), - arg->reactor_object - ); - - semaphore_post(arg->unregistered_sem); -} diff --git a/osi/test/reactor_test.cpp b/osi/test/reactor_test.cpp index f830ec86b..7959b98cd 100644 --- a/osi/test/reactor_test.cpp +++ b/osi/test/reactor_test.cpp @@ -1,5 +1,6 @@ #include <gtest/gtest.h> #include <pthread.h> +#include <sys/eventfd.h> #include <sys/time.h> #include <unistd.h> @@ -29,12 +30,6 @@ static void join_reactor_thread() { pthread_join(thread, NULL); } -static uint64_t get_timestamp(void) { - struct timeval tv; - gettimeofday(&tv, NULL); - return tv.tv_sec * 1000 + tv.tv_usec / 1000; -} - TEST(ReactorTest, reactor_new) { reactor_t *reactor = reactor_new(); EXPECT_TRUE(reactor != NULL); @@ -75,13 +70,46 @@ TEST(ReactorTest, reactor_start_wait_stop) { reactor_free(reactor); } -TEST(ReactorTest, reactor_run_once_timeout) { +typedef struct { + reactor_t *reactor; + reactor_object_t *object; +} unregister_arg_t; + +static void unregister_cb(void *context) { + unregister_arg_t *arg = (unregister_arg_t *)context; + reactor_unregister(arg->object); + reactor_stop(arg->reactor); +} + +TEST(ReactorTest, reactor_unregister_from_callback) { + reactor_t *reactor = reactor_new(); + + int fd = eventfd(0, 0); + unregister_arg_t arg; + arg.reactor = reactor; + arg.object = reactor_register(reactor, fd, &arg, unregister_cb, NULL); + spawn_reactor_thread(reactor); + eventfd_write(fd, 1); + + join_reactor_thread(); + + close(fd); + reactor_free(reactor); +} + +TEST(ReactorTest, reactor_unregister_from_separate_thread) { reactor_t *reactor = reactor_new(); - uint64_t start = get_timestamp(); - reactor_status_t status = reactor_run_once_timeout(reactor, 50); - EXPECT_GE(get_timestamp() - start, static_cast<uint64_t>(50)); - EXPECT_EQ(status, REACTOR_STATUS_TIMEOUT); + int fd = eventfd(0, 0); + + reactor_object_t *object = reactor_register(reactor, fd, NULL, NULL, NULL); + spawn_reactor_thread(reactor); + usleep(50 * 1000); + reactor_unregister(object); + + reactor_stop(reactor); + join_reactor_thread(); + close(fd); reactor_free(reactor); } diff --git a/osi/test/thread_test.cpp b/osi/test/thread_test.cpp index ea8e06c7e..32f8c287f 100644 --- a/osi/test/thread_test.cpp +++ b/osi/test/thread_test.cpp @@ -10,11 +10,6 @@ extern "C" { #include "osi.h" } -typedef struct { - semaphore_t *wait; - semaphore_t *read; -} read_wait_pair_t; - TEST(ThreadTest, test_new_simple) { thread_t *thread = thread_new("test_thread"); ASSERT_TRUE(thread != NULL); @@ -44,73 +39,6 @@ TEST(ThreadTest, test_very_long_name) { thread_free(thread); } -static void signal_semaphore_when_called(UNUSED_ATTR void *context) { - read_wait_pair_t *semaphores = (read_wait_pair_t *)context; - semaphore_wait(semaphores->read); - semaphore_post(semaphores->wait); -} - -TEST(ThreadTest, test_register) { - read_wait_pair_t semaphores; - semaphores.wait = semaphore_new(0); - semaphores.read = semaphore_new(0); - - thread_t *thread = thread_new("test_thread"); - - reactor_object_t obj; - obj.context = &semaphores; - obj.fd = semaphore_get_fd(semaphores.read); - obj.interest = REACTOR_INTEREST_READ; - obj.read_ready = signal_semaphore_when_called; - - thread_register(thread, &obj); - - // See if the reactor picks it up - semaphore_post(semaphores.read); - semaphore_wait(semaphores.wait); - thread_free(thread); -} - -TEST(ThreadTest, test_unregister) { - read_wait_pair_t semaphores; - semaphores.wait = semaphore_new(0); - semaphores.read = semaphore_new(0); - - thread_t *thread = thread_new("test_thread"); - - reactor_object_t obj; - obj.context = &semaphores; - obj.fd = semaphore_get_fd(semaphores.read); - obj.interest = REACTOR_INTEREST_READ; - obj.read_ready = signal_semaphore_when_called; - - thread_register(thread, &obj); - - // See if the reactor picks it up - semaphore_post(semaphores.read); - semaphore_wait(semaphores.wait); - - thread_unregister(thread, &obj); - - semaphore_post(semaphores.read); - - // Select with a timeout and make sure it times out (should not get data after unregister returns) - struct timeval timeout; - timeout.tv_sec = 1; - timeout.tv_usec = 0; - - int wait_fd = semaphore_get_fd(semaphores.wait); - fd_set read_fds; - - FD_ZERO(&read_fds); - FD_SET(wait_fd, &read_fds); - - select(wait_fd + 1, &read_fds, NULL, NULL, &timeout); - - ASSERT_FALSE(FD_ISSET(wait_fd, &read_fds)); - thread_free(thread); -} - static void thread_is_self_fn(void *context) { thread_t *thread = (thread_t *)context; EXPECT_TRUE(thread_is_self(thread)); |