diff options
Diffstat (limited to 'src/threadpool-pthreads.c')
-rw-r--r-- | src/threadpool-pthreads.c | 883 |
1 files changed, 688 insertions, 195 deletions
diff --git a/src/threadpool-pthreads.c b/src/threadpool-pthreads.c index 6c6a6d4..0a9c06d 100644 --- a/src/threadpool-pthreads.c +++ b/src/threadpool-pthreads.c @@ -1,5 +1,5 @@ /* Standard C headers */ -#include <stdatomic.h> +#include <assert.h> #include <stdbool.h> #include <stdint.h> #include <stdlib.h> @@ -9,10 +9,27 @@ #include <pthread.h> #include <unistd.h> -/* Futex-specific headers */ +#ifndef PTHREADPOOL_USE_CPUINFO + #define PTHREADPOOL_USE_CPUINFO 0 +#endif + #ifndef PTHREADPOOL_USE_FUTEX #if defined(__linux__) #define PTHREADPOOL_USE_FUTEX 1 + #elif defined(__EMSCRIPTEN__) + #define PTHREADPOOL_USE_FUTEX 1 + #else + #define PTHREADPOOL_USE_FUTEX 0 + #endif +#endif + +#if PTHREADPOOL_USE_CPUINFO + #include <cpuinfo.h> +#endif + +/* Futex-specific headers */ +#if PTHREADPOOL_USE_FUTEX + #if defined(__linux__) #include <sys/syscall.h> #include <linux/futex.h> @@ -23,14 +40,22 @@ #ifndef FUTEX_PRIVATE_FLAG #define FUTEX_PRIVATE_FLAG 128 #endif - #elif defined(__native_client__) - #define PTHREADPOOL_USE_FUTEX 1 - #include <irt.h> + #elif defined(__EMSCRIPTEN__) + /* math.h for INFINITY constant */ + #include <math.h> + + #include <emscripten/threading.h> #else - #define PTHREADPOOL_USE_FUTEX 0 + #error "Platform-specific implementation of futex_wait and futex_wake_all required" #endif #endif +#ifdef _WIN32 + #define NOMINMAX + #include <malloc.h> + #include <sysinfoapi.h> +#endif + /* Dependencies */ #include <fxdiv.h> @@ -39,6 +64,7 @@ /* Internal headers */ #include "threadpool-utils.h" +#include "threadpool-atomics.h" /* Number of iterations in spin-wait loop before going into futex/mutex wait */ #define PTHREADPOOL_SPIN_WAIT_ITERATIONS 1000000 @@ -83,27 +109,20 @@ static inline size_t min(size_t a, size_t b) { #if PTHREADPOOL_USE_FUTEX #if defined(__linux__) - static int futex_wait(_Atomic uint32_t* address, uint32_t value) { + static int futex_wait(pthreadpool_atomic_uint32_t* address, uint32_t value) { return syscall(SYS_futex, address, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, value, NULL); } - static int futex_wake_all(_Atomic uint32_t* address) { + static int futex_wake_all(pthreadpool_atomic_uint32_t* address) { return syscall(SYS_futex, address, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT_MAX); } - #elif defined(__native_client__) - static struct nacl_irt_futex nacl_irt_futex = { 0 }; - static pthread_once_t nacl_init_guard = PTHREAD_ONCE_INIT; - static void nacl_init(void) { - nacl_interface_query(NACL_IRT_FUTEX_v0_1, &nacl_irt_futex, sizeof(nacl_irt_futex)); + #elif defined(__EMSCRIPTEN__) + static int futex_wait(pthreadpool_atomic_uint32_t* address, uint32_t value) { + return emscripten_futex_wait((volatile void*) address, value, INFINITY); } - static int futex_wait(_Atomic uint32_t* address, uint32_t value) { - return nacl_irt_futex.futex_wait_abs((_Atomic int*) address, (int) value, NULL); - } - - static int futex_wake_all(_Atomic uint32_t* address) { - int count; - return nacl_irt_futex.futex_wake((_Atomic int*) address, INT_MAX, &count); + static int futex_wake_all(pthreadpool_atomic_uint32_t* address) { + return emscripten_futex_wake((volatile void*) address, INT_MAX); } #else #error "Platform-specific implementation of futex_wait and futex_wake_all required" @@ -114,7 +133,7 @@ static inline size_t min(size_t a, size_t b) { enum threadpool_command { threadpool_command_init, - threadpool_command_compute_1d, + threadpool_command_parallelize, threadpool_command_shutdown, }; @@ -123,19 +142,19 @@ struct PTHREADPOOL_CACHELINE_ALIGNED thread_info { * Index of the first element in the work range. * Before processing a new element the owning worker thread increments this value. */ - atomic_size_t range_start; + pthreadpool_atomic_size_t range_start; /** * Index of the element after the last element of the work range. * Before processing a new element the stealing worker thread decrements this value. */ - atomic_size_t range_end; + pthreadpool_atomic_size_t range_end; /** * The number of elements in the work range. * Due to race conditions range_length <= range_end - range_start. * The owning worker thread must decrement this value before incrementing @a range_start. * The stealing worker thread must decrement this value before decrementing @a range_end. */ - atomic_size_t range_length; + pthreadpool_atomic_size_t range_length; /** * Thread number in the 0..threads_count-1 range. */ @@ -153,11 +172,22 @@ struct PTHREADPOOL_CACHELINE_ALIGNED thread_info { PTHREADPOOL_STATIC_ASSERT(sizeof(struct thread_info) % PTHREADPOOL_CACHELINE_SIZE == 0, "thread_info structure must occupy an integer number of cache lines (64 bytes)"); +struct pthreadpool_1d_with_uarch_params { + /** + * Copy of the default uarch index argument passed to a microarchitecture-aware parallelization function. + */ + uint32_t default_uarch_index; + /** + * Copy of the max uarch index argument passed to a microarchitecture-aware parallelization function. + */ + uint32_t max_uarch_index; +}; + struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { /** * The number of threads that are processing an operation. */ - atomic_size_t active_threads; + pthreadpool_atomic_size_t active_threads; #if PTHREADPOOL_USE_FUTEX /** * Indicates if there are active threads. @@ -165,24 +195,35 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { * - has_active_threads == 0 if active_threads == 0 * - has_active_threads == 1 if active_threads != 0 */ - _Atomic uint32_t has_active_threads; + pthreadpool_atomic_uint32_t has_active_threads; #endif /** * The last command submitted to the thread pool. */ - _Atomic uint32_t command; + pthreadpool_atomic_uint32_t command; + /** + * The entry point function to call for each thread in the thread pool for parallelization tasks. + */ + pthreadpool_atomic_void_p thread_function; /** * The function to call for each item. */ - void *_Atomic task; + pthreadpool_atomic_void_p task; /** * The first argument to the item processing function. */ - void *_Atomic argument; + pthreadpool_atomic_void_p argument; + /** + * Additional parallelization parameters. + * These parameters are specific for each thread_function. + */ + union { + struct pthreadpool_1d_with_uarch_params parallelize_1d_with_uarch; + } params; /** - * Copy of the flags passed to parallelization function. + * Copy of the flags passed to a parallelization function. */ - _Atomic uint32_t flags; + pthreadpool_atomic_uint32_t flags; /** * Serializes concurrent calls to @a pthreadpool_parallelize_* from different threads. */ @@ -205,8 +246,14 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { */ pthread_cond_t command_condvar; #endif +#if PTHREADPOOL_USE_CPUINFO + /** + * Indication whether cpuinfo library initialized successfully. Never changes after pthreadpool_create. + */ + bool cpuinfo_is_initialized; +#endif /** - * The number of threads in the thread pool. Never changes after initialization. + * The number of threads in the thread pool. Never changes after pthreadpool_create. */ size_t threads_count; /** @@ -219,13 +266,13 @@ PTHREADPOOL_STATIC_ASSERT(sizeof(struct pthreadpool) % PTHREADPOOL_CACHELINE_SIZ static void checkin_worker_thread(struct pthreadpool* threadpool) { #if PTHREADPOOL_USE_FUTEX - if (atomic_fetch_sub_explicit(&threadpool->active_threads, 1, memory_order_relaxed) == 1) { - atomic_store_explicit(&threadpool->has_active_threads, 0, memory_order_release); + if (pthreadpool_fetch_sub_relaxed_size_t(&threadpool->active_threads, 1) == 1) { + pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 0); futex_wake_all(&threadpool->has_active_threads); } #else pthread_mutex_lock(&threadpool->completion_mutex); - if (atomic_fetch_sub_explicit(&threadpool->active_threads, 1, memory_order_relaxed) == 1) { + if (pthreadpool_fetch_sub_relaxed_size_t(&threadpool->active_threads, 1) == 1) { pthread_cond_signal(&threadpool->completion_condvar); } pthread_mutex_unlock(&threadpool->completion_mutex); @@ -235,12 +282,12 @@ static void checkin_worker_thread(struct pthreadpool* threadpool) { static void wait_worker_threads(struct pthreadpool* threadpool) { /* Initial check */ #if PTHREADPOOL_USE_FUTEX - uint32_t has_active_threads = atomic_load_explicit(&threadpool->has_active_threads, memory_order_relaxed); + uint32_t has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads); if (has_active_threads == 0) { return; } #else - size_t active_threads = atomic_load_explicit(&threadpool->active_threads, memory_order_relaxed); + size_t active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads); if (active_threads == 0) { return; } @@ -249,15 +296,15 @@ static void wait_worker_threads(struct pthreadpool* threadpool) { /* Spin-wait */ for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) { /* This fence serves as a sleep instruction */ - atomic_thread_fence(memory_order_acquire); + pthreadpool_fence_acquire(); #if PTHREADPOOL_USE_FUTEX - has_active_threads = atomic_load_explicit(&threadpool->has_active_threads, memory_order_relaxed); + has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads); if (has_active_threads == 0) { return; } #else - active_threads = atomic_load_explicit(&threadpool->active_threads, memory_order_relaxed); + active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads); if (active_threads == 0) { return; } @@ -266,31 +313,41 @@ static void wait_worker_threads(struct pthreadpool* threadpool) { /* Fall-back to mutex/futex wait */ #if PTHREADPOOL_USE_FUTEX - while ((has_active_threads = atomic_load(&threadpool->has_active_threads)) != 0) { + while ((has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads)) != 0) { futex_wait(&threadpool->has_active_threads, 1); } #else pthread_mutex_lock(&threadpool->completion_mutex); - while (atomic_load_explicit(&threadpool->active_threads, memory_order_relaxed) != 0) { + while (pthreadpool_load_relaxed_size_t(&threadpool->active_threads) != 0) { pthread_cond_wait(&threadpool->completion_condvar, &threadpool->completion_mutex); }; pthread_mutex_unlock(&threadpool->completion_mutex); #endif } -inline static bool atomic_decrement(atomic_size_t* value) { - size_t actual_value = atomic_load_explicit(value, memory_order_relaxed); - if (actual_value == 0) { - return false; - } - while (!atomic_compare_exchange_weak_explicit( - value, &actual_value, actual_value - 1, memory_order_relaxed, memory_order_relaxed)) - { +inline static bool atomic_decrement(pthreadpool_atomic_size_t* value) { + #if defined(__clang__) && (defined(__arm__) || defined(__aarch64__)) + size_t actual_value; + do { + actual_value = __builtin_arm_ldrex((const volatile size_t*) value); + if (actual_value == 0) { + __builtin_arm_clrex(); + return false; + } + } while (__builtin_arm_strex(actual_value - 1, (volatile size_t*) value) != 0); + return true; + #else + size_t actual_value = pthreadpool_load_relaxed_size_t(value); if (actual_value == 0) { return false; } - } - return true; + while (!pthreadpool_compare_exchange_weak_relaxed_size_t(value, &actual_value, actual_value - 1)) { + if (actual_value == 0) { + return false; + } + } + return true; + #endif } inline static size_t modulo_decrement(uint32_t i, uint32_t n) { @@ -302,11 +359,13 @@ inline static size_t modulo_decrement(uint32_t i, uint32_t n) { return i - 1; } +typedef void (*thread_function_t)(struct pthreadpool* threadpool, struct thread_info* thread); + static void thread_parallelize_1d(struct pthreadpool* threadpool, struct thread_info* thread) { - const pthreadpool_task_1d_t task = (pthreadpool_task_1d_t) atomic_load_explicit(&threadpool->task, memory_order_relaxed); - void *const argument = atomic_load_explicit(&threadpool->argument, memory_order_relaxed); + const pthreadpool_task_1d_t task = (pthreadpool_task_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task); + void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); /* Process thread's own range of items */ - size_t range_start = atomic_load_explicit(&thread->range_start, memory_order_relaxed); + size_t range_start = pthreadpool_load_relaxed_size_t(&thread->range_start); while (atomic_decrement(&thread->range_length)) { task(argument, range_start++); } @@ -320,53 +379,94 @@ static void thread_parallelize_1d(struct pthreadpool* threadpool, struct thread_ { struct thread_info* other_thread = &threadpool->threads[tid]; while (atomic_decrement(&other_thread->range_length)) { - const size_t item_id = atomic_fetch_sub_explicit(&other_thread->range_end, 1, memory_order_relaxed) - 1; + const size_t item_id = pthreadpool_fetch_sub_relaxed_size_t(&other_thread->range_end, 1) - 1; task(argument, item_id); } } - atomic_thread_fence(memory_order_release); + + /* Make changes by this thread visible to other threads */ + pthreadpool_fence_release(); +} + +static void thread_parallelize_1d_with_uarch(struct pthreadpool* threadpool, struct thread_info* thread) { + const pthreadpool_task_1d_with_id_t task = (pthreadpool_task_1d_with_id_t) pthreadpool_load_relaxed_void_p(&threadpool->task); + void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); + + const uint32_t default_uarch_index = threadpool->params.parallelize_1d_with_uarch.default_uarch_index; + uint32_t uarch_index = default_uarch_index; + #if PTHREADPOOL_USE_CPUINFO + if (threadpool && threadpool->cpuinfo_is_initialized) { + uarch_index = cpuinfo_get_current_uarch_index(); + if (uarch_index > threadpool->params.parallelize_1d_with_uarch.max_uarch_index) { + uarch_index = default_uarch_index; + } + } + #endif + + /* Process thread's own range of items */ + size_t range_start = pthreadpool_load_relaxed_size_t(&thread->range_start); + while (atomic_decrement(&thread->range_length)) { + task(argument, uarch_index, range_start++); + } + + /* There still may be other threads with work */ + const size_t thread_number = thread->thread_number; + const size_t threads_count = threadpool->threads_count; + for (size_t tid = modulo_decrement(thread_number, threads_count); + tid != thread_number; + tid = modulo_decrement(tid, threads_count)) + { + struct thread_info* other_thread = &threadpool->threads[tid]; + while (atomic_decrement(&other_thread->range_length)) { + const size_t item_id = pthreadpool_fetch_sub_relaxed_size_t(&other_thread->range_end, 1) - 1; + task(argument, uarch_index, item_id); + } + } + + /* Make changes by this thread visible to other threads */ + pthreadpool_fence_release(); } static uint32_t wait_for_new_command( struct pthreadpool* threadpool, - uint32_t last_command) + uint32_t last_command, + uint32_t last_flags) { - uint32_t command = atomic_load_explicit(&threadpool->command, memory_order_relaxed); + uint32_t command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); if (command != last_command) { - atomic_thread_fence(memory_order_acquire); return command; } - /* Spin-wait loop */ - for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) { - /* This fence serves as a sleep instruction */ - atomic_thread_fence(memory_order_acquire); + if ((last_flags & PTHREADPOOL_FLAG_YIELD_WORKERS) == 0) { + /* Spin-wait loop */ + for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) { + /* This fence serves as a sleep instruction */ + pthreadpool_fence_acquire(); - command = atomic_load_explicit(&threadpool->command, memory_order_relaxed); - if (command != last_command) { - atomic_thread_fence(memory_order_acquire); - return command; + command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); + if (command != last_command) { + return command; + } } } - /* Spin-wait timed out, fall back to mutex/futex wait */ + /* Spin-wait disabled or timed out, fall back to mutex/futex wait */ #if PTHREADPOOL_USE_FUTEX do { futex_wait(&threadpool->command, last_command); - command = atomic_load_explicit(&threadpool->command, memory_order_relaxed); + command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); } while (command == last_command); #else /* Lock the command mutex */ pthread_mutex_lock(&threadpool->command_mutex); /* Read the command */ - while ((command = atomic_load_explicit(&threadpool->command, memory_order_relaxed)) == last_command) { + while ((command = pthreadpool_load_relaxed_uint32_t(&threadpool->command)) == last_command) { /* Wait for new command */ pthread_cond_wait(&threadpool->command_condvar, &threadpool->command_mutex); } /* Read a new command */ pthread_mutex_unlock(&threadpool->command_mutex); #endif - atomic_thread_fence(memory_order_acquire); return command; } @@ -375,24 +475,30 @@ static void* thread_main(void* arg) { struct pthreadpool* threadpool = ((struct pthreadpool*) (thread - thread->thread_number)) - 1; uint32_t last_command = threadpool_command_init; struct fpu_state saved_fpu_state = { 0 }; + uint32_t flags = 0; /* Check in */ checkin_worker_thread(threadpool); /* Monitor new commands and act accordingly */ for (;;) { - uint32_t command = wait_for_new_command(threadpool, last_command); - const uint32_t flags = atomic_load_explicit(&threadpool->flags, memory_order_relaxed); + uint32_t command = wait_for_new_command(threadpool, last_command, flags); + pthreadpool_fence_acquire(); + + flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags); /* Process command */ switch (command & THREADPOOL_COMMAND_MASK) { - case threadpool_command_compute_1d: + case threadpool_command_parallelize: { + const thread_function_t thread_function = + (thread_function_t) pthreadpool_load_relaxed_void_p(&threadpool->thread_function); if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { saved_fpu_state = get_fpu_state(); disable_fpu_denormals(); } - thread_parallelize_1d(threadpool, thread); + + thread_function(threadpool, thread); if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { set_fpu_state(saved_fpu_state); } @@ -424,6 +530,11 @@ static struct pthreadpool* pthreadpool_allocate(size_t threads_count) { if (threadpool == NULL) { return NULL; } + #elif defined(_WIN32) + threadpool = _aligned_malloc(threadpool_size, PTHREADPOOL_CACHELINE_SIZE); + if (threadpool == NULL) { + return NULL; + } #else if (posix_memalign((void**) &threadpool, PTHREADPOOL_CACHELINE_SIZE, threadpool_size) != 0) { return NULL; @@ -434,13 +545,25 @@ static struct pthreadpool* pthreadpool_allocate(size_t threads_count) { } struct pthreadpool* pthreadpool_create(size_t threads_count) { -#if defined(__native_client__) - pthread_once(&nacl_init_guard, nacl_init); -#endif - if (threads_count == 0) { - threads_count = (size_t) sysconf(_SC_NPROCESSORS_ONLN); + #if defined(_SC_NPROCESSORS_ONLN) + threads_count = (size_t) sysconf(_SC_NPROCESSORS_ONLN); + #if defined(__EMSCRIPTEN_PTHREADS__) + /* Limit the number of threads to 8 to match link-time PTHREAD_POOL_SIZE option */ + if (threads_count >= 8) { + threads_count = 8; + } + #endif + #elif defined(_WIN32) + SYSTEM_INFO system_info; + ZeroMemory(&system_info, sizeof(system_info)); + GetSystemInfo(&system_info); + threads_count = (size_t) system_info.dwNumberOfProcessors; + #else + #error "Unsupported platform" + #endif } + struct pthreadpool* threadpool = pthreadpool_allocate(threads_count); if (threadpool == NULL) { return NULL; @@ -449,6 +572,9 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { for (size_t tid = 0; tid < threads_count; tid++) { threadpool->threads[tid].thread_number = tid; } + #if PTHREADPOOL_USE_CPUINFO + threadpool->cpuinfo_is_initialized = cpuinfo_initialize(); + #endif /* Thread pool with a single thread computes everything on the caller thread. */ if (threads_count > 1) { @@ -461,10 +587,9 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { #endif #if PTHREADPOOL_USE_FUTEX - atomic_store_explicit(&threadpool->has_active_threads, 1, memory_order_relaxed); + pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); #endif - atomic_store_explicit( - &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_release); + pthreadpool_store_release_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); /* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */ for (size_t tid = 1; tid < threads_count; tid++) { @@ -485,6 +610,114 @@ size_t pthreadpool_get_threads_count(struct pthreadpool* threadpool) { } } +static void pthreadpool_parallelize( + struct pthreadpool* threadpool, + thread_function_t thread_function, + const void* params, + size_t params_size, + void* task, + void* context, + size_t linear_range, + uint32_t flags) +{ + assert(threadpool != NULL); + assert(thread_function != NULL); + assert(task != NULL); + assert(linear_range > 1); + + /* Protect the global threadpool structures */ + pthread_mutex_lock(&threadpool->execution_mutex); + + #if !PTHREADPOOL_USE_FUTEX + /* Lock the command variables to ensure that threads don't start processing before they observe complete command with all arguments */ + pthread_mutex_lock(&threadpool->command_mutex); + #endif + + /* Setup global arguments */ + pthreadpool_store_relaxed_void_p(&threadpool->thread_function, (void*) thread_function); + pthreadpool_store_relaxed_void_p(&threadpool->task, task); + pthreadpool_store_relaxed_void_p(&threadpool->argument, context); + pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags); + + /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ + const size_t threads_count = threadpool->threads_count; + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); + #if PTHREADPOOL_USE_FUTEX + pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); + #endif + + if (params_size != 0) { + memcpy(&threadpool->params, params, params_size); + pthreadpool_fence_release(); + } + + /* Spread the work between threads */ + size_t range_start = 0; + for (size_t tid = 0; tid < threads_count; tid++) { + struct thread_info* thread = &threadpool->threads[tid]; + const size_t range_end = multiply_divide(linear_range, tid + 1, threads_count); + pthreadpool_store_relaxed_size_t(&thread->range_start, range_start); + pthreadpool_store_relaxed_size_t(&thread->range_end, range_end); + pthreadpool_store_relaxed_size_t(&thread->range_length, range_end - range_start); + + /* The next subrange starts where the previous ended */ + range_start = range_end; + } + + /* + * Update the threadpool command. + * Imporantly, do it after initializing command parameters (range, task, argument, flags) + * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask + * to ensure the unmasked command is different then the last command, because worker threads + * monitor for change in the unmasked command. + */ + const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); + const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_parallelize; + + /* + * Store the command with release semantics to guarantee that if a worker thread observes + * the new command value, it also observes the updated command parameters. + * + * Note: release semantics is necessary even with a conditional variable, because the workers might + * be waiting in a spin-loop rather than the conditional variable. + */ + pthreadpool_store_release_uint32_t(&threadpool->command, new_command); + #if PTHREADPOOL_USE_FUTEX + /* Wake up the threads */ + futex_wake_all(&threadpool->command); + #else + /* Unlock the command variables before waking up the threads for better performance */ + pthread_mutex_unlock(&threadpool->command_mutex); + + /* Wake up the threads */ + pthread_cond_broadcast(&threadpool->command_condvar); + #endif + + /* Save and modify FPU denormals control, if needed */ + struct fpu_state saved_fpu_state = { 0 }; + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + saved_fpu_state = get_fpu_state(); + disable_fpu_denormals(); + } + + /* Do computations as worker #0 */ + thread_function(threadpool, &threadpool->threads[0]); + + /* Restore FPU denormals control, if needed */ + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + set_fpu_state(saved_fpu_state); + } + + /* Wait until the threads finish computation */ + wait_worker_threads(threadpool); + + /* Make changes by other threads visible to this thread */ + pthreadpool_fence_acquire(); + + /* Unprotect the global threadpool structures */ + pthread_mutex_unlock(&threadpool->execution_mutex); +} + void pthreadpool_parallelize_1d( struct pthreadpool* threadpool, pthreadpool_task_1d_t task, @@ -492,7 +725,7 @@ void pthreadpool_parallelize_1d( size_t range, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count <= 1) { + if (threadpool == NULL || threadpool->threads_count <= 1 || range <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -506,92 +739,53 @@ void pthreadpool_parallelize_1d( set_fpu_state(saved_fpu_state); } } else { - /* Protect the global threadpool structures */ - pthread_mutex_lock(&threadpool->execution_mutex); - - #if !PTHREADPOOL_USE_FUTEX - /* Lock the command variables to ensure that threads don't start processing before they observe complete command with all arguments */ - pthread_mutex_lock(&threadpool->command_mutex); - #endif - - /* Setup global arguments */ - atomic_store_explicit(&threadpool->task, task, memory_order_relaxed); - atomic_store_explicit(&threadpool->argument, argument, memory_order_relaxed); - atomic_store_explicit(&threadpool->flags, flags, memory_order_relaxed); - - /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ - atomic_store_explicit( - &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_relaxed); - #if PTHREADPOOL_USE_FUTEX - atomic_store_explicit(&threadpool->has_active_threads, 1, memory_order_relaxed); - #endif - - /* Spread the work between threads */ - for (size_t tid = 0; tid < threadpool->threads_count; tid++) { - struct thread_info* thread = &threadpool->threads[tid]; - const size_t range_start = multiply_divide(range, tid, threadpool->threads_count); - const size_t range_end = multiply_divide(range, tid + 1, threadpool->threads_count); - atomic_store_explicit(&thread->range_start, range_start, memory_order_relaxed); - atomic_store_explicit(&thread->range_end, range_end, memory_order_relaxed); - atomic_store_explicit(&thread->range_length, range_end - range_start, memory_order_relaxed); - } - - #if PTHREADPOOL_USE_FUTEX - /* - * Make new command parameters globally visible. Having this fence before updating the command is imporatnt: it - * guarantees that if a worker thread observes new command value, it also observes the updated command parameters. - */ - atomic_thread_fence(memory_order_release); - #endif - - /* - * Update the threadpool command. - * Imporantly, do it after initializing command parameters (range, task, argument) - * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask - * to ensure the unmasked command is different then the last command, because worker threads - * monitor for change in the unmasked command. - */ - const uint32_t old_command = atomic_load_explicit(&threadpool->command, memory_order_relaxed); - const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_compute_1d; - - #if PTHREADPOOL_USE_FUTEX - atomic_store_explicit(&threadpool->command, new_command, memory_order_release); - - /* Wake up the threads */ - futex_wake_all(&threadpool->command); - #else - atomic_store_explicit(&threadpool->command, new_command, memory_order_relaxed); + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d, NULL, 0, + (void*) task, argument, range, flags); + } +} - /* Unlock the command variables before waking up the threads for better performance */ - pthread_mutex_unlock(&threadpool->command_mutex); +void pthreadpool_parallelize_1d_with_uarch( + pthreadpool_t threadpool, + pthreadpool_task_1d_with_id_t task, + void* argument, + uint32_t default_uarch_index, + uint32_t max_uarch_index, + size_t range, + uint32_t flags) +{ + if (threadpool == NULL || threadpool->threads_count <= 1 || range <= 1) { + /* No thread pool used: execute task sequentially on the calling thread */ - /* Wake up the threads */ - pthread_cond_broadcast(&threadpool->command_condvar); + uint32_t uarch_index = default_uarch_index; + #if PTHREADPOOL_USE_CPUINFO + if (threadpool && threadpool->cpuinfo_is_initialized) { + uarch_index = cpuinfo_get_current_uarch_index(); + if (uarch_index > max_uarch_index) { + uarch_index = default_uarch_index; + } + } #endif - /* Save and modify FPU denormals control, if needed */ struct fpu_state saved_fpu_state = { 0 }; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { saved_fpu_state = get_fpu_state(); disable_fpu_denormals(); } - - /* Do computations as worker #0 */ - thread_parallelize_1d(threadpool, &threadpool->threads[0]); - - /* Restore FPU denormals control, if needed */ + for (size_t i = 0; i < range; i++) { + task(argument, uarch_index, i); + } if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { set_fpu_state(saved_fpu_state); } - - /* Wait until the threads finish computation */ - wait_worker_threads(threadpool); - - /* Make changes by other threads visible to this thread */ - atomic_thread_fence(memory_order_acquire); - - /* Unprotect the global threadpool structures */ - pthread_mutex_unlock(&threadpool->execution_mutex); + } else { + const struct pthreadpool_1d_with_uarch_params params = { + .default_uarch_index = default_uarch_index, + .max_uarch_index = max_uarch_index, + }; + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d_with_uarch, ¶ms, sizeof(params), + task, argument, range, flags); } } @@ -617,7 +811,7 @@ void pthreadpool_parallelize_1d_tile_1d( size_t tile, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count <= 1) { + if (threadpool == NULL || threadpool->threads_count <= 1 || range <= tile) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -639,7 +833,9 @@ void pthreadpool_parallelize_1d_tile_1d( .range = range, .tile = tile }; - pthreadpool_parallelize_1d(threadpool, (pthreadpool_task_1d_t) compute_1d_tile_1d, &context, tile_range, flags); + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d, NULL, 0, + (void*) compute_1d_tile_1d, &context, tile_range, flags); } } @@ -663,7 +859,7 @@ void pthreadpool_parallelize_2d( size_t range_j, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count <= 1) { + if (threadpool == NULL || threadpool->threads_count <= 1 || (range_i | range_j) <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -685,7 +881,9 @@ void pthreadpool_parallelize_2d( .argument = argument, .range_j = fxdiv_init_size_t(range_j) }; - pthreadpool_parallelize_1d(threadpool, (pthreadpool_task_1d_t) compute_2d, &context, range_i * range_j, flags); + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d, NULL, 0, + (void*) compute_2d, &context, range_i * range_j, flags); } } @@ -717,7 +915,7 @@ void pthreadpool_parallelize_2d_tile_1d( size_t tile_j, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count <= 1) { + if (threadpool == NULL || threadpool->threads_count <= 1 || (range_i <= 1 && range_j <= tile_j)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -743,7 +941,9 @@ void pthreadpool_parallelize_2d_tile_1d( .range_j = range_j, .tile_j = tile_j }; - pthreadpool_parallelize_1d(threadpool, (pthreadpool_task_1d_t) compute_2d_tile_1d, &context, range_i * tile_range_j, flags); + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d, NULL, 0, + (void*) compute_2d_tile_1d, &context, range_i * tile_range_j, flags); } } @@ -779,7 +979,7 @@ void pthreadpool_parallelize_2d_tile_2d( size_t tile_j, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count <= 1) { + if (threadpool == NULL || threadpool->threads_count <= 1 || (range_i <= tile_i && range_j <= tile_j)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -807,7 +1007,92 @@ void pthreadpool_parallelize_2d_tile_2d( .tile_i = tile_i, .tile_j = tile_j }; - pthreadpool_parallelize_1d(threadpool, (pthreadpool_task_1d_t) compute_2d_tile_2d, &context, tile_range_i * tile_range_j, flags); + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d, NULL, 0, + (void*) compute_2d_tile_2d, &context, tile_range_i * tile_range_j, flags); + } +} + +struct compute_2d_tile_2d_with_uarch_context { + pthreadpool_task_2d_tile_2d_with_id_t task; + void* argument; + struct fxdiv_divisor_size_t tile_range_j; + size_t range_i; + size_t range_j; + size_t tile_i; + size_t tile_j; +}; + +static void compute_2d_tile_2d_with_uarch(const struct compute_2d_tile_2d_with_uarch_context* context, uint32_t uarch_index, size_t linear_index) { + const struct fxdiv_divisor_size_t tile_range_j = context->tile_range_j; + const struct fxdiv_result_size_t tile_index = fxdiv_divide_size_t(linear_index, tile_range_j); + const size_t max_tile_i = context->tile_i; + const size_t max_tile_j = context->tile_j; + const size_t index_i = tile_index.quotient * max_tile_i; + const size_t index_j = tile_index.remainder * max_tile_j; + const size_t tile_i = min(max_tile_i, context->range_i - index_i); + const size_t tile_j = min(max_tile_j, context->range_j - index_j); + context->task(context->argument, uarch_index, index_i, index_j, tile_i, tile_j); +} + +void pthreadpool_parallelize_2d_tile_2d_with_uarch( + pthreadpool_t threadpool, + pthreadpool_task_2d_tile_2d_with_id_t task, + void* argument, + uint32_t default_uarch_index, + uint32_t max_uarch_index, + size_t range_i, + size_t range_j, + size_t tile_i, + size_t tile_j, + uint32_t flags) +{ + if (threadpool == NULL || threadpool->threads_count <= 1 || (range_i <= tile_i && range_j <= tile_j)) { + /* No thread pool used: execute task sequentially on the calling thread */ + + uint32_t uarch_index = default_uarch_index; + #if PTHREADPOOL_USE_CPUINFO + if (threadpool && threadpool->cpuinfo_is_initialized) { + uarch_index = cpuinfo_get_current_uarch_index(); + if (uarch_index > max_uarch_index) { + uarch_index = default_uarch_index; + } + } + #endif + + struct fpu_state saved_fpu_state = { 0 }; + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + saved_fpu_state = get_fpu_state(); + disable_fpu_denormals(); + } + for (size_t i = 0; i < range_i; i += tile_i) { + for (size_t j = 0; j < range_j; j += tile_j) { + task(argument, uarch_index, i, j, min(range_i - i, tile_i), min(range_j - j, tile_j)); + } + } + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + set_fpu_state(saved_fpu_state); + } + } else { + /* Execute in parallel on the thread pool using linearized index */ + const size_t tile_range_i = divide_round_up(range_i, tile_i); + const size_t tile_range_j = divide_round_up(range_j, tile_j); + const struct pthreadpool_1d_with_uarch_params params = { + .default_uarch_index = default_uarch_index, + .max_uarch_index = max_uarch_index, + }; + struct compute_2d_tile_2d_with_uarch_context context = { + .task = task, + .argument = argument, + .tile_range_j = fxdiv_init_size_t(tile_range_j), + .range_i = range_i, + .range_j = range_j, + .tile_i = tile_i, + .tile_j = tile_j + }; + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d_with_uarch, ¶ms, sizeof(params), + (void*) compute_2d_tile_2d_with_uarch, &context, tile_range_i * tile_range_j, flags); } } @@ -848,7 +1133,7 @@ void pthreadpool_parallelize_3d_tile_2d( size_t tile_k, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count <= 1) { + if (threadpool == NULL || threadpool->threads_count <= 1 || (range_i <= 1 && range_j <= tile_j && range_k <= tile_k)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -879,9 +1164,100 @@ void pthreadpool_parallelize_3d_tile_2d( .tile_j = tile_j, .tile_k = tile_k }; - pthreadpool_parallelize_1d(threadpool, - (pthreadpool_task_1d_t) compute_3d_tile_2d, &context, - range_i * tile_range_j * tile_range_k, flags); + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d, NULL, 0, + (void*) compute_3d_tile_2d, &context, range_i * tile_range_j * tile_range_k, flags); + } +} + +struct compute_3d_tile_2d_with_uarch_context { + pthreadpool_task_3d_tile_2d_with_id_t task; + void* argument; + struct fxdiv_divisor_size_t tile_range_j; + struct fxdiv_divisor_size_t tile_range_k; + size_t range_j; + size_t range_k; + size_t tile_j; + size_t tile_k; +}; + +static void compute_3d_tile_2d_with_uarch(const struct compute_3d_tile_2d_with_uarch_context* context, uint32_t uarch_index, size_t linear_index) { + const struct fxdiv_divisor_size_t tile_range_k = context->tile_range_k; + const struct fxdiv_result_size_t tile_index_ij_k = fxdiv_divide_size_t(linear_index, tile_range_k); + const struct fxdiv_divisor_size_t tile_range_j = context->tile_range_j; + const struct fxdiv_result_size_t tile_index_i_j = fxdiv_divide_size_t(tile_index_ij_k.quotient, tile_range_j); + const size_t max_tile_j = context->tile_j; + const size_t max_tile_k = context->tile_k; + const size_t index_i = tile_index_i_j.quotient; + const size_t index_j = tile_index_i_j.remainder * max_tile_j; + const size_t index_k = tile_index_ij_k.remainder * max_tile_k; + const size_t tile_j = min(max_tile_j, context->range_j - index_j); + const size_t tile_k = min(max_tile_k, context->range_k - index_k); + context->task(context->argument, uarch_index, index_i, index_j, index_k, tile_j, tile_k); +} + +void pthreadpool_parallelize_3d_tile_2d_with_uarch( + pthreadpool_t threadpool, + pthreadpool_task_3d_tile_2d_with_id_t task, + void* argument, + uint32_t default_uarch_index, + uint32_t max_uarch_index, + size_t range_i, + size_t range_j, + size_t range_k, + size_t tile_j, + size_t tile_k, + uint32_t flags) +{ + if (threadpool == NULL || threadpool->threads_count <= 1 || (range_i <= 1 && range_j <= tile_j && range_k <= tile_k)) { + /* No thread pool used: execute task sequentially on the calling thread */ + + uint32_t uarch_index = default_uarch_index; + #if PTHREADPOOL_USE_CPUINFO + if (threadpool && threadpool->cpuinfo_is_initialized) { + uarch_index = cpuinfo_get_current_uarch_index(); + if (uarch_index > max_uarch_index) { + uarch_index = default_uarch_index; + } + } + #endif + + struct fpu_state saved_fpu_state = { 0 }; + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + saved_fpu_state = get_fpu_state(); + disable_fpu_denormals(); + } + for (size_t i = 0; i < range_i; i++) { + for (size_t j = 0; j < range_j; j += tile_j) { + for (size_t k = 0; k < range_k; k += tile_k) { + task(argument, uarch_index, i, j, k, min(range_j - j, tile_j), min(range_k - k, tile_k)); + } + } + } + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + set_fpu_state(saved_fpu_state); + } + } else { + /* Execute in parallel on the thread pool using linearized index */ + const size_t tile_range_j = divide_round_up(range_j, tile_j); + const size_t tile_range_k = divide_round_up(range_k, tile_k); + const struct pthreadpool_1d_with_uarch_params params = { + .default_uarch_index = default_uarch_index, + .max_uarch_index = max_uarch_index, + }; + struct compute_3d_tile_2d_with_uarch_context context = { + .task = task, + .argument = argument, + .tile_range_j = fxdiv_init_size_t(tile_range_j), + .tile_range_k = fxdiv_init_size_t(tile_range_k), + .range_j = range_j, + .range_k = range_k, + .tile_j = tile_j, + .tile_k = tile_k + }; + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d_with_uarch, ¶ms, sizeof(params), + (void*) compute_3d_tile_2d_with_uarch, &context, range_i * tile_range_j * tile_range_k, flags); } } @@ -927,7 +1303,7 @@ void pthreadpool_parallelize_4d_tile_2d( size_t tile_l, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count <= 1) { + if (threadpool == NULL || threadpool->threads_count <= 1 || ((range_i | range_j) <= 1 && range_k <= tile_k && range_l <= tile_l)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -962,9 +1338,109 @@ void pthreadpool_parallelize_4d_tile_2d( .tile_k = tile_k, .tile_l = tile_l }; - pthreadpool_parallelize_1d(threadpool, - (pthreadpool_task_1d_t) compute_4d_tile_2d, &context, - range_i * range_j * tile_range_k * tile_range_l, flags); + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d, NULL, 0, + (void*) compute_4d_tile_2d, &context, range_i * range_j * tile_range_k * tile_range_l, flags); + } +} + +struct compute_4d_tile_2d_with_uarch_context { + pthreadpool_task_4d_tile_2d_with_id_t task; + void* argument; + struct fxdiv_divisor_size_t tile_range_kl; + struct fxdiv_divisor_size_t range_j; + struct fxdiv_divisor_size_t tile_range_l; + size_t range_k; + size_t range_l; + size_t tile_k; + size_t tile_l; +}; + +static void compute_4d_tile_2d_with_uarch(const struct compute_4d_tile_2d_with_uarch_context* context, uint32_t uarch_index, size_t linear_index) { + const struct fxdiv_divisor_size_t tile_range_kl = context->tile_range_kl; + const struct fxdiv_result_size_t tile_index_ij_kl = fxdiv_divide_size_t(linear_index, tile_range_kl); + const struct fxdiv_divisor_size_t range_j = context->range_j; + const struct fxdiv_result_size_t tile_index_i_j = fxdiv_divide_size_t(tile_index_ij_kl.quotient, range_j); + const struct fxdiv_divisor_size_t tile_range_l = context->tile_range_l; + const struct fxdiv_result_size_t tile_index_k_l = fxdiv_divide_size_t(tile_index_ij_kl.remainder, tile_range_l); + const size_t max_tile_k = context->tile_k; + const size_t max_tile_l = context->tile_l; + const size_t index_i = tile_index_i_j.quotient; + const size_t index_j = tile_index_i_j.remainder; + const size_t index_k = tile_index_k_l.quotient * max_tile_k; + const size_t index_l = tile_index_k_l.remainder * max_tile_l; + const size_t tile_k = min(max_tile_k, context->range_k - index_k); + const size_t tile_l = min(max_tile_l, context->range_l - index_l); + context->task(context->argument, uarch_index, index_i, index_j, index_k, index_l, tile_k, tile_l); +} + +void pthreadpool_parallelize_4d_tile_2d_with_uarch( + pthreadpool_t threadpool, + pthreadpool_task_4d_tile_2d_with_id_t task, + void* argument, + uint32_t default_uarch_index, + uint32_t max_uarch_index, + size_t range_i, + size_t range_j, + size_t range_k, + size_t range_l, + size_t tile_k, + size_t tile_l, + uint32_t flags) +{ + if (threadpool == NULL || threadpool->threads_count <= 1 || ((range_i | range_j) <= 1 && range_k <= tile_k && range_l <= tile_l)) { + /* No thread pool used: execute task sequentially on the calling thread */ + + uint32_t uarch_index = default_uarch_index; + #if PTHREADPOOL_USE_CPUINFO + if (threadpool && threadpool->cpuinfo_is_initialized) { + uarch_index = cpuinfo_get_current_uarch_index(); + if (uarch_index > max_uarch_index) { + uarch_index = default_uarch_index; + } + } + #endif + + struct fpu_state saved_fpu_state = { 0 }; + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + saved_fpu_state = get_fpu_state(); + disable_fpu_denormals(); + } + for (size_t i = 0; i < range_i; i++) { + for (size_t j = 0; j < range_j; j++) { + for (size_t k = 0; k < range_k; k += tile_k) { + for (size_t l = 0; l < range_l; l += tile_l) { + task(argument, uarch_index, i, j, k, l, + min(range_k - k, tile_k), min(range_l - l, tile_l)); + } + } + } + } + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + set_fpu_state(saved_fpu_state); + } + } else { + /* Execute in parallel on the thread pool using linearized index */ + const size_t tile_range_k = divide_round_up(range_k, tile_k); + const size_t tile_range_l = divide_round_up(range_l, tile_l); + const struct pthreadpool_1d_with_uarch_params params = { + .default_uarch_index = default_uarch_index, + .max_uarch_index = max_uarch_index, + }; + struct compute_4d_tile_2d_with_uarch_context context = { + .task = task, + .argument = argument, + .tile_range_kl = fxdiv_init_size_t(tile_range_k * tile_range_l), + .range_j = fxdiv_init_size_t(range_j), + .tile_range_l = fxdiv_init_size_t(tile_range_l), + .range_k = range_k, + .range_l = range_l, + .tile_k = tile_k, + .tile_l = tile_l + }; + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d_with_uarch, ¶ms, sizeof(params), + (void*) compute_4d_tile_2d_with_uarch, &context, range_i * range_j * tile_range_k * tile_range_l, flags); } } @@ -1016,7 +1492,7 @@ void pthreadpool_parallelize_5d_tile_2d( size_t tile_m, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count <= 1) { + if (threadpool == NULL || threadpool->threads_count <= 1 || ((range_i | range_j | range_k) <= 1 && range_l <= tile_l && range_m <= tile_m)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -1054,9 +1530,9 @@ void pthreadpool_parallelize_5d_tile_2d( .tile_l = tile_l, .tile_m = tile_m, }; - pthreadpool_parallelize_1d(threadpool, - (pthreadpool_task_1d_t) compute_5d_tile_2d, &context, - range_i * range_j * range_k * tile_range_l * tile_range_m, flags); + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d, NULL, 0, + (void*) compute_5d_tile_2d, &context, range_i * range_j * range_k * tile_range_l * tile_range_m, flags); } } @@ -1113,7 +1589,7 @@ void pthreadpool_parallelize_6d_tile_2d( size_t tile_n, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count <= 1) { + if (threadpool == NULL || threadpool->threads_count <= 1 || ((range_i | range_j | range_k | range_l) <= 1 && range_m <= tile_m && range_n <= tile_n)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -1154,21 +1630,25 @@ void pthreadpool_parallelize_6d_tile_2d( .tile_m = tile_m, .tile_n = tile_n, }; - pthreadpool_parallelize_1d(threadpool, - (pthreadpool_task_1d_t) compute_6d_tile_2d, &context, - range_i * range_j * range_k * range_l * tile_range_m * tile_range_n, flags); + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d, NULL, 0, + (void*) compute_6d_tile_2d, &context, range_i * range_j * range_k * range_l * tile_range_m * tile_range_n, flags); } } void pthreadpool_destroy(struct pthreadpool* threadpool) { if (threadpool != NULL) { - if (threadpool->threads_count > 1) { + const size_t threads_count = threadpool->threads_count; + if (threads_count > 1) { #if PTHREADPOOL_USE_FUTEX - atomic_store_explicit( - &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_relaxed); - atomic_store_explicit(&threadpool->has_active_threads, 1, memory_order_release); + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); + pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); - atomic_store_explicit(&threadpool->command, threadpool_command_shutdown, memory_order_release); + /* + * Store the command with release semantics to guarantee that if a worker thread observes + * the new command value, it also observes the updated active_threads/has_active_threads values. + */ + pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown); /* Wake up worker threads */ futex_wake_all(&threadpool->command); @@ -1176,12 +1656,16 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) { /* Lock the command variable to ensure that threads don't shutdown until both command and active_threads are updated */ pthread_mutex_lock(&threadpool->command_mutex); - /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ - atomic_store_explicit( - &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_release); + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); - /* Update the threadpool command. */ - atomic_store_explicit(&threadpool->command, threadpool_command_shutdown, memory_order_release); + /* + * Store the command with release semantics to guarantee that if a worker thread observes + * the new command value, it also observes the updated active_threads value. + * + * Note: the release fence inside pthread_mutex_unlock is insufficient, + * because the workers might be waiting in a spin-loop rather than the conditional variable. + */ + pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown); /* Wake up worker threads */ pthread_cond_broadcast(&threadpool->command_condvar); @@ -1191,7 +1675,7 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) { #endif /* Wait until all threads return */ - for (size_t thread = 1; thread < threadpool->threads_count; thread++) { + for (size_t thread = 1; thread < threads_count; thread++) { pthread_join(threadpool->threads[thread].thread_object, NULL); } @@ -1204,6 +1688,15 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) { pthread_cond_destroy(&threadpool->command_condvar); #endif } - free(threadpool); + #if PTHREADPOOL_USE_CPUINFO + if (threadpool->cpuinfo_is_initialized) { + cpuinfo_deinitialize(); + } + #endif + #ifdef _WIN32 + _aligned_free(threadpool); + #else + free(threadpool); + #endif } } |