aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMarat Dukhan <maratek@google.com>2019-10-19 00:14:10 -0700
committerMarat Dukhan <maratek@google.com>2019-10-19 00:14:10 -0700
commit71aacd31d8f0851a158a30df6129416d6f2eca17 (patch)
tree95d74f046d6201ac9968318aed6022381655af7a
parent158098ac36d257bedd7d9c02d7276eb8b2077881 (diff)
downloadplatform_external_pthreadpool-71aacd31d8f0851a158a30df6129416d6f2eca17.tar.gz
platform_external_pthreadpool-71aacd31d8f0851a158a30df6129416d6f2eca17.tar.bz2
platform_external_pthreadpool-71aacd31d8f0851a158a30df6129416d6f2eca17.zip
Switch to C11 atomics to synchronization
-rw-r--r--CMakeLists.txt18
-rw-r--r--src/threadpool-pthreads.c357
2 files changed, 191 insertions, 184 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 0622cb9..2cdc2cb 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1,4 +1,4 @@
-CMAKE_MINIMUM_REQUIRED(VERSION 2.8.12 FATAL_ERROR)
+CMAKE_MINIMUM_REQUIRED(VERSION 3.5 FATAL_ERROR)
INCLUDE(GNUInstallDirs)
@@ -16,16 +16,10 @@ IF(PTHREADPOOL_BUILD_TESTS)
ENABLE_TESTING()
ENDIF()
-MACRO(PTHREADPOOL_TARGET_ENABLE_C99 target)
- IF(${CMAKE_VERSION} VERSION_LESS "3.1")
- IF(NOT MSVC)
- TARGET_COMPILE_OPTIONS(${target} PRIVATE -std=c99)
- ENDIF()
- ELSE()
- SET_TARGET_PROPERTIES(${target} PROPERTIES
- C_STANDARD 99
- C_EXTENSIONS NO)
- ENDIF()
+MACRO(PTHREADPOOL_TARGET_ENABLE_C11 target)
+ SET_TARGET_PROPERTIES(${target} PROPERTIES
+ C_STANDARD 11
+ C_EXTENSIONS NO)
ENDMACRO()
# ---[ Download deps
@@ -91,7 +85,7 @@ ELSE()
MESSAGE(FATAL_ERROR "Unsupported library type ${PTHREADPOOL_LIBRARY_TYPE}")
ENDIF()
-PTHREADPOOL_TARGET_ENABLE_C99(pthreadpool)
+PTHREADPOOL_TARGET_ENABLE_C11(pthreadpool)
TARGET_LINK_LIBRARIES(pthreadpool PUBLIC pthreadpool_interface)
TARGET_INCLUDE_DIRECTORIES(pthreadpool PRIVATE src)
IF(NOT CMAKE_SYSTEM_NAME STREQUAL "Emscripten")
diff --git a/src/threadpool-pthreads.c b/src/threadpool-pthreads.c
index 91a2786..6c6a6d4 100644
--- a/src/threadpool-pthreads.c
+++ b/src/threadpool-pthreads.c
@@ -1,6 +1,7 @@
/* Standard C headers */
-#include <stdint.h>
+#include <stdatomic.h>
#include <stdbool.h>
+#include <stdint.h>
#include <stdlib.h>
#include <string.h>
@@ -8,24 +9,26 @@
#include <pthread.h>
#include <unistd.h>
-/* Platform-specific headers */
-#if defined(__linux__)
- #define PTHREADPOOL_USE_FUTEX 1
- #include <sys/syscall.h>
- #include <linux/futex.h>
+/* Futex-specific headers */
+#ifndef PTHREADPOOL_USE_FUTEX
+ #if defined(__linux__)
+ #define PTHREADPOOL_USE_FUTEX 1
+ #include <sys/syscall.h>
+ #include <linux/futex.h>
- /* Old Android NDKs do not define SYS_futex and FUTEX_PRIVATE_FLAG */
- #ifndef SYS_futex
- #define SYS_futex __NR_futex
- #endif
- #ifndef FUTEX_PRIVATE_FLAG
- #define FUTEX_PRIVATE_FLAG 128
+ /* Old Android NDKs do not define SYS_futex and FUTEX_PRIVATE_FLAG */
+ #ifndef SYS_futex
+ #define SYS_futex __NR_futex
+ #endif
+ #ifndef FUTEX_PRIVATE_FLAG
+ #define FUTEX_PRIVATE_FLAG 128
+ #endif
+ #elif defined(__native_client__)
+ #define PTHREADPOOL_USE_FUTEX 1
+ #include <irt.h>
+ #else
+ #define PTHREADPOOL_USE_FUTEX 0
#endif
-#elif defined(__native_client__)
- #define PTHREADPOOL_USE_FUTEX 1
- #include <irt.h>
-#else
- #define PTHREADPOOL_USE_FUTEX 0
#endif
/* Dependencies */
@@ -80,14 +83,12 @@ static inline size_t min(size_t a, size_t b) {
#if PTHREADPOOL_USE_FUTEX
#if defined(__linux__)
- static int futex_wait(volatile uint32_t* address, uint32_t value) {
- return syscall(SYS_futex, address, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, value,
- NULL, NULL, 0);
+ static int futex_wait(_Atomic uint32_t* address, uint32_t value) {
+ return syscall(SYS_futex, address, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, value, NULL);
}
- static int futex_wake_all(volatile uint32_t* address) {
- return syscall(SYS_futex, address, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT_MAX,
- NULL, NULL, 0);
+ static int futex_wake_all(_Atomic uint32_t* address) {
+ return syscall(SYS_futex, address, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT_MAX);
}
#elif defined(__native_client__)
static struct nacl_irt_futex nacl_irt_futex = { 0 };
@@ -96,13 +97,13 @@ static inline size_t min(size_t a, size_t b) {
nacl_interface_query(NACL_IRT_FUTEX_v0_1, &nacl_irt_futex, sizeof(nacl_irt_futex));
}
- static int futex_wait(volatile uint32_t* address, uint32_t value) {
- return nacl_irt_futex.futex_wait_abs((volatile int*) address, (int) value, NULL);
+ static int futex_wait(_Atomic uint32_t* address, uint32_t value) {
+ return nacl_irt_futex.futex_wait_abs((_Atomic int*) address, (int) value, NULL);
}
- static int futex_wake_all(volatile uint32_t* address) {
+ static int futex_wake_all(_Atomic uint32_t* address) {
int count;
- return nacl_irt_futex.futex_wake((volatile int*) address, INT_MAX, &count);
+ return nacl_irt_futex.futex_wake((_Atomic int*) address, INT_MAX, &count);
}
#else
#error "Platform-specific implementation of futex_wait and futex_wake_all required"
@@ -122,19 +123,19 @@ struct PTHREADPOOL_CACHELINE_ALIGNED thread_info {
* Index of the first element in the work range.
* Before processing a new element the owning worker thread increments this value.
*/
- volatile size_t range_start;
+ atomic_size_t range_start;
/**
* Index of the element after the last element of the work range.
* Before processing a new element the stealing worker thread decrements this value.
*/
- volatile size_t range_end;
+ atomic_size_t range_end;
/**
* The number of elements in the work range.
* Due to race conditions range_length <= range_end - range_start.
* The owning worker thread must decrement this value before incrementing @a range_start.
* The stealing worker thread must decrement this value before decrementing @a range_end.
*/
- volatile size_t range_length;
+ atomic_size_t range_length;
/**
* Thread number in the 0..threads_count-1 range.
*/
@@ -156,7 +157,7 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool {
/**
* The number of threads that are processing an operation.
*/
- volatile size_t active_threads;
+ atomic_size_t active_threads;
#if PTHREADPOOL_USE_FUTEX
/**
* Indicates if there are active threads.
@@ -164,24 +165,24 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool {
* - has_active_threads == 0 if active_threads == 0
* - has_active_threads == 1 if active_threads != 0
*/
- volatile uint32_t has_active_threads;
+ _Atomic uint32_t has_active_threads;
#endif
/**
* The last command submitted to the thread pool.
*/
- volatile uint32_t command;
+ _Atomic uint32_t command;
/**
* The function to call for each item.
*/
- volatile void* task;
+ void *_Atomic task;
/**
* The first argument to the item processing function.
*/
- void *volatile argument;
+ void *_Atomic argument;
/**
* Copy of the flags passed to parallelization function.
*/
- uint32_t flags;
+ _Atomic uint32_t flags;
/**
* Serializes concurrent calls to @a pthreadpool_parallelize_* from different threads.
*/
@@ -218,14 +219,13 @@ PTHREADPOOL_STATIC_ASSERT(sizeof(struct pthreadpool) % PTHREADPOOL_CACHELINE_SIZ
static void checkin_worker_thread(struct pthreadpool* threadpool) {
#if PTHREADPOOL_USE_FUTEX
- if (__sync_sub_and_fetch(&threadpool->active_threads, 1) == 0) {
- threadpool->has_active_threads = 0;
- __sync_synchronize();
+ if (atomic_fetch_sub_explicit(&threadpool->active_threads, 1, memory_order_relaxed) == 1) {
+ atomic_store_explicit(&threadpool->has_active_threads, 0, memory_order_release);
futex_wake_all(&threadpool->has_active_threads);
}
#else
pthread_mutex_lock(&threadpool->completion_mutex);
- if (--threadpool->active_threads == 0) {
+ if (atomic_fetch_sub_explicit(&threadpool->active_threads, 1, memory_order_relaxed) == 1) {
pthread_cond_signal(&threadpool->completion_condvar);
}
pthread_mutex_unlock(&threadpool->completion_mutex);
@@ -234,66 +234,79 @@ static void checkin_worker_thread(struct pthreadpool* threadpool) {
static void wait_worker_threads(struct pthreadpool* threadpool) {
/* Initial check */
- const uint32_t active_threads = threadpool->active_threads;
- if (active_threads == 0) {
- __sync_synchronize();
- return;
- }
-
- /* Spin-wait */
- for (uint32_t i = 0; i < PTHREADPOOL_SPIN_WAIT_ITERATIONS; i++) {
- __sync_synchronize();
- const uint32_t active_threads = threadpool->active_threads;
+ #if PTHREADPOOL_USE_FUTEX
+ uint32_t has_active_threads = atomic_load_explicit(&threadpool->has_active_threads, memory_order_relaxed);
+ if (has_active_threads == 0) {
+ return;
+ }
+ #else
+ size_t active_threads = atomic_load_explicit(&threadpool->active_threads, memory_order_relaxed);
if (active_threads == 0) {
- __sync_synchronize();
return;
}
+ #endif
+
+ /* Spin-wait */
+ for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) {
+ /* This fence serves as a sleep instruction */
+ atomic_thread_fence(memory_order_acquire);
+
+ #if PTHREADPOOL_USE_FUTEX
+ has_active_threads = atomic_load_explicit(&threadpool->has_active_threads, memory_order_relaxed);
+ if (has_active_threads == 0) {
+ return;
+ }
+ #else
+ active_threads = atomic_load_explicit(&threadpool->active_threads, memory_order_relaxed);
+ if (active_threads == 0) {
+ return;
+ }
+ #endif
}
/* Fall-back to mutex/futex wait */
#if PTHREADPOOL_USE_FUTEX
- uint32_t has_active_threads;
- while ((has_active_threads = threadpool->has_active_threads) != 0) {
+ while ((has_active_threads = atomic_load(&threadpool->has_active_threads)) != 0) {
futex_wait(&threadpool->has_active_threads, 1);
}
#else
pthread_mutex_lock(&threadpool->completion_mutex);
- while (threadpool->active_threads != 0) {
+ while (atomic_load_explicit(&threadpool->active_threads, memory_order_relaxed) != 0) {
pthread_cond_wait(&threadpool->completion_condvar, &threadpool->completion_mutex);
};
pthread_mutex_unlock(&threadpool->completion_mutex);
#endif
- __sync_synchronize();
}
-inline static bool atomic_decrement(volatile size_t* value) {
- size_t actual_value = *value;
- if (actual_value != 0) {
- size_t expected_value;
- do {
- expected_value = actual_value;
- const size_t new_value = actual_value - 1;
- actual_value = __sync_val_compare_and_swap(value, expected_value, new_value);
- } while ((actual_value != expected_value) && (actual_value != 0));
+inline static bool atomic_decrement(atomic_size_t* value) {
+ size_t actual_value = atomic_load_explicit(value, memory_order_relaxed);
+ if (actual_value == 0) {
+ return false;
}
- return actual_value != 0;
+ while (!atomic_compare_exchange_weak_explicit(
+ value, &actual_value, actual_value - 1, memory_order_relaxed, memory_order_relaxed))
+ {
+ if (actual_value == 0) {
+ return false;
+ }
+ }
+ return true;
}
-inline static size_t modulo_increment(uint32_t i, uint32_t n) {
- /* Increment input variable */
- i = i + 1;
+inline static size_t modulo_decrement(uint32_t i, uint32_t n) {
/* Wrap modulo n, if needed */
- if (i == n) {
- i = 0;
+ if (i == 0) {
+ i = n;
}
- return i;
+ /* Decrement input variable */
+ return i - 1;
}
static void thread_parallelize_1d(struct pthreadpool* threadpool, struct thread_info* thread) {
- const pthreadpool_task_1d_t task = (pthreadpool_task_1d_t) threadpool->task;
- void *const argument = threadpool->argument;
+ const pthreadpool_task_1d_t task = (pthreadpool_task_1d_t) atomic_load_explicit(&threadpool->task, memory_order_relaxed);
+ void *const argument = atomic_load_explicit(&threadpool->argument, memory_order_relaxed);
/* Process thread's own range of items */
- size_t range_start = thread->range_start;
+ size_t range_start = atomic_load_explicit(&thread->range_start, memory_order_relaxed);
while (atomic_decrement(&thread->range_length)) {
task(argument, range_start++);
}
@@ -301,63 +314,59 @@ static void thread_parallelize_1d(struct pthreadpool* threadpool, struct thread_
/* There still may be other threads with work */
const size_t thread_number = thread->thread_number;
const size_t threads_count = threadpool->threads_count;
- for (size_t tid = modulo_increment(thread_number, threads_count);
+ for (size_t tid = modulo_decrement(thread_number, threads_count);
tid != thread_number;
- tid = modulo_increment(tid, threads_count))
+ tid = modulo_decrement(tid, threads_count))
{
struct thread_info* other_thread = &threadpool->threads[tid];
while (atomic_decrement(&other_thread->range_length)) {
- const size_t item_id = __sync_sub_and_fetch(&other_thread->range_end, 1);
+ const size_t item_id = atomic_fetch_sub_explicit(&other_thread->range_end, 1, memory_order_relaxed) - 1;
task(argument, item_id);
}
}
+ atomic_thread_fence(memory_order_release);
}
-static uint32_t spin_wait_for_new_command(
+static uint32_t wait_for_new_command(
struct pthreadpool* threadpool,
uint32_t last_command)
{
- uint32_t command = threadpool->command;
+ uint32_t command = atomic_load_explicit(&threadpool->command, memory_order_relaxed);
if (command != last_command) {
- __sync_synchronize();
+ atomic_thread_fence(memory_order_acquire);
return command;
}
- for (uint32_t i = 0; i < PTHREADPOOL_SPIN_WAIT_ITERATIONS; i++) {
- __sync_synchronize();
- command = threadpool->command;
+
+ /* Spin-wait loop */
+ for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) {
+ /* This fence serves as a sleep instruction */
+ atomic_thread_fence(memory_order_acquire);
+
+ command = atomic_load_explicit(&threadpool->command, memory_order_relaxed);
if (command != last_command) {
- __sync_synchronize();
+ atomic_thread_fence(memory_order_acquire);
return command;
}
}
- return last_command;
-}
-static uint32_t wait_for_new_command(
- struct pthreadpool* threadpool,
- uint32_t last_command)
-{
- uint32_t command = spin_wait_for_new_command(threadpool, last_command);
- if (command == last_command) {
- /* Spin-wait timed out, fall back to mutex/futex wait */
- #if PTHREADPOOL_USE_FUTEX
- do {
- futex_wait(&threadpool->command, last_command);
- command = threadpool->command;
- } while (command == last_command);
- #else
- /* Lock the command mutex */
- pthread_mutex_lock(&threadpool->command_mutex);
- /* Read the command */
- while ((command = threadpool->command) == last_command) {
- /* Wait for new command */
- pthread_cond_wait(&threadpool->command_condvar, &threadpool->command_mutex);
- }
- /* Read a new command */
- pthread_mutex_unlock(&threadpool->command_mutex);
- #endif
- __sync_synchronize();
- }
+ /* Spin-wait timed out, fall back to mutex/futex wait */
+ #if PTHREADPOOL_USE_FUTEX
+ do {
+ futex_wait(&threadpool->command, last_command);
+ command = atomic_load_explicit(&threadpool->command, memory_order_relaxed);
+ } while (command == last_command);
+ #else
+ /* Lock the command mutex */
+ pthread_mutex_lock(&threadpool->command_mutex);
+ /* Read the command */
+ while ((command = atomic_load_explicit(&threadpool->command, memory_order_relaxed)) == last_command) {
+ /* Wait for new command */
+ pthread_cond_wait(&threadpool->command_condvar, &threadpool->command_mutex);
+ }
+ /* Read a new command */
+ pthread_mutex_unlock(&threadpool->command_mutex);
+ #endif
+ atomic_thread_fence(memory_order_acquire);
return command;
}
@@ -373,12 +382,12 @@ static void* thread_main(void* arg) {
/* Monitor new commands and act accordingly */
for (;;) {
uint32_t command = wait_for_new_command(threadpool, last_command);
+ const uint32_t flags = atomic_load_explicit(&threadpool->flags, memory_order_relaxed);
/* Process command */
switch (command & THREADPOOL_COMMAND_MASK) {
case threadpool_command_compute_1d:
{
- const uint32_t flags = threadpool->flags;
if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
saved_fpu_state = get_fpu_state();
disable_fpu_denormals();
@@ -420,7 +429,7 @@ static struct pthreadpool* pthreadpool_allocate(size_t threads_count) {
return NULL;
}
#endif
- memset(threadpool, 0, sizeof(struct pthreadpool) + threads_count * sizeof(struct thread_info));
+ memset(threadpool, 0, threadpool_size);
return threadpool;
}
@@ -437,6 +446,9 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) {
return NULL;
}
threadpool->threads_count = threads_count;
+ for (size_t tid = 0; tid < threads_count; tid++) {
+ threadpool->threads[tid].thread_number = tid;
+ }
/* Thread pool with a single thread computes everything on the caller thread. */
if (threads_count > 1) {
@@ -449,13 +461,13 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) {
#endif
#if PTHREADPOOL_USE_FUTEX
- threadpool->has_active_threads = 1;
+ atomic_store_explicit(&threadpool->has_active_threads, 1, memory_order_relaxed);
#endif
- threadpool->active_threads = threadpool->threads_count - 1 /* caller thread */;
+ atomic_store_explicit(
+ &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_release);
/* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */
for (size_t tid = 1; tid < threads_count; tid++) {
- threadpool->threads[tid].thread_number = tid;
pthread_create(&threadpool->threads[tid].thread_object, NULL, &thread_main, &threadpool->threads[tid]);
}
@@ -497,63 +509,58 @@ void pthreadpool_parallelize_1d(
/* Protect the global threadpool structures */
pthread_mutex_lock(&threadpool->execution_mutex);
- #if PTHREADPOOL_USE_FUTEX
- /* Setup global arguments */
- threadpool->task = task;
- threadpool->argument = argument;
- threadpool->flags = flags;
-
- threadpool->active_threads = threadpool->threads_count - 1 /* caller thread */;
- threadpool->has_active_threads = 1;
-
- /* Spread the work between threads */
- for (size_t tid = 0; tid < threadpool->threads_count; tid++) {
- struct thread_info* thread = &threadpool->threads[tid];
- thread->range_start = multiply_divide(range, tid, threadpool->threads_count);
- thread->range_end = multiply_divide(range, tid + 1, threadpool->threads_count);
- thread->range_length = thread->range_end - thread->range_start;
- }
- __sync_synchronize();
-
- /*
- * Update the threadpool command.
- * Imporantly, do it after initializing command parameters (range, task, argument)
- * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask
- * to ensure the unmasked command is different then the last command, because worker threads
- * monitor for change in the unmasked command.
- */
- threadpool->command = ~(threadpool->command | THREADPOOL_COMMAND_MASK) | threadpool_command_compute_1d;
- __sync_synchronize();
-
- futex_wake_all(&threadpool->command);
- #else
+ #if !PTHREADPOOL_USE_FUTEX
/* Lock the command variables to ensure that threads don't start processing before they observe complete command with all arguments */
pthread_mutex_lock(&threadpool->command_mutex);
+ #endif
- /* Setup global arguments */
- threadpool->task = task;
- threadpool->argument = argument;
- threadpool->flags = flags;
+ /* Setup global arguments */
+ atomic_store_explicit(&threadpool->task, task, memory_order_relaxed);
+ atomic_store_explicit(&threadpool->argument, argument, memory_order_relaxed);
+ atomic_store_explicit(&threadpool->flags, flags, memory_order_relaxed);
- /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */
- threadpool->active_threads = threadpool->threads_count - 1 /* caller thread */;
+ /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */
+ atomic_store_explicit(
+ &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_relaxed);
+ #if PTHREADPOOL_USE_FUTEX
+ atomic_store_explicit(&threadpool->has_active_threads, 1, memory_order_relaxed);
+ #endif
- /* Spread the work between threads */
- for (size_t tid = 0; tid < threadpool->threads_count; tid++) {
- struct thread_info* thread = &threadpool->threads[tid];
- thread->range_start = multiply_divide(range, tid, threadpool->threads_count);
- thread->range_end = multiply_divide(range, tid + 1, threadpool->threads_count);
- thread->range_length = thread->range_end - thread->range_start;
- }
+ /* Spread the work between threads */
+ for (size_t tid = 0; tid < threadpool->threads_count; tid++) {
+ struct thread_info* thread = &threadpool->threads[tid];
+ const size_t range_start = multiply_divide(range, tid, threadpool->threads_count);
+ const size_t range_end = multiply_divide(range, tid + 1, threadpool->threads_count);
+ atomic_store_explicit(&thread->range_start, range_start, memory_order_relaxed);
+ atomic_store_explicit(&thread->range_end, range_end, memory_order_relaxed);
+ atomic_store_explicit(&thread->range_length, range_end - range_start, memory_order_relaxed);
+ }
+ #if PTHREADPOOL_USE_FUTEX
/*
- * Update the threadpool command.
- * Imporantly, do it after initializing command parameters (range, task, argument)
- * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask
- * to ensure the unmasked command is different then the last command, because worker threads
- * monitor for change in the unmasked command.
+ * Make new command parameters globally visible. Having this fence before updating the command is imporatnt: it
+ * guarantees that if a worker thread observes new command value, it also observes the updated command parameters.
*/
- threadpool->command = ~(threadpool->command | THREADPOOL_COMMAND_MASK) | threadpool_command_compute_1d;
+ atomic_thread_fence(memory_order_release);
+ #endif
+
+ /*
+ * Update the threadpool command.
+ * Imporantly, do it after initializing command parameters (range, task, argument)
+ * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask
+ * to ensure the unmasked command is different then the last command, because worker threads
+ * monitor for change in the unmasked command.
+ */
+ const uint32_t old_command = atomic_load_explicit(&threadpool->command, memory_order_relaxed);
+ const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_compute_1d;
+
+ #if PTHREADPOOL_USE_FUTEX
+ atomic_store_explicit(&threadpool->command, new_command, memory_order_release);
+
+ /* Wake up the threads */
+ futex_wake_all(&threadpool->command);
+ #else
+ atomic_store_explicit(&threadpool->command, new_command, memory_order_relaxed);
/* Unlock the command variables before waking up the threads for better performance */
pthread_mutex_unlock(&threadpool->command_mutex);
@@ -580,6 +587,9 @@ void pthreadpool_parallelize_1d(
/* Wait until the threads finish computation */
wait_worker_threads(threadpool);
+ /* Make changes by other threads visible to this thread */
+ atomic_thread_fence(memory_order_acquire);
+
/* Unprotect the global threadpool structures */
pthread_mutex_unlock(&threadpool->execution_mutex);
}
@@ -1154,21 +1164,24 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) {
if (threadpool != NULL) {
if (threadpool->threads_count > 1) {
#if PTHREADPOOL_USE_FUTEX
- threadpool->active_threads = threadpool->threads_count - 1 /* caller thread */;
- threadpool->has_active_threads = 1;
- __sync_synchronize();
- threadpool->command = threadpool_command_shutdown;
- __sync_synchronize();
+ atomic_store_explicit(
+ &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_relaxed);
+ atomic_store_explicit(&threadpool->has_active_threads, 1, memory_order_release);
+
+ atomic_store_explicit(&threadpool->command, threadpool_command_shutdown, memory_order_release);
+
+ /* Wake up worker threads */
futex_wake_all(&threadpool->command);
#else
/* Lock the command variable to ensure that threads don't shutdown until both command and active_threads are updated */
pthread_mutex_lock(&threadpool->command_mutex);
/* Locking of completion_mutex not needed: readers are sleeping on command_condvar */
- threadpool->active_threads = threadpool->threads_count - 1 /* caller thread */;
+ atomic_store_explicit(
+ &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_release);
/* Update the threadpool command. */
- threadpool->command = threadpool_command_shutdown;
+ atomic_store_explicit(&threadpool->command, threadpool_command_shutdown, memory_order_release);
/* Wake up worker threads */
pthread_cond_broadcast(&threadpool->command_condvar);