diff options
| author | The Android Open Source Project <initial-contribution@android.com> | 2009-03-03 19:32:55 -0800 |
|---|---|---|
| committer | The Android Open Source Project <initial-contribution@android.com> | 2009-03-03 19:32:55 -0800 |
| commit | dd7bc3319deb2b77c5d07a51b7d6cd7e11b5beb0 (patch) | |
| tree | 2ba8d1a0846d69b18f623515e8d9b5d9fe38b590 /libcutils/mq.c | |
| parent | e54eebbf1a908d65ee8cf80bab62821c05666d70 (diff) | |
| download | system_core-dd7bc3319deb2b77c5d07a51b7d6cd7e11b5beb0.tar.gz system_core-dd7bc3319deb2b77c5d07a51b7d6cd7e11b5beb0.tar.bz2 system_core-dd7bc3319deb2b77c5d07a51b7d6cd7e11b5beb0.zip | |
auto import from //depot/cupcake/@135843
Diffstat (limited to 'libcutils/mq.c')
| -rw-r--r-- | libcutils/mq.c | 1357 |
1 files changed, 1357 insertions, 0 deletions
diff --git a/libcutils/mq.c b/libcutils/mq.c new file mode 100644 index 00000000..3b65f1f1 --- /dev/null +++ b/libcutils/mq.c @@ -0,0 +1,1357 @@ +/* + * Copyright (C) 2007 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define LOG_TAG "mq" + +#include <assert.h> +#include <errno.h> +#include <fcntl.h> +#include <pthread.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> + +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/un.h> +#include <sys/uio.h> + +#include <cutils/array.h> +#include <cutils/hashmap.h> +#include <cutils/selector.h> + +#include "loghack.h" +#include "buffer.h" + +/** Number of dead peers to remember. */ +#define PEER_HISTORY (16) + +typedef struct sockaddr SocketAddress; +typedef struct sockaddr_un UnixAddress; + +/** + * Process/user/group ID. We don't use ucred directly because it's only + * available on Linux. + */ +typedef struct { + pid_t pid; + uid_t uid; + gid_t gid; +} Credentials; + +/** Listens for bytes coming from remote peers. */ +typedef void BytesListener(Credentials credentials, char* bytes, size_t size); + +/** Listens for the deaths of remote peers. */ +typedef void DeathListener(pid_t pid); + +/** Types of packets. */ +typedef enum { + /** Request for a connection to another peer. */ + CONNECTION_REQUEST, + + /** A connection to another peer. */ + CONNECTION, + + /** Reports a failed connection attempt. */ + CONNECTION_ERROR, + + /** A generic packet of bytes. */ + BYTES, +} PacketType; + +typedef enum { + /** Reading a packet header. */ + READING_HEADER, + + /** Waiting for a connection from the master. */ + ACCEPTING_CONNECTION, + + /** Reading bytes. */ + READING_BYTES, +} InputState; + +/** A packet header. */ +// TODO: Use custom headers for master->peer, peer->master, peer->peer. +typedef struct { + PacketType type; + union { + /** Packet size. Used for BYTES. */ + size_t size; + + /** Credentials. Used for CONNECTION and CONNECTION_REQUEST. */ + Credentials credentials; + }; +} Header; + +/** A packet which will be sent to a peer. */ +typedef struct OutgoingPacket OutgoingPacket; +struct OutgoingPacket { + /** Packet header. */ + Header header; + + union { + /** Connection to peer. Used with CONNECTION. */ + int socket; + + /** Buffer of bytes. Used with BYTES. */ + Buffer* bytes; + }; + + /** Frees all resources associated with this packet. */ + void (*free)(OutgoingPacket* packet); + + /** Optional context. */ + void* context; + + /** Next packet in the queue. */ + OutgoingPacket* nextPacket; +}; + +/** Represents a remote peer. */ +typedef struct PeerProxy PeerProxy; + +/** Local peer state. You typically have one peer per process. */ +typedef struct { + /** This peer's PID. */ + pid_t pid; + + /** + * Map from pid to peer proxy. The peer has a peer proxy for each remote + * peer it's connected to. + * + * Acquire mutex before use. + */ + Hashmap* peerProxies; + + /** Manages I/O. */ + Selector* selector; + + /** Used to synchronize operations with the selector thread. */ + pthread_mutex_t mutex; + + /** Is this peer the master? */ + bool master; + + /** Peer proxy for the master. */ + PeerProxy* masterProxy; + + /** Listens for packets from remote peers. */ + BytesListener* onBytes; + + /** Listens for deaths of remote peers. */ + DeathListener* onDeath; + + /** Keeps track of recently dead peers. Requires mutex. */ + pid_t deadPeers[PEER_HISTORY]; + size_t deadPeerCursor; +} Peer; + +struct PeerProxy { + /** Credentials of the remote process. */ + Credentials credentials; + + /** Keeps track of data coming in from the remote peer. */ + InputState inputState; + Buffer* inputBuffer; + PeerProxy* connecting; + + /** File descriptor for this peer. */ + SelectableFd* fd; + + /** + * Queue of packets to be written out to the remote peer. + * + * Requires mutex. + */ + // TODO: Limit queue length. + OutgoingPacket* currentPacket; + OutgoingPacket* lastPacket; + + /** Used to write outgoing header. */ + Buffer outgoingHeader; + + /** True if this is the master's proxy. */ + bool master; + + /** Reference back to the local peer. */ + Peer* peer; + + /** + * Used in master only. Maps this peer proxy to other peer proxies to + * which the peer has been connected to. Maps pid to PeerProxy. Helps + * keep track of which connections we've sent to whom. + */ + Hashmap* connections; +}; + +/** Server socket path. */ +static const char* MASTER_PATH = "/master.peer"; + +/** Credentials of the master peer. */ +static const Credentials MASTER_CREDENTIALS = {0, 0, 0}; + +/** Creates a peer proxy and adds it to the peer proxy map. */ +static PeerProxy* peerProxyCreate(Peer* peer, Credentials credentials); + +/** Sets the non-blocking flag on a descriptor. */ +static void setNonBlocking(int fd) { + int flags; + if ((flags = fcntl(fd, F_GETFL, 0)) < 0) { + LOG_ALWAYS_FATAL("fcntl() error: %s", strerror(errno)); + } + if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) { + LOG_ALWAYS_FATAL("fcntl() error: %s", strerror(errno)); + } +} + +/** Closes a fd and logs a warning if the close fails. */ +static void closeWithWarning(int fd) { + int result = close(fd); + if (result == -1) { + LOGW("close() error: %s", strerror(errno)); + } +} + +/** Hashes pid_t keys. */ +static int pidHash(void* key) { + pid_t* pid = (pid_t*) key; + return (int) (*pid); +} + +/** Compares pid_t keys. */ +static bool pidEquals(void* keyA, void* keyB) { + pid_t* a = (pid_t*) keyA; + pid_t* b = (pid_t*) keyB; + return *a == *b; +} + +/** Gets the master address. Not thread safe. */ +static UnixAddress* getMasterAddress() { + static UnixAddress masterAddress; + static bool initialized = false; + if (initialized == false) { + masterAddress.sun_family = AF_LOCAL; + strcpy(masterAddress.sun_path, MASTER_PATH); + initialized = true; + } + return &masterAddress; +} + +/** Gets exclusive access to the peer for this thread. */ +static void peerLock(Peer* peer) { + pthread_mutex_lock(&peer->mutex); +} + +/** Releases exclusive access to the peer. */ +static void peerUnlock(Peer* peer) { + pthread_mutex_unlock(&peer->mutex); +} + +/** Frees a simple, i.e. header-only, outgoing packet. */ +static void outgoingPacketFree(OutgoingPacket* packet) { + LOGD("Freeing outgoing packet."); + free(packet); +} + +/** + * Prepare to read a new packet from the peer. + */ +static void peerProxyExpectHeader(PeerProxy* peerProxy) { + peerProxy->inputState = READING_HEADER; + bufferPrepareForRead(peerProxy->inputBuffer, sizeof(Header)); +} + +/** Sets up the buffer for the outgoing header. */ +static void peerProxyPrepareOutgoingHeader(PeerProxy* peerProxy) { + peerProxy->outgoingHeader.data + = (char*) &(peerProxy->currentPacket->header); + peerProxy->outgoingHeader.size = sizeof(Header); + bufferPrepareForWrite(&peerProxy->outgoingHeader); +} + +/** Adds a packet to the end of the queue. Callers must have the mutex. */ +static void peerProxyEnqueueOutgoingPacket(PeerProxy* peerProxy, + OutgoingPacket* newPacket) { + newPacket->nextPacket = NULL; // Just in case. + if (peerProxy->currentPacket == NULL) { + // The queue is empty. + peerProxy->currentPacket = newPacket; + peerProxy->lastPacket = newPacket; + + peerProxyPrepareOutgoingHeader(peerProxy); + } else { + peerProxy->lastPacket->nextPacket = newPacket; + } +} + +/** Takes the peer lock and enqueues the given packet. */ +static void peerProxyLockAndEnqueueOutgoingPacket(PeerProxy* peerProxy, + OutgoingPacket* newPacket) { + Peer* peer = peerProxy->peer; + peerLock(peer); + peerProxyEnqueueOutgoingPacket(peerProxy, newPacket); + peerUnlock(peer); +} + +/** + * Frees current packet and moves to the next one. Returns true if there is + * a next packet or false if the queue is empty. + */ +static bool peerProxyNextPacket(PeerProxy* peerProxy) { + Peer* peer = peerProxy->peer; + peerLock(peer); + + OutgoingPacket* current = peerProxy->currentPacket; + + if (current == NULL) { + // The queue is already empty. + peerUnlock(peer); + return false; + } + + OutgoingPacket* next = current->nextPacket; + peerProxy->currentPacket = next; + current->nextPacket = NULL; + current->free(current); + if (next == NULL) { + // The queue is empty. + peerProxy->lastPacket = NULL; + peerUnlock(peer); + return false; + } else { + peerUnlock(peer); + peerProxyPrepareOutgoingHeader(peerProxy); + + // TODO: Start writing next packet? It would reduce the number of + // system calls, but we could also starve other peers. + return true; + } +} + +/** + * Checks whether a peer died recently. + */ +static bool peerIsDead(Peer* peer, pid_t pid) { + size_t i; + for (i = 0; i < PEER_HISTORY; i++) { + pid_t deadPeer = peer->deadPeers[i]; + if (deadPeer == 0) { + return false; + } + if (deadPeer == pid) { + return true; + } + } + return false; +} + +/** + * Cleans up connection information. + */ +static bool peerProxyRemoveConnection(void* key, void* value, void* context) { + PeerProxy* deadPeer = (PeerProxy*) context; + PeerProxy* otherPeer = (PeerProxy*) value; + hashmapRemove(otherPeer->connections, &(deadPeer->credentials.pid)); + return true; +} + +/** + * Called when the peer dies. + */ +static void peerProxyKill(PeerProxy* peerProxy, bool errnoIsSet) { + if (errnoIsSet) { + LOGI("Peer %d died. errno: %s", peerProxy->credentials.pid, + strerror(errno)); + } else { + LOGI("Peer %d died.", peerProxy->credentials.pid); + } + + // If we lost the master, we're up a creek. We can't let this happen. + if (peerProxy->master) { + LOG_ALWAYS_FATAL("Lost connection to master."); + } + + Peer* localPeer = peerProxy->peer; + pid_t pid = peerProxy->credentials.pid; + + peerLock(localPeer); + + // Remember for awhile that the peer died. + localPeer->deadPeers[localPeer->deadPeerCursor] + = peerProxy->credentials.pid; + localPeer->deadPeerCursor++; + if (localPeer->deadPeerCursor == PEER_HISTORY) { + localPeer->deadPeerCursor = 0; + } + + // Remove from peer map. + hashmapRemove(localPeer->peerProxies, &pid); + + // External threads can no longer get to this peer proxy, so we don't + // need the lock anymore. + peerUnlock(localPeer); + + // Remove the fd from the selector. + if (peerProxy->fd != NULL) { + peerProxy->fd->remove = true; + } + + // Clear outgoing packet queue. + while (peerProxyNextPacket(peerProxy)) {} + + bufferFree(peerProxy->inputBuffer); + + // This only applies to the master. + if (peerProxy->connections != NULL) { + // We can't leave these other maps pointing to freed memory. + hashmapForEach(peerProxy->connections, &peerProxyRemoveConnection, + peerProxy); + hashmapFree(peerProxy->connections); + } + + // Invoke death listener. + localPeer->onDeath(pid); + + // Free the peer proxy itself. + free(peerProxy); +} + +static void peerProxyHandleError(PeerProxy* peerProxy, char* functionName) { + if (errno == EINTR) { + // Log interruptions but otherwise ignore them. + LOGW("%s() interrupted.", functionName); + } else if (errno == EAGAIN) { + LOGD("EWOULDBLOCK"); + // Ignore. + } else { + LOGW("Error returned by %s().", functionName); + peerProxyKill(peerProxy, true); + } +} + +/** + * Buffers output sent to a peer. May be called multiple times until the entire + * buffer is filled. Returns true when the buffer is empty. + */ +static bool peerProxyWriteFromBuffer(PeerProxy* peerProxy, Buffer* outgoing) { + ssize_t size = bufferWrite(outgoing, peerProxy->fd->fd); + if (size < 0) { + peerProxyHandleError(peerProxy, "write"); + return false; + } else { + return bufferWriteComplete(outgoing); + } +} + +/** Writes packet bytes to peer. */ +static void peerProxyWriteBytes(PeerProxy* peerProxy) { + Buffer* buffer = peerProxy->currentPacket->bytes; + if (peerProxyWriteFromBuffer(peerProxy, buffer)) { + LOGD("Bytes written."); + peerProxyNextPacket(peerProxy); + } +} + +/** Sends a socket to the peer. */ +static void peerProxyWriteConnection(PeerProxy* peerProxy) { + int socket = peerProxy->currentPacket->socket; + + // Why does sending and receiving fds have to be such a PITA? + struct msghdr msg; + struct iovec iov[1]; + + union { + struct cmsghdr cm; + char control[CMSG_SPACE(sizeof(int))]; + } control_un; + + struct cmsghdr *cmptr; + + msg.msg_control = control_un.control; + msg.msg_controllen = sizeof(control_un.control); + cmptr = CMSG_FIRSTHDR(&msg); + cmptr->cmsg_len = CMSG_LEN(sizeof(int)); + cmptr->cmsg_level = SOL_SOCKET; + cmptr->cmsg_type = SCM_RIGHTS; + + // Store the socket in the message. + *((int *) CMSG_DATA(cmptr)) = peerProxy->currentPacket->socket; + + msg.msg_name = NULL; + msg.msg_namelen = 0; + iov[0].iov_base = ""; + iov[0].iov_len = 1; + msg.msg_iov = iov; + msg.msg_iovlen = 1; + + ssize_t result = sendmsg(peerProxy->fd->fd, &msg, 0); + + if (result < 0) { + peerProxyHandleError(peerProxy, "sendmsg"); + } else { + // Success. Queue up the next packet. + peerProxyNextPacket(peerProxy); + + } +} + +/** + * Writes some outgoing data. + */ +static void peerProxyWrite(SelectableFd* fd) { + // TODO: Try to write header and body with one system call. + + PeerProxy* peerProxy = (PeerProxy*) fd->data; + OutgoingPacket* current = peerProxy->currentPacket; + + if (current == NULL) { + // We have nothing left to write. + return; + } + + // Write the header. + Buffer* outgoingHeader = &peerProxy->outgoingHeader; + bool headerWritten = bufferWriteComplete(outgoingHeader); + if (!headerWritten) { + LOGD("Writing header..."); + headerWritten = peerProxyWriteFromBuffer(peerProxy, outgoingHeader); + if (headerWritten) { + LOGD("Header written."); + } + } + + // Write body. + if (headerWritten) { + PacketType type = current->header.type; + switch (type) { + case CONNECTION: + peerProxyWriteConnection(peerProxy); + break; + case BYTES: + peerProxyWriteBytes(peerProxy); + break; + case CONNECTION_REQUEST: + case CONNECTION_ERROR: + // These packets consist solely of a header. + peerProxyNextPacket(peerProxy); + break; + default: + LOG_ALWAYS_FATAL("Unknown packet type: %d", type); + } + } +} + +/** + * Sets up a peer proxy's fd before we try to select() it. + */ +static void peerProxyBeforeSelect(SelectableFd* fd) { + LOGD("Before select..."); + + PeerProxy* peerProxy = (PeerProxy*) fd->data; + + peerLock(peerProxy->peer); + bool hasPackets = peerProxy->currentPacket != NULL; + peerUnlock(peerProxy->peer); + + if (hasPackets) { + LOGD("Packets found. Setting onWritable()."); + + fd->onWritable = &peerProxyWrite; + } else { + // We have nothing to write. + fd->onWritable = NULL; + } +} + +/** Prepare to read bytes from the peer. */ +static void peerProxyExpectBytes(PeerProxy* peerProxy, Header* header) { + LOGD("Expecting %d bytes.", header->size); + + peerProxy->inputState = READING_BYTES; + if (bufferPrepareForRead(peerProxy->inputBuffer, header->size) == -1) { + LOGW("Couldn't allocate memory for incoming data. Size: %u", + (unsigned int) header->size); + + // TODO: Ignore the packet and log a warning? + peerProxyKill(peerProxy, false); + } +} + +/** + * Gets a peer proxy for the given ID. Creates a peer proxy if necessary. + * Sends a connection request to the master if desired. + * + * Returns NULL if an error occurs. Sets errno to EHOSTDOWN if the peer died + * or ENOMEM if memory couldn't be allocated. + */ +static PeerProxy* peerProxyGetOrCreate(Peer* peer, pid_t pid, + bool requestConnection) { + if (pid == peer->pid) { + errno = EINVAL; + return NULL; + } + + if (peerIsDead(peer, pid)) { + errno = EHOSTDOWN; + return NULL; + } + + PeerProxy* peerProxy = hashmapGet(peer->peerProxies, &pid); + if (peerProxy != NULL) { + return peerProxy; + } + + // If this is the master peer, we already know about all peers. + if (peer->master) { + errno = EHOSTDOWN; + return NULL; + } + + // Try to create a peer proxy. + Credentials credentials; + credentials.pid = pid; + + // Fake gid and uid until we have the real thing. The real creds are + // filled in by masterProxyExpectConnection(). These fake creds will + // never be exposed to the user. + credentials.uid = 0; + credentials.gid = 0; + + // Make sure we can allocate the connection request packet. + OutgoingPacket* packet = NULL; + if (requestConnection) { + packet = calloc(1, sizeof(OutgoingPacket)); + if (packet == NULL) { + errno = ENOMEM; + return NULL; + } + + packet->header.type = CONNECTION_REQUEST; + packet->header.credentials = credentials; + packet->free = &outgoingPacketFree; + } + + peerProxy = peerProxyCreate(peer, credentials); + if (peerProxy == NULL) { + free(packet); + errno = ENOMEM; + return NULL; + } else { + // Send a connection request to the master. + if (requestConnection) { + PeerProxy* masterProxy = peer->masterProxy; + peerProxyEnqueueOutgoingPacket(masterProxy, packet); + } + + return peerProxy; + } +} + +/** + * Switches the master peer proxy into a state where it's waiting for a + * connection from the master. + */ +static void masterProxyExpectConnection(PeerProxy* masterProxy, + Header* header) { + // TODO: Restructure things so we don't need this check. + // Verify that this really is the master. + if (!masterProxy->master) { + LOGW("Non-master process %d tried to send us a connection.", + masterProxy->credentials.pid); + // Kill off the evil peer. + peerProxyKill(masterProxy, false); + return; + } + + masterProxy->inputState = ACCEPTING_CONNECTION; + Peer* localPeer = masterProxy->peer; + + // Create a peer proxy so we have somewhere to stash the creds. + // See if we already have a proxy set up. + pid_t pid = header->credentials.pid; + peerLock(localPeer); + PeerProxy* peerProxy = peerProxyGetOrCreate(localPeer, pid, false); + if (peerProxy == NULL) { + LOGW("Peer proxy creation failed: %s", strerror(errno)); + } else { + // Fill in full credentials. + peerProxy->credentials = header->credentials; + } + peerUnlock(localPeer); + + // Keep track of which peer proxy we're accepting a connection for. + masterProxy->connecting = peerProxy; +} + +/** + * Reads input from a peer process. + */ +static void peerProxyRead(SelectableFd* fd); + +/** Sets up fd callbacks. */ +static void peerProxySetFd(PeerProxy* peerProxy, SelectableFd* fd) { + peerProxy->fd = fd; + fd->data = peerProxy; + fd->onReadable = &peerProxyRead; + fd->beforeSelect = &peerProxyBeforeSelect; + + // Make the socket non-blocking. + setNonBlocking(fd->fd); +} + +/** + * Accepts a connection sent by the master proxy. + */ +static void masterProxyAcceptConnection(PeerProxy* masterProxy) { + struct msghdr msg; + struct iovec iov[1]; + ssize_t size; + char ignored; + int incomingFd; + + // TODO: Reuse code which writes the connection. Who the heck designed + // this API anyway? + union { + struct cmsghdr cm; + char control[CMSG_SPACE(sizeof(int))]; + } control_un; + struct cmsghdr *cmptr; + msg.msg_control = control_un.control; + msg.msg_controllen = sizeof(control_un.control); + + msg.msg_name = NULL; + msg.msg_namelen = 0; + + // We sent 1 byte of data so we can detect EOF. + iov[0].iov_base = &ignored; + iov[0].iov_len = 1; + msg.msg_iov = iov; + msg.msg_iovlen = 1; + + size = recvmsg(masterProxy->fd->fd, &msg, 0); + if (size < 0) { + if (errno == EINTR) { + // Log interruptions but otherwise ignore them. + LOGW("recvmsg() interrupted."); + return; + } else if (errno == EAGAIN) { + // Keep waiting for the connection. + return; + } else { + LOG_ALWAYS_FATAL("Error reading connection from master: %s", + strerror(errno)); + } + } else if (size == 0) { + // EOF. + LOG_ALWAYS_FATAL("Received EOF from master."); + } + + // Extract fd from message. + if ((cmptr = CMSG_FIRSTHDR(&msg)) != NULL + && cmptr->cmsg_len == CMSG_LEN(sizeof(int))) { + if (cmptr->cmsg_level != SOL_SOCKET) { + LOG_ALWAYS_FATAL("Expected SOL_SOCKET."); + } + if (cmptr->cmsg_type != SCM_RIGHTS) { + LOG_ALWAYS_FATAL("Expected SCM_RIGHTS."); + } + incomingFd = *((int*) CMSG_DATA(cmptr)); + } else { + LOG_ALWAYS_FATAL("Expected fd."); + } + + // The peer proxy this connection is for. + PeerProxy* peerProxy = masterProxy->connecting; + if (peerProxy == NULL) { + LOGW("Received connection for unknown peer."); + closeWithWarning(incomingFd); + } else { + Peer* peer = masterProxy->peer; + + SelectableFd* selectableFd = selectorAdd(peer->selector, incomingFd); + if (selectableFd == NULL) { + LOGW("Error adding fd to selector for %d.", + peerProxy->credentials.pid); + closeWithWarning(incomingFd); + peerProxyKill(peerProxy, false); + } + + peerProxySetFd(peerProxy, selectableFd); + } + + peerProxyExpectHeader(masterProxy); +} + +/** + * Frees an outgoing packet containing a connection. + */ +static void outgoingPacketFreeSocket(OutgoingPacket* packet) { + closeWithWarning(packet->socket); + outgoingPacketFree(packet); +} + +/** + * Connects two known peers. + */ +static void masterConnectPeers(PeerProxy* peerA, PeerProxy* peerB) { + int sockets[2]; + int result = socketpair(AF_LOCAL, SOCK_STREAM, 0, sockets); + if (result == -1) { + LOGW("socketpair() error: %s", strerror(errno)); + // TODO: Send CONNECTION_FAILED packets to peers. + return; + } + + OutgoingPacket* packetA = calloc(1, sizeof(OutgoingPacket)); + OutgoingPacket* packetB = calloc(1, sizeof(OutgoingPacket)); + if (packetA == NULL || packetB == NULL) { + free(packetA); + free(packetB); + LOGW("malloc() error. Failed to tell process %d that process %d is" + " dead.", peerA->credentials.pid, peerB->credentials.pid); + return; + } + + packetA->header.type = CONNECTION; + packetB->header.type = CONNECTION; + + packetA->header.credentials = peerB->credentials; + packetB->header.credentials = peerA->credentials; + + packetA->socket = sockets[0]; + packetB->socket = sockets[1]; + + packetA->free = &outgoingPacketFreeSocket; + packetB->free = &outgoingPacketFreeSocket; + + peerLock(peerA->peer); + peerProxyEnqueueOutgoingPacket(peerA, packetA); + peerProxyEnqueueOutgoingPacket(peerB, packetB); + peerUnlock(peerA->peer); +} + +/** + * Informs a peer that the peer they're trying to connect to couldn't be + * found. + */ +static void masterReportConnectionError(PeerProxy* peerProxy, + Credentials credentials) { + OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket)); + if (packet == NULL) { + LOGW("malloc() error. Failed to tell process %d that process %d is" + " dead.", peerProxy->credentials.pid, credentials.pid); + return; + } + + packet->header.type = CONNECTION_ERROR; + packet->header.credentials = credentials; + packet->free = &outgoingPacketFree; + + peerProxyLockAndEnqueueOutgoingPacket(peerProxy, packet); +} + +/** + * Handles a request to be connected to another peer. + */ +static void masterHandleConnectionRequest(PeerProxy* peerProxy, + Header* header) { + Peer* master = peerProxy->peer; + pid_t targetPid = header->credentials.pid; + if (!hashmapContainsKey(peerProxy->connections, &targetPid)) { + // We haven't connected these peers yet. + PeerProxy* targetPeer + = (PeerProxy*) hashmapGet(master->peerProxies, &targetPid); + if (targetPeer == NULL) { + // Unknown process. + masterReportConnectionError(peerProxy, header->credentials); + } else { + masterConnectPeers(peerProxy, targetPeer); + } + } + + // This packet is complete. Get ready for the next one. + peerProxyExpectHeader(peerProxy); +} + +/** + * The master told us this peer is dead. + */ +static void masterProxyHandleConnectionError(PeerProxy* masterProxy, + Header* header) { + Peer* peer = masterProxy->peer; + + // Look up the peer proxy. + pid_t pid = header->credentials.pid; + PeerProxy* peerProxy = NULL; + peerLock(peer); + peerProxy = hashmapGet(peer->peerProxies, &pid); + peerUnlock(peer); + + if (peerProxy != NULL) { + LOGI("Couldn't connect to %d.", pid); + peerProxyKill(peerProxy, false); + } else { + LOGW("Peer proxy for %d not found. This shouldn't happen.", pid); + } + + peerProxyExpectHeader(masterProxy); +} + +/** + * Handles a packet header. + */ +static void peerProxyHandleHeader(PeerProxy* peerProxy, Header* header) { + switch (header->type) { + case CONNECTION_REQUEST: + masterHandleConnectionRequest(peerProxy, header); + break; + case CONNECTION: + masterProxyExpectConnection(peerProxy, header); + break; + case CONNECTION_ERROR: + masterProxyHandleConnectionError(peerProxy, header); + break; + case BYTES: + peerProxyExpectBytes(peerProxy, header); + break; + default: + LOGW("Invalid packet type from %d: %d", peerProxy->credentials.pid, + header->type); + peerProxyKill(peerProxy, false); + } +} + +/** + * Buffers input sent by peer. May be called multiple times until the entire + * buffer is filled. Returns true when the buffer is full. + */ +static bool peerProxyBufferInput(PeerProxy* peerProxy) { + Buffer* in = peerProxy->inputBuffer; + ssize_t size = bufferRead(in, peerProxy->fd->fd); + if (size < 0) { + peerProxyHandleError(peerProxy, "read"); + return false; + } else if (size == 0) { + // EOF. + LOGI("EOF"); + peerProxyKill(peerProxy, false); + return false; + } else if (bufferReadComplete(in)) { + // We're done! + return true; + } else { + // Continue reading. + return false; + } +} + +/** + * Reads input from a peer process. + */ +static void peerProxyRead(SelectableFd* fd) { + LOGD("Reading..."); + PeerProxy* peerProxy = (PeerProxy*) fd->data; + int state = peerProxy->inputState; + Buffer* in = peerProxy->inputBuffer; + switch (state) { + case READING_HEADER: + if (peerProxyBufferInput(peerProxy)) { + LOGD("Header read."); + // We've read the complete header. + Header* header = (Header*) in->data; + peerProxyHandleHeader(peerProxy, header); + } + break; + case READING_BYTES: + LOGD("Reading bytes..."); + if (peerProxyBufferInput(peerProxy)) { + LOGD("Bytes read."); + // We have the complete packet. Notify bytes listener. + peerProxy->peer->onBytes(peerProxy->credentials, + in->data, in->size); + + // Get ready for the next packet. + peerProxyExpectHeader(peerProxy); + } + break; + case ACCEPTING_CONNECTION: + masterProxyAcceptConnection(peerProxy); + break; + default: + LOG_ALWAYS_FATAL("Unknown state: %d", state); + } +} + +static PeerProxy* peerProxyCreate(Peer* peer, Credentials credentials) { + PeerProxy* peerProxy = calloc(1, sizeof(PeerProxy)); + if (peerProxy == NULL) { + return NULL; + } + + peerProxy->inputBuffer = bufferCreate(sizeof(Header)); + if (peerProxy->inputBuffer == NULL) { + free(peerProxy); + return NULL; + } + + peerProxy->peer = peer; + peerProxy->credentials = credentials; + + // Initial state == expecting a header. + peerProxyExpectHeader(peerProxy); + + // Add this proxy to the map. Make sure the key points to the stable memory + // inside of the peer proxy itself. + pid_t* pid = &(peerProxy->credentials.pid); + hashmapPut(peer->peerProxies, pid, peerProxy); + return peerProxy; +} + +/** Accepts a connection to the master peer. */ +static void masterAcceptConnection(SelectableFd* listenerFd) { + // Accept connection. + int socket = accept(listenerFd->fd, NULL, NULL); + if (socket == -1) { + LOGW("accept() error: %s", strerror(errno)); + return; + } + + LOGD("Accepted connection as fd %d.", socket); + + // Get credentials. + Credentials credentials; + struct ucred ucredentials; + socklen_t credentialsSize = sizeof(struct ucred); + int result = getsockopt(socket, SOL_SOCKET, SO_PEERCRED, + &ucredentials, &credentialsSize); + // We might want to verify credentialsSize. + if (result == -1) { + LOGW("getsockopt() error: %s", strerror(errno)); + closeWithWarning(socket); + return; + } + + // Copy values into our own structure so we know we have the types right. + credentials.pid = ucredentials.pid; + credentials.uid = ucredentials.uid; + credentials.gid = ucredentials.gid; + + LOGI("Accepted connection from process %d.", credentials.pid); + + Peer* masterPeer = (Peer*) listenerFd->data; + + peerLock(masterPeer); + + // Make sure we don't already have a connection from that process. + PeerProxy* peerProxy + = hashmapGet(masterPeer->peerProxies, &credentials.pid); + if (peerProxy != NULL) { + peerUnlock(masterPeer); + LOGW("Alread connected to process %d.", credentials.pid); + closeWithWarning(socket); + return; + } + + // Add connection to the selector. + SelectableFd* socketFd = selectorAdd(masterPeer->selector, socket); + if (socketFd == NULL) { + peerUnlock(masterPeer); + LOGW("malloc() failed."); + closeWithWarning(socket); + return; + } + + // Create a peer proxy. + peerProxy = peerProxyCreate(masterPeer, credentials); + peerUnlock(masterPeer); + if (peerProxy == NULL) { + LOGW("malloc() failed."); + socketFd->remove = true; + closeWithWarning(socket); + } + peerProxy->connections = hashmapCreate(10, &pidHash, &pidEquals); + peerProxySetFd(peerProxy, socketFd); +} + +/** + * Creates the local peer. + */ +static Peer* peerCreate() { + Peer* peer = calloc(1, sizeof(Peer)); + if (peer == NULL) { + LOG_ALWAYS_FATAL("malloc() error."); + } + peer->peerProxies = hashmapCreate(10, &pidHash, &pidEquals); + peer->selector = selectorCreate(); + + pthread_mutexattr_t attributes; + if (pthread_mutexattr_init(&attributes) != 0) { + LOG_ALWAYS_FATAL("pthread_mutexattr_init() error."); + } + if (pthread_mutexattr_settype(&attributes, PTHREAD_MUTEX_RECURSIVE) != 0) { + LOG_ALWAYS_FATAL("pthread_mutexattr_settype() error."); + } + if (pthread_mutex_init(&peer->mutex, &attributes) != 0) { + LOG_ALWAYS_FATAL("pthread_mutex_init() error."); + } + + peer->pid = getpid(); + return peer; +} + +/** The local peer. */ +static Peer* localPeer; + +/** Frees a packet of bytes. */ +static void outgoingPacketFreeBytes(OutgoingPacket* packet) { + LOGD("Freeing outgoing packet."); + bufferFree(packet->bytes); + free(packet); +} + +/** + * Sends a packet of bytes to a remote peer. Returns 0 on success. + * + * Returns -1 if an error occurs. Sets errno to ENOMEM if memory couldn't be + * allocated. Sets errno to EHOSTDOWN if the peer died recently. Sets errno + * to EINVAL if pid is the same as the local pid. + */ +int peerSendBytes(pid_t pid, const char* bytes, size_t size) { + Peer* peer = localPeer; + assert(peer != NULL); + + OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket)); + if (packet == NULL) { + errno = ENOMEM; + return -1; + } + + Buffer* copy = bufferCreate(size); + if (copy == NULL) { + free(packet); + errno = ENOMEM; + return -1; + } + + // Copy data. + memcpy(copy->data, bytes, size); + copy->size = size; + + packet->bytes = copy; + packet->header.type = BYTES; + packet->header.size = size; + packet->free = outgoingPacketFreeBytes; + bufferPrepareForWrite(packet->bytes); + + peerLock(peer); + + PeerProxy* peerProxy = peerProxyGetOrCreate(peer, pid, true); + if (peerProxy == NULL) { + // The peer is already dead or we couldn't alloc memory. Either way, + // errno is set. + peerUnlock(peer); + packet->free(packet); + return -1; + } else { + peerProxyEnqueueOutgoingPacket(peerProxy, packet); + peerUnlock(peer); + selectorWakeUp(peer->selector); + return 0; + } +} + +/** Keeps track of how to free shared bytes. */ +typedef struct { + void (*free)(void* context); + void* context; +} SharedBytesFreer; + +/** Frees shared bytes. */ +static void outgoingPacketFreeSharedBytes(OutgoingPacket* packet) { + SharedBytesFreer* sharedBytesFreer + = (SharedBytesFreer*) packet->context; + sharedBytesFreer->free(sharedBytesFreer->context); + free(sharedBytesFreer); + free(packet); +} + +/** + * Sends a packet of bytes to a remote peer without copying the bytes. Calls + * free() with context after the bytes have been sent. + * + * Returns -1 if an error occurs. Sets errno to ENOMEM if memory couldn't be + * allocated. Sets errno to EHOSTDOWN if the peer died recently. Sets errno + * to EINVAL if pid is the same as the local pid. + */ +int peerSendSharedBytes(pid_t pid, char* bytes, size_t size, + void (*free)(void* context), void* context) { + Peer* peer = localPeer; + assert(peer != NULL); + + OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket)); + if (packet == NULL) { + errno = ENOMEM; + return -1; + } + + Buffer* wrapper = bufferWrap(bytes, size, size); + if (wrapper == NULL) { + free(packet); + errno = ENOMEM; + return -1; + } + + SharedBytesFreer* sharedBytesFreer = malloc(sizeof(SharedBytesFreer)); + if (sharedBytesFreer == NULL) { + free(packet); + free(wrapper); + errno = ENOMEM; + return -1; + } + sharedBytesFreer->free = free; + sharedBytesFreer->context = context; + + packet->bytes = wrapper; + packet->context = sharedBytesFreer; + packet->header.type = BYTES; + packet->header.size = size; + packet->free = &outgoingPacketFreeSharedBytes; + bufferPrepareForWrite(packet->bytes); + + peerLock(peer); + + PeerProxy* peerProxy = peerProxyGetOrCreate(peer, pid, true); + if (peerProxy == NULL) { + // The peer is already dead or we couldn't alloc memory. Either way, + // errno is set. + peerUnlock(peer); + packet->free(packet); + return -1; + } else { + peerProxyEnqueueOutgoingPacket(peerProxy, packet); + peerUnlock(peer); + selectorWakeUp(peer->selector); + return 0; + } +} + +/** + * Starts the master peer. The master peer differs from other peers in that + * it is responsible for connecting the other peers. You can only have one + * master peer. + * + * Goes into an I/O loop and does not return. + */ +void masterPeerInitialize(BytesListener* bytesListener, + DeathListener* deathListener) { + // Create and bind socket. + int listenerSocket = socket(AF_LOCAL, SOCK_STREAM, 0); + if (listenerSocket == -1) { + LOG_ALWAYS_FATAL("socket() error: %s", strerror(errno)); + } + unlink(MASTER_PATH); + int result = bind(listenerSocket, (SocketAddress*) getMasterAddress(), + sizeof(UnixAddress)); + if (result == -1) { + LOG_ALWAYS_FATAL("bind() error: %s", strerror(errno)); + } + + LOGD("Listener socket: %d", listenerSocket); + + // Queue up to 16 connections. + result = listen(listenerSocket, 16); + if (result != 0) { + LOG_ALWAYS_FATAL("listen() error: %s", strerror(errno)); + } + + // Make socket non-blocking. + setNonBlocking(listenerSocket); + + // Create the peer for this process. Fail if we already have one. + if (localPeer != NULL) { + LOG_ALWAYS_FATAL("Peer is already initialized."); + } + localPeer = peerCreate(); + if (localPeer == NULL) { + LOG_ALWAYS_FATAL("malloc() failed."); + } + localPeer->master = true; + localPeer->onBytes = bytesListener; + localPeer->onDeath = deathListener; + + // Make listener socket selectable. + SelectableFd* listenerFd = selectorAdd(localPeer->selector, listenerSocket); + if (listenerFd == NULL) { + LOG_ALWAYS_FATAL("malloc() error."); + } + listenerFd->data = localPeer; + listenerFd->onReadable = &masterAcceptConnection; +} + +/** + * Starts a local peer. + * + * Goes into an I/O loop and does not return. + */ +void peerInitialize(BytesListener* bytesListener, + DeathListener* deathListener) { + // Connect to master peer. + int masterSocket = socket(AF_LOCAL, SOCK_STREAM, 0); + if (masterSocket == -1) { + LOG_ALWAYS_FATAL("socket() error: %s", strerror(errno)); + } + int result = connect(masterSocket, (SocketAddress*) getMasterAddress(), + sizeof(UnixAddress)); + if (result != 0) { + LOG_ALWAYS_FATAL("connect() error: %s", strerror(errno)); + } + + // Create the peer for this process. Fail if we already have one. + if (localPeer != NULL) { + LOG_ALWAYS_FATAL("Peer is already initialized."); + } + localPeer = peerCreate(); + if (localPeer == NULL) { + LOG_ALWAYS_FATAL("malloc() failed."); + } + localPeer->onBytes = bytesListener; + localPeer->onDeath = deathListener; + + // Make connection selectable. + SelectableFd* masterFd = selectorAdd(localPeer->selector, masterSocket); + if (masterFd == NULL) { + LOG_ALWAYS_FATAL("malloc() error."); + } + + // Create a peer proxy for the master peer. + PeerProxy* masterProxy = peerProxyCreate(localPeer, MASTER_CREDENTIALS); + if (masterProxy == NULL) { + LOG_ALWAYS_FATAL("malloc() error."); + } + peerProxySetFd(masterProxy, masterFd); + masterProxy->master = true; + localPeer->masterProxy = masterProxy; +} + +/** Starts the master peer I/O loop. Doesn't return. */ +void peerLoop() { + assert(localPeer != NULL); + + // Start selector. + selectorLoop(localPeer->selector); +} + |
