diff options
author | Marat Dukhan <maratek@google.com> | 2019-10-19 00:14:10 -0700 |
---|---|---|
committer | Marat Dukhan <maratek@google.com> | 2019-10-19 00:14:10 -0700 |
commit | 71aacd31d8f0851a158a30df6129416d6f2eca17 (patch) | |
tree | 95d74f046d6201ac9968318aed6022381655af7a | |
parent | 158098ac36d257bedd7d9c02d7276eb8b2077881 (diff) | |
download | platform_external_pthreadpool-71aacd31d8f0851a158a30df6129416d6f2eca17.tar.gz platform_external_pthreadpool-71aacd31d8f0851a158a30df6129416d6f2eca17.tar.bz2 platform_external_pthreadpool-71aacd31d8f0851a158a30df6129416d6f2eca17.zip |
Switch to C11 atomics to synchronization
-rw-r--r-- | CMakeLists.txt | 18 | ||||
-rw-r--r-- | src/threadpool-pthreads.c | 357 |
2 files changed, 191 insertions, 184 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 0622cb9..2cdc2cb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,4 +1,4 @@ -CMAKE_MINIMUM_REQUIRED(VERSION 2.8.12 FATAL_ERROR) +CMAKE_MINIMUM_REQUIRED(VERSION 3.5 FATAL_ERROR) INCLUDE(GNUInstallDirs) @@ -16,16 +16,10 @@ IF(PTHREADPOOL_BUILD_TESTS) ENABLE_TESTING() ENDIF() -MACRO(PTHREADPOOL_TARGET_ENABLE_C99 target) - IF(${CMAKE_VERSION} VERSION_LESS "3.1") - IF(NOT MSVC) - TARGET_COMPILE_OPTIONS(${target} PRIVATE -std=c99) - ENDIF() - ELSE() - SET_TARGET_PROPERTIES(${target} PROPERTIES - C_STANDARD 99 - C_EXTENSIONS NO) - ENDIF() +MACRO(PTHREADPOOL_TARGET_ENABLE_C11 target) + SET_TARGET_PROPERTIES(${target} PROPERTIES + C_STANDARD 11 + C_EXTENSIONS NO) ENDMACRO() # ---[ Download deps @@ -91,7 +85,7 @@ ELSE() MESSAGE(FATAL_ERROR "Unsupported library type ${PTHREADPOOL_LIBRARY_TYPE}") ENDIF() -PTHREADPOOL_TARGET_ENABLE_C99(pthreadpool) +PTHREADPOOL_TARGET_ENABLE_C11(pthreadpool) TARGET_LINK_LIBRARIES(pthreadpool PUBLIC pthreadpool_interface) TARGET_INCLUDE_DIRECTORIES(pthreadpool PRIVATE src) IF(NOT CMAKE_SYSTEM_NAME STREQUAL "Emscripten") diff --git a/src/threadpool-pthreads.c b/src/threadpool-pthreads.c index 91a2786..6c6a6d4 100644 --- a/src/threadpool-pthreads.c +++ b/src/threadpool-pthreads.c @@ -1,6 +1,7 @@ /* Standard C headers */ -#include <stdint.h> +#include <stdatomic.h> #include <stdbool.h> +#include <stdint.h> #include <stdlib.h> #include <string.h> @@ -8,24 +9,26 @@ #include <pthread.h> #include <unistd.h> -/* Platform-specific headers */ -#if defined(__linux__) - #define PTHREADPOOL_USE_FUTEX 1 - #include <sys/syscall.h> - #include <linux/futex.h> +/* Futex-specific headers */ +#ifndef PTHREADPOOL_USE_FUTEX + #if defined(__linux__) + #define PTHREADPOOL_USE_FUTEX 1 + #include <sys/syscall.h> + #include <linux/futex.h> - /* Old Android NDKs do not define SYS_futex and FUTEX_PRIVATE_FLAG */ - #ifndef SYS_futex - #define SYS_futex __NR_futex - #endif - #ifndef FUTEX_PRIVATE_FLAG - #define FUTEX_PRIVATE_FLAG 128 + /* Old Android NDKs do not define SYS_futex and FUTEX_PRIVATE_FLAG */ + #ifndef SYS_futex + #define SYS_futex __NR_futex + #endif + #ifndef FUTEX_PRIVATE_FLAG + #define FUTEX_PRIVATE_FLAG 128 + #endif + #elif defined(__native_client__) + #define PTHREADPOOL_USE_FUTEX 1 + #include <irt.h> + #else + #define PTHREADPOOL_USE_FUTEX 0 #endif -#elif defined(__native_client__) - #define PTHREADPOOL_USE_FUTEX 1 - #include <irt.h> -#else - #define PTHREADPOOL_USE_FUTEX 0 #endif /* Dependencies */ @@ -80,14 +83,12 @@ static inline size_t min(size_t a, size_t b) { #if PTHREADPOOL_USE_FUTEX #if defined(__linux__) - static int futex_wait(volatile uint32_t* address, uint32_t value) { - return syscall(SYS_futex, address, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, value, - NULL, NULL, 0); + static int futex_wait(_Atomic uint32_t* address, uint32_t value) { + return syscall(SYS_futex, address, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, value, NULL); } - static int futex_wake_all(volatile uint32_t* address) { - return syscall(SYS_futex, address, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT_MAX, - NULL, NULL, 0); + static int futex_wake_all(_Atomic uint32_t* address) { + return syscall(SYS_futex, address, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT_MAX); } #elif defined(__native_client__) static struct nacl_irt_futex nacl_irt_futex = { 0 }; @@ -96,13 +97,13 @@ static inline size_t min(size_t a, size_t b) { nacl_interface_query(NACL_IRT_FUTEX_v0_1, &nacl_irt_futex, sizeof(nacl_irt_futex)); } - static int futex_wait(volatile uint32_t* address, uint32_t value) { - return nacl_irt_futex.futex_wait_abs((volatile int*) address, (int) value, NULL); + static int futex_wait(_Atomic uint32_t* address, uint32_t value) { + return nacl_irt_futex.futex_wait_abs((_Atomic int*) address, (int) value, NULL); } - static int futex_wake_all(volatile uint32_t* address) { + static int futex_wake_all(_Atomic uint32_t* address) { int count; - return nacl_irt_futex.futex_wake((volatile int*) address, INT_MAX, &count); + return nacl_irt_futex.futex_wake((_Atomic int*) address, INT_MAX, &count); } #else #error "Platform-specific implementation of futex_wait and futex_wake_all required" @@ -122,19 +123,19 @@ struct PTHREADPOOL_CACHELINE_ALIGNED thread_info { * Index of the first element in the work range. * Before processing a new element the owning worker thread increments this value. */ - volatile size_t range_start; + atomic_size_t range_start; /** * Index of the element after the last element of the work range. * Before processing a new element the stealing worker thread decrements this value. */ - volatile size_t range_end; + atomic_size_t range_end; /** * The number of elements in the work range. * Due to race conditions range_length <= range_end - range_start. * The owning worker thread must decrement this value before incrementing @a range_start. * The stealing worker thread must decrement this value before decrementing @a range_end. */ - volatile size_t range_length; + atomic_size_t range_length; /** * Thread number in the 0..threads_count-1 range. */ @@ -156,7 +157,7 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { /** * The number of threads that are processing an operation. */ - volatile size_t active_threads; + atomic_size_t active_threads; #if PTHREADPOOL_USE_FUTEX /** * Indicates if there are active threads. @@ -164,24 +165,24 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { * - has_active_threads == 0 if active_threads == 0 * - has_active_threads == 1 if active_threads != 0 */ - volatile uint32_t has_active_threads; + _Atomic uint32_t has_active_threads; #endif /** * The last command submitted to the thread pool. */ - volatile uint32_t command; + _Atomic uint32_t command; /** * The function to call for each item. */ - volatile void* task; + void *_Atomic task; /** * The first argument to the item processing function. */ - void *volatile argument; + void *_Atomic argument; /** * Copy of the flags passed to parallelization function. */ - uint32_t flags; + _Atomic uint32_t flags; /** * Serializes concurrent calls to @a pthreadpool_parallelize_* from different threads. */ @@ -218,14 +219,13 @@ PTHREADPOOL_STATIC_ASSERT(sizeof(struct pthreadpool) % PTHREADPOOL_CACHELINE_SIZ static void checkin_worker_thread(struct pthreadpool* threadpool) { #if PTHREADPOOL_USE_FUTEX - if (__sync_sub_and_fetch(&threadpool->active_threads, 1) == 0) { - threadpool->has_active_threads = 0; - __sync_synchronize(); + if (atomic_fetch_sub_explicit(&threadpool->active_threads, 1, memory_order_relaxed) == 1) { + atomic_store_explicit(&threadpool->has_active_threads, 0, memory_order_release); futex_wake_all(&threadpool->has_active_threads); } #else pthread_mutex_lock(&threadpool->completion_mutex); - if (--threadpool->active_threads == 0) { + if (atomic_fetch_sub_explicit(&threadpool->active_threads, 1, memory_order_relaxed) == 1) { pthread_cond_signal(&threadpool->completion_condvar); } pthread_mutex_unlock(&threadpool->completion_mutex); @@ -234,66 +234,79 @@ static void checkin_worker_thread(struct pthreadpool* threadpool) { static void wait_worker_threads(struct pthreadpool* threadpool) { /* Initial check */ - const uint32_t active_threads = threadpool->active_threads; - if (active_threads == 0) { - __sync_synchronize(); - return; - } - - /* Spin-wait */ - for (uint32_t i = 0; i < PTHREADPOOL_SPIN_WAIT_ITERATIONS; i++) { - __sync_synchronize(); - const uint32_t active_threads = threadpool->active_threads; + #if PTHREADPOOL_USE_FUTEX + uint32_t has_active_threads = atomic_load_explicit(&threadpool->has_active_threads, memory_order_relaxed); + if (has_active_threads == 0) { + return; + } + #else + size_t active_threads = atomic_load_explicit(&threadpool->active_threads, memory_order_relaxed); if (active_threads == 0) { - __sync_synchronize(); return; } + #endif + + /* Spin-wait */ + for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) { + /* This fence serves as a sleep instruction */ + atomic_thread_fence(memory_order_acquire); + + #if PTHREADPOOL_USE_FUTEX + has_active_threads = atomic_load_explicit(&threadpool->has_active_threads, memory_order_relaxed); + if (has_active_threads == 0) { + return; + } + #else + active_threads = atomic_load_explicit(&threadpool->active_threads, memory_order_relaxed); + if (active_threads == 0) { + return; + } + #endif } /* Fall-back to mutex/futex wait */ #if PTHREADPOOL_USE_FUTEX - uint32_t has_active_threads; - while ((has_active_threads = threadpool->has_active_threads) != 0) { + while ((has_active_threads = atomic_load(&threadpool->has_active_threads)) != 0) { futex_wait(&threadpool->has_active_threads, 1); } #else pthread_mutex_lock(&threadpool->completion_mutex); - while (threadpool->active_threads != 0) { + while (atomic_load_explicit(&threadpool->active_threads, memory_order_relaxed) != 0) { pthread_cond_wait(&threadpool->completion_condvar, &threadpool->completion_mutex); }; pthread_mutex_unlock(&threadpool->completion_mutex); #endif - __sync_synchronize(); } -inline static bool atomic_decrement(volatile size_t* value) { - size_t actual_value = *value; - if (actual_value != 0) { - size_t expected_value; - do { - expected_value = actual_value; - const size_t new_value = actual_value - 1; - actual_value = __sync_val_compare_and_swap(value, expected_value, new_value); - } while ((actual_value != expected_value) && (actual_value != 0)); +inline static bool atomic_decrement(atomic_size_t* value) { + size_t actual_value = atomic_load_explicit(value, memory_order_relaxed); + if (actual_value == 0) { + return false; } - return actual_value != 0; + while (!atomic_compare_exchange_weak_explicit( + value, &actual_value, actual_value - 1, memory_order_relaxed, memory_order_relaxed)) + { + if (actual_value == 0) { + return false; + } + } + return true; } -inline static size_t modulo_increment(uint32_t i, uint32_t n) { - /* Increment input variable */ - i = i + 1; +inline static size_t modulo_decrement(uint32_t i, uint32_t n) { /* Wrap modulo n, if needed */ - if (i == n) { - i = 0; + if (i == 0) { + i = n; } - return i; + /* Decrement input variable */ + return i - 1; } static void thread_parallelize_1d(struct pthreadpool* threadpool, struct thread_info* thread) { - const pthreadpool_task_1d_t task = (pthreadpool_task_1d_t) threadpool->task; - void *const argument = threadpool->argument; + const pthreadpool_task_1d_t task = (pthreadpool_task_1d_t) atomic_load_explicit(&threadpool->task, memory_order_relaxed); + void *const argument = atomic_load_explicit(&threadpool->argument, memory_order_relaxed); /* Process thread's own range of items */ - size_t range_start = thread->range_start; + size_t range_start = atomic_load_explicit(&thread->range_start, memory_order_relaxed); while (atomic_decrement(&thread->range_length)) { task(argument, range_start++); } @@ -301,63 +314,59 @@ static void thread_parallelize_1d(struct pthreadpool* threadpool, struct thread_ /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_increment(thread_number, threads_count); + for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; - tid = modulo_increment(tid, threads_count)) + tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; while (atomic_decrement(&other_thread->range_length)) { - const size_t item_id = __sync_sub_and_fetch(&other_thread->range_end, 1); + const size_t item_id = atomic_fetch_sub_explicit(&other_thread->range_end, 1, memory_order_relaxed) - 1; task(argument, item_id); } } + atomic_thread_fence(memory_order_release); } -static uint32_t spin_wait_for_new_command( +static uint32_t wait_for_new_command( struct pthreadpool* threadpool, uint32_t last_command) { - uint32_t command = threadpool->command; + uint32_t command = atomic_load_explicit(&threadpool->command, memory_order_relaxed); if (command != last_command) { - __sync_synchronize(); + atomic_thread_fence(memory_order_acquire); return command; } - for (uint32_t i = 0; i < PTHREADPOOL_SPIN_WAIT_ITERATIONS; i++) { - __sync_synchronize(); - command = threadpool->command; + + /* Spin-wait loop */ + for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) { + /* This fence serves as a sleep instruction */ + atomic_thread_fence(memory_order_acquire); + + command = atomic_load_explicit(&threadpool->command, memory_order_relaxed); if (command != last_command) { - __sync_synchronize(); + atomic_thread_fence(memory_order_acquire); return command; } } - return last_command; -} -static uint32_t wait_for_new_command( - struct pthreadpool* threadpool, - uint32_t last_command) -{ - uint32_t command = spin_wait_for_new_command(threadpool, last_command); - if (command == last_command) { - /* Spin-wait timed out, fall back to mutex/futex wait */ - #if PTHREADPOOL_USE_FUTEX - do { - futex_wait(&threadpool->command, last_command); - command = threadpool->command; - } while (command == last_command); - #else - /* Lock the command mutex */ - pthread_mutex_lock(&threadpool->command_mutex); - /* Read the command */ - while ((command = threadpool->command) == last_command) { - /* Wait for new command */ - pthread_cond_wait(&threadpool->command_condvar, &threadpool->command_mutex); - } - /* Read a new command */ - pthread_mutex_unlock(&threadpool->command_mutex); - #endif - __sync_synchronize(); - } + /* Spin-wait timed out, fall back to mutex/futex wait */ + #if PTHREADPOOL_USE_FUTEX + do { + futex_wait(&threadpool->command, last_command); + command = atomic_load_explicit(&threadpool->command, memory_order_relaxed); + } while (command == last_command); + #else + /* Lock the command mutex */ + pthread_mutex_lock(&threadpool->command_mutex); + /* Read the command */ + while ((command = atomic_load_explicit(&threadpool->command, memory_order_relaxed)) == last_command) { + /* Wait for new command */ + pthread_cond_wait(&threadpool->command_condvar, &threadpool->command_mutex); + } + /* Read a new command */ + pthread_mutex_unlock(&threadpool->command_mutex); + #endif + atomic_thread_fence(memory_order_acquire); return command; } @@ -373,12 +382,12 @@ static void* thread_main(void* arg) { /* Monitor new commands and act accordingly */ for (;;) { uint32_t command = wait_for_new_command(threadpool, last_command); + const uint32_t flags = atomic_load_explicit(&threadpool->flags, memory_order_relaxed); /* Process command */ switch (command & THREADPOOL_COMMAND_MASK) { case threadpool_command_compute_1d: { - const uint32_t flags = threadpool->flags; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { saved_fpu_state = get_fpu_state(); disable_fpu_denormals(); @@ -420,7 +429,7 @@ static struct pthreadpool* pthreadpool_allocate(size_t threads_count) { return NULL; } #endif - memset(threadpool, 0, sizeof(struct pthreadpool) + threads_count * sizeof(struct thread_info)); + memset(threadpool, 0, threadpool_size); return threadpool; } @@ -437,6 +446,9 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { return NULL; } threadpool->threads_count = threads_count; + for (size_t tid = 0; tid < threads_count; tid++) { + threadpool->threads[tid].thread_number = tid; + } /* Thread pool with a single thread computes everything on the caller thread. */ if (threads_count > 1) { @@ -449,13 +461,13 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { #endif #if PTHREADPOOL_USE_FUTEX - threadpool->has_active_threads = 1; + atomic_store_explicit(&threadpool->has_active_threads, 1, memory_order_relaxed); #endif - threadpool->active_threads = threadpool->threads_count - 1 /* caller thread */; + atomic_store_explicit( + &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_release); /* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */ for (size_t tid = 1; tid < threads_count; tid++) { - threadpool->threads[tid].thread_number = tid; pthread_create(&threadpool->threads[tid].thread_object, NULL, &thread_main, &threadpool->threads[tid]); } @@ -497,63 +509,58 @@ void pthreadpool_parallelize_1d( /* Protect the global threadpool structures */ pthread_mutex_lock(&threadpool->execution_mutex); - #if PTHREADPOOL_USE_FUTEX - /* Setup global arguments */ - threadpool->task = task; - threadpool->argument = argument; - threadpool->flags = flags; - - threadpool->active_threads = threadpool->threads_count - 1 /* caller thread */; - threadpool->has_active_threads = 1; - - /* Spread the work between threads */ - for (size_t tid = 0; tid < threadpool->threads_count; tid++) { - struct thread_info* thread = &threadpool->threads[tid]; - thread->range_start = multiply_divide(range, tid, threadpool->threads_count); - thread->range_end = multiply_divide(range, tid + 1, threadpool->threads_count); - thread->range_length = thread->range_end - thread->range_start; - } - __sync_synchronize(); - - /* - * Update the threadpool command. - * Imporantly, do it after initializing command parameters (range, task, argument) - * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask - * to ensure the unmasked command is different then the last command, because worker threads - * monitor for change in the unmasked command. - */ - threadpool->command = ~(threadpool->command | THREADPOOL_COMMAND_MASK) | threadpool_command_compute_1d; - __sync_synchronize(); - - futex_wake_all(&threadpool->command); - #else + #if !PTHREADPOOL_USE_FUTEX /* Lock the command variables to ensure that threads don't start processing before they observe complete command with all arguments */ pthread_mutex_lock(&threadpool->command_mutex); + #endif - /* Setup global arguments */ - threadpool->task = task; - threadpool->argument = argument; - threadpool->flags = flags; + /* Setup global arguments */ + atomic_store_explicit(&threadpool->task, task, memory_order_relaxed); + atomic_store_explicit(&threadpool->argument, argument, memory_order_relaxed); + atomic_store_explicit(&threadpool->flags, flags, memory_order_relaxed); - /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ - threadpool->active_threads = threadpool->threads_count - 1 /* caller thread */; + /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ + atomic_store_explicit( + &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_relaxed); + #if PTHREADPOOL_USE_FUTEX + atomic_store_explicit(&threadpool->has_active_threads, 1, memory_order_relaxed); + #endif - /* Spread the work between threads */ - for (size_t tid = 0; tid < threadpool->threads_count; tid++) { - struct thread_info* thread = &threadpool->threads[tid]; - thread->range_start = multiply_divide(range, tid, threadpool->threads_count); - thread->range_end = multiply_divide(range, tid + 1, threadpool->threads_count); - thread->range_length = thread->range_end - thread->range_start; - } + /* Spread the work between threads */ + for (size_t tid = 0; tid < threadpool->threads_count; tid++) { + struct thread_info* thread = &threadpool->threads[tid]; + const size_t range_start = multiply_divide(range, tid, threadpool->threads_count); + const size_t range_end = multiply_divide(range, tid + 1, threadpool->threads_count); + atomic_store_explicit(&thread->range_start, range_start, memory_order_relaxed); + atomic_store_explicit(&thread->range_end, range_end, memory_order_relaxed); + atomic_store_explicit(&thread->range_length, range_end - range_start, memory_order_relaxed); + } + #if PTHREADPOOL_USE_FUTEX /* - * Update the threadpool command. - * Imporantly, do it after initializing command parameters (range, task, argument) - * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask - * to ensure the unmasked command is different then the last command, because worker threads - * monitor for change in the unmasked command. + * Make new command parameters globally visible. Having this fence before updating the command is imporatnt: it + * guarantees that if a worker thread observes new command value, it also observes the updated command parameters. */ - threadpool->command = ~(threadpool->command | THREADPOOL_COMMAND_MASK) | threadpool_command_compute_1d; + atomic_thread_fence(memory_order_release); + #endif + + /* + * Update the threadpool command. + * Imporantly, do it after initializing command parameters (range, task, argument) + * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask + * to ensure the unmasked command is different then the last command, because worker threads + * monitor for change in the unmasked command. + */ + const uint32_t old_command = atomic_load_explicit(&threadpool->command, memory_order_relaxed); + const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_compute_1d; + + #if PTHREADPOOL_USE_FUTEX + atomic_store_explicit(&threadpool->command, new_command, memory_order_release); + + /* Wake up the threads */ + futex_wake_all(&threadpool->command); + #else + atomic_store_explicit(&threadpool->command, new_command, memory_order_relaxed); /* Unlock the command variables before waking up the threads for better performance */ pthread_mutex_unlock(&threadpool->command_mutex); @@ -580,6 +587,9 @@ void pthreadpool_parallelize_1d( /* Wait until the threads finish computation */ wait_worker_threads(threadpool); + /* Make changes by other threads visible to this thread */ + atomic_thread_fence(memory_order_acquire); + /* Unprotect the global threadpool structures */ pthread_mutex_unlock(&threadpool->execution_mutex); } @@ -1154,21 +1164,24 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) { if (threadpool != NULL) { if (threadpool->threads_count > 1) { #if PTHREADPOOL_USE_FUTEX - threadpool->active_threads = threadpool->threads_count - 1 /* caller thread */; - threadpool->has_active_threads = 1; - __sync_synchronize(); - threadpool->command = threadpool_command_shutdown; - __sync_synchronize(); + atomic_store_explicit( + &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_relaxed); + atomic_store_explicit(&threadpool->has_active_threads, 1, memory_order_release); + + atomic_store_explicit(&threadpool->command, threadpool_command_shutdown, memory_order_release); + + /* Wake up worker threads */ futex_wake_all(&threadpool->command); #else /* Lock the command variable to ensure that threads don't shutdown until both command and active_threads are updated */ pthread_mutex_lock(&threadpool->command_mutex); /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ - threadpool->active_threads = threadpool->threads_count - 1 /* caller thread */; + atomic_store_explicit( + &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_release); /* Update the threadpool command. */ - threadpool->command = threadpool_command_shutdown; + atomic_store_explicit(&threadpool->command, threadpool_command_shutdown, memory_order_release); /* Wake up worker threads */ pthread_cond_broadcast(&threadpool->command_condvar); |