/* go-select.c -- implement select. Copyright 2009 The Go Authors. All rights reserved. Use of this source code is governed by a BSD-style license that can be found in the LICENSE file. */ #include #include #include #include #include #include #include "config.h" #include "go-assert.h" #include "channel.h" /* __go_select builds an array of these structures. */ struct select_channel { /* The channel being selected. */ struct __go_channel* channel; /* If this channel is selected, the value to return. */ size_t retval; /* If this channel is a duplicate of one which appears earlier in the array, this is the array index of the earlier channel. This is -1UL if this is not a dup. */ size_t dup_index; /* An entry to put on the send or receive queue. */ struct __go_channel_select queue_entry; /* True if selected for send. */ _Bool is_send; /* True if channel is ready--it has data to receive or space to send. */ _Bool is_ready; }; /* This mutex controls access to __go_select_cond. This mutex may not be acquired if any channel locks are held. */ static pthread_mutex_t __go_select_mutex = PTHREAD_MUTEX_INITIALIZER; /* When we have to wait for channels, we tell them to trigger this condition variable when they send or receive something. */ static pthread_cond_t __go_select_cond = PTHREAD_COND_INITIALIZER; /* Sort the channels by address. This avoids deadlock when multiple selects are running on overlapping sets of channels. */ static int channel_sort (const void *p1, const void *p2) { const struct select_channel *c1 = (const struct select_channel *) p1; const struct select_channel *c2 = (const struct select_channel *) p2; if ((uintptr_t) c1->channel < (uintptr_t) c2->channel) return -1; else if ((uintptr_t) c1->channel > (uintptr_t) c2->channel) return 1; else return 0; } /* Return whether there is an entry on QUEUE which can be used for a synchronous send or receive. */ static _Bool is_queue_ready (struct __go_channel_select *queue) { int x; if (queue == NULL) return 0; x = pthread_mutex_lock (&__go_select_data_mutex); __go_assert (x == 0); while (queue != NULL) { if (*queue->selected == NULL) break; queue = queue->next; } x = pthread_mutex_unlock (&__go_select_data_mutex); __go_assert (x == 0); return queue != NULL; } /* Return whether CHAN is ready. If IS_SEND is true check whether it has space to send, otherwise check whether it has a value to receive. */ static _Bool is_channel_ready (struct __go_channel* channel, _Bool is_send) { if (is_send) { if (channel->selected_for_send) return 0; if (channel->is_closed) return 1; if (channel->num_entries > 0) { /* An asynchronous channel is ready for sending if there is room in the buffer. */ return ((channel->next_store + 1) % channel->num_entries != channel->next_fetch); } else { if (channel->waiting_to_send) { /* Some other goroutine is waiting to send on this channel, so we can't. */ return 0; } if (channel->waiting_to_receive) { /* Some other goroutine is waiting to receive a value, so we can send one. */ return 1; } if (is_queue_ready (channel->select_receive_queue)) { /* There is a select statement waiting to synchronize with this one. */ return 1; } return 0; } } else { if (channel->selected_for_receive) return 0; if (channel->is_closed) return 1; if (channel->num_entries > 0) { /* An asynchronous channel is ready for receiving if there is a value in the buffer. */ return channel->next_fetch != channel->next_store; } else { if (channel->waiting_to_receive) { /* Some other goroutine is waiting to receive from this channel, so it is not ready for us to receive. */ return 0; } if (channel->next_store > 0) { /* There is data on the channel. */ return 1; } if (is_queue_ready (channel->select_send_queue)) { /* There is a select statement waiting to synchronize with this one. */ return 1; } return 0; } } } /* Mark a channel as selected. The channel is locked. IS_SELECTED is true if the channel was selected for us by another goroutine. We set *NEEDS_BROADCAST if we need to broadcast on the select condition variable. Return true if we got it. */ static _Bool mark_channel_selected (struct __go_channel *channel, _Bool is_send, _Bool is_selected, _Bool *needs_broadcast) { if (channel->num_entries == 0) { /* This is a synchronous channel. If there is no goroutine currently waiting, but there is another select waiting, then we need to tell that select to use this channel. That may fail--there may be no other goroutines currently waiting--as a third goroutine may already have claimed the select. */ if (!is_selected && !channel->is_closed && (is_send ? !channel->waiting_to_receive : channel->next_store == 0)) { int x; struct __go_channel_select *queue; x = pthread_mutex_lock (&__go_select_data_mutex); __go_assert (x == 0); queue = (is_send ? channel->select_receive_queue : channel->select_send_queue); __go_assert (queue != NULL); while (queue != NULL) { if (*queue->selected == NULL) { *queue->selected = channel; *queue->is_read = !is_send; break; } queue = queue->next; } x = pthread_mutex_unlock (&__go_select_data_mutex); __go_assert (x == 0); if (queue == NULL) return 0; if (is_send) channel->selected_for_receive = 1; else channel->selected_for_send = 1; /* We are going to have to tell the other select that there is something to do. */ *needs_broadcast = 1; } } if (is_send) channel->selected_for_send = 1; else channel->selected_for_receive = 1; return 1; } /* Mark a channel to indicate that a select is waiting. The channel is locked. */ static void mark_select_waiting (struct select_channel *sc, struct __go_channel **selected_pointer, _Bool *selected_for_read_pointer) { struct __go_channel *channel = sc->channel; _Bool is_send = sc->is_send; if (channel->num_entries == 0) { struct __go_channel_select **pp; pp = (is_send ? &channel->select_send_queue : &channel->select_receive_queue); /* Add an entry to the queue of selects on this channel. */ sc->queue_entry.next = *pp; sc->queue_entry.selected = selected_pointer; sc->queue_entry.is_read = selected_for_read_pointer; *pp = &sc->queue_entry; } channel->select_mutex = &__go_select_mutex; channel->select_cond = &__go_select_cond; /* We never actually clear the select_mutex and select_cond fields. In order to clear them safely, we would need to have some way of knowing when no select is waiting for the channel. Thus we introduce a bit of inefficiency for every channel that select needs to wait for. This is harmless other than the performance cost. */ } /* Remove the entry for this select waiting on this channel. The channel is locked. We check both queues, because the channel may be selected for both reading and writing. */ static void clear_select_waiting (struct select_channel *sc, struct __go_channel **selected_pointer) { struct __go_channel *channel = sc->channel; if (channel->num_entries == 0) { _Bool found; struct __go_channel_select **pp; found = 0; for (pp = &channel->select_send_queue; *pp != NULL; pp = &(*pp)->next) { if ((*pp)->selected == selected_pointer) { *pp = (*pp)->next; found = 1; break; } } for (pp = &channel->select_receive_queue; *pp != NULL; pp = &(*pp)->next) { if ((*pp)->selected == selected_pointer) { *pp = (*pp)->next; found = 1; break; } } __go_assert (found); } } /* Look through the list of channels to see which ones are ready. Lock each channels, and set the is_ready flag. Return the number of ready channels. */ static size_t lock_channels_find_ready (struct select_channel *channels, size_t count) { size_t ready_count; size_t i; ready_count = 0; for (i = 0; i < count; ++i) { struct __go_channel *channel = channels[i].channel; _Bool is_send = channels[i].is_send; size_t dup_index = channels[i].dup_index; int x; if (channel == NULL) continue; if (dup_index != (size_t) -1UL) { if (channels[dup_index].is_ready) { channels[i].is_ready = 1; ++ready_count; } continue; } x = pthread_mutex_lock (&channel->lock); __go_assert (x == 0); if (is_channel_ready (channel, is_send)) { channels[i].is_ready = 1; ++ready_count; } } return ready_count; } /* The channel we are going to select has been forced by some other goroutine. SELECTED_CHANNEL is the channel we will use, SELECTED_FOR_READ is whether the other goroutine wants to read from the channel. Note that the channel could be specified multiple times in this select, so we must mark each appropriate entry for this channel as ready. Every other channel is marked as not ready. All the channels are locked before this routine is called. This returns the number of ready channels. */ size_t force_selected_channel_ready (struct select_channel *channels, size_t count, struct __go_channel *selected_channel, _Bool selected_for_read) { size_t ready_count; size_t i; ready_count = 0; for (i = 0; i < count; ++i) { struct __go_channel *channel = channels[i].channel; _Bool is_send = channels[i].is_send; if (channel == NULL) continue; if (channel != selected_channel || (is_send ? !selected_for_read : selected_for_read)) channels[i].is_ready = 0; else { channels[i].is_ready = 1; ++ready_count; } } __go_assert (ready_count > 0); return ready_count; } /* Unlock all the channels. */ static void unlock_channels (struct select_channel *channels, size_t count) { size_t i; int x; for (i = 0; i < count; ++i) { struct __go_channel *channel = channels[i].channel; if (channel == NULL) continue; if (channels[i].dup_index != (size_t) -1UL) continue; x = pthread_mutex_unlock (&channel->lock); __go_assert (x == 0); } } /* At least one channel is ready. Randomly pick a channel to return. Unlock all the channels. IS_SELECTED is true if the channel was picked for us by some other goroutine. If SELECTED_POINTER is not NULL, remove it from the queue for all the channels. Return the retval field of the selected channel. This will return 0 if we can't use the selected channel, because it relied on synchronizing with some other select, and that select already synchronized with a different channel. */ static size_t unlock_channels_and_select (struct select_channel *channels, size_t count, size_t ready_count, _Bool is_selected, struct __go_channel **selected_pointer) { size_t selected; size_t ret; _Bool needs_broadcast; size_t i; int x; /* Pick which channel we are going to return. */ #if defined(HAVE_RANDOM) selected = (size_t) random () % ready_count; #else selected = (size_t) rand () % ready_count; #endif ret = 0; needs_broadcast = 0; /* Look at the channels in reverse order so that we don't unlock a duplicated channel until we have seen all its dups. */ for (i = 0; i < count; ++i) { size_t j = count - i - 1; struct __go_channel *channel = channels[j].channel; _Bool is_send = channels[j].is_send; if (channel == NULL) continue; if (channels[j].is_ready) { if (selected == 0) { if (mark_channel_selected (channel, is_send, is_selected, &needs_broadcast)) ret = channels[j].retval; } --selected; } if (channels[j].dup_index == (size_t) -1UL) { if (selected_pointer != NULL) clear_select_waiting (&channels[j], selected_pointer); x = pthread_mutex_unlock (&channel->lock); __go_assert (x == 0); } } /* The NEEDS_BROADCAST variable is set if we are synchronizing with some other select statement. We can't do the actual broadcast until we have unlocked all the channels. */ if (needs_broadcast) { x = pthread_mutex_lock (&__go_select_mutex); __go_assert (x == 0); x = pthread_cond_broadcast (&__go_select_cond); __go_assert (x == 0); x = pthread_mutex_unlock (&__go_select_mutex); __go_assert (x == 0); } return ret; } /* Mark all channels to show that we are waiting for them. This is called with the select mutex held, but none of the channels are locked. This returns true if some channel was found to be ready. */ static _Bool mark_all_channels_waiting (struct select_channel* channels, size_t count, struct __go_channel **selected_pointer, _Bool *selected_for_read_pointer) { _Bool ret; int x; size_t i; ret = 0; for (i = 0; i < count; ++i) { struct __go_channel *channel = channels[i].channel; _Bool is_send = channels[i].is_send; if (channel == NULL) continue; if (channels[i].dup_index != (size_t) -1UL) { size_t j; /* A channel may be selected for both read and write. */ if (channels[channels[i].dup_index].is_send != is_send) { for (j = channels[i].dup_index + 1; j < i; ++j) { if (channels[j].channel == channel && channels[j].is_send == is_send) break; } if (j < i) continue; } } x = pthread_mutex_lock (&channel->lock); __go_assert (x == 0); /* To avoid a race condition, we have to check again whether the channel is ready. It may have become ready since we did the first set of checks but before we acquired the select mutex. If we don't check here, we could sleep forever on the select condition variable. */ if (is_channel_ready (channel, is_send)) ret = 1; /* If SELECTED_POINTER is NULL, then we have already marked the channel as waiting. */ if (selected_pointer != NULL) mark_select_waiting (&channels[i], selected_pointer, selected_for_read_pointer); x = pthread_mutex_unlock (&channel->lock); __go_assert (x == 0); } return ret; } /* Implement select. This is called by the compiler-generated code with pairs of arguments: a pointer to a channel, and an int which is non-zero for send, zero for receive. */ size_t __go_select (size_t count, _Bool has_default, struct __go_channel **channel_args, _Bool *is_send_args) { struct select_channel stack_buffer[16]; struct select_channel *allocated_buffer; struct select_channel *channels; size_t i; int x; struct __go_channel *selected_channel; _Bool selected_for_read; _Bool is_queued; if (count < sizeof stack_buffer / sizeof stack_buffer[0]) { channels = &stack_buffer[0]; allocated_buffer = NULL; } else { allocated_buffer = ((struct select_channel *) malloc (count * sizeof (struct select_channel))); channels = allocated_buffer; } for (i = 0; i < count; ++i) { struct __go_channel *channel_arg = channel_args[i]; _Bool is_send = is_send_args[i]; channels[i].channel = (struct __go_channel*) channel_arg; channels[i].retval = i + 1; channels[i].dup_index = (size_t) -1UL; channels[i].queue_entry.next = NULL; channels[i].queue_entry.selected = NULL; channels[i].is_send = is_send; channels[i].is_ready = 0; } qsort (channels, count, sizeof (struct select_channel), channel_sort); for (i = 0; i < count; ++i) { size_t j; for (j = 0; j < i; ++j) { if (channels[j].channel == channels[i].channel) { channels[i].dup_index = j; break; } } } /* SELECT_CHANNEL is used to select synchronized channels. If no channels are ready, we store a pointer to this variable on the select queue for each synchronized channel. Because the variable may be set by channel operations running in other goroutines, SELECT_CHANNEL may only be accessed when all the channels are locked and/or when the select_data_mutex is locked. */ selected_channel = NULL; /* SELECTED_FOR_READ is set to true if SELECTED_CHANNEL was set by a goroutine which wants to read from the channel. The access restrictions for this are like those for SELECTED_CHANNEL. */ selected_for_read = 0; /* IS_QUEUED is true if we have queued up this select on the queues for any associated synchronous channels. We only do this if no channels are ready the first time around the loop. */ is_queued = 0; while (1) { int ready_count; _Bool is_selected; /* Lock all channels, identify which ones are ready. */ ready_count = lock_channels_find_ready (channels, count); /* All the channels are locked, so we can look at SELECTED_CHANNEL. If it is not NULL, then our choice has been forced by some other goroutine. This can only happen after the first time through the loop. */ is_selected = selected_channel != NULL; if (is_selected) ready_count = force_selected_channel_ready (channels, count, selected_channel, selected_for_read); if (ready_count > 0) { size_t ret; ret = unlock_channels_and_select (channels, count, ready_count, is_selected, (is_queued ? &selected_channel : NULL)); /* If RET is zero, it means that the channel we picked turned out not to be ready, because some other select grabbed it during our traversal. Loop around and try again. */ if (ret == 0) { is_queued = 0; /* We are no longer on any channel queues, so it is safe to touch SELECTED_CHANNEL here. It must be NULL, because otherwise that would somebody has promised to synch up with us and then failed to do so. */ __go_assert (selected_channel == NULL); continue; } if (allocated_buffer != NULL) free (allocated_buffer); return ret; } /* No channels were ready. */ unlock_channels (channels, count); if (has_default) { /* Use the default clause. */ if (allocated_buffer != NULL) free (allocated_buffer); return 0; } /* This is a blocking select. Grab the select lock, tell all the channels to notify us when something happens, and wait for something to happen. */ x = pthread_mutex_lock (&__go_select_mutex); __go_assert (x == 0); /* Check whether CHANNEL_SELECTED was set while the channels were unlocked. If it was set, then we can simply loop around again. We need to check this while the select mutex is held. It is possible that something will set CHANNEL_SELECTED while we mark the channels as waiting. If this happens, that goroutine is required to signal the select condition variable, which means acquiring the select mutex. Since we have the select mutex locked ourselves, we can not miss that signal. */ x = pthread_mutex_lock (&__go_select_data_mutex); __go_assert (x == 0); is_selected = selected_channel != NULL; x = pthread_mutex_unlock (&__go_select_data_mutex); __go_assert (x == 0); if (!is_selected) { /* Mark the channels as waiting, and check whether they have become ready. */ if (!mark_all_channels_waiting (channels, count, (is_queued ? NULL : &selected_channel), (is_queued ? NULL : &selected_for_read))) { x = pthread_cond_wait (&__go_select_cond, &__go_select_mutex); __go_assert (x == 0); } is_queued = 1; } x = pthread_mutex_unlock (&__go_select_mutex); __go_assert (x == 0); } }