From ebd50d0cfa3664d454ffdf246fcd228c3b370a11 Mon Sep 17 00:00:00 2001 From: mattn Date: Mon, 2 Mar 2020 15:21:58 +0900 Subject: Build on Windows/mingw64 (#6) Support Windows/mingw64 build --- bench/latency.cc | 3 ++- src/threadpool-pthreads.c | 29 +++++++++++++++++++++++++++-- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/bench/latency.cc b/bench/latency.cc index f500cdf..e72ea49 100644 --- a/bench/latency.cc +++ b/bench/latency.cc @@ -4,9 +4,10 @@ #include +#include static void SetNumberOfThreads(benchmark::internal::Benchmark* benchmark) { - const int max_threads = sysconf(_SC_NPROCESSORS_ONLN); + const int max_threads = std::thread::hardware_concurrency(); for (int t = 1; t <= max_threads; t++) { benchmark->Arg(t); } diff --git a/src/threadpool-pthreads.c b/src/threadpool-pthreads.c index 6c6a6d4..2cb834d 100644 --- a/src/threadpool-pthreads.c +++ b/src/threadpool-pthreads.c @@ -31,6 +31,12 @@ #endif #endif +#ifdef _WIN32 +# define NOMINMAX +# include +# include +#endif + /* Dependencies */ #include @@ -424,6 +430,11 @@ static struct pthreadpool* pthreadpool_allocate(size_t threads_count) { if (threadpool == NULL) { return NULL; } + #elif defined(_WIN32) + threadpool = _aligned_malloc(threadpool_size, PTHREADPOOL_CACHELINE_SIZE); + if (threadpool == NULL) { + return NULL; + } #else if (posix_memalign((void**) &threadpool, PTHREADPOOL_CACHELINE_SIZE, threadpool_size) != 0) { return NULL; @@ -439,8 +450,18 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { #endif if (threads_count == 0) { - threads_count = (size_t) sysconf(_SC_NPROCESSORS_ONLN); + #if defined(_SC_NPROCESSORS_ONLN) + threads_count = (size_t) sysconf(_SC_NPROCESSORS_ONLN); + #elif defined(_WIN32) + SYSTEM_INFO system_info; + ZeroMemory(&system_info, sizeof(system_info)); + GetSystemInfo(&system_info); + threads_count = (size_t) system_info.dwNumberOfProcessors; + #else + #error "Unsupported platform" + #endif } + struct pthreadpool* threadpool = pthreadpool_allocate(threads_count); if (threadpool == NULL) { return NULL; @@ -1204,6 +1225,10 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) { pthread_cond_destroy(&threadpool->command_condvar); #endif } - free(threadpool); + #ifdef _WIN32 + _aligned_free(threadpool); + #else + free(threadpool); + #endif } } -- cgit v1.2.3 From defdd296d67a43b00b334c183edba09e992d5915 Mon Sep 17 00:00:00 2001 From: Marat Dukhan Date: Thu, 5 Mar 2020 13:22:16 -0800 Subject: Add high-contention test cases --- test/pthreadpool.cc | 236 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 236 insertions(+) diff --git a/test/pthreadpool.cc b/test/pthreadpool.cc index 4faf3be..b80d98d 100644 --- a/test/pthreadpool.cc +++ b/test/pthreadpool.cc @@ -274,6 +274,29 @@ TEST(Parallelize1D, MultiThreadPoolEachItemProcessedMultipleTimes) { } } +static void IncrementSame1D(std::atomic_int* num_processed_items, size_t i) { + num_processed_items->fetch_add(1, std::memory_order_relaxed); +} + +TEST(Parallelize1D, MultiThreadPoolHighContention) { + std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d( + threadpool.get(), + reinterpret_cast(IncrementSame1D), + static_cast(&num_processed_items), + kParallelize1DRange, + 0 /* flags */); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize1DRange); +} + static void WorkImbalance1D(std::atomic_int* num_processed_items, size_t i) { num_processed_items->fetch_add(1, std::memory_order_relaxed); if (i == 0) { @@ -545,6 +568,31 @@ TEST(Parallelize1DTile1D, MultiThreadPoolEachItemProcessedMultipleTimes) { } } +static void IncrementSame1DTile1D(std::atomic_int* num_processed_items, size_t start_i, size_t tile_i) { + for (size_t i = start_i; i < start_i + tile_i; i++) { + num_processed_items->fetch_add(1, std::memory_order_relaxed); + } +} + +TEST(Parallelize1DTile1D, MultiThreadPoolHighContention) { + std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_tile_1d( + threadpool.get(), + reinterpret_cast(IncrementSame1DTile1D), + static_cast(&num_processed_items), + kParallelize1DTile1DRange, kParallelize1DTile1DTile, + 0 /* flags */); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize1DTile1DRange); +} + static void WorkImbalance1DTile1D(std::atomic_int* num_processed_items, size_t start_i, size_t tile_i) { num_processed_items->fetch_add(tile_i, std::memory_order_relaxed); if (start_i == 0) { @@ -801,6 +849,29 @@ TEST(Parallelize2D, MultiThreadPoolEachItemProcessedMultipleTimes) { } } +static void IncrementSame2D(std::atomic_int* num_processed_items, size_t i, size_t j) { + num_processed_items->fetch_add(1, std::memory_order_relaxed); +} + +TEST(Parallelize2D, MultiThreadPoolHighContention) { + std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_2d( + threadpool.get(), + reinterpret_cast(IncrementSame2D), + static_cast(&num_processed_items), + kParallelize2DRangeI, kParallelize2DRangeJ, + 0 /* flags */); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize2DRangeI * kParallelize2DRangeJ); +} + static void WorkImbalance2D(std::atomic_int* num_processed_items, size_t i, size_t j) { num_processed_items->fetch_add(1, std::memory_order_relaxed); if (i == 0 && j == 0) { @@ -1097,6 +1168,31 @@ TEST(Parallelize2DTile1D, MultiThreadPoolEachItemProcessedMultipleTimes) { } } +static void IncrementSame2DTile1D(std::atomic_int* num_processed_items, size_t i, size_t start_j, size_t tile_j) { + for (size_t j = start_j; j < start_j + tile_j; j++) { + num_processed_items->fetch_add(1, std::memory_order_relaxed); + } +} + +TEST(Parallelize2DTile1D, MultiThreadPoolHighContention) { + std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_2d_tile_1d( + threadpool.get(), + reinterpret_cast(IncrementSame2DTile1D), + static_cast(&num_processed_items), + kParallelize2DTile1DRangeI, kParallelize2DTile1DRangeJ, kParallelize2DTile1DTileJ, + 0 /* flags */); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize2DTile1DRangeI * kParallelize2DTile1DRangeJ); +} + static void WorkImbalance2DTile1D(std::atomic_int* num_processed_items, size_t i, size_t start_j, size_t tile_j) { num_processed_items->fetch_add(tile_j, std::memory_order_relaxed); if (i == 0 && start_j == 0) { @@ -1415,6 +1511,34 @@ TEST(Parallelize2DTile2D, MultiThreadPoolEachItemProcessedMultipleTimes) { } } +static void IncrementSame2DTile2D(std::atomic_int* num_processed_items, size_t start_i, size_t start_j, size_t tile_i, size_t tile_j) { + for (size_t i = start_i; i < start_i + tile_i; i++) { + for (size_t j = start_j; j < start_j + tile_j; j++) { + num_processed_items->fetch_add(1, std::memory_order_relaxed); + } + } +} + +TEST(Parallelize2DTile2D, MultiThreadPoolHighContention) { + std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_2d_tile_2d( + threadpool.get(), + reinterpret_cast(IncrementSame2DTile2D), + static_cast(&num_processed_items), + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize2DTile2DRangeI * kParallelize2DTile2DRangeJ); +} + static void WorkImbalance2DTile2D(std::atomic_int* num_processed_items, size_t start_i, size_t start_j, size_t tile_i, size_t tile_j) { num_processed_items->fetch_add(tile_i * tile_j, std::memory_order_relaxed); if (start_i == 0 && start_j == 0) { @@ -1747,6 +1871,34 @@ TEST(Parallelize3DTile2D, MultiThreadPoolEachItemProcessedMultipleTimes) { } } +static void IncrementSame3DTile2D(std::atomic_int* num_processed_items, size_t i, size_t start_j, size_t start_k, size_t tile_j, size_t tile_k) { + for (size_t j = start_j; j < start_j + tile_j; j++) { + for (size_t k = start_k; k < start_k + tile_k; k++) { + num_processed_items->fetch_add(1, std::memory_order_relaxed); + } + } +} + +TEST(Parallelize3DTile2D, MultiThreadPoolHighContention) { + std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_3d_tile_2d( + threadpool.get(), + reinterpret_cast(IncrementSame3DTile2D), + static_cast(&num_processed_items), + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); +} + static void WorkImbalance3DTile2D(std::atomic_int* num_processed_items, size_t i, size_t start_j, size_t start_k, size_t tile_j, size_t tile_k) { num_processed_items->fetch_add(tile_j * tile_k, std::memory_order_relaxed); if (i == 0 && start_j == 0 && start_k == 0) { @@ -2092,6 +2244,34 @@ TEST(Parallelize4DTile2D, MultiThreadPoolEachItemProcessedMultipleTimes) { } } +static void IncrementSame4DTile2D(std::atomic_int* num_processed_items, size_t i, size_t j, size_t start_k, size_t start_l, size_t tile_k, size_t tile_l) { + for (size_t k = start_k; k < start_k + tile_k; k++) { + for (size_t l = start_l; l < start_l + tile_l; l++) { + num_processed_items->fetch_add(1, std::memory_order_relaxed); + } + } +} + +TEST(Parallelize4DTile2D, MultiThreadPoolHighContention) { + std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_4d_tile_2d( + threadpool.get(), + reinterpret_cast(IncrementSame4DTile2D), + static_cast(&num_processed_items), + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, + 0 /* flags */); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize4DTile2DRangeI * kParallelize4DTile2DRangeJ * kParallelize4DTile2DRangeK * kParallelize4DTile2DRangeL); +} + static void WorkImbalance4DTile2D(std::atomic_int* num_processed_items, size_t i, size_t j, size_t start_k, size_t start_l, size_t tile_k, size_t tile_l) { num_processed_items->fetch_add(tile_k * tile_l, std::memory_order_relaxed); if (i == 0 && j == 0 && start_k == 0 && start_l == 0) { @@ -2450,6 +2630,34 @@ TEST(Parallelize5DTile2D, MultiThreadPoolEachItemProcessedMultipleTimes) { } } +static void IncrementSame5DTile2D(std::atomic_int* num_processed_items, size_t i, size_t j, size_t k, size_t start_l, size_t start_m, size_t tile_l, size_t tile_m) { + for (size_t l = start_l; l < start_l + tile_l; l++) { + for (size_t m = start_m; m < start_m + tile_m; m++) { + num_processed_items->fetch_add(1, std::memory_order_relaxed); + } + } +} + +TEST(Parallelize5DTile2D, MultiThreadPoolHighContention) { + std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_5d_tile_2d( + threadpool.get(), + reinterpret_cast(IncrementSame5DTile2D), + static_cast(&num_processed_items), + kParallelize5DTile2DRangeI, kParallelize5DTile2DRangeJ, kParallelize5DTile2DRangeK, kParallelize5DTile2DRangeL, kParallelize5DTile2DRangeM, + kParallelize5DTile2DTileL, kParallelize5DTile2DTileM, + 0 /* flags */); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize5DTile2DRangeI * kParallelize5DTile2DRangeJ * kParallelize5DTile2DRangeK * kParallelize5DTile2DRangeL * kParallelize5DTile2DRangeM); +} + static void WorkImbalance5DTile2D(std::atomic_int* num_processed_items, size_t i, size_t j, size_t k, size_t start_l, size_t start_m, size_t tile_l, size_t tile_m) { num_processed_items->fetch_add(tile_l * tile_m, std::memory_order_relaxed); if (i == 0 && j == 0 && k == 0 && start_l == 0 && start_m == 0) { @@ -2821,6 +3029,34 @@ TEST(Parallelize6DTile2D, MultiThreadPoolEachItemProcessedMultipleTimes) { } } +static void IncrementSame6DTile2D(std::atomic_int* num_processed_items, size_t i, size_t j, size_t k, size_t l, size_t start_m, size_t start_n, size_t tile_m, size_t tile_n) { + for (size_t m = start_m; m < start_m + tile_m; m++) { + for (size_t n = start_n; n < start_n + tile_n; n++) { + num_processed_items->fetch_add(1, std::memory_order_relaxed); + } + } +} + +TEST(Parallelize6DTile2D, MultiThreadPoolHighContention) { + std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_6d_tile_2d( + threadpool.get(), + reinterpret_cast(IncrementSame6DTile2D), + static_cast(&num_processed_items), + kParallelize6DTile2DRangeI, kParallelize6DTile2DRangeJ, kParallelize6DTile2DRangeK, kParallelize6DTile2DRangeL, kParallelize6DTile2DRangeM, kParallelize6DTile2DRangeN, + kParallelize6DTile2DTileM, kParallelize6DTile2DTileN, + 0 /* flags */); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize6DTile2DRangeI * kParallelize6DTile2DRangeJ * kParallelize6DTile2DRangeK * kParallelize6DTile2DRangeL * kParallelize6DTile2DRangeM * kParallelize6DTile2DRangeN); +} + static void WorkImbalance6DTile2D(std::atomic_int* num_processed_items, size_t i, size_t j, size_t k, size_t l, size_t start_m, size_t start_n, size_t tile_m, size_t tile_n) { num_processed_items->fetch_add(tile_m * tile_n, std::memory_order_relaxed); if (i == 0 && j == 0 && k == 0 && l == 0 && start_m == 0 && start_n == 0) { -- cgit v1.2.3 From 00108cf5c96fba21da3ea2836ad129dd3839eaed Mon Sep 17 00:00:00 2001 From: Marat Dukhan Date: Thu, 5 Mar 2020 13:30:44 -0800 Subject: Minor cleanup --- bench/latency.cc | 2 -- src/threadpool-pthreads.c | 8 ++++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/bench/latency.cc b/bench/latency.cc index e72ea49..4fb59ee 100644 --- a/bench/latency.cc +++ b/bench/latency.cc @@ -1,7 +1,5 @@ #include -#include - #include #include diff --git a/src/threadpool-pthreads.c b/src/threadpool-pthreads.c index 2cb834d..4bbf427 100644 --- a/src/threadpool-pthreads.c +++ b/src/threadpool-pthreads.c @@ -32,9 +32,9 @@ #endif #ifdef _WIN32 -# define NOMINMAX -# include -# include + #define NOMINMAX + #include + #include #endif /* Dependencies */ @@ -559,7 +559,7 @@ void pthreadpool_parallelize_1d( #if PTHREADPOOL_USE_FUTEX /* - * Make new command parameters globally visible. Having this fence before updating the command is imporatnt: it + * Make new command parameters globally visible. Having this fence before updating the command is important: it * guarantees that if a worker thread observes new command value, it also observes the updated command parameters. */ atomic_thread_fence(memory_order_release); -- cgit v1.2.3 From efa3c028d0f7b0414f0738a706f8187430425477 Mon Sep 17 00:00:00 2001 From: Marat Dukhan Date: Thu, 5 Mar 2020 13:34:50 -0800 Subject: Document PTHREADPOOL_FLAG_DISABLE_DENORMALS --- include/pthreadpool.h | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/include/pthreadpool.h b/include/pthreadpool.h index 2443285..d15bf3f 100644 --- a/include/pthreadpool.h +++ b/include/pthreadpool.h @@ -17,6 +17,19 @@ typedef void (*pthreadpool_task_5d_tile_2d_t)(void*, size_t, size_t, size_t, siz typedef void (*pthreadpool_task_6d_tile_2d_t)(void*, size_t, size_t, size_t, size_t, size_t, size_t, size_t, size_t); +/** + * Disable support for denormalized numbers to the maximum extent possible for the duration of the computation. + * + * Handling denormalized floating-point numbers is often implemented in microcode, and incurs significant performance + * degradation. This hint instructs the thread pool to disable support for denormalized numbers before running the + * computation by manipulating architecture-specific control registers, and restore the initial value of control + * registers after the computation is complete. The thread pool temporary disables denormalized numbers on all threads + * involved in the computation (i.e. the caller threads, and potentially worker threads). + * + * Disabling denormalized numbers may have a small negative effect on results' accuracy. As various architectures differ + * in capabilities to control processing of denormalized numbers, using this flag may also hurt results' reproducibility + * across different instruction set architectures. + */ #define PTHREADPOOL_FLAG_DISABLE_DENORMALS 0x00000001 #ifdef __cplusplus -- cgit v1.2.3 From ef23a4af82e34a4785c56b5fbae631efbcb77aea Mon Sep 17 00:00:00 2001 From: Marat Dukhan Date: Thu, 5 Mar 2020 13:38:29 -0800 Subject: PTHREADPOOL_FLAG_YIELD_WORKERS flag to bypass spin-wait Makes it possible to signal the last operation in a sequence of computations, so pthreadpool workers don't spin in vain. --- include/pthreadpool.h | 9 +++++++++ src/threadpool-pthreads.c | 28 ++++++++++++++++------------ 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/include/pthreadpool.h b/include/pthreadpool.h index d15bf3f..30471e6 100644 --- a/include/pthreadpool.h +++ b/include/pthreadpool.h @@ -32,6 +32,15 @@ typedef void (*pthreadpool_task_6d_tile_2d_t)(void*, size_t, size_t, size_t, siz */ #define PTHREADPOOL_FLAG_DISABLE_DENORMALS 0x00000001 +/** + * Yield worker threads to the system scheduler after the operation is finished. + * + * Force workers to use kernel wait (instead of active spin-wait by default) for new commands after this command is + * processed. This flag affects only the immediate next operation on this thread pool. To make the thread pool always + * use kernel wait, pass this flag to all parallelization functions. + */ +#define PTHREADPOOL_FLAG_YIELD_WORKERS 0x00000002 + #ifdef __cplusplus extern "C" { #endif diff --git a/src/threadpool-pthreads.c b/src/threadpool-pthreads.c index 4bbf427..955d0b9 100644 --- a/src/threadpool-pthreads.c +++ b/src/threadpool-pthreads.c @@ -335,7 +335,8 @@ static void thread_parallelize_1d(struct pthreadpool* threadpool, struct thread_ static uint32_t wait_for_new_command( struct pthreadpool* threadpool, - uint32_t last_command) + uint32_t last_command, + uint32_t last_flags) { uint32_t command = atomic_load_explicit(&threadpool->command, memory_order_relaxed); if (command != last_command) { @@ -343,19 +344,21 @@ static uint32_t wait_for_new_command( return command; } - /* Spin-wait loop */ - for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) { - /* This fence serves as a sleep instruction */ - atomic_thread_fence(memory_order_acquire); - - command = atomic_load_explicit(&threadpool->command, memory_order_relaxed); - if (command != last_command) { + if ((last_flags & PTHREADPOOL_FLAG_YIELD_WORKERS) == 0) { + /* Spin-wait loop */ + for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) { + /* This fence serves as a sleep instruction */ atomic_thread_fence(memory_order_acquire); - return command; + + command = atomic_load_explicit(&threadpool->command, memory_order_relaxed); + if (command != last_command) { + atomic_thread_fence(memory_order_acquire); + return command; + } } } - /* Spin-wait timed out, fall back to mutex/futex wait */ + /* Spin-wait disabled or timed out, fall back to mutex/futex wait */ #if PTHREADPOOL_USE_FUTEX do { futex_wait(&threadpool->command, last_command); @@ -381,14 +384,15 @@ static void* thread_main(void* arg) { struct pthreadpool* threadpool = ((struct pthreadpool*) (thread - thread->thread_number)) - 1; uint32_t last_command = threadpool_command_init; struct fpu_state saved_fpu_state = { 0 }; + uint32_t flags = 0; /* Check in */ checkin_worker_thread(threadpool); /* Monitor new commands and act accordingly */ for (;;) { - uint32_t command = wait_for_new_command(threadpool, last_command); - const uint32_t flags = atomic_load_explicit(&threadpool->flags, memory_order_relaxed); + uint32_t command = wait_for_new_command(threadpool, last_command, flags); + flags = atomic_load_explicit(&threadpool->flags, memory_order_relaxed); /* Process command */ switch (command & THREADPOOL_COMMAND_MASK) { -- cgit v1.2.3 From 39e0461712bc666abd5f82f22cab46f0dcb502e7 Mon Sep 17 00:00:00 2001 From: Marat Dukhan Date: Thu, 5 Mar 2020 14:33:12 -0800 Subject: Remove Native Client support --- README.md | 2 +- src/threadpool-pthreads.c | 22 ---------------------- 2 files changed, 1 insertion(+), 23 deletions(-) diff --git a/README.md b/README.md index 3faafaa..b92bea0 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ It provides similar functionality to `#pragma omp parallel for`, but with additi * Run on user-specified or auto-detected number of threads. * Work-stealing scheduling for efficient work balancing. * Wait-free synchronization of work items. -* Compatible with Linux (including Android), macOS, iOS, Emscripten, Native Client environments. +* Compatible with Linux (including Android), macOS, iOS, Emscripten environments. * 100% unit tests coverage. * Throughput and latency microbenchmarks. diff --git a/src/threadpool-pthreads.c b/src/threadpool-pthreads.c index 955d0b9..ddc67bd 100644 --- a/src/threadpool-pthreads.c +++ b/src/threadpool-pthreads.c @@ -23,9 +23,6 @@ #ifndef FUTEX_PRIVATE_FLAG #define FUTEX_PRIVATE_FLAG 128 #endif - #elif defined(__native_client__) - #define PTHREADPOOL_USE_FUTEX 1 - #include #else #define PTHREADPOOL_USE_FUTEX 0 #endif @@ -96,21 +93,6 @@ static inline size_t min(size_t a, size_t b) { static int futex_wake_all(_Atomic uint32_t* address) { return syscall(SYS_futex, address, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT_MAX); } - #elif defined(__native_client__) - static struct nacl_irt_futex nacl_irt_futex = { 0 }; - static pthread_once_t nacl_init_guard = PTHREAD_ONCE_INIT; - static void nacl_init(void) { - nacl_interface_query(NACL_IRT_FUTEX_v0_1, &nacl_irt_futex, sizeof(nacl_irt_futex)); - } - - static int futex_wait(_Atomic uint32_t* address, uint32_t value) { - return nacl_irt_futex.futex_wait_abs((_Atomic int*) address, (int) value, NULL); - } - - static int futex_wake_all(_Atomic uint32_t* address) { - int count; - return nacl_irt_futex.futex_wake((_Atomic int*) address, INT_MAX, &count); - } #else #error "Platform-specific implementation of futex_wait and futex_wake_all required" #endif @@ -449,10 +431,6 @@ static struct pthreadpool* pthreadpool_allocate(size_t threads_count) { } struct pthreadpool* pthreadpool_create(size_t threads_count) { -#if defined(__native_client__) - pthread_once(&nacl_init_guard, nacl_init); -#endif - if (threads_count == 0) { #if defined(_SC_NPROCESSORS_ONLN) threads_count = (size_t) sysconf(_SC_NPROCESSORS_ONLN); -- cgit v1.2.3 From 2bde094c58c0fcde46b537b538f8bbaef7ef9b16 Mon Sep 17 00:00:00 2001 From: Marat Dukhan Date: Mon, 23 Mar 2020 00:59:45 -0700 Subject: Avoid spinning thread-pool when task has the only item --- src/threadpool-pthreads.c | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/threadpool-pthreads.c b/src/threadpool-pthreads.c index ddc67bd..fd67f2f 100644 --- a/src/threadpool-pthreads.c +++ b/src/threadpool-pthreads.c @@ -495,7 +495,7 @@ void pthreadpool_parallelize_1d( size_t range, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count <= 1) { + if (threadpool == NULL || threadpool->threads_count <= 1 || range <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -620,7 +620,7 @@ void pthreadpool_parallelize_1d_tile_1d( size_t tile, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count <= 1) { + if (threadpool == NULL || threadpool->threads_count <= 1 || range <= tile) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -666,7 +666,7 @@ void pthreadpool_parallelize_2d( size_t range_j, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count <= 1) { + if (threadpool == NULL || threadpool->threads_count <= 1 || (range_i | range_j) <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -720,7 +720,7 @@ void pthreadpool_parallelize_2d_tile_1d( size_t tile_j, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count <= 1) { + if (threadpool == NULL || threadpool->threads_count <= 1 || (range_i <= 1 && range_j <= tile_j)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -782,7 +782,7 @@ void pthreadpool_parallelize_2d_tile_2d( size_t tile_j, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count <= 1) { + if (threadpool == NULL || threadpool->threads_count <= 1 || (range_i <= tile_i && range_j <= tile_j)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -851,7 +851,7 @@ void pthreadpool_parallelize_3d_tile_2d( size_t tile_k, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count <= 1) { + if (threadpool == NULL || threadpool->threads_count <= 1 || (range_i <= 1 && range_j <= tile_j && range_k <= tile_k)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -930,7 +930,7 @@ void pthreadpool_parallelize_4d_tile_2d( size_t tile_l, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count <= 1) { + if (threadpool == NULL || threadpool->threads_count <= 1 || ((range_i | range_j) <= 1 && range_k <= tile_k && range_l <= tile_l)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -1019,7 +1019,7 @@ void pthreadpool_parallelize_5d_tile_2d( size_t tile_m, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count <= 1) { + if (threadpool == NULL || threadpool->threads_count <= 1 || ((range_i | range_j | range_k) <= 1 && range_l <= tile_l && range_m <= tile_m)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -1116,7 +1116,7 @@ void pthreadpool_parallelize_6d_tile_2d( size_t tile_n, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count <= 1) { + if (threadpool == NULL || threadpool->threads_count <= 1 || ((range_i | range_j | range_k | range_l) <= 1 && range_m <= tile_m && range_n <= tile_n)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { -- cgit v1.2.3 From 75294cea82d1a0e801dc64ef4d8c50e6b4b387ea Mon Sep 17 00:00:00 2001 From: Marat Dukhan Date: Mon, 23 Mar 2020 01:09:29 -0700 Subject: Simplify parallel task initialization --- src/threadpool-pthreads.c | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/threadpool-pthreads.c b/src/threadpool-pthreads.c index fd67f2f..ba508b9 100644 --- a/src/threadpool-pthreads.c +++ b/src/threadpool-pthreads.c @@ -523,20 +523,24 @@ void pthreadpool_parallelize_1d( atomic_store_explicit(&threadpool->flags, flags, memory_order_relaxed); /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ + const size_t threads_count = threadpool->threads_count; atomic_store_explicit( - &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_relaxed); + &threadpool->active_threads, threads_count - 1 /* caller thread */, memory_order_relaxed); #if PTHREADPOOL_USE_FUTEX atomic_store_explicit(&threadpool->has_active_threads, 1, memory_order_relaxed); #endif /* Spread the work between threads */ - for (size_t tid = 0; tid < threadpool->threads_count; tid++) { + size_t range_start = 0; + for (size_t tid = 0; tid < threads_count; tid++) { struct thread_info* thread = &threadpool->threads[tid]; - const size_t range_start = multiply_divide(range, tid, threadpool->threads_count); - const size_t range_end = multiply_divide(range, tid + 1, threadpool->threads_count); + const size_t range_end = multiply_divide(range, tid + 1, threads_count); atomic_store_explicit(&thread->range_start, range_start, memory_order_relaxed); atomic_store_explicit(&thread->range_end, range_end, memory_order_relaxed); atomic_store_explicit(&thread->range_length, range_end - range_start, memory_order_relaxed); + + /* The next range starts where the previous ended */ + range_start = range_end; } #if PTHREADPOOL_USE_FUTEX -- cgit v1.2.3 From 0f57821e68a56b4ba78fd7f4c7d1a494286497aa Mon Sep 17 00:00:00 2001 From: Marat Dukhan Date: Mon, 23 Mar 2020 01:19:32 -0700 Subject: Remove redundant barriers --- src/threadpool-pthreads.c | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/src/threadpool-pthreads.c b/src/threadpool-pthreads.c index ba508b9..96aa72a 100644 --- a/src/threadpool-pthreads.c +++ b/src/threadpool-pthreads.c @@ -208,7 +208,7 @@ PTHREADPOOL_STATIC_ASSERT(sizeof(struct pthreadpool) % PTHREADPOOL_CACHELINE_SIZ static void checkin_worker_thread(struct pthreadpool* threadpool) { #if PTHREADPOOL_USE_FUTEX if (atomic_fetch_sub_explicit(&threadpool->active_threads, 1, memory_order_relaxed) == 1) { - atomic_store_explicit(&threadpool->has_active_threads, 0, memory_order_release); + atomic_store_explicit(&threadpool->has_active_threads, 0, memory_order_relaxed); futex_wake_all(&threadpool->has_active_threads); } #else @@ -312,6 +312,8 @@ static void thread_parallelize_1d(struct pthreadpool* threadpool, struct thread_ task(argument, item_id); } } + + /* Make changes by this thread visible to other threads */ atomic_thread_fence(memory_order_release); } @@ -322,7 +324,6 @@ static uint32_t wait_for_new_command( { uint32_t command = atomic_load_explicit(&threadpool->command, memory_order_relaxed); if (command != last_command) { - atomic_thread_fence(memory_order_acquire); return command; } @@ -334,7 +335,6 @@ static uint32_t wait_for_new_command( command = atomic_load_explicit(&threadpool->command, memory_order_relaxed); if (command != last_command) { - atomic_thread_fence(memory_order_acquire); return command; } } @@ -357,7 +357,6 @@ static uint32_t wait_for_new_command( /* Read a new command */ pthread_mutex_unlock(&threadpool->command_mutex); #endif - atomic_thread_fence(memory_order_acquire); return command; } @@ -374,6 +373,8 @@ static void* thread_main(void* arg) { /* Monitor new commands and act accordingly */ for (;;) { uint32_t command = wait_for_new_command(threadpool, last_command, flags); + atomic_thread_fence(memory_order_acquire); + flags = atomic_load_explicit(&threadpool->flags, memory_order_relaxed); /* Process command */ @@ -543,14 +544,6 @@ void pthreadpool_parallelize_1d( range_start = range_end; } - #if PTHREADPOOL_USE_FUTEX - /* - * Make new command parameters globally visible. Having this fence before updating the command is important: it - * guarantees that if a worker thread observes new command value, it also observes the updated command parameters. - */ - atomic_thread_fence(memory_order_release); - #endif - /* * Update the threadpool command. * Imporantly, do it after initializing command parameters (range, task, argument) @@ -562,11 +555,16 @@ void pthreadpool_parallelize_1d( const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_compute_1d; #if PTHREADPOOL_USE_FUTEX + /* + * Store the command with release semantics to guarantee that if a worker thread observes + * the new command value, it also observes the updated command parameters. + */ atomic_store_explicit(&threadpool->command, new_command, memory_order_release); /* Wake up the threads */ futex_wake_all(&threadpool->command); #else + /* Relaxed semantics because pthread_mutex_unlock acts as a release fence */ atomic_store_explicit(&threadpool->command, new_command, memory_order_relaxed); /* Unlock the command variables before waking up the threads for better performance */ @@ -1173,7 +1171,7 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) { #if PTHREADPOOL_USE_FUTEX atomic_store_explicit( &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_relaxed); - atomic_store_explicit(&threadpool->has_active_threads, 1, memory_order_release); + atomic_store_explicit(&threadpool->has_active_threads, 1, memory_order_relaxed); atomic_store_explicit(&threadpool->command, threadpool_command_shutdown, memory_order_release); @@ -1185,10 +1183,10 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) { /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ atomic_store_explicit( - &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_release); + &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_relaxed); /* Update the threadpool command. */ - atomic_store_explicit(&threadpool->command, threadpool_command_shutdown, memory_order_release); + atomic_store_explicit(&threadpool->command, threadpool_command_shutdown, memory_order_relaxed); /* Wake up worker threads */ pthread_cond_broadcast(&threadpool->command_condvar); -- cgit v1.2.3 From cb207d8b505e495d578bf6b9fe538b7f291a33a2 Mon Sep 17 00:00:00 2001 From: Marat Dukhan Date: Mon, 23 Mar 2020 01:35:09 -0700 Subject: Support WebAssembly+Threads build - Abstract away atomic operations and data type from the source file - Polyfill atomic operations for Clang targeting WAsm+Threads - Set Emscripten link options for WebAssembly+Threads builds --- src/threadpool-atomics.h | 178 ++++++++++++++++++++++++++++++++++++++++++++++ src/threadpool-pthreads.c | 125 ++++++++++++++++---------------- 2 files changed, 240 insertions(+), 63 deletions(-) create mode 100644 src/threadpool-atomics.h diff --git a/src/threadpool-atomics.h b/src/threadpool-atomics.h new file mode 100644 index 0000000..92fcd8d --- /dev/null +++ b/src/threadpool-atomics.h @@ -0,0 +1,178 @@ +#pragma once + +#include +#include +#include + +#if defined(__wasm__) && defined(__EMSCRIPTEN_PTHREADS__) && defined(__clang__) + /* + * Clang for WebAssembly target lacks stdatomic.h header, + * even though it supports the necessary low-level intrinsics. + * Thus, we implement pthreadpool atomic functions on top of + * low-level Clang-specific interfaces for this target. + */ + + typedef _Atomic(uint32_t) pthreadpool_atomic_uint32_t; + typedef _Atomic(size_t) pthreadpool_atomic_size_t; + typedef _Atomic(void*) pthreadpool_atomic_void_p; + + static inline uint32_t pthreadpool_load_relaxed_uint32_t( + pthreadpool_atomic_uint32_t* address) + { + return __c11_atomic_load(address, __ATOMIC_RELAXED); + } + + static inline size_t pthreadpool_load_relaxed_size_t( + pthreadpool_atomic_size_t* address) + { + return __c11_atomic_load(address, __ATOMIC_RELAXED); + } + + static inline void* pthreadpool_load_relaxed_void_p( + pthreadpool_atomic_void_p* address) + { + return __c11_atomic_load(address, __ATOMIC_RELAXED); + } + + static inline void pthreadpool_store_relaxed_uint32_t( + pthreadpool_atomic_uint32_t* address, + uint32_t value) + { + __c11_atomic_store(address, value, __ATOMIC_RELAXED); + } + + static inline void pthreadpool_store_relaxed_size_t( + pthreadpool_atomic_size_t* address, + size_t value) + { + __c11_atomic_store(address, value, __ATOMIC_RELAXED); + } + + static inline void pthreadpool_store_relaxed_void_p( + pthreadpool_atomic_void_p* address, + void* value) + { + __c11_atomic_store(address, value, __ATOMIC_RELAXED); + } + + static inline void pthreadpool_store_release_uint32_t( + pthreadpool_atomic_uint32_t* address, + uint32_t value) + { + __c11_atomic_store(address, value, __ATOMIC_RELEASE); + } + + static inline void pthreadpool_store_release_size_t( + pthreadpool_atomic_size_t* address, + size_t value) + { + __c11_atomic_store(address, value, __ATOMIC_RELEASE); + } + + static inline uint32_t pthreadpool_fetch_sub_relaxed_size_t( + pthreadpool_atomic_size_t* address, + uint32_t decrement) + { + return __c11_atomic_fetch_sub(address, decrement, __ATOMIC_RELAXED); + } + + static inline bool pthreadpool_compare_exchange_weak_relaxed_size_t( + pthreadpool_atomic_size_t* address, + size_t* expected_value, + size_t new_value) + { + return __c11_atomic_compare_exchange_weak( + address, expected_value, new_value, __ATOMIC_RELAXED, __ATOMIC_RELAXED); + } + + static inline void pthreadpool_fence_acquire() { + __c11_atomic_thread_fence(__ATOMIC_ACQUIRE); + } + + static inline void pthreadpool_fence_release() { + __c11_atomic_thread_fence(__ATOMIC_RELEASE); + } +#else + #include + + typedef _Atomic(uint32_t) pthreadpool_atomic_uint32_t; + typedef _Atomic(size_t) pthreadpool_atomic_size_t; + typedef _Atomic(void*) pthreadpool_atomic_void_p; + + static inline uint32_t pthreadpool_load_relaxed_uint32_t( + pthreadpool_atomic_uint32_t* address) + { + return atomic_load_explicit(address, memory_order_relaxed); + } + + static inline size_t pthreadpool_load_relaxed_size_t( + pthreadpool_atomic_size_t* address) + { + return atomic_load_explicit(address, memory_order_relaxed); + } + + static inline void* pthreadpool_load_relaxed_void_p( + pthreadpool_atomic_void_p* address) + { + return atomic_load_explicit(address, memory_order_relaxed); + } + + static inline void pthreadpool_store_relaxed_uint32_t( + pthreadpool_atomic_uint32_t* address, + uint32_t value) + { + atomic_store_explicit(address, value, memory_order_relaxed); + } + + static inline void pthreadpool_store_relaxed_size_t( + pthreadpool_atomic_size_t* address, + size_t value) + { + atomic_store_explicit(address, value, memory_order_relaxed); + } + + static inline void pthreadpool_store_relaxed_void_p( + pthreadpool_atomic_void_p* address, + void* value) + { + atomic_store_explicit(address, value, memory_order_relaxed); + } + + static inline void pthreadpool_store_release_uint32_t( + pthreadpool_atomic_uint32_t* address, + uint32_t value) + { + atomic_store_explicit(address, value, memory_order_release); + } + + static inline void pthreadpool_store_release_size_t( + pthreadpool_atomic_size_t* address, + size_t value) + { + atomic_store_explicit(address, value, memory_order_release); + } + + static inline uint32_t pthreadpool_fetch_sub_relaxed_size_t( + pthreadpool_atomic_size_t* address, + uint32_t decrement) + { + return atomic_fetch_sub_explicit(address, decrement, memory_order_relaxed); + } + + static inline bool pthreadpool_compare_exchange_weak_relaxed_size_t( + pthreadpool_atomic_size_t* address, + size_t* expected_value, + size_t new_value) + { + return atomic_compare_exchange_weak_explicit( + address, expected_value, new_value, memory_order_relaxed, memory_order_relaxed); + } + + static inline void pthreadpool_fence_acquire() { + atomic_thread_fence(memory_order_acquire); + } + + static inline void pthreadpool_fence_release() { + atomic_thread_fence(memory_order_release); + } +#endif diff --git a/src/threadpool-pthreads.c b/src/threadpool-pthreads.c index 96aa72a..7cc190d 100644 --- a/src/threadpool-pthreads.c +++ b/src/threadpool-pthreads.c @@ -1,5 +1,4 @@ /* Standard C headers */ -#include #include #include #include @@ -42,6 +41,7 @@ /* Internal headers */ #include "threadpool-utils.h" +#include "threadpool-atomics.h" /* Number of iterations in spin-wait loop before going into futex/mutex wait */ #define PTHREADPOOL_SPIN_WAIT_ITERATIONS 1000000 @@ -86,11 +86,11 @@ static inline size_t min(size_t a, size_t b) { #if PTHREADPOOL_USE_FUTEX #if defined(__linux__) - static int futex_wait(_Atomic uint32_t* address, uint32_t value) { + static int futex_wait(pthreadpool_atomic_uint32_t* address, uint32_t value) { return syscall(SYS_futex, address, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, value, NULL); } - static int futex_wake_all(_Atomic uint32_t* address) { + static int futex_wake_all(pthreadpool_atomic_uint32_t* address) { return syscall(SYS_futex, address, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT_MAX); } #else @@ -111,19 +111,19 @@ struct PTHREADPOOL_CACHELINE_ALIGNED thread_info { * Index of the first element in the work range. * Before processing a new element the owning worker thread increments this value. */ - atomic_size_t range_start; + pthreadpool_atomic_size_t range_start; /** * Index of the element after the last element of the work range. * Before processing a new element the stealing worker thread decrements this value. */ - atomic_size_t range_end; + pthreadpool_atomic_size_t range_end; /** * The number of elements in the work range. * Due to race conditions range_length <= range_end - range_start. * The owning worker thread must decrement this value before incrementing @a range_start. * The stealing worker thread must decrement this value before decrementing @a range_end. */ - atomic_size_t range_length; + pthreadpool_atomic_size_t range_length; /** * Thread number in the 0..threads_count-1 range. */ @@ -145,7 +145,7 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { /** * The number of threads that are processing an operation. */ - atomic_size_t active_threads; + pthreadpool_atomic_size_t active_threads; #if PTHREADPOOL_USE_FUTEX /** * Indicates if there are active threads. @@ -153,24 +153,24 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { * - has_active_threads == 0 if active_threads == 0 * - has_active_threads == 1 if active_threads != 0 */ - _Atomic uint32_t has_active_threads; + pthreadpool_atomic_uint32_t has_active_threads; #endif /** * The last command submitted to the thread pool. */ - _Atomic uint32_t command; + pthreadpool_atomic_uint32_t command; /** * The function to call for each item. */ - void *_Atomic task; + pthreadpool_atomic_void_p task; /** * The first argument to the item processing function. */ - void *_Atomic argument; + pthreadpool_atomic_void_p argument; /** * Copy of the flags passed to parallelization function. */ - _Atomic uint32_t flags; + pthreadpool_atomic_uint32_t flags; /** * Serializes concurrent calls to @a pthreadpool_parallelize_* from different threads. */ @@ -207,13 +207,13 @@ PTHREADPOOL_STATIC_ASSERT(sizeof(struct pthreadpool) % PTHREADPOOL_CACHELINE_SIZ static void checkin_worker_thread(struct pthreadpool* threadpool) { #if PTHREADPOOL_USE_FUTEX - if (atomic_fetch_sub_explicit(&threadpool->active_threads, 1, memory_order_relaxed) == 1) { - atomic_store_explicit(&threadpool->has_active_threads, 0, memory_order_relaxed); + if (pthreadpool_fetch_sub_relaxed_size_t(&threadpool->active_threads, 1) == 1) { + pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 0); futex_wake_all(&threadpool->has_active_threads); } #else pthread_mutex_lock(&threadpool->completion_mutex); - if (atomic_fetch_sub_explicit(&threadpool->active_threads, 1, memory_order_relaxed) == 1) { + if (pthreadpool_fetch_sub_relaxed_size_t(&threadpool->active_threads, 1) == 1) { pthread_cond_signal(&threadpool->completion_condvar); } pthread_mutex_unlock(&threadpool->completion_mutex); @@ -223,12 +223,12 @@ static void checkin_worker_thread(struct pthreadpool* threadpool) { static void wait_worker_threads(struct pthreadpool* threadpool) { /* Initial check */ #if PTHREADPOOL_USE_FUTEX - uint32_t has_active_threads = atomic_load_explicit(&threadpool->has_active_threads, memory_order_relaxed); + uint32_t has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads); if (has_active_threads == 0) { return; } #else - size_t active_threads = atomic_load_explicit(&threadpool->active_threads, memory_order_relaxed); + size_t active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads); if (active_threads == 0) { return; } @@ -237,15 +237,15 @@ static void wait_worker_threads(struct pthreadpool* threadpool) { /* Spin-wait */ for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) { /* This fence serves as a sleep instruction */ - atomic_thread_fence(memory_order_acquire); + pthreadpool_fence_acquire(); #if PTHREADPOOL_USE_FUTEX - has_active_threads = atomic_load_explicit(&threadpool->has_active_threads, memory_order_relaxed); + has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads); if (has_active_threads == 0) { return; } #else - active_threads = atomic_load_explicit(&threadpool->active_threads, memory_order_relaxed); + active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads); if (active_threads == 0) { return; } @@ -254,26 +254,24 @@ static void wait_worker_threads(struct pthreadpool* threadpool) { /* Fall-back to mutex/futex wait */ #if PTHREADPOOL_USE_FUTEX - while ((has_active_threads = atomic_load(&threadpool->has_active_threads)) != 0) { + while ((has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads)) != 0) { futex_wait(&threadpool->has_active_threads, 1); } #else pthread_mutex_lock(&threadpool->completion_mutex); - while (atomic_load_explicit(&threadpool->active_threads, memory_order_relaxed) != 0) { + while (pthreadpool_load_relaxed_size_t(&threadpool->active_threads) != 0) { pthread_cond_wait(&threadpool->completion_condvar, &threadpool->completion_mutex); }; pthread_mutex_unlock(&threadpool->completion_mutex); #endif } -inline static bool atomic_decrement(atomic_size_t* value) { - size_t actual_value = atomic_load_explicit(value, memory_order_relaxed); +inline static bool atomic_decrement(pthreadpool_atomic_size_t* value) { + size_t actual_value = pthreadpool_load_relaxed_size_t(value); if (actual_value == 0) { return false; } - while (!atomic_compare_exchange_weak_explicit( - value, &actual_value, actual_value - 1, memory_order_relaxed, memory_order_relaxed)) - { + while (!pthreadpool_compare_exchange_weak_relaxed_size_t(value, &actual_value, actual_value - 1)) { if (actual_value == 0) { return false; } @@ -291,10 +289,10 @@ inline static size_t modulo_decrement(uint32_t i, uint32_t n) { } static void thread_parallelize_1d(struct pthreadpool* threadpool, struct thread_info* thread) { - const pthreadpool_task_1d_t task = (pthreadpool_task_1d_t) atomic_load_explicit(&threadpool->task, memory_order_relaxed); - void *const argument = atomic_load_explicit(&threadpool->argument, memory_order_relaxed); + const pthreadpool_task_1d_t task = (pthreadpool_task_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task); + void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); /* Process thread's own range of items */ - size_t range_start = atomic_load_explicit(&thread->range_start, memory_order_relaxed); + size_t range_start = pthreadpool_load_relaxed_size_t(&thread->range_start); while (atomic_decrement(&thread->range_length)) { task(argument, range_start++); } @@ -308,13 +306,13 @@ static void thread_parallelize_1d(struct pthreadpool* threadpool, struct thread_ { struct thread_info* other_thread = &threadpool->threads[tid]; while (atomic_decrement(&other_thread->range_length)) { - const size_t item_id = atomic_fetch_sub_explicit(&other_thread->range_end, 1, memory_order_relaxed) - 1; + const size_t item_id = pthreadpool_fetch_sub_relaxed_size_t(&other_thread->range_end, 1) - 1; task(argument, item_id); } } /* Make changes by this thread visible to other threads */ - atomic_thread_fence(memory_order_release); + pthreadpool_fence_release(); } static uint32_t wait_for_new_command( @@ -322,7 +320,7 @@ static uint32_t wait_for_new_command( uint32_t last_command, uint32_t last_flags) { - uint32_t command = atomic_load_explicit(&threadpool->command, memory_order_relaxed); + uint32_t command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); if (command != last_command) { return command; } @@ -331,9 +329,9 @@ static uint32_t wait_for_new_command( /* Spin-wait loop */ for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) { /* This fence serves as a sleep instruction */ - atomic_thread_fence(memory_order_acquire); + pthreadpool_fence_acquire(); - command = atomic_load_explicit(&threadpool->command, memory_order_relaxed); + command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); if (command != last_command) { return command; } @@ -344,13 +342,13 @@ static uint32_t wait_for_new_command( #if PTHREADPOOL_USE_FUTEX do { futex_wait(&threadpool->command, last_command); - command = atomic_load_explicit(&threadpool->command, memory_order_relaxed); + command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); } while (command == last_command); #else /* Lock the command mutex */ pthread_mutex_lock(&threadpool->command_mutex); /* Read the command */ - while ((command = atomic_load_explicit(&threadpool->command, memory_order_relaxed)) == last_command) { + while ((command = pthreadpool_load_relaxed_uint32_t(&threadpool->command)) == last_command) { /* Wait for new command */ pthread_cond_wait(&threadpool->command_condvar, &threadpool->command_mutex); } @@ -373,9 +371,9 @@ static void* thread_main(void* arg) { /* Monitor new commands and act accordingly */ for (;;) { uint32_t command = wait_for_new_command(threadpool, last_command, flags); - atomic_thread_fence(memory_order_acquire); + pthreadpool_fence_acquire(); - flags = atomic_load_explicit(&threadpool->flags, memory_order_relaxed); + flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags); /* Process command */ switch (command & THREADPOOL_COMMAND_MASK) { @@ -435,6 +433,12 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { if (threads_count == 0) { #if defined(_SC_NPROCESSORS_ONLN) threads_count = (size_t) sysconf(_SC_NPROCESSORS_ONLN); + #if defined(__EMSCRIPTEN_PTHREADS__) + /* Limit the number of threads to 8 to match link-time PTHREAD_POOL_SIZE option */ + if (threads_count >= 8) { + threads_count = 8; + } + #endif #elif defined(_WIN32) SYSTEM_INFO system_info; ZeroMemory(&system_info, sizeof(system_info)); @@ -465,10 +469,9 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { #endif #if PTHREADPOOL_USE_FUTEX - atomic_store_explicit(&threadpool->has_active_threads, 1, memory_order_relaxed); + pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); #endif - atomic_store_explicit( - &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_release); + pthreadpool_store_release_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); /* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */ for (size_t tid = 1; tid < threads_count; tid++) { @@ -519,16 +522,15 @@ void pthreadpool_parallelize_1d( #endif /* Setup global arguments */ - atomic_store_explicit(&threadpool->task, task, memory_order_relaxed); - atomic_store_explicit(&threadpool->argument, argument, memory_order_relaxed); - atomic_store_explicit(&threadpool->flags, flags, memory_order_relaxed); + pthreadpool_store_relaxed_void_p(&threadpool->task, task); + pthreadpool_store_relaxed_void_p(&threadpool->argument, argument); + pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags); /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ const size_t threads_count = threadpool->threads_count; - atomic_store_explicit( - &threadpool->active_threads, threads_count - 1 /* caller thread */, memory_order_relaxed); + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); #if PTHREADPOOL_USE_FUTEX - atomic_store_explicit(&threadpool->has_active_threads, 1, memory_order_relaxed); + pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); #endif /* Spread the work between threads */ @@ -536,9 +538,9 @@ void pthreadpool_parallelize_1d( for (size_t tid = 0; tid < threads_count; tid++) { struct thread_info* thread = &threadpool->threads[tid]; const size_t range_end = multiply_divide(range, tid + 1, threads_count); - atomic_store_explicit(&thread->range_start, range_start, memory_order_relaxed); - atomic_store_explicit(&thread->range_end, range_end, memory_order_relaxed); - atomic_store_explicit(&thread->range_length, range_end - range_start, memory_order_relaxed); + pthreadpool_store_relaxed_size_t(&thread->range_start, range_start); + pthreadpool_store_relaxed_size_t(&thread->range_end, range_end); + pthreadpool_store_relaxed_size_t(&thread->range_length, range_end - range_start); /* The next range starts where the previous ended */ range_start = range_end; @@ -551,7 +553,7 @@ void pthreadpool_parallelize_1d( * to ensure the unmasked command is different then the last command, because worker threads * monitor for change in the unmasked command. */ - const uint32_t old_command = atomic_load_explicit(&threadpool->command, memory_order_relaxed); + const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_compute_1d; #if PTHREADPOOL_USE_FUTEX @@ -559,13 +561,13 @@ void pthreadpool_parallelize_1d( * Store the command with release semantics to guarantee that if a worker thread observes * the new command value, it also observes the updated command parameters. */ - atomic_store_explicit(&threadpool->command, new_command, memory_order_release); + pthreadpool_store_release_uint32_t(&threadpool->command, new_command); /* Wake up the threads */ futex_wake_all(&threadpool->command); #else /* Relaxed semantics because pthread_mutex_unlock acts as a release fence */ - atomic_store_explicit(&threadpool->command, new_command, memory_order_relaxed); + pthreadpool_store_relaxed_uint32_t(&threadpool->command, new_command); /* Unlock the command variables before waking up the threads for better performance */ pthread_mutex_unlock(&threadpool->command_mutex); @@ -593,7 +595,7 @@ void pthreadpool_parallelize_1d( wait_worker_threads(threadpool); /* Make changes by other threads visible to this thread */ - atomic_thread_fence(memory_order_acquire); + pthreadpool_fence_acquire(); /* Unprotect the global threadpool structures */ pthread_mutex_unlock(&threadpool->execution_mutex); @@ -1169,11 +1171,10 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) { if (threadpool != NULL) { if (threadpool->threads_count > 1) { #if PTHREADPOOL_USE_FUTEX - atomic_store_explicit( - &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_relaxed); - atomic_store_explicit(&threadpool->has_active_threads, 1, memory_order_relaxed); + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */); + pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); - atomic_store_explicit(&threadpool->command, threadpool_command_shutdown, memory_order_release); + pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown); /* Wake up worker threads */ futex_wake_all(&threadpool->command); @@ -1181,12 +1182,10 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) { /* Lock the command variable to ensure that threads don't shutdown until both command and active_threads are updated */ pthread_mutex_lock(&threadpool->command_mutex); - /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ - atomic_store_explicit( - &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_relaxed); + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */); /* Update the threadpool command. */ - atomic_store_explicit(&threadpool->command, threadpool_command_shutdown, memory_order_relaxed); + pthreadpool_store_relaxed_uint32_t(&threadpool->command, threadpool_command_shutdown); /* Wake up worker threads */ pthread_cond_broadcast(&threadpool->command_condvar); -- cgit v1.2.3 From 15f39bf0c211ddd7e768bb6c83c5fa3695fa4be8 Mon Sep 17 00:00:00 2001 From: Marat Dukhan Date: Mon, 23 Mar 2020 01:36:55 -0700 Subject: Futex-based WebAssembly+Threads implementation --- src/threadpool-pthreads.c | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/src/threadpool-pthreads.c b/src/threadpool-pthreads.c index 7cc190d..8d20fdd 100644 --- a/src/threadpool-pthreads.c +++ b/src/threadpool-pthreads.c @@ -12,6 +12,15 @@ #ifndef PTHREADPOOL_USE_FUTEX #if defined(__linux__) #define PTHREADPOOL_USE_FUTEX 1 + #elif defined(__EMSCRIPTEN__) + #define PTHREADPOOL_USE_FUTEX 1 + #else + #define PTHREADPOOL_USE_FUTEX 0 + #endif +#endif + +#if PTHREADPOOL_USE_FUTEX + #if defined(__linux__) #include #include @@ -22,8 +31,13 @@ #ifndef FUTEX_PRIVATE_FLAG #define FUTEX_PRIVATE_FLAG 128 #endif + #elif defined(__EMSCRIPTEN__) + /* math.h for INFINITY constant */ + #include + + #include #else - #define PTHREADPOOL_USE_FUTEX 0 + #error "Platform-specific implementation of futex_wait and futex_wake_all required" #endif #endif @@ -93,6 +107,14 @@ static inline size_t min(size_t a, size_t b) { static int futex_wake_all(pthreadpool_atomic_uint32_t* address) { return syscall(SYS_futex, address, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT_MAX); } + #elif defined(__EMSCRIPTEN__) + static int futex_wait(pthreadpool_atomic_uint32_t* address, uint32_t value) { + return emscripten_futex_wait((volatile void*) address, value, INFINITY); + } + + static int futex_wake_all(pthreadpool_atomic_uint32_t* address) { + return emscripten_futex_wake((volatile void*) address, INT_MAX); + } #else #error "Platform-specific implementation of futex_wait and futex_wake_all required" #endif -- cgit v1.2.3 From ba8b08e4975a9b0151ceefa85ab3d3efc3450043 Mon Sep 17 00:00:00 2001 From: Marat Dukhan Date: Mon, 23 Mar 2020 03:36:07 -0700 Subject: Fix race conditions in non-futex implementation --- src/threadpool-pthreads.c | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/src/threadpool-pthreads.c b/src/threadpool-pthreads.c index 8d20fdd..e0e0af9 100644 --- a/src/threadpool-pthreads.c +++ b/src/threadpool-pthreads.c @@ -578,19 +578,18 @@ void pthreadpool_parallelize_1d( const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_compute_1d; + /* + * Store the command with release semantics to guarantee that if a worker thread observes + * the new command value, it also observes the updated command parameters. + * + * Note: release semantics is necessary even with a conditional variable, because the workers might + * be waiting in a spin-loop rather than the conditional variable. + */ + pthreadpool_store_release_uint32_t(&threadpool->command, new_command); #if PTHREADPOOL_USE_FUTEX - /* - * Store the command with release semantics to guarantee that if a worker thread observes - * the new command value, it also observes the updated command parameters. - */ - pthreadpool_store_release_uint32_t(&threadpool->command, new_command); - /* Wake up the threads */ futex_wake_all(&threadpool->command); #else - /* Relaxed semantics because pthread_mutex_unlock acts as a release fence */ - pthreadpool_store_relaxed_uint32_t(&threadpool->command, new_command); - /* Unlock the command variables before waking up the threads for better performance */ pthread_mutex_unlock(&threadpool->command_mutex); @@ -1196,6 +1195,10 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) { pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */); pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); + /* + * Store the command with release semantics to guarantee that if a worker thread observes + * the new command value, it also observes the updated active_threads/has_active_threads values. + */ pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown); /* Wake up worker threads */ @@ -1206,8 +1209,14 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) { pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */); - /* Update the threadpool command. */ - pthreadpool_store_relaxed_uint32_t(&threadpool->command, threadpool_command_shutdown); + /* + * Store the command with release semantics to guarantee that if a worker thread observes + * the new command value, it also observes the updated active_threads value. + * + * Note: the release fence inside pthread_mutex_unlock is insufficient, + * because the workers might be waiting in a spin-loop rather than the conditional variable. + */ + pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown); /* Wake up worker threads */ pthread_cond_broadcast(&threadpool->command_condvar); -- cgit v1.2.3 From 97c181b7d61be38850b59d27f93832b53cee1cdf Mon Sep 17 00:00:00 2001 From: Marat Dukhan Date: Mon, 23 Mar 2020 05:38:20 -0700 Subject: Minor refactoring in pthreadpool_destroy --- src/threadpool-pthreads.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/threadpool-pthreads.c b/src/threadpool-pthreads.c index e0e0af9..07c47f0 100644 --- a/src/threadpool-pthreads.c +++ b/src/threadpool-pthreads.c @@ -8,7 +8,6 @@ #include #include -/* Futex-specific headers */ #ifndef PTHREADPOOL_USE_FUTEX #if defined(__linux__) #define PTHREADPOOL_USE_FUTEX 1 @@ -19,6 +18,7 @@ #endif #endif +/* Futex-specific headers */ #if PTHREADPOOL_USE_FUTEX #if defined(__linux__) #include @@ -1190,9 +1190,10 @@ void pthreadpool_parallelize_6d_tile_2d( void pthreadpool_destroy(struct pthreadpool* threadpool) { if (threadpool != NULL) { - if (threadpool->threads_count > 1) { + const size_t threads_count = threadpool->threads_count; + if (threads_count > 1) { #if PTHREADPOOL_USE_FUTEX - pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */); + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); /* @@ -1207,7 +1208,7 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) { /* Lock the command variable to ensure that threads don't shutdown until both command and active_threads are updated */ pthread_mutex_lock(&threadpool->command_mutex); - pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */); + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); /* * Store the command with release semantics to guarantee that if a worker thread observes @@ -1226,7 +1227,7 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) { #endif /* Wait until all threads return */ - for (size_t thread = 1; thread < threadpool->threads_count; thread++) { + for (size_t thread = 1; thread < threads_count; thread++) { pthread_join(threadpool->threads[thread].thread_object, NULL); } -- cgit v1.2.3 From 2e59d6fa7d1bfd3c32797bd5a838d558a9e97377 Mon Sep 17 00:00:00 2001 From: Marat Dukhan Date: Mon, 23 Mar 2020 05:44:19 -0700 Subject: Update support platforms and example in the README --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index b92bea0..164aab5 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ It provides similar functionality to `#pragma omp parallel for`, but with additi * Run on user-specified or auto-detected number of threads. * Work-stealing scheduling for efficient work balancing. * Wait-free synchronization of work items. -* Compatible with Linux (including Android), macOS, iOS, Emscripten environments. +* Compatible with Linux (including Android), macOS, iOS, MinGW, Emscripten environments. * 100% unit tests coverage. * Throughput and latency microbenchmarks. @@ -35,17 +35,17 @@ int main() { pthreadpool_t threadpool = pthreadpool_create(0); assert(threadpool != NULL); - + const size_t threads_count = pthreadpool_get_threads_count(threadpool); printf("Created thread pool with %zu threads\n", threads_count); struct array_addition_context context = { augend, addend, sum }; pthreadpool_parallelize_1d(threadpool, (pthreadpool_task_1d_t) add_arrays, - (void**) &context, + (void*) &context, ARRAY_SIZE, PTHREADPOOL_FLAG_DISABLE_DENORMALS /* flags */); - + pthreadpool_destroy(threadpool); threadpool = NULL; -- cgit v1.2.3 From 832403f32df1920f2bc4678b9541668fec7238f3 Mon Sep 17 00:00:00 2001 From: Marat Dukhan Date: Mon, 23 Mar 2020 10:47:42 -0700 Subject: Implement atomic_decrement with LL-SC on ARM/ARM64 --- src/threadpool-pthreads.c | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/src/threadpool-pthreads.c b/src/threadpool-pthreads.c index 07c47f0..934e5e7 100644 --- a/src/threadpool-pthreads.c +++ b/src/threadpool-pthreads.c @@ -289,16 +289,28 @@ static void wait_worker_threads(struct pthreadpool* threadpool) { } inline static bool atomic_decrement(pthreadpool_atomic_size_t* value) { - size_t actual_value = pthreadpool_load_relaxed_size_t(value); - if (actual_value == 0) { - return false; - } - while (!pthreadpool_compare_exchange_weak_relaxed_size_t(value, &actual_value, actual_value - 1)) { + #if defined(__clang__) && (defined(__arm__) || defined(__aarch64__)) + size_t actual_value; + do { + actual_value = __builtin_arm_ldrex((const volatile size_t*) value); + if (actual_value == 0) { + __builtin_arm_clrex(); + return false; + } + } while (__builtin_arm_strex(actual_value - 1, (volatile size_t*) value) != 0); + return true; + #else + size_t actual_value = pthreadpool_load_relaxed_size_t(value); if (actual_value == 0) { return false; } - } - return true; + while (!pthreadpool_compare_exchange_weak_relaxed_size_t(value, &actual_value, actual_value - 1)) { + if (actual_value == 0) { + return false; + } + } + return true; + #endif } inline static size_t modulo_decrement(uint32_t i, uint32_t n) { -- cgit v1.2.3 From 4c32ac059994906a5de7715e9c4cee58211354e0 Mon Sep 17 00:00:00 2001 From: Marat Dukhan Date: Mon, 23 Mar 2020 10:51:13 -0700 Subject: Document all public API functions --- include/pthreadpool.h | 337 ++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 310 insertions(+), 27 deletions(-) diff --git a/include/pthreadpool.h b/include/pthreadpool.h index 30471e6..36f44e5 100644 --- a/include/pthreadpool.h +++ b/include/pthreadpool.h @@ -18,25 +18,31 @@ typedef void (*pthreadpool_task_6d_tile_2d_t)(void*, size_t, size_t, size_t, siz /** - * Disable support for denormalized numbers to the maximum extent possible for the duration of the computation. + * Disable support for denormalized numbers to the maximum extent possible for + * the duration of the computation. * - * Handling denormalized floating-point numbers is often implemented in microcode, and incurs significant performance - * degradation. This hint instructs the thread pool to disable support for denormalized numbers before running the - * computation by manipulating architecture-specific control registers, and restore the initial value of control - * registers after the computation is complete. The thread pool temporary disables denormalized numbers on all threads - * involved in the computation (i.e. the caller threads, and potentially worker threads). + * Handling denormalized floating-point numbers is often implemented in + * microcode, and incurs significant performance degradation. This hint + * instructs the thread pool to disable support for denormalized numbers before + * running the computation by manipulating architecture-specific control + * registers, and restore the initial value of control registers after the + * computation is complete. The thread pool temporary disables denormalized + * numbers on all threads involved in the computation (i.e. the caller threads, + * and potentially worker threads). * - * Disabling denormalized numbers may have a small negative effect on results' accuracy. As various architectures differ - * in capabilities to control processing of denormalized numbers, using this flag may also hurt results' reproducibility - * across different instruction set architectures. + * Disabling denormalized numbers may have a small negative effect on results' + * accuracy. As various architectures differ in capabilities to control + * processing of denormalized numbers, using this flag may also hurt results' + * reproducibility across different instruction set architectures. */ #define PTHREADPOOL_FLAG_DISABLE_DENORMALS 0x00000001 /** * Yield worker threads to the system scheduler after the operation is finished. * - * Force workers to use kernel wait (instead of active spin-wait by default) for new commands after this command is - * processed. This flag affects only the immediate next operation on this thread pool. To make the thread pool always + * Force workers to use kernel wait (instead of active spin-wait by default) for + * new commands after this command is processed. This flag affects only the + * immediate next operation on this thread pool. To make the thread pool always * use kernel wait, pass this flag to all parallelization functions. */ #define PTHREADPOOL_FLAG_YIELD_WORKERS 0x00000002 @@ -46,40 +52,48 @@ extern "C" { #endif /** - * Creates a thread pool with the specified number of threads. + * Create a thread pool with the specified number of threads. * - * @param[in] threads_count The number of threads in the thread pool. - * A value of 0 has special interpretation: it creates a thread for each - * processor core available in the system. + * @param threads_count the number of threads in the thread pool. + * A value of 0 has special interpretation: it creates a thread pool with as + * many threads as there are logical processors in the system. * - * @returns A pointer to an opaque thread pool object. - * On error the function returns NULL and sets errno accordingly. + * @returns A pointer to an opaque thread pool object if the call is + * successful, or NULL pointer if the call failed. */ pthreadpool_t pthreadpool_create(size_t threads_count); /** - * Queries the number of threads in a thread pool. + * Query the number of threads in a thread pool. * - * @param[in] threadpool The thread pool to query. + * @param threadpool the thread pool to query. * * @returns The number of threads in the thread pool. */ size_t pthreadpool_get_threads_count(pthreadpool_t threadpool); /** - * Processes items in parallel using threads from a thread pool. + * Process items on a 1D grid. * - * When the call returns, all items have been processed and the thread pool is - * ready for a new task. + * The function implements a parallel version of the following snippet: + * + * for (size_t i = 0; i < range; i++) + * function(context, i); + * + * When the function returns, all items have been processed and the thread pool + * is ready for a new task. * * @note If multiple threads call this function with the same thread pool, the * calls are serialized. * - * @param[in] threadpool The thread pool to use for parallelisation. - * @param[in] function The function to call for each item. - * @param[in] argument The first argument passed to the @a function. - * @param[in] items The number of items to process. The @a function - * will be called once for each item. + * @param threadpool the thread pool to use for parallelisation. If threadpool + * is NULL, all items are processed serially on the calling thread. + * @param function the function to call for each item. + * @param context the first argument passed to the specified function. + * @param range the number of items on the 1D grid to process. The + * specified function will be called once for each item. + * @param flags a bitwise combination of zero or more optional flags + * (PTHREADPOOL_FLAG_DISABLE_DENORMALS or PTHREADPOOL_FLAG_YIELD_WORKERS) */ void pthreadpool_parallelize_1d( pthreadpool_t threadpool, @@ -88,6 +102,30 @@ void pthreadpool_parallelize_1d( size_t range, uint32_t flags); +/** + * Process items on a 1D grid with specified maximum tile size. + * + * The function implements a parallel version of the following snippet: + * + * for (size_t i = 0; i < range; i += tile) + * function(context, i, min(range - i, tile)); + * + * When the call returns, all items have been processed and the thread pool is + * ready for a new task. + * + * @note If multiple threads call this function with the same thread pool, + * the calls are serialized. + * + * @param threadpool the thread pool to use for parallelisation. If threadpool + * is NULL, all items are processed serially on the calling thread. + * @param function the function to call for each tile. + * @param context the first argument passed to the specified function. + * @param range the number of items on the 1D grid to process. + * @param tile the maximum number of items on the 1D grid to process in + * one function call. + * @param flags a bitwise combination of zero or more optional flags + * (PTHREADPOOL_FLAG_DISABLE_DENORMALS or PTHREADPOOL_FLAG_YIELD_WORKERS) + */ void pthreadpool_parallelize_1d_tile_1d( pthreadpool_t threadpool, pthreadpool_task_1d_tile_1d_t function, @@ -96,6 +134,32 @@ void pthreadpool_parallelize_1d_tile_1d( size_t tile, uint32_t flags); +/** + * Process items on a 2D grid. + * + * The function implements a parallel version of the following snippet: + * + * for (size_t i = 0; i < range_i; i++) + * for (size_t j = 0; j < range_j; j++) + * function(context, i, j); + * + * When the function returns, all items have been processed and the thread pool + * is ready for a new task. + * + * @note If multiple threads call this function with the same thread pool, the + * calls are serialized. + * + * @param threadpool the thread pool to use for parallelisation. If threadpool + * is NULL, all items are processed serially on the calling thread. + * @param function the function to call for each item. + * @param context the first argument passed to the specified function. + * @param range_i the number of items to process along the first dimension + * of the 2D grid. + * @param range_j the number of items to process along the second dimension + * of the 2D grid. + * @param flags a bitwise combination of zero or more optional flags + * (PTHREADPOOL_FLAG_DISABLE_DENORMALS or PTHREADPOOL_FLAG_YIELD_WORKERS) + */ void pthreadpool_parallelize_2d( pthreadpool_t threadpool, pthreadpool_task_2d_t function, @@ -104,6 +168,35 @@ void pthreadpool_parallelize_2d( size_t range_j, uint32_t flags); +/** + * Process items on a 2D grid with the specified maximum tile size along the + * last grid dimension. + * + * The function implements a parallel version of the following snippet: + * + * for (size_t i = 0; i < range_i; i++) + * for (size_t j = 0; j < range_j; j += tile_j) + * function(context, i, j, min(range_j - j, tile_j)); + * + * When the function returns, all items have been processed and the thread pool + * is ready for a new task. + * + * @note If multiple threads call this function with the same thread pool, the + * calls are serialized. + * + * @param threadpool the thread pool to use for parallelisation. If threadpool + * is NULL, all items are processed serially on the calling thread. + * @param function the function to call for each tile. + * @param context the first argument passed to the specified function. + * @param range_i the number of items to process along the first dimension + * of the 2D grid. + * @param range_j the number of items to process along the second dimension + * of the 2D grid. + * @param tile_j the maximum number of items along the second dimension of + * the 2D grid to process in one function call. + * @param flags a bitwise combination of zero or more optional flags + * (PTHREADPOOL_FLAG_DISABLE_DENORMALS or PTHREADPOOL_FLAG_YIELD_WORKERS) + */ void pthreadpool_parallelize_2d_tile_1d( pthreadpool_t threadpool, pthreadpool_task_2d_tile_1d_t function, @@ -113,6 +206,38 @@ void pthreadpool_parallelize_2d_tile_1d( size_t tile_j, uint32_t flags); +/** + * Process items on a 2D grid with the specified maximum tile size along each + * grid dimension. + * + * The function implements a parallel version of the following snippet: + * + * for (size_t i = 0; i < range_i; i += tile_i) + * for (size_t j = 0; j < range_j; j += tile_j) + * function(context, i, j, + * min(range_i - i, tile_i), min(range_j - j, tile_j)); + * + * When the function returns, all items have been processed and the thread pool + * is ready for a new task. + * + * @note If multiple threads call this function with the same thread pool, the + * calls are serialized. + * + * @param threadpool the thread pool to use for parallelisation. If threadpool + * is NULL, all items are processed serially on the calling thread. + * @param function the function to call for each tile. + * @param context the first argument passed to the specified function. + * @param range_i the number of items to process along the first dimension + * of the 2D grid. + * @param range_j the number of items to process along the second dimension + * of the 2D grid. + * @param tile_j the maximum number of items along the first dimension of + * the 2D grid to process in one function call. + * @param tile_j the maximum number of items along the second dimension of + * the 2D grid to process in one function call. + * @param flags a bitwise combination of zero or more optional flags + * (PTHREADPOOL_FLAG_DISABLE_DENORMALS or PTHREADPOOL_FLAG_YIELD_WORKERS) + */ void pthreadpool_parallelize_2d_tile_2d( pthreadpool_t threadpool, pthreadpool_task_2d_tile_2d_t function, @@ -123,6 +248,41 @@ void pthreadpool_parallelize_2d_tile_2d( size_t tile_j, uint32_t flags); +/** + * Process items on a 3D grid with the specified maximum tile size along the + * last two grid dimensions. + * + * The function implements a parallel version of the following snippet: + * + * for (size_t i = 0; i < range_i; i++) + * for (size_t j = 0; j < range_j; j += tile_j) + * for (size_t k = 0; k < range_k; k += tile_k) + * function(context, i, j, k, + * min(range_j - j, tile_j), min(range_k - k, tile_k)); + * + * When the function returns, all items have been processed and the thread pool + * is ready for a new task. + * + * @note If multiple threads call this function with the same thread pool, the + * calls are serialized. + * + * @param threadpool the thread pool to use for parallelisation. If threadpool + * is NULL, all items are processed serially on the calling thread. + * @param function the function to call for each tile. + * @param context the first argument passed to the specified function. + * @param range_i the number of items to process along the first dimension + * of the 3D grid. + * @param range_j the number of items to process along the second dimension + * of the 3D grid. + * @param range_k the number of items to process along the third dimension + * of the 3D grid. + * @param tile_j the maximum number of items along the second dimension of + * the 3D grid to process in one function call. + * @param tile_k the maximum number of items along the third dimension of + * the 3D grid to process in one function call. + * @param flags a bitwise combination of zero or more optional flags + * (PTHREADPOOL_FLAG_DISABLE_DENORMALS or PTHREADPOOL_FLAG_YIELD_WORKERS) + */ void pthreadpool_parallelize_3d_tile_2d( pthreadpool_t threadpool, pthreadpool_task_3d_tile_2d_t function, @@ -134,6 +294,44 @@ void pthreadpool_parallelize_3d_tile_2d( size_t tile_k, uint32_t flags); +/** + * Process items on a 4D grid with the specified maximum tile size along the + * last two grid dimensions. + * + * The function implements a parallel version of the following snippet: + * + * for (size_t i = 0; i < range_i; i++) + * for (size_t j = 0; j < range_j; j++) + * for (size_t k = 0; k < range_k; k += tile_k) + * for (size_t l = 0; l < range_l; l += tile_l) + * function(context, i, j, k, l, + * min(range_k - k, tile_k), min(range_l - l, tile_l)); + * + * When the function returns, all items have been processed and the thread pool + * is ready for a new task. + * + * @note If multiple threads call this function with the same thread pool, the + * calls are serialized. + * + * @param threadpool the thread pool to use for parallelisation. If threadpool + * is NULL, all items are processed serially on the calling thread. + * @param function the function to call for each tile. + * @param context the first argument passed to the specified function. + * @param range_i the number of items to process along the first dimension + * of the 4D grid. + * @param range_j the number of items to process along the second dimension + * of the 4D grid. + * @param range_k the number of items to process along the third dimension + * of the 4D grid. + * @param range_l the number of items to process along the fourth dimension + * of the 4D grid. + * @param tile_k the maximum number of items along the third dimension of + * the 4D grid to process in one function call. + * @param tile_l the maximum number of items along the fourth dimension of + * the 4D grid to process in one function call. + * @param flags a bitwise combination of zero or more optional flags + * (PTHREADPOOL_FLAG_DISABLE_DENORMALS or PTHREADPOOL_FLAG_YIELD_WORKERS) + */ void pthreadpool_parallelize_4d_tile_2d( pthreadpool_t threadpool, pthreadpool_task_4d_tile_2d_t function, @@ -146,6 +344,47 @@ void pthreadpool_parallelize_4d_tile_2d( size_t tile_l, uint32_t flags); +/** + * Process items on a 5D grid with the specified maximum tile size along the + * last two grid dimensions. + * + * The function implements a parallel version of the following snippet: + * + * for (size_t i = 0; i < range_i; i++) + * for (size_t j = 0; j < range_j; j++) + * for (size_t k = 0; k < range_k; k++) + * for (size_t l = 0; l < range_l; l += tile_l) + * for (size_t m = 0; m < range_m; m += tile_m) + * function(context, i, j, k, l, m, + * min(range_l - l, tile_l), min(range_m - m, tile_m)); + * + * When the function returns, all items have been processed and the thread pool + * is ready for a new task. + * + * @note If multiple threads call this function with the same thread pool, the + * calls are serialized. + * + * @param threadpool the thread pool to use for parallelisation. If threadpool + * is NULL, all items are processed serially on the calling thread. + * @param function the function to call for each tile. + * @param context the first argument passed to the specified function. + * @param range_i the number of items to process along the first dimension + * of the 5D grid. + * @param range_j the number of items to process along the second dimension + * of the 5D grid. + * @param range_k the number of items to process along the third dimension + * of the 5D grid. + * @param range_l the number of items to process along the fourth dimension + * of the 5D grid. + * @param range_m the number of items to process along the fifth dimension + * of the 5D grid. + * @param tile_l the maximum number of items along the fourth dimension of + * the 5D grid to process in one function call. + * @param tile_m the maximum number of items along the fifth dimension of + * the 5D grid to process in one function call. + * @param flags a bitwise combination of zero or more optional flags + * (PTHREADPOOL_FLAG_DISABLE_DENORMALS or PTHREADPOOL_FLAG_YIELD_WORKERS) + */ void pthreadpool_parallelize_5d_tile_2d( pthreadpool_t threadpool, pthreadpool_task_5d_tile_2d_t function, @@ -159,6 +398,50 @@ void pthreadpool_parallelize_5d_tile_2d( size_t tile_m, uint32_t flags); +/** + * Process items on a 6D grid with the specified maximum tile size along the + * last two grid dimensions. + * + * The function implements a parallel version of the following snippet: + * + * for (size_t i = 0; i < range_i; i++) + * for (size_t j = 0; j < range_j; j++) + * for (size_t k = 0; k < range_k; k++) + * for (size_t l = 0; l < range_l; l++) + * for (size_t m = 0; m < range_m; m += tile_m) + * for (size_t n = 0; n < range_n; n += tile_n) + * function(context, i, j, k, l, m, n, + * min(range_m - m, tile_m), min(range_n - n, tile_n)); + * + * When the function returns, all items have been processed and the thread pool + * is ready for a new task. + * + * @note If multiple threads call this function with the same thread pool, the + * calls are serialized. + * + * @param threadpool the thread pool to use for parallelisation. If threadpool + * is NULL, all items are processed serially on the calling thread. + * @param function the function to call for each tile. + * @param context the first argument passed to the specified function. + * @param range_i the number of items to process along the first dimension + * of the 6D grid. + * @param range_j the number of items to process along the second dimension + * of the 6D grid. + * @param range_k the number of items to process along the third dimension + * of the 6D grid. + * @param range_l the number of items to process along the fourth dimension + * of the 6D grid. + * @param range_m the number of items to process along the fifth dimension + * of the 6D grid. + * @param range_n the number of items to process along the sixth dimension + * of the 6D grid. + * @param tile_m the maximum number of items along the fifth dimension of + * the 6D grid to process in one function call. + * @param tile_n the maximum number of items along the sixth dimension of + * the 6D grid to process in one function call. + * @param flags a bitwise combination of zero or more optional flags + * (PTHREADPOOL_FLAG_DISABLE_DENORMALS or PTHREADPOOL_FLAG_YIELD_WORKERS) + */ void pthreadpool_parallelize_6d_tile_2d( pthreadpool_t threadpool, pthreadpool_task_6d_tile_2d_t function, -- cgit v1.2.3 From 31b939ccfec1347dbd4e8fbfd7bb9bde467ae0f0 Mon Sep 17 00:00:00 2001 From: Marat Dukhan Date: Thu, 26 Mar 2020 11:00:30 -0700 Subject: Rename "argument" argument to match documentation comments --- include/pthreadpool.h | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/include/pthreadpool.h b/include/pthreadpool.h index 36f44e5..4d21528 100644 --- a/include/pthreadpool.h +++ b/include/pthreadpool.h @@ -98,7 +98,7 @@ size_t pthreadpool_get_threads_count(pthreadpool_t threadpool); void pthreadpool_parallelize_1d( pthreadpool_t threadpool, pthreadpool_task_1d_t function, - void* argument, + void* context, size_t range, uint32_t flags); @@ -129,7 +129,7 @@ void pthreadpool_parallelize_1d( void pthreadpool_parallelize_1d_tile_1d( pthreadpool_t threadpool, pthreadpool_task_1d_tile_1d_t function, - void* argument, + void* context, size_t range, size_t tile, uint32_t flags); @@ -163,7 +163,7 @@ void pthreadpool_parallelize_1d_tile_1d( void pthreadpool_parallelize_2d( pthreadpool_t threadpool, pthreadpool_task_2d_t function, - void* argument, + void* context, size_t range_i, size_t range_j, uint32_t flags); @@ -200,7 +200,7 @@ void pthreadpool_parallelize_2d( void pthreadpool_parallelize_2d_tile_1d( pthreadpool_t threadpool, pthreadpool_task_2d_tile_1d_t function, - void* argument, + void* context, size_t range_i, size_t range_j, size_t tile_j, @@ -241,7 +241,7 @@ void pthreadpool_parallelize_2d_tile_1d( void pthreadpool_parallelize_2d_tile_2d( pthreadpool_t threadpool, pthreadpool_task_2d_tile_2d_t function, - void* argument, + void* context, size_t range_i, size_t range_j, size_t tile_i, @@ -286,7 +286,7 @@ void pthreadpool_parallelize_2d_tile_2d( void pthreadpool_parallelize_3d_tile_2d( pthreadpool_t threadpool, pthreadpool_task_3d_tile_2d_t function, - void* argument, + void* context, size_t range_i, size_t range_j, size_t range_k, @@ -335,7 +335,7 @@ void pthreadpool_parallelize_3d_tile_2d( void pthreadpool_parallelize_4d_tile_2d( pthreadpool_t threadpool, pthreadpool_task_4d_tile_2d_t function, - void* argument, + void* context, size_t range_i, size_t range_j, size_t range_k, @@ -388,7 +388,7 @@ void pthreadpool_parallelize_4d_tile_2d( void pthreadpool_parallelize_5d_tile_2d( pthreadpool_t threadpool, pthreadpool_task_5d_tile_2d_t function, - void* argument, + void* context, size_t range_i, size_t range_j, size_t range_k, @@ -445,7 +445,7 @@ void pthreadpool_parallelize_5d_tile_2d( void pthreadpool_parallelize_6d_tile_2d( pthreadpool_t threadpool, pthreadpool_task_6d_tile_2d_t function, - void* argument, + void* context, size_t range_i, size_t range_j, size_t range_k, -- cgit v1.2.3 From 6469659dd404768fd80f1989dfd66930a6587bf8 Mon Sep 17 00:00:00 2001 From: Marat Dukhan Date: Thu, 26 Mar 2020 11:02:12 -0700 Subject: Refactor multi-threaded case of parallelization functions - Extract multi-threaded setup logic into a generalized pthreadpool_parallelize function - Call into pthreadpool_parallelize directly from tiled and 2+-dimensional functions --- src/threadpool-pthreads.c | 247 ++++++++++++++++++++++++++-------------------- 1 file changed, 142 insertions(+), 105 deletions(-) diff --git a/src/threadpool-pthreads.c b/src/threadpool-pthreads.c index 934e5e7..6ebd521 100644 --- a/src/threadpool-pthreads.c +++ b/src/threadpool-pthreads.c @@ -1,4 +1,5 @@ /* Standard C headers */ +#include #include #include #include @@ -124,7 +125,7 @@ static inline size_t min(size_t a, size_t b) { enum threadpool_command { threadpool_command_init, - threadpool_command_compute_1d, + threadpool_command_parallelize, threadpool_command_shutdown, }; @@ -181,6 +182,10 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { * The last command submitted to the thread pool. */ pthreadpool_atomic_uint32_t command; + /** + * The entry point function to call for each thread in the thread pool for parallelization tasks. + */ + pthreadpool_atomic_void_p thread_function; /** * The function to call for each item. */ @@ -190,7 +195,7 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { */ pthreadpool_atomic_void_p argument; /** - * Copy of the flags passed to parallelization function. + * Copy of the flags passed to a parallelization function. */ pthreadpool_atomic_uint32_t flags; /** @@ -322,6 +327,8 @@ inline static size_t modulo_decrement(uint32_t i, uint32_t n) { return i - 1; } +typedef void (*thread_function_t)(struct pthreadpool* threadpool, struct thread_info* thread); + static void thread_parallelize_1d(struct pthreadpool* threadpool, struct thread_info* thread) { const pthreadpool_task_1d_t task = (pthreadpool_task_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task); void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); @@ -411,13 +418,16 @@ static void* thread_main(void* arg) { /* Process command */ switch (command & THREADPOOL_COMMAND_MASK) { - case threadpool_command_compute_1d: + case threadpool_command_parallelize: { + const thread_function_t thread_function = + (thread_function_t) pthreadpool_load_relaxed_void_p(&threadpool->thread_function); if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { saved_fpu_state = get_fpu_state(); disable_fpu_denormals(); } - thread_parallelize_1d(threadpool, thread); + + thread_function(threadpool, thread); if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { set_fpu_state(saved_fpu_state); } @@ -526,6 +536,107 @@ size_t pthreadpool_get_threads_count(struct pthreadpool* threadpool) { } } +static void pthreadpool_parallelize( + struct pthreadpool* threadpool, + thread_function_t thread_function, + void* task, + void* context, + size_t linear_range, + uint32_t flags) +{ + assert(threadpool != NULL); + assert(thread_function != NULL); + assert(task != NULL); + assert(linear_range > 1); + + /* Protect the global threadpool structures */ + pthread_mutex_lock(&threadpool->execution_mutex); + + #if !PTHREADPOOL_USE_FUTEX + /* Lock the command variables to ensure that threads don't start processing before they observe complete command with all arguments */ + pthread_mutex_lock(&threadpool->command_mutex); + #endif + + /* Setup global arguments */ + pthreadpool_store_relaxed_void_p(&threadpool->thread_function, (void*) thread_function); + pthreadpool_store_relaxed_void_p(&threadpool->task, task); + pthreadpool_store_relaxed_void_p(&threadpool->argument, context); + pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags); + + /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ + const size_t threads_count = threadpool->threads_count; + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); + #if PTHREADPOOL_USE_FUTEX + pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); + #endif + + /* Spread the work between threads */ + size_t range_start = 0; + for (size_t tid = 0; tid < threads_count; tid++) { + struct thread_info* thread = &threadpool->threads[tid]; + const size_t range_end = multiply_divide(linear_range, tid + 1, threads_count); + pthreadpool_store_relaxed_size_t(&thread->range_start, range_start); + pthreadpool_store_relaxed_size_t(&thread->range_end, range_end); + pthreadpool_store_relaxed_size_t(&thread->range_length, range_end - range_start); + + /* The next subrange starts where the previous ended */ + range_start = range_end; + } + + /* + * Update the threadpool command. + * Imporantly, do it after initializing command parameters (range, task, argument, flags) + * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask + * to ensure the unmasked command is different then the last command, because worker threads + * monitor for change in the unmasked command. + */ + const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); + const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_parallelize; + + /* + * Store the command with release semantics to guarantee that if a worker thread observes + * the new command value, it also observes the updated command parameters. + * + * Note: release semantics is necessary even with a conditional variable, because the workers might + * be waiting in a spin-loop rather than the conditional variable. + */ + pthreadpool_store_release_uint32_t(&threadpool->command, new_command); + #if PTHREADPOOL_USE_FUTEX + /* Wake up the threads */ + futex_wake_all(&threadpool->command); + #else + /* Unlock the command variables before waking up the threads for better performance */ + pthread_mutex_unlock(&threadpool->command_mutex); + + /* Wake up the threads */ + pthread_cond_broadcast(&threadpool->command_condvar); + #endif + + /* Save and modify FPU denormals control, if needed */ + struct fpu_state saved_fpu_state = { 0 }; + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + saved_fpu_state = get_fpu_state(); + disable_fpu_denormals(); + } + + /* Do computations as worker #0 */ + thread_function(threadpool, &threadpool->threads[0]); + + /* Restore FPU denormals control, if needed */ + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + set_fpu_state(saved_fpu_state); + } + + /* Wait until the threads finish computation */ + wait_worker_threads(threadpool); + + /* Make changes by other threads visible to this thread */ + pthreadpool_fence_acquire(); + + /* Unprotect the global threadpool structures */ + pthread_mutex_unlock(&threadpool->execution_mutex); +} + void pthreadpool_parallelize_1d( struct pthreadpool* threadpool, pthreadpool_task_1d_t task, @@ -547,91 +658,9 @@ void pthreadpool_parallelize_1d( set_fpu_state(saved_fpu_state); } } else { - /* Protect the global threadpool structures */ - pthread_mutex_lock(&threadpool->execution_mutex); - - #if !PTHREADPOOL_USE_FUTEX - /* Lock the command variables to ensure that threads don't start processing before they observe complete command with all arguments */ - pthread_mutex_lock(&threadpool->command_mutex); - #endif - - /* Setup global arguments */ - pthreadpool_store_relaxed_void_p(&threadpool->task, task); - pthreadpool_store_relaxed_void_p(&threadpool->argument, argument); - pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags); - - /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ - const size_t threads_count = threadpool->threads_count; - pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); - #if PTHREADPOOL_USE_FUTEX - pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); - #endif - - /* Spread the work between threads */ - size_t range_start = 0; - for (size_t tid = 0; tid < threads_count; tid++) { - struct thread_info* thread = &threadpool->threads[tid]; - const size_t range_end = multiply_divide(range, tid + 1, threads_count); - pthreadpool_store_relaxed_size_t(&thread->range_start, range_start); - pthreadpool_store_relaxed_size_t(&thread->range_end, range_end); - pthreadpool_store_relaxed_size_t(&thread->range_length, range_end - range_start); - - /* The next range starts where the previous ended */ - range_start = range_end; - } - - /* - * Update the threadpool command. - * Imporantly, do it after initializing command parameters (range, task, argument) - * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask - * to ensure the unmasked command is different then the last command, because worker threads - * monitor for change in the unmasked command. - */ - const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); - const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_compute_1d; - - /* - * Store the command with release semantics to guarantee that if a worker thread observes - * the new command value, it also observes the updated command parameters. - * - * Note: release semantics is necessary even with a conditional variable, because the workers might - * be waiting in a spin-loop rather than the conditional variable. - */ - pthreadpool_store_release_uint32_t(&threadpool->command, new_command); - #if PTHREADPOOL_USE_FUTEX - /* Wake up the threads */ - futex_wake_all(&threadpool->command); - #else - /* Unlock the command variables before waking up the threads for better performance */ - pthread_mutex_unlock(&threadpool->command_mutex); - - /* Wake up the threads */ - pthread_cond_broadcast(&threadpool->command_condvar); - #endif - - /* Save and modify FPU denormals control, if needed */ - struct fpu_state saved_fpu_state = { 0 }; - if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { - saved_fpu_state = get_fpu_state(); - disable_fpu_denormals(); - } - - /* Do computations as worker #0 */ - thread_parallelize_1d(threadpool, &threadpool->threads[0]); - - /* Restore FPU denormals control, if needed */ - if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { - set_fpu_state(saved_fpu_state); - } - - /* Wait until the threads finish computation */ - wait_worker_threads(threadpool); - - /* Make changes by other threads visible to this thread */ - pthreadpool_fence_acquire(); - - /* Unprotect the global threadpool structures */ - pthread_mutex_unlock(&threadpool->execution_mutex); + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d, + (void*) task, argument, range, flags); } } @@ -679,7 +708,9 @@ void pthreadpool_parallelize_1d_tile_1d( .range = range, .tile = tile }; - pthreadpool_parallelize_1d(threadpool, (pthreadpool_task_1d_t) compute_1d_tile_1d, &context, tile_range, flags); + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d, + (void*) compute_1d_tile_1d, &context, tile_range, flags); } } @@ -725,7 +756,9 @@ void pthreadpool_parallelize_2d( .argument = argument, .range_j = fxdiv_init_size_t(range_j) }; - pthreadpool_parallelize_1d(threadpool, (pthreadpool_task_1d_t) compute_2d, &context, range_i * range_j, flags); + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d, + (void*) compute_2d, &context, range_i * range_j, flags); } } @@ -783,7 +816,9 @@ void pthreadpool_parallelize_2d_tile_1d( .range_j = range_j, .tile_j = tile_j }; - pthreadpool_parallelize_1d(threadpool, (pthreadpool_task_1d_t) compute_2d_tile_1d, &context, range_i * tile_range_j, flags); + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d, + (void*) compute_2d_tile_1d, &context, range_i * tile_range_j, flags); } } @@ -847,7 +882,9 @@ void pthreadpool_parallelize_2d_tile_2d( .tile_i = tile_i, .tile_j = tile_j }; - pthreadpool_parallelize_1d(threadpool, (pthreadpool_task_1d_t) compute_2d_tile_2d, &context, tile_range_i * tile_range_j, flags); + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d, + (void*) compute_2d_tile_2d, &context, tile_range_i * tile_range_j, flags); } } @@ -919,9 +956,9 @@ void pthreadpool_parallelize_3d_tile_2d( .tile_j = tile_j, .tile_k = tile_k }; - pthreadpool_parallelize_1d(threadpool, - (pthreadpool_task_1d_t) compute_3d_tile_2d, &context, - range_i * tile_range_j * tile_range_k, flags); + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d, + (void*) compute_3d_tile_2d, &context, range_i * tile_range_j * tile_range_k, flags); } } @@ -1002,9 +1039,9 @@ void pthreadpool_parallelize_4d_tile_2d( .tile_k = tile_k, .tile_l = tile_l }; - pthreadpool_parallelize_1d(threadpool, - (pthreadpool_task_1d_t) compute_4d_tile_2d, &context, - range_i * range_j * tile_range_k * tile_range_l, flags); + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d, + (void*) compute_4d_tile_2d, &context, range_i * range_j * tile_range_k * tile_range_l, flags); } } @@ -1094,9 +1131,9 @@ void pthreadpool_parallelize_5d_tile_2d( .tile_l = tile_l, .tile_m = tile_m, }; - pthreadpool_parallelize_1d(threadpool, - (pthreadpool_task_1d_t) compute_5d_tile_2d, &context, - range_i * range_j * range_k * tile_range_l * tile_range_m, flags); + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d, + (void*) compute_5d_tile_2d, &context, range_i * range_j * range_k * tile_range_l * tile_range_m, flags); } } @@ -1194,9 +1231,9 @@ void pthreadpool_parallelize_6d_tile_2d( .tile_m = tile_m, .tile_n = tile_n, }; - pthreadpool_parallelize_1d(threadpool, - (pthreadpool_task_1d_t) compute_6d_tile_2d, &context, - range_i * range_j * range_k * range_l * tile_range_m * tile_range_n, flags); + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d, + (void*) compute_6d_tile_2d, &context, range_i * range_j * range_k * range_l * tile_range_m * tile_range_n, flags); } } -- cgit v1.2.3 From 76042155a8b1e189c8f141429fd72219472c32e1 Mon Sep 17 00:00:00 2001 From: Marat Dukhan Date: Thu, 26 Mar 2020 14:19:30 -0700 Subject: Microarchitecture-aware parallelization functions --- CMakeLists.txt | 32 + cmake/DownloadCpuinfo.cmake | 15 + include/pthreadpool.h | 239 ++++++ src/threadpool-pthreads.c | 424 +++++++++- src/threadpool-shim.c | 83 ++ test/pthreadpool.cc | 1944 ++++++++++++++++++++++++++++++++++++++----- 6 files changed, 2532 insertions(+), 205 deletions(-) create mode 100644 cmake/DownloadCpuinfo.cmake diff --git a/CMakeLists.txt b/CMakeLists.txt index 714325a..79a17a1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -39,6 +39,21 @@ IF(NOT DEFINED FXDIV_SOURCE_DIR) SET(FXDIV_SOURCE_DIR "${CMAKE_BINARY_DIR}/FXdiv-source" CACHE STRING "FXdiv source directory") ENDIF() +IF(CMAKE_SYSTEM_NAME MATCHES "^(Linux|Android)$" AND CMAKE_SYSTEM_PROCESSOR MATCHES "^(armv[5-8].*|aarch64)$") + IF(NOT DEFINED CPUINFO_SOURCE_DIR) + MESSAGE(STATUS "Downloading cpuinfo to ${CMAKE_BINARY_DIR}/cpuinfo-source (define CPUINFO_SOURCE_DIR to avoid it)") + CONFIGURE_FILE(cmake/DownloadCpuinfo.cmake "${CMAKE_BINARY_DIR}/cpuinfo-download/CMakeLists.txt") + EXECUTE_PROCESS(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" . + WORKING_DIRECTORY "${CMAKE_BINARY_DIR}/cpuinfo-download") + EXECUTE_PROCESS(COMMAND "${CMAKE_COMMAND}" --build . + WORKING_DIRECTORY "${CMAKE_BINARY_DIR}/cpuinfo-download") + SET(CPUINFO_SOURCE_DIR "${CMAKE_BINARY_DIR}/cpuinfo-source" CACHE STRING "cpuinfo source directory") + ENDIF() + SET(PTHREADPOOL_USE_CPUINFO ON) +ELSE() + SET(PTHREADPOOL_USE_CPUINFO OFF) +ENDIF() + IF(PTHREADPOOL_BUILD_TESTS AND NOT DEFINED GOOGLETEST_SOURCE_DIR) MESSAGE(STATUS "Downloading Google Test to ${CMAKE_BINARY_DIR}/googletest-source (define GOOGLETEST_SOURCE_DIR to avoid it)") CONFIGURE_FILE(cmake/DownloadGoogleTest.cmake "${CMAKE_BINARY_DIR}/googletest-download/CMakeLists.txt") @@ -122,6 +137,23 @@ IF(NOT TARGET fxdiv) ENDIF() TARGET_LINK_LIBRARIES(pthreadpool PRIVATE fxdiv) +# ---[ Configure cpuinfo +IF(PTHREADPOOL_USE_CPUINFO) + IF(NOT TARGET cpuinfo) + SET(CPUINFO_BUILD_TOOLS OFF CACHE BOOL "") + SET(CPUINFO_BUILD_UNIT_TESTS OFF CACHE BOOL "") + SET(CPUINFO_BUILD_MOCK_TESTS OFF CACHE BOOL "") + SET(CPUINFO_BUILD_BENCHMARKS OFF CACHE BOOL "") + ADD_SUBDIRECTORY( + "${CPUINFO_SOURCE_DIR}" + "${CMAKE_BINARY_DIR}/cpuinfo") + ENDIF() + TARGET_LINK_LIBRARIES(pthreadpool PRIVATE cpuinfo) + TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_CPUINFO=1) +ELSE() + TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_CPUINFO=0) +ENDIF() + INSTALL(TARGETS pthreadpool LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}) diff --git a/cmake/DownloadCpuinfo.cmake b/cmake/DownloadCpuinfo.cmake new file mode 100644 index 0000000..25213a0 --- /dev/null +++ b/cmake/DownloadCpuinfo.cmake @@ -0,0 +1,15 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 3.5 FATAL_ERROR) + +PROJECT(cpuinfo-download NONE) + +INCLUDE(ExternalProject) +ExternalProject_Add(cpuinfo + URL https://github.com/pytorch/cpuinfo/archive/0cc563acb9baac39f2c1349bc42098c4a1da59e3.tar.gz + URL_HASH SHA256=80625d0b69a3d69b70c2236f30db2c542d0922ccf9bb51a61bc39c49fac91a35 + SOURCE_DIR "${CMAKE_BINARY_DIR}/cpuinfo-source" + BINARY_DIR "${CMAKE_BINARY_DIR}/cpuinfo" + CONFIGURE_COMMAND "" + BUILD_COMMAND "" + INSTALL_COMMAND "" + TEST_COMMAND "" +) diff --git a/include/pthreadpool.h b/include/pthreadpool.h index 4d21528..de4016b 100644 --- a/include/pthreadpool.h +++ b/include/pthreadpool.h @@ -16,6 +16,11 @@ typedef void (*pthreadpool_task_4d_tile_2d_t)(void*, size_t, size_t, size_t, siz typedef void (*pthreadpool_task_5d_tile_2d_t)(void*, size_t, size_t, size_t, size_t, size_t, size_t, size_t); typedef void (*pthreadpool_task_6d_tile_2d_t)(void*, size_t, size_t, size_t, size_t, size_t, size_t, size_t, size_t); +typedef void (*pthreadpool_task_1d_with_id_t)(void*, uint32_t, size_t); +typedef void (*pthreadpool_task_2d_tile_2d_with_id_t)(void*, uint32_t, size_t, size_t, size_t, size_t); +typedef void (*pthreadpool_task_3d_tile_2d_with_id_t)(void*, uint32_t, size_t, size_t, size_t, size_t, size_t); +typedef void (*pthreadpool_task_4d_tile_2d_with_id_t)(void*, uint32_t, size_t, size_t, size_t, size_t, size_t, size_t); + /** * Disable support for denormalized numbers to the maximum extent possible for @@ -102,6 +107,52 @@ void pthreadpool_parallelize_1d( size_t range, uint32_t flags); +/** + * Process items on a 1D grid using a microarchitecture-aware task function. + * + * The function implements a parallel version of the following snippet: + * + * uint32_t uarch_index = cpuinfo_initialize() ? + * cpuinfo_get_current_uarch_index() : default_uarch_index; + * if (uarch_index > max_uarch_index) uarch_index = default_uarch_index; + * for (size_t i = 0; i < range; i++) + * function(context, uarch_index, i); + * + * When the function returns, all items have been processed and the thread pool + * is ready for a new task. + * + * @note If multiple threads call this function with the same thread pool, the + * calls are serialized. + * + * @param threadpool the thread pool to use for parallelisation. If + * threadpool is NULL, all items are processed serially on the calling + * thread. + * @param function the function to call for each item. + * @param context the first argument passed to the specified + * function. + * @param default_uarch_index the microarchitecture index to use when + * pthreadpool is configured without cpuinfo, cpuinfo initialization failed, + * or index returned by cpuinfo_get_current_uarch_index() exceeds the + * max_uarch_index value. + * @param max_uarch_index the maximum microarchitecture index expected by + * the specified function. If the index returned by + * cpuinfo_get_current_uarch_index() exceeds this value, default_uarch_index + * will be used instead. default_uarch_index can exceed max_uarch_index. + * @param range the number of items on the 1D grid to process. + * The specified function will be called once for each item. + * @param flags a bitwise combination of zero or more optional + * flags (PTHREADPOOL_FLAG_DISABLE_DENORMALS or + * PTHREADPOOL_FLAG_YIELD_WORKERS) + */ +void pthreadpool_parallelize_1d_with_uarch( + pthreadpool_t threadpool, + pthreadpool_task_1d_with_id_t function, + void* context, + uint32_t default_uarch_index, + uint32_t max_uarch_index, + size_t range, + uint32_t flags); + /** * Process items on a 1D grid with specified maximum tile size. * @@ -248,6 +299,66 @@ void pthreadpool_parallelize_2d_tile_2d( size_t tile_j, uint32_t flags); +/** + * Process items on a 2D grid with the specified maximum tile size along each + * grid dimension using a microarchitecture-aware task function. + * + * The function implements a parallel version of the following snippet: + * + * uint32_t uarch_index = cpuinfo_initialize() ? + * cpuinfo_get_current_uarch_index() : default_uarch_index; + * if (uarch_index > max_uarch_index) uarch_index = default_uarch_index; + * for (size_t i = 0; i < range_i; i += tile_i) + * for (size_t j = 0; j < range_j; j += tile_j) + * function(context, uarch_index, i, j, + * min(range_i - i, tile_i), min(range_j - j, tile_j)); + * + * When the function returns, all items have been processed and the thread pool + * is ready for a new task. + * + * @note If multiple threads call this function with the same thread pool, the + * calls are serialized. + * + * @param threadpool the thread pool to use for parallelisation. If + * threadpool is NULL, all items are processed serially on the calling + * thread. + * @param function the function to call for each tile. + * @param context the first argument passed to the specified + * function. + * @param default_uarch_index the microarchitecture index to use when + * pthreadpool is configured without cpuinfo, + * cpuinfo initialization failed, or index returned + * by cpuinfo_get_current_uarch_index() exceeds + * the max_uarch_index value. + * @param max_uarch_index the maximum microarchitecture index expected + * by the specified function. If the index returned + * by cpuinfo_get_current_uarch_index() exceeds this + * value, default_uarch_index will be used instead. + * default_uarch_index can exceed max_uarch_index. + * @param range_i the number of items to process along the first + * dimension of the 2D grid. + * @param range_j the number of items to process along the second + * dimension of the 2D grid. + * @param tile_j the maximum number of items along the first + * dimension of the 2D grid to process in one function call. + * @param tile_j the maximum number of items along the second + * dimension of the 2D grid to process in one function call. + * @param flags a bitwise combination of zero or more optional + * flags (PTHREADPOOL_FLAG_DISABLE_DENORMALS or + * PTHREADPOOL_FLAG_YIELD_WORKERS) + */ +void pthreadpool_parallelize_2d_tile_2d_with_uarch( + pthreadpool_t threadpool, + pthreadpool_task_2d_tile_2d_with_id_t function, + void* context, + uint32_t default_uarch_index, + uint32_t max_uarch_index, + size_t range_i, + size_t range_j, + size_t tile_i, + size_t tile_j, + uint32_t flags); + /** * Process items on a 3D grid with the specified maximum tile size along the * last two grid dimensions. @@ -294,6 +405,68 @@ void pthreadpool_parallelize_3d_tile_2d( size_t tile_k, uint32_t flags); +/** + * Process items on a 3D grid with the specified maximum tile size along the + * last two grid dimensions using a microarchitecture-aware task function. + * + * The function implements a parallel version of the following snippet: + * + * uint32_t uarch_index = cpuinfo_initialize() ? + * cpuinfo_get_current_uarch_index() : default_uarch_index; + * if (uarch_index > max_uarch_index) uarch_index = default_uarch_index; + * for (size_t i = 0; i < range_i; i++) + * for (size_t j = 0; j < range_j; j += tile_j) + * for (size_t k = 0; k < range_k; k += tile_k) + * function(context, uarch_index, i, j, k, + * min(range_j - j, tile_j), min(range_k - k, tile_k)); + * + * When the function returns, all items have been processed and the thread pool + * is ready for a new task. + * + * @note If multiple threads call this function with the same thread pool, the + * calls are serialized. + * + * @param threadpool the thread pool to use for parallelisation. If + * threadpool is NULL, all items are processed serially on the calling + * thread. + * @param function the function to call for each tile. + * @param context the first argument passed to the specified + * function. + * @param default_uarch_index the microarchitecture index to use when + * pthreadpool is configured without cpuinfo, cpuinfo initialization failed, + * or index returned by cpuinfo_get_current_uarch_index() exceeds the + * max_uarch_index value. + * @param max_uarch_index the maximum microarchitecture index expected by + * the specified function. If the index returned by + * cpuinfo_get_current_uarch_index() exceeds this value, default_uarch_index + * will be used instead. default_uarch_index can exceed max_uarch_index. + * @param range_i the number of items to process along the first + * dimension of the 3D grid. + * @param range_j the number of items to process along the second + * dimension of the 3D grid. + * @param range_k the number of items to process along the third + * dimension of the 3D grid. + * @param tile_j the maximum number of items along the second + * dimension of the 3D grid to process in one function call. + * @param tile_k the maximum number of items along the third + * dimension of the 3D grid to process in one function call. + * @param flags a bitwise combination of zero or more optional + * flags (PTHREADPOOL_FLAG_DISABLE_DENORMALS or + * PTHREADPOOL_FLAG_YIELD_WORKERS) + */ +void pthreadpool_parallelize_3d_tile_2d_with_uarch( + pthreadpool_t threadpool, + pthreadpool_task_3d_tile_2d_with_id_t function, + void* context, + uint32_t default_uarch_index, + uint32_t max_uarch_index, + size_t range_i, + size_t range_j, + size_t range_k, + size_t tile_j, + size_t tile_k, + uint32_t flags); + /** * Process items on a 4D grid with the specified maximum tile size along the * last two grid dimensions. @@ -344,6 +517,72 @@ void pthreadpool_parallelize_4d_tile_2d( size_t tile_l, uint32_t flags); +/** + * Process items on a 4D grid with the specified maximum tile size along the + * last two grid dimensions using a microarchitecture-aware task function. + * + * The function implements a parallel version of the following snippet: + * + * uint32_t uarch_index = cpuinfo_initialize() ? + * cpuinfo_get_current_uarch_index() : default_uarch_index; + * if (uarch_index > max_uarch_index) uarch_index = default_uarch_index; + * for (size_t i = 0; i < range_i; i++) + * for (size_t j = 0; j < range_j; j++) + * for (size_t k = 0; k < range_k; k += tile_k) + * for (size_t l = 0; l < range_l; l += tile_l) + * function(context, uarch_index, i, j, k, l, + * min(range_k - k, tile_k), min(range_l - l, tile_l)); + * + * When the function returns, all items have been processed and the thread pool + * is ready for a new task. + * + * @note If multiple threads call this function with the same thread pool, the + * calls are serialized. + * + * @param threadpool the thread pool to use for parallelisation. If + * threadpool is NULL, all items are processed serially on the calling + * thread. + * @param function the function to call for each tile. + * @param context the first argument passed to the specified + * function. + * @param default_uarch_index the microarchitecture index to use when + * pthreadpool is configured without cpuinfo, cpuinfo initialization failed, + * or index returned by cpuinfo_get_current_uarch_index() exceeds the + * max_uarch_index value. + * @param max_uarch_index the maximum microarchitecture index expected by + * the specified function. If the index returned by + * cpuinfo_get_current_uarch_index() exceeds this value, default_uarch_index + * will be used instead. default_uarch_index can exceed max_uarch_index. + * @param range_i the number of items to process along the first + * dimension of the 4D grid. + * @param range_j the number of items to process along the second + * dimension of the 4D grid. + * @param range_k the number of items to process along the third + * dimension of the 4D grid. + * @param range_l the number of items to process along the fourth + * dimension of the 4D grid. + * @param tile_k the maximum number of items along the third + * dimension of the 4D grid to process in one function call. + * @param tile_l the maximum number of items along the fourth + * dimension of the 4D grid to process in one function call. + * @param flags a bitwise combination of zero or more optional + * flags (PTHREADPOOL_FLAG_DISABLE_DENORMALS or + * PTHREADPOOL_FLAG_YIELD_WORKERS) + */ +void pthreadpool_parallelize_4d_tile_2d_with_uarch( + pthreadpool_t threadpool, + pthreadpool_task_4d_tile_2d_with_id_t function, + void* context, + uint32_t default_uarch_index, + uint32_t max_uarch_index, + size_t range_i, + size_t range_j, + size_t range_k, + size_t range_l, + size_t tile_k, + size_t tile_l, + uint32_t flags); + /** * Process items on a 5D grid with the specified maximum tile size along the * last two grid dimensions. diff --git a/src/threadpool-pthreads.c b/src/threadpool-pthreads.c index 6ebd521..0a9c06d 100644 --- a/src/threadpool-pthreads.c +++ b/src/threadpool-pthreads.c @@ -9,6 +9,10 @@ #include #include +#ifndef PTHREADPOOL_USE_CPUINFO + #define PTHREADPOOL_USE_CPUINFO 0 +#endif + #ifndef PTHREADPOOL_USE_FUTEX #if defined(__linux__) #define PTHREADPOOL_USE_FUTEX 1 @@ -19,6 +23,10 @@ #endif #endif +#if PTHREADPOOL_USE_CPUINFO + #include +#endif + /* Futex-specific headers */ #if PTHREADPOOL_USE_FUTEX #if defined(__linux__) @@ -164,6 +172,17 @@ struct PTHREADPOOL_CACHELINE_ALIGNED thread_info { PTHREADPOOL_STATIC_ASSERT(sizeof(struct thread_info) % PTHREADPOOL_CACHELINE_SIZE == 0, "thread_info structure must occupy an integer number of cache lines (64 bytes)"); +struct pthreadpool_1d_with_uarch_params { + /** + * Copy of the default uarch index argument passed to a microarchitecture-aware parallelization function. + */ + uint32_t default_uarch_index; + /** + * Copy of the max uarch index argument passed to a microarchitecture-aware parallelization function. + */ + uint32_t max_uarch_index; +}; + struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { /** * The number of threads that are processing an operation. @@ -194,6 +213,13 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { * The first argument to the item processing function. */ pthreadpool_atomic_void_p argument; + /** + * Additional parallelization parameters. + * These parameters are specific for each thread_function. + */ + union { + struct pthreadpool_1d_with_uarch_params parallelize_1d_with_uarch; + } params; /** * Copy of the flags passed to a parallelization function. */ @@ -220,8 +246,14 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { */ pthread_cond_t command_condvar; #endif +#if PTHREADPOOL_USE_CPUINFO /** - * The number of threads in the thread pool. Never changes after initialization. + * Indication whether cpuinfo library initialized successfully. Never changes after pthreadpool_create. + */ + bool cpuinfo_is_initialized; +#endif + /** + * The number of threads in the thread pool. Never changes after pthreadpool_create. */ size_t threads_count; /** @@ -356,6 +388,45 @@ static void thread_parallelize_1d(struct pthreadpool* threadpool, struct thread_ pthreadpool_fence_release(); } +static void thread_parallelize_1d_with_uarch(struct pthreadpool* threadpool, struct thread_info* thread) { + const pthreadpool_task_1d_with_id_t task = (pthreadpool_task_1d_with_id_t) pthreadpool_load_relaxed_void_p(&threadpool->task); + void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); + + const uint32_t default_uarch_index = threadpool->params.parallelize_1d_with_uarch.default_uarch_index; + uint32_t uarch_index = default_uarch_index; + #if PTHREADPOOL_USE_CPUINFO + if (threadpool && threadpool->cpuinfo_is_initialized) { + uarch_index = cpuinfo_get_current_uarch_index(); + if (uarch_index > threadpool->params.parallelize_1d_with_uarch.max_uarch_index) { + uarch_index = default_uarch_index; + } + } + #endif + + /* Process thread's own range of items */ + size_t range_start = pthreadpool_load_relaxed_size_t(&thread->range_start); + while (atomic_decrement(&thread->range_length)) { + task(argument, uarch_index, range_start++); + } + + /* There still may be other threads with work */ + const size_t thread_number = thread->thread_number; + const size_t threads_count = threadpool->threads_count; + for (size_t tid = modulo_decrement(thread_number, threads_count); + tid != thread_number; + tid = modulo_decrement(tid, threads_count)) + { + struct thread_info* other_thread = &threadpool->threads[tid]; + while (atomic_decrement(&other_thread->range_length)) { + const size_t item_id = pthreadpool_fetch_sub_relaxed_size_t(&other_thread->range_end, 1) - 1; + task(argument, uarch_index, item_id); + } + } + + /* Make changes by this thread visible to other threads */ + pthreadpool_fence_release(); +} + static uint32_t wait_for_new_command( struct pthreadpool* threadpool, uint32_t last_command, @@ -501,6 +572,9 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { for (size_t tid = 0; tid < threads_count; tid++) { threadpool->threads[tid].thread_number = tid; } + #if PTHREADPOOL_USE_CPUINFO + threadpool->cpuinfo_is_initialized = cpuinfo_initialize(); + #endif /* Thread pool with a single thread computes everything on the caller thread. */ if (threads_count > 1) { @@ -539,6 +613,8 @@ size_t pthreadpool_get_threads_count(struct pthreadpool* threadpool) { static void pthreadpool_parallelize( struct pthreadpool* threadpool, thread_function_t thread_function, + const void* params, + size_t params_size, void* task, void* context, size_t linear_range, @@ -570,6 +646,11 @@ static void pthreadpool_parallelize( pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); #endif + if (params_size != 0) { + memcpy(&threadpool->params, params, params_size); + pthreadpool_fence_release(); + } + /* Spread the work between threads */ size_t range_start = 0; for (size_t tid = 0; tid < threads_count; tid++) { @@ -659,11 +740,55 @@ void pthreadpool_parallelize_1d( } } else { pthreadpool_parallelize( - threadpool, &thread_parallelize_1d, + threadpool, &thread_parallelize_1d, NULL, 0, (void*) task, argument, range, flags); } } +void pthreadpool_parallelize_1d_with_uarch( + pthreadpool_t threadpool, + pthreadpool_task_1d_with_id_t task, + void* argument, + uint32_t default_uarch_index, + uint32_t max_uarch_index, + size_t range, + uint32_t flags) +{ + if (threadpool == NULL || threadpool->threads_count <= 1 || range <= 1) { + /* No thread pool used: execute task sequentially on the calling thread */ + + uint32_t uarch_index = default_uarch_index; + #if PTHREADPOOL_USE_CPUINFO + if (threadpool && threadpool->cpuinfo_is_initialized) { + uarch_index = cpuinfo_get_current_uarch_index(); + if (uarch_index > max_uarch_index) { + uarch_index = default_uarch_index; + } + } + #endif + + struct fpu_state saved_fpu_state = { 0 }; + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + saved_fpu_state = get_fpu_state(); + disable_fpu_denormals(); + } + for (size_t i = 0; i < range; i++) { + task(argument, uarch_index, i); + } + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + set_fpu_state(saved_fpu_state); + } + } else { + const struct pthreadpool_1d_with_uarch_params params = { + .default_uarch_index = default_uarch_index, + .max_uarch_index = max_uarch_index, + }; + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d_with_uarch, ¶ms, sizeof(params), + task, argument, range, flags); + } +} + struct compute_1d_tile_1d_context { pthreadpool_task_1d_tile_1d_t task; void* argument; @@ -709,7 +834,7 @@ void pthreadpool_parallelize_1d_tile_1d( .tile = tile }; pthreadpool_parallelize( - threadpool, &thread_parallelize_1d, + threadpool, &thread_parallelize_1d, NULL, 0, (void*) compute_1d_tile_1d, &context, tile_range, flags); } } @@ -757,7 +882,7 @@ void pthreadpool_parallelize_2d( .range_j = fxdiv_init_size_t(range_j) }; pthreadpool_parallelize( - threadpool, &thread_parallelize_1d, + threadpool, &thread_parallelize_1d, NULL, 0, (void*) compute_2d, &context, range_i * range_j, flags); } } @@ -817,7 +942,7 @@ void pthreadpool_parallelize_2d_tile_1d( .tile_j = tile_j }; pthreadpool_parallelize( - threadpool, &thread_parallelize_1d, + threadpool, &thread_parallelize_1d, NULL, 0, (void*) compute_2d_tile_1d, &context, range_i * tile_range_j, flags); } } @@ -883,11 +1008,94 @@ void pthreadpool_parallelize_2d_tile_2d( .tile_j = tile_j }; pthreadpool_parallelize( - threadpool, &thread_parallelize_1d, + threadpool, &thread_parallelize_1d, NULL, 0, (void*) compute_2d_tile_2d, &context, tile_range_i * tile_range_j, flags); } } +struct compute_2d_tile_2d_with_uarch_context { + pthreadpool_task_2d_tile_2d_with_id_t task; + void* argument; + struct fxdiv_divisor_size_t tile_range_j; + size_t range_i; + size_t range_j; + size_t tile_i; + size_t tile_j; +}; + +static void compute_2d_tile_2d_with_uarch(const struct compute_2d_tile_2d_with_uarch_context* context, uint32_t uarch_index, size_t linear_index) { + const struct fxdiv_divisor_size_t tile_range_j = context->tile_range_j; + const struct fxdiv_result_size_t tile_index = fxdiv_divide_size_t(linear_index, tile_range_j); + const size_t max_tile_i = context->tile_i; + const size_t max_tile_j = context->tile_j; + const size_t index_i = tile_index.quotient * max_tile_i; + const size_t index_j = tile_index.remainder * max_tile_j; + const size_t tile_i = min(max_tile_i, context->range_i - index_i); + const size_t tile_j = min(max_tile_j, context->range_j - index_j); + context->task(context->argument, uarch_index, index_i, index_j, tile_i, tile_j); +} + +void pthreadpool_parallelize_2d_tile_2d_with_uarch( + pthreadpool_t threadpool, + pthreadpool_task_2d_tile_2d_with_id_t task, + void* argument, + uint32_t default_uarch_index, + uint32_t max_uarch_index, + size_t range_i, + size_t range_j, + size_t tile_i, + size_t tile_j, + uint32_t flags) +{ + if (threadpool == NULL || threadpool->threads_count <= 1 || (range_i <= tile_i && range_j <= tile_j)) { + /* No thread pool used: execute task sequentially on the calling thread */ + + uint32_t uarch_index = default_uarch_index; + #if PTHREADPOOL_USE_CPUINFO + if (threadpool && threadpool->cpuinfo_is_initialized) { + uarch_index = cpuinfo_get_current_uarch_index(); + if (uarch_index > max_uarch_index) { + uarch_index = default_uarch_index; + } + } + #endif + + struct fpu_state saved_fpu_state = { 0 }; + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + saved_fpu_state = get_fpu_state(); + disable_fpu_denormals(); + } + for (size_t i = 0; i < range_i; i += tile_i) { + for (size_t j = 0; j < range_j; j += tile_j) { + task(argument, uarch_index, i, j, min(range_i - i, tile_i), min(range_j - j, tile_j)); + } + } + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + set_fpu_state(saved_fpu_state); + } + } else { + /* Execute in parallel on the thread pool using linearized index */ + const size_t tile_range_i = divide_round_up(range_i, tile_i); + const size_t tile_range_j = divide_round_up(range_j, tile_j); + const struct pthreadpool_1d_with_uarch_params params = { + .default_uarch_index = default_uarch_index, + .max_uarch_index = max_uarch_index, + }; + struct compute_2d_tile_2d_with_uarch_context context = { + .task = task, + .argument = argument, + .tile_range_j = fxdiv_init_size_t(tile_range_j), + .range_i = range_i, + .range_j = range_j, + .tile_i = tile_i, + .tile_j = tile_j + }; + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d_with_uarch, ¶ms, sizeof(params), + (void*) compute_2d_tile_2d_with_uarch, &context, tile_range_i * tile_range_j, flags); + } +} + struct compute_3d_tile_2d_context { pthreadpool_task_3d_tile_2d_t task; void* argument; @@ -957,11 +1165,102 @@ void pthreadpool_parallelize_3d_tile_2d( .tile_k = tile_k }; pthreadpool_parallelize( - threadpool, &thread_parallelize_1d, + threadpool, &thread_parallelize_1d, NULL, 0, (void*) compute_3d_tile_2d, &context, range_i * tile_range_j * tile_range_k, flags); } } +struct compute_3d_tile_2d_with_uarch_context { + pthreadpool_task_3d_tile_2d_with_id_t task; + void* argument; + struct fxdiv_divisor_size_t tile_range_j; + struct fxdiv_divisor_size_t tile_range_k; + size_t range_j; + size_t range_k; + size_t tile_j; + size_t tile_k; +}; + +static void compute_3d_tile_2d_with_uarch(const struct compute_3d_tile_2d_with_uarch_context* context, uint32_t uarch_index, size_t linear_index) { + const struct fxdiv_divisor_size_t tile_range_k = context->tile_range_k; + const struct fxdiv_result_size_t tile_index_ij_k = fxdiv_divide_size_t(linear_index, tile_range_k); + const struct fxdiv_divisor_size_t tile_range_j = context->tile_range_j; + const struct fxdiv_result_size_t tile_index_i_j = fxdiv_divide_size_t(tile_index_ij_k.quotient, tile_range_j); + const size_t max_tile_j = context->tile_j; + const size_t max_tile_k = context->tile_k; + const size_t index_i = tile_index_i_j.quotient; + const size_t index_j = tile_index_i_j.remainder * max_tile_j; + const size_t index_k = tile_index_ij_k.remainder * max_tile_k; + const size_t tile_j = min(max_tile_j, context->range_j - index_j); + const size_t tile_k = min(max_tile_k, context->range_k - index_k); + context->task(context->argument, uarch_index, index_i, index_j, index_k, tile_j, tile_k); +} + +void pthreadpool_parallelize_3d_tile_2d_with_uarch( + pthreadpool_t threadpool, + pthreadpool_task_3d_tile_2d_with_id_t task, + void* argument, + uint32_t default_uarch_index, + uint32_t max_uarch_index, + size_t range_i, + size_t range_j, + size_t range_k, + size_t tile_j, + size_t tile_k, + uint32_t flags) +{ + if (threadpool == NULL || threadpool->threads_count <= 1 || (range_i <= 1 && range_j <= tile_j && range_k <= tile_k)) { + /* No thread pool used: execute task sequentially on the calling thread */ + + uint32_t uarch_index = default_uarch_index; + #if PTHREADPOOL_USE_CPUINFO + if (threadpool && threadpool->cpuinfo_is_initialized) { + uarch_index = cpuinfo_get_current_uarch_index(); + if (uarch_index > max_uarch_index) { + uarch_index = default_uarch_index; + } + } + #endif + + struct fpu_state saved_fpu_state = { 0 }; + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + saved_fpu_state = get_fpu_state(); + disable_fpu_denormals(); + } + for (size_t i = 0; i < range_i; i++) { + for (size_t j = 0; j < range_j; j += tile_j) { + for (size_t k = 0; k < range_k; k += tile_k) { + task(argument, uarch_index, i, j, k, min(range_j - j, tile_j), min(range_k - k, tile_k)); + } + } + } + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + set_fpu_state(saved_fpu_state); + } + } else { + /* Execute in parallel on the thread pool using linearized index */ + const size_t tile_range_j = divide_round_up(range_j, tile_j); + const size_t tile_range_k = divide_round_up(range_k, tile_k); + const struct pthreadpool_1d_with_uarch_params params = { + .default_uarch_index = default_uarch_index, + .max_uarch_index = max_uarch_index, + }; + struct compute_3d_tile_2d_with_uarch_context context = { + .task = task, + .argument = argument, + .tile_range_j = fxdiv_init_size_t(tile_range_j), + .tile_range_k = fxdiv_init_size_t(tile_range_k), + .range_j = range_j, + .range_k = range_k, + .tile_j = tile_j, + .tile_k = tile_k + }; + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d_with_uarch, ¶ms, sizeof(params), + (void*) compute_3d_tile_2d_with_uarch, &context, range_i * tile_range_j * tile_range_k, flags); + } +} + struct compute_4d_tile_2d_context { pthreadpool_task_4d_tile_2d_t task; void* argument; @@ -1040,11 +1339,111 @@ void pthreadpool_parallelize_4d_tile_2d( .tile_l = tile_l }; pthreadpool_parallelize( - threadpool, &thread_parallelize_1d, + threadpool, &thread_parallelize_1d, NULL, 0, (void*) compute_4d_tile_2d, &context, range_i * range_j * tile_range_k * tile_range_l, flags); } } +struct compute_4d_tile_2d_with_uarch_context { + pthreadpool_task_4d_tile_2d_with_id_t task; + void* argument; + struct fxdiv_divisor_size_t tile_range_kl; + struct fxdiv_divisor_size_t range_j; + struct fxdiv_divisor_size_t tile_range_l; + size_t range_k; + size_t range_l; + size_t tile_k; + size_t tile_l; +}; + +static void compute_4d_tile_2d_with_uarch(const struct compute_4d_tile_2d_with_uarch_context* context, uint32_t uarch_index, size_t linear_index) { + const struct fxdiv_divisor_size_t tile_range_kl = context->tile_range_kl; + const struct fxdiv_result_size_t tile_index_ij_kl = fxdiv_divide_size_t(linear_index, tile_range_kl); + const struct fxdiv_divisor_size_t range_j = context->range_j; + const struct fxdiv_result_size_t tile_index_i_j = fxdiv_divide_size_t(tile_index_ij_kl.quotient, range_j); + const struct fxdiv_divisor_size_t tile_range_l = context->tile_range_l; + const struct fxdiv_result_size_t tile_index_k_l = fxdiv_divide_size_t(tile_index_ij_kl.remainder, tile_range_l); + const size_t max_tile_k = context->tile_k; + const size_t max_tile_l = context->tile_l; + const size_t index_i = tile_index_i_j.quotient; + const size_t index_j = tile_index_i_j.remainder; + const size_t index_k = tile_index_k_l.quotient * max_tile_k; + const size_t index_l = tile_index_k_l.remainder * max_tile_l; + const size_t tile_k = min(max_tile_k, context->range_k - index_k); + const size_t tile_l = min(max_tile_l, context->range_l - index_l); + context->task(context->argument, uarch_index, index_i, index_j, index_k, index_l, tile_k, tile_l); +} + +void pthreadpool_parallelize_4d_tile_2d_with_uarch( + pthreadpool_t threadpool, + pthreadpool_task_4d_tile_2d_with_id_t task, + void* argument, + uint32_t default_uarch_index, + uint32_t max_uarch_index, + size_t range_i, + size_t range_j, + size_t range_k, + size_t range_l, + size_t tile_k, + size_t tile_l, + uint32_t flags) +{ + if (threadpool == NULL || threadpool->threads_count <= 1 || ((range_i | range_j) <= 1 && range_k <= tile_k && range_l <= tile_l)) { + /* No thread pool used: execute task sequentially on the calling thread */ + + uint32_t uarch_index = default_uarch_index; + #if PTHREADPOOL_USE_CPUINFO + if (threadpool && threadpool->cpuinfo_is_initialized) { + uarch_index = cpuinfo_get_current_uarch_index(); + if (uarch_index > max_uarch_index) { + uarch_index = default_uarch_index; + } + } + #endif + + struct fpu_state saved_fpu_state = { 0 }; + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + saved_fpu_state = get_fpu_state(); + disable_fpu_denormals(); + } + for (size_t i = 0; i < range_i; i++) { + for (size_t j = 0; j < range_j; j++) { + for (size_t k = 0; k < range_k; k += tile_k) { + for (size_t l = 0; l < range_l; l += tile_l) { + task(argument, uarch_index, i, j, k, l, + min(range_k - k, tile_k), min(range_l - l, tile_l)); + } + } + } + } + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + set_fpu_state(saved_fpu_state); + } + } else { + /* Execute in parallel on the thread pool using linearized index */ + const size_t tile_range_k = divide_round_up(range_k, tile_k); + const size_t tile_range_l = divide_round_up(range_l, tile_l); + const struct pthreadpool_1d_with_uarch_params params = { + .default_uarch_index = default_uarch_index, + .max_uarch_index = max_uarch_index, + }; + struct compute_4d_tile_2d_with_uarch_context context = { + .task = task, + .argument = argument, + .tile_range_kl = fxdiv_init_size_t(tile_range_k * tile_range_l), + .range_j = fxdiv_init_size_t(range_j), + .tile_range_l = fxdiv_init_size_t(tile_range_l), + .range_k = range_k, + .range_l = range_l, + .tile_k = tile_k, + .tile_l = tile_l + }; + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d_with_uarch, ¶ms, sizeof(params), + (void*) compute_4d_tile_2d_with_uarch, &context, range_i * range_j * tile_range_k * tile_range_l, flags); + } +} + struct compute_5d_tile_2d_context { pthreadpool_task_5d_tile_2d_t task; void* argument; @@ -1132,7 +1531,7 @@ void pthreadpool_parallelize_5d_tile_2d( .tile_m = tile_m, }; pthreadpool_parallelize( - threadpool, &thread_parallelize_1d, + threadpool, &thread_parallelize_1d, NULL, 0, (void*) compute_5d_tile_2d, &context, range_i * range_j * range_k * tile_range_l * tile_range_m, flags); } } @@ -1232,7 +1631,7 @@ void pthreadpool_parallelize_6d_tile_2d( .tile_n = tile_n, }; pthreadpool_parallelize( - threadpool, &thread_parallelize_1d, + threadpool, &thread_parallelize_1d, NULL, 0, (void*) compute_6d_tile_2d, &context, range_i * range_j * range_k * range_l * tile_range_m * tile_range_n, flags); } } @@ -1289,6 +1688,11 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) { pthread_cond_destroy(&threadpool->command_condvar); #endif } + #if PTHREADPOOL_USE_CPUINFO + if (threadpool->cpuinfo_is_initialized) { + cpuinfo_deinitialize(); + } + #endif #ifdef _WIN32 _aligned_free(threadpool); #else diff --git a/src/threadpool-shim.c b/src/threadpool-shim.c index c8ef51d..b5670ea 100644 --- a/src/threadpool-shim.c +++ b/src/threadpool-shim.c @@ -28,6 +28,20 @@ void pthreadpool_parallelize_1d( } } +void pthreadpool_parallelize_1d_with_uarch( + pthreadpool_t threadpool, + pthreadpool_task_1d_with_id_t task, + void* argument, + uint32_t default_uarch_index, + uint32_t max_uarch_index, + size_t range, + uint32_t flags) +{ + for (size_t i = 0; i < range; i++) { + task(argument, default_uarch_index, i); + } +} + void pthreadpool_parallelize_1d_tile_1d( pthreadpool_t threadpool, pthreadpool_task_1d_tile_1d_t task, @@ -89,6 +103,26 @@ void pthreadpool_parallelize_2d_tile_2d( } } +void pthreadpool_parallelize_2d_tile_2d_with_uarch( + pthreadpool_t threadpool, + pthreadpool_task_2d_tile_2d_with_id_t task, + void* argument, + uint32_t default_uarch_index, + uint32_t max_uarch_index, + size_t range_i, + size_t range_j, + size_t tile_i, + size_t tile_j, + uint32_t flags) +{ + for (size_t i = 0; i < range_i; i += tile_i) { + for (size_t j = 0; j < range_j; j += tile_j) { + task(argument, default_uarch_index, i, j, + min(range_i - i, tile_i), min(range_j - j, tile_j)); + } + } +} + void pthreadpool_parallelize_3d_tile_2d( pthreadpool_t threadpool, pthreadpool_task_3d_tile_2d_t task, @@ -110,6 +144,29 @@ void pthreadpool_parallelize_3d_tile_2d( } } +void pthreadpool_parallelize_3d_tile_2d_with_uarch( + pthreadpool_t threadpool, + pthreadpool_task_3d_tile_2d_with_id_t task, + void* argument, + uint32_t default_uarch_index, + uint32_t max_uarch_index, + size_t range_i, + size_t range_j, + size_t range_k, + size_t tile_j, + size_t tile_k, + uint32_t flags) +{ + for (size_t i = 0; i < range_i; i++) { + for (size_t j = 0; j < range_j; j += tile_j) { + for (size_t k = 0; k < range_k; k += tile_k) { + task(argument, default_uarch_index, i, j, k, + min(range_j - j, tile_j), min(range_k - k, tile_k)); + } + } + } +} + void pthreadpool_parallelize_4d_tile_2d( pthreadpool_t threadpool, pthreadpool_task_4d_tile_2d_t task, @@ -134,6 +191,32 @@ void pthreadpool_parallelize_4d_tile_2d( } } +void pthreadpool_parallelize_4d_tile_2d_with_uarch( + pthreadpool_t threadpool, + pthreadpool_task_4d_tile_2d_with_id_t task, + void* argument, + uint32_t default_uarch_index, + uint32_t max_uarch_index, + size_t range_i, + size_t range_j, + size_t range_k, + size_t range_l, + size_t tile_k, + size_t tile_l, + uint32_t flags) +{ + for (size_t i = 0; i < range_i; i++) { + for (size_t j = 0; j < range_j; j++) { + for (size_t k = 0; k < range_k; k += tile_k) { + for (size_t l = 0; l < range_l; l += tile_l) { + task(argument, default_uarch_index, i, j, k, l, + min(range_k - k, tile_k), min(range_l - l, tile_l)); + } + } + } + } +} + void pthreadpool_parallelize_5d_tile_2d( pthreadpool_t threadpool, pthreadpool_task_5d_tile_2d_t task, diff --git a/test/pthreadpool.cc b/test/pthreadpool.cc index b80d98d..b8a6803 100644 --- a/test/pthreadpool.cc +++ b/test/pthreadpool.cc @@ -54,6 +54,9 @@ const size_t kIncrementIterations = 101; const size_t kIncrementIterations5D = 7; const size_t kIncrementIterations6D = 3; +const uint32_t kMaxUArchIndex = 0; +const uint32_t kDefaultUArchIndex = 42; + TEST(CreateAndDestroy, NullThreadPool) { pthreadpool* threadpool = nullptr; @@ -326,6 +329,321 @@ TEST(Parallelize1D, MultiThreadPoolWorkStealing) { EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize1DRange); } +static void ComputeNothing1DWithUArch(void*, uint32_t, size_t) { +} + +TEST(Parallelize1DWithUArch, SingleThreadPoolCompletes) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_1d_with_uarch(threadpool.get(), + ComputeNothing1DWithUArch, + nullptr, + kDefaultUArchIndex, + kMaxUArchIndex, + kParallelize1DRange, + 0 /* flags */); +} + +TEST(Parallelize1DWithUArch, MultiThreadPoolCompletes) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_with_uarch( + threadpool.get(), + ComputeNothing1DWithUArch, + nullptr, + kDefaultUArchIndex, + kMaxUArchIndex, + kParallelize1DRange, + 0 /* flags */); +} + +static void CheckUArch1DWithUArch(void*, uint32_t uarch_index, size_t) { + if (uarch_index != kDefaultUArchIndex) { + EXPECT_LE(uarch_index, kMaxUArchIndex); + } +} + +TEST(Parallelize1DWithUArch, SingleThreadPoolUArchInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_1d_with_uarch(threadpool.get(), + CheckUArch1DWithUArch, + nullptr, + kDefaultUArchIndex, + kMaxUArchIndex, + kParallelize1DRange, + 0 /* flags */); +} + +TEST(Parallelize1DWithUArch, MultiThreadPoolUArchInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_with_uarch( + threadpool.get(), + CheckUArch1DWithUArch, + nullptr, + kDefaultUArchIndex, + kMaxUArchIndex, + kParallelize1DRange, + 0 /* flags */); +} + +static void CheckBounds1DWithUArch(void*, uint32_t, size_t i) { + EXPECT_LT(i, kParallelize1DRange); +} + +TEST(Parallelize1DWithUArch, SingleThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_1d_with_uarch( + threadpool.get(), + CheckBounds1DWithUArch, + nullptr, + kDefaultUArchIndex, + kMaxUArchIndex, + kParallelize1DRange, + 0 /* flags */); +} + +TEST(Parallelize1DWithUArch, MultiThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_with_uarch( + threadpool.get(), + CheckBounds1DWithUArch, + nullptr, + kDefaultUArchIndex, + kMaxUArchIndex, + kParallelize1DRange, + 0 /* flags */); +} + +static void SetTrue1DWithUArch(std::atomic_bool* processed_indicators, uint32_t, size_t i) { + processed_indicators[i].store(true, std::memory_order_relaxed); +} + +TEST(Parallelize1DWithUArch, SingleThreadPoolAllItemsProcessed) { + std::vector indicators(kParallelize1DRange); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_1d_with_uarch( + threadpool.get(), + reinterpret_cast(SetTrue1DWithUArch), + static_cast(indicators.data()), + kDefaultUArchIndex, + kMaxUArchIndex, + kParallelize1DRange, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DRange; i++) { + EXPECT_TRUE(indicators[i].load(std::memory_order_relaxed)) + << "Element " << i << " not processed"; + } +} + +TEST(Parallelize1DWithUArch, MultiThreadPoolAllItemsProcessed) { + std::vector indicators(kParallelize1DRange); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_with_uarch( + threadpool.get(), + reinterpret_cast(SetTrue1DWithUArch), + static_cast(indicators.data()), + kDefaultUArchIndex, + kMaxUArchIndex, + kParallelize1DRange, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DRange; i++) { + EXPECT_TRUE(indicators[i].load(std::memory_order_relaxed)) + << "Element " << i << " not processed"; + } +} + +static void Increment1DWithUArch(std::atomic_int* processed_counters, uint32_t, size_t i) { + processed_counters[i].fetch_add(1, std::memory_order_relaxed); +} + +TEST(Parallelize1DWithUArch, SingleThreadPoolEachItemProcessedOnce) { + std::vector counters(kParallelize1DRange); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_1d_with_uarch( + threadpool.get(), + reinterpret_cast(Increment1DWithUArch), + static_cast(counters.data()), + kDefaultUArchIndex, + kMaxUArchIndex, + kParallelize1DRange, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DRange; i++) { + EXPECT_EQ(counters[i].load(std::memory_order_relaxed), 1) + << "Element " << i << " was processed " << counters[i].load(std::memory_order_relaxed) << " times (expected: 1)"; + } +} + +TEST(Parallelize1DWithUArch, MultiThreadPoolEachItemProcessedOnce) { + std::vector counters(kParallelize1DRange); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_with_uarch( + threadpool.get(), + reinterpret_cast(Increment1DWithUArch), + static_cast(counters.data()), + kDefaultUArchIndex, + kMaxUArchIndex, + kParallelize1DRange, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DRange; i++) { + EXPECT_EQ(counters[i].load(std::memory_order_relaxed), 1) + << "Element " << i << " was processed " << counters[i].load(std::memory_order_relaxed) << " times (expected: 1)"; + } +} + +TEST(Parallelize1DWithUArch, SingleThreadPoolEachItemProcessedMultipleTimes) { + std::vector counters(kParallelize1DRange); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + for (size_t iteration = 0; iteration < kIncrementIterations; iteration++) { + pthreadpool_parallelize_1d_with_uarch( + threadpool.get(), + reinterpret_cast(Increment1DWithUArch), + static_cast(counters.data()), + kDefaultUArchIndex, + kMaxUArchIndex, + kParallelize1DRange, + 0 /* flags */); + } + + for (size_t i = 0; i < kParallelize1DRange; i++) { + EXPECT_EQ(counters[i].load(std::memory_order_relaxed), kIncrementIterations) + << "Element " << i << " was processed " << counters[i].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations << ")"; + } +} + +TEST(Parallelize1DWithUArch, MultiThreadPoolEachItemProcessedMultipleTimes) { + std::vector counters(kParallelize1DRange); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + for (size_t iteration = 0; iteration < kIncrementIterations; iteration++) { + pthreadpool_parallelize_1d_with_uarch( + threadpool.get(), + reinterpret_cast(Increment1DWithUArch), + static_cast(counters.data()), + kDefaultUArchIndex, + kMaxUArchIndex, + kParallelize1DRange, + 0 /* flags */); + } + + for (size_t i = 0; i < kParallelize1DRange; i++) { + EXPECT_EQ(counters[i].load(std::memory_order_relaxed), kIncrementIterations) + << "Element " << i << " was processed " << counters[i].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations << ")"; + } +} + +static void IncrementSame1DWithUArch(std::atomic_int* num_processed_items, uint32_t, size_t i) { + num_processed_items->fetch_add(1, std::memory_order_relaxed); +} + +TEST(Parallelize1DWithUArch, MultiThreadPoolHighContention) { + std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_with_uarch( + threadpool.get(), + reinterpret_cast(IncrementSame1DWithUArch), + static_cast(&num_processed_items), + kDefaultUArchIndex, + kMaxUArchIndex, + kParallelize1DRange, + 0 /* flags */); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize1DRange); +} + +static void WorkImbalance1DWithUArch(std::atomic_int* num_processed_items, uint32_t, size_t i) { + num_processed_items->fetch_add(1, std::memory_order_relaxed); + if (i == 0) { + /* Spin-wait until all items are computed */ + while (num_processed_items->load(std::memory_order_relaxed) != kParallelize1DRange) { + std::atomic_thread_fence(std::memory_order_acquire); + } + } +} + +TEST(Parallelize1DWithUArch, MultiThreadPoolWorkStealing) { + std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_with_uarch( + threadpool.get(), + reinterpret_cast(WorkImbalance1DWithUArch), + static_cast(&num_processed_items), + kDefaultUArchIndex, + kMaxUArchIndex, + kParallelize1DRange, + 0 /* flags */); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize1DRange); +} + static void ComputeNothing1DTile1D(void*, size_t, size_t) { } @@ -1569,22 +1887,23 @@ TEST(Parallelize2DTile2D, MultiThreadPoolWorkStealing) { EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize2DTile2DRangeI * kParallelize2DTile2DRangeJ); } -static void ComputeNothing3DTile2D(void*, size_t, size_t, size_t, size_t, size_t) { +static void ComputeNothing2DTile2DWithUArch(void*, uint32_t, size_t, size_t, size_t, size_t) { } -TEST(Parallelize3DTile2D, SingleThreadPoolCompletes) { +TEST(Parallelize2DTile2DWithUArch, SingleThreadPoolCompletes) { auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); - pthreadpool_parallelize_3d_tile_2d(threadpool.get(), - ComputeNothing3DTile2D, + pthreadpool_parallelize_2d_tile_2d_with_uarch(threadpool.get(), + ComputeNothing2DTile2DWithUArch, nullptr, - kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, - kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, 0 /* flags */); } -TEST(Parallelize3DTile2D, MultiThreadPoolCompletes) { +TEST(Parallelize2DTile2DWithUArch, MultiThreadPoolCompletes) { auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); @@ -1592,37 +1911,1135 @@ TEST(Parallelize3DTile2D, MultiThreadPoolCompletes) { GTEST_SKIP(); } - pthreadpool_parallelize_3d_tile_2d( + pthreadpool_parallelize_2d_tile_2d_with_uarch( threadpool.get(), - ComputeNothing3DTile2D, + ComputeNothing2DTile2DWithUArch, nullptr, - kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, - kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, 0 /* flags */); } -static void CheckBounds3DTile2D(void*, size_t i, size_t start_j, size_t start_k, size_t tile_j, size_t tile_k) { - EXPECT_LT(i, kParallelize3DTile2DRangeI); - EXPECT_LT(start_j, kParallelize3DTile2DRangeJ); - EXPECT_LT(start_k, kParallelize3DTile2DRangeK); +static void CheckUArch2DTile2DWithUArch(void*, uint32_t uarch_index, size_t, size_t, size_t, size_t) { + if (uarch_index != kDefaultUArchIndex) { + EXPECT_LE(uarch_index, kMaxUArchIndex); + } +} + +TEST(Parallelize2DTile2DWithUArch, SingleThreadPoolUArchInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_2d_tile_2d_with_uarch( + threadpool.get(), + CheckUArch2DTile2DWithUArch, + nullptr, + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); +} + +TEST(Parallelize2DTile2DWithUArch, MultiThreadPoolUArchInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_2d_tile_2d_with_uarch( + threadpool.get(), + CheckUArch2DTile2DWithUArch, + nullptr, + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); +} + +static void CheckBounds2DTile2DWithUArch(void*, uint32_t, size_t start_i, size_t start_j, size_t tile_i, size_t tile_j) { + EXPECT_LT(start_i, kParallelize2DTile2DRangeI); + EXPECT_LT(start_j, kParallelize2DTile2DRangeJ); + EXPECT_LE(start_i + tile_i, kParallelize2DTile2DRangeI); + EXPECT_LE(start_j + tile_j, kParallelize2DTile2DRangeJ); +} + +TEST(Parallelize2DTile2DWithUArch, SingleThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_2d_tile_2d_with_uarch( + threadpool.get(), + CheckBounds2DTile2DWithUArch, + nullptr, + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); +} + +TEST(Parallelize2DTile2DWithUArch, MultiThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_2d_tile_2d_with_uarch( + threadpool.get(), + CheckBounds2DTile2DWithUArch, + nullptr, + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); +} + +static void CheckTiling2DTile2DWithUArch(void*, uint32_t, size_t start_i, size_t start_j, size_t tile_i, size_t tile_j) { + EXPECT_GT(tile_i, 0); + EXPECT_LE(tile_i, kParallelize2DTile2DTileI); + EXPECT_EQ(start_i % kParallelize2DTile2DTileI, 0); + EXPECT_EQ(tile_i, std::min(kParallelize2DTile2DTileI, kParallelize2DTile2DRangeI - start_i)); + + EXPECT_GT(tile_j, 0); + EXPECT_LE(tile_j, kParallelize2DTile2DTileJ); + EXPECT_EQ(start_j % kParallelize2DTile2DTileJ, 0); + EXPECT_EQ(tile_j, std::min(kParallelize2DTile2DTileJ, kParallelize2DTile2DRangeJ - start_j)); +} + +TEST(Parallelize2DTile2DWithUArch, SingleThreadPoolUniformTiling) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_2d_tile_2d_with_uarch( + threadpool.get(), + CheckTiling2DTile2DWithUArch, + nullptr, + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); +} + +TEST(Parallelize2DTile2DWithUArch, MultiThreadPoolUniformTiling) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_2d_tile_2d_with_uarch( + threadpool.get(), + CheckTiling2DTile2DWithUArch, + nullptr, + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); +} + +static void SetTrue2DTile2DWithUArch(std::atomic_bool* processed_indicators, uint32_t, size_t start_i, size_t start_j, size_t tile_i, size_t tile_j) { + for (size_t i = start_i; i < start_i + tile_i; i++) { + for (size_t j = start_j; j < start_j + tile_j; j++) { + const size_t linear_idx = i * kParallelize2DTile2DRangeJ + j; + processed_indicators[linear_idx].store(true, std::memory_order_relaxed); + } + } +} + +TEST(Parallelize2DTile2DWithUArch, SingleThreadPoolAllItemsProcessed) { + std::vector indicators(kParallelize2DTile2DRangeI * kParallelize2DTile2DRangeJ); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_2d_tile_2d_with_uarch( + threadpool.get(), + reinterpret_cast(SetTrue2DTile2DWithUArch), + static_cast(indicators.data()), + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize2DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize2DTile2DRangeJ; j++) { + const size_t linear_idx = i * kParallelize2DTile2DRangeJ + j; + EXPECT_TRUE(indicators[linear_idx].load(std::memory_order_relaxed)) + << "Element (" << i << ", " << j << ") not processed"; + } + } +} + +TEST(Parallelize2DTile2DWithUArch, MultiThreadPoolAllItemsProcessed) { + std::vector indicators(kParallelize2DTile2DRangeI * kParallelize2DTile2DRangeJ); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_2d_tile_2d_with_uarch( + threadpool.get(), + reinterpret_cast(SetTrue2DTile2DWithUArch), + static_cast(indicators.data()), + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize2DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize2DTile2DRangeJ; j++) { + const size_t linear_idx = i * kParallelize2DTile2DRangeJ + j; + EXPECT_TRUE(indicators[linear_idx].load(std::memory_order_relaxed)) + << "Element (" << i << ", " << j << ") not processed"; + } + } +} + +static void Increment2DTile2DWithUArch(std::atomic_int* processed_counters, uint32_t, size_t start_i, size_t start_j, size_t tile_i, size_t tile_j) { + for (size_t i = start_i; i < start_i + tile_i; i++) { + for (size_t j = start_j; j < start_j + tile_j; j++) { + const size_t linear_idx = i * kParallelize2DTile2DRangeJ + j; + processed_counters[linear_idx].fetch_add(1, std::memory_order_relaxed); + } + } +} + +TEST(Parallelize2DTile2DWithUArch, SingleThreadPoolEachItemProcessedOnce) { + std::vector counters(kParallelize2DTile2DRangeI * kParallelize2DTile2DRangeJ); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_2d_tile_2d_with_uarch( + threadpool.get(), + reinterpret_cast(Increment2DTile2DWithUArch), + static_cast(counters.data()), + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize2DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize2DTile2DRangeJ; j++) { + const size_t linear_idx = i * kParallelize2DTile2DRangeJ + j; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), 1) + << "Element (" << i << ", " << j << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times (expected: 1)"; + } + } +} + +TEST(Parallelize2DTile2DWithUArch, MultiThreadPoolEachItemProcessedOnce) { + std::vector counters(kParallelize2DTile2DRangeI * kParallelize2DTile2DRangeJ); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_2d_tile_2d_with_uarch( + threadpool.get(), + reinterpret_cast(Increment2DTile2DWithUArch), + static_cast(counters.data()), + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize2DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize2DTile2DRangeJ; j++) { + const size_t linear_idx = i * kParallelize2DTile2DRangeJ + j; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), 1) + << "Element (" << i << ", " << j << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times (expected: 1)"; + } + } +} + +TEST(Parallelize2DTile2DWithUArch, SingleThreadPoolEachItemProcessedMultipleTimes) { + std::vector counters(kParallelize2DTile2DRangeI * kParallelize2DTile2DRangeJ); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + for (size_t iteration = 0; iteration < kIncrementIterations; iteration++) { + pthreadpool_parallelize_2d_tile_2d_with_uarch( + threadpool.get(), + reinterpret_cast(Increment2DTile2DWithUArch), + static_cast(counters.data()), + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); + } + + for (size_t i = 0; i < kParallelize2DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize2DTile2DRangeJ; j++) { + const size_t linear_idx = i * kParallelize2DTile2DRangeJ + j; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), kIncrementIterations) + << "Element (" << i << ", " << j << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations << ")"; + } + } +} + +TEST(Parallelize2DTile2DWithUArch, MultiThreadPoolEachItemProcessedMultipleTimes) { + std::vector counters(kParallelize2DTile2DRangeI * kParallelize2DTile2DRangeJ); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + for (size_t iteration = 0; iteration < kIncrementIterations; iteration++) { + pthreadpool_parallelize_2d_tile_2d_with_uarch( + threadpool.get(), + reinterpret_cast(Increment2DTile2DWithUArch), + static_cast(counters.data()), + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); + } + + for (size_t i = 0; i < kParallelize2DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize2DTile2DRangeJ; j++) { + const size_t linear_idx = i * kParallelize2DTile2DRangeJ + j; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), kIncrementIterations) + << "Element (" << i << ", " << j << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations << ")"; + } + } +} + +static void IncrementSame2DTile2DWithUArch(std::atomic_int* num_processed_items, uint32_t, size_t start_i, size_t start_j, size_t tile_i, size_t tile_j) { + for (size_t i = start_i; i < start_i + tile_i; i++) { + for (size_t j = start_j; j < start_j + tile_j; j++) { + num_processed_items->fetch_add(1, std::memory_order_relaxed); + } + } +} + +TEST(Parallelize2DTile2DWithUArch, MultiThreadPoolHighContention) { + std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_2d_tile_2d_with_uarch( + threadpool.get(), + reinterpret_cast(IncrementSame2DTile2DWithUArch), + static_cast(&num_processed_items), + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize2DTile2DRangeI * kParallelize2DTile2DRangeJ); +} + +static void WorkImbalance2DTile2DWithUArch(std::atomic_int* num_processed_items, uint32_t, size_t start_i, size_t start_j, size_t tile_i, size_t tile_j) { + num_processed_items->fetch_add(tile_i * tile_j, std::memory_order_relaxed); + if (start_i == 0 && start_j == 0) { + /* Spin-wait until all items are computed */ + while (num_processed_items->load(std::memory_order_relaxed) != kParallelize2DTile2DRangeI * kParallelize2DTile2DRangeJ) { + std::atomic_thread_fence(std::memory_order_acquire); + } + } +} + +TEST(Parallelize2DTile2DWithUArch, MultiThreadPoolWorkStealing) { + std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_2d_tile_2d_with_uarch( + threadpool.get(), + reinterpret_cast(WorkImbalance2DTile2DWithUArch), + static_cast(&num_processed_items), + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize2DTile2DRangeI * kParallelize2DTile2DRangeJ); +} + +static void ComputeNothing3DTile2D(void*, size_t, size_t, size_t, size_t, size_t) { +} + +TEST(Parallelize3DTile2D, SingleThreadPoolCompletes) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_3d_tile_2d(threadpool.get(), + ComputeNothing3DTile2D, + nullptr, + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); +} + +TEST(Parallelize3DTile2D, MultiThreadPoolCompletes) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_3d_tile_2d( + threadpool.get(), + ComputeNothing3DTile2D, + nullptr, + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); +} + +static void CheckBounds3DTile2D(void*, size_t i, size_t start_j, size_t start_k, size_t tile_j, size_t tile_k) { + EXPECT_LT(i, kParallelize3DTile2DRangeI); + EXPECT_LT(start_j, kParallelize3DTile2DRangeJ); + EXPECT_LT(start_k, kParallelize3DTile2DRangeK); + EXPECT_LE(start_j + tile_j, kParallelize3DTile2DRangeJ); + EXPECT_LE(start_k + tile_k, kParallelize3DTile2DRangeK); +} + +TEST(Parallelize3DTile2D, SingleThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_3d_tile_2d( + threadpool.get(), + CheckBounds3DTile2D, + nullptr, + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); +} + +TEST(Parallelize3DTile2D, MultiThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_3d_tile_2d( + threadpool.get(), + CheckBounds3DTile2D, + nullptr, + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); +} + +static void CheckTiling3DTile2D(void*, size_t i, size_t start_j, size_t start_k, size_t tile_j, size_t tile_k) { + EXPECT_GT(tile_j, 0); + EXPECT_LE(tile_j, kParallelize3DTile2DTileJ); + EXPECT_EQ(start_j % kParallelize3DTile2DTileJ, 0); + EXPECT_EQ(tile_j, std::min(kParallelize3DTile2DTileJ, kParallelize3DTile2DRangeJ - start_j)); + + EXPECT_GT(tile_k, 0); + EXPECT_LE(tile_k, kParallelize3DTile2DTileK); + EXPECT_EQ(start_k % kParallelize3DTile2DTileK, 0); + EXPECT_EQ(tile_k, std::min(kParallelize3DTile2DTileK, kParallelize3DTile2DRangeK - start_k)); +} + +TEST(Parallelize3DTile2D, SingleThreadPoolUniformTiling) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_3d_tile_2d( + threadpool.get(), + CheckTiling3DTile2D, + nullptr, + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); +} + +TEST(Parallelize3DTile2D, MultiThreadPoolUniformTiling) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_3d_tile_2d( + threadpool.get(), + CheckTiling3DTile2D, + nullptr, + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); +} + +static void SetTrue3DTile2D(std::atomic_bool* processed_indicators, size_t i, size_t start_j, size_t start_k, size_t tile_j, size_t tile_k) { + for (size_t j = start_j; j < start_j + tile_j; j++) { + for (size_t k = start_k; k < start_k + tile_k; k++) { + const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; + processed_indicators[linear_idx].store(true, std::memory_order_relaxed); + } + } +} + +TEST(Parallelize3DTile2D, SingleThreadPoolAllItemsProcessed) { + std::vector indicators(kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_3d_tile_2d( + threadpool.get(), + reinterpret_cast(SetTrue3DTile2D), + static_cast(indicators.data()), + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize3DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize3DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize3DTile2DRangeK; k++) { + const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; + EXPECT_TRUE(indicators[linear_idx].load(std::memory_order_relaxed)) + << "Element (" << i << ", " << j << ", " << k << ") not processed"; + } + } + } +} + +TEST(Parallelize3DTile2D, MultiThreadPoolAllItemsProcessed) { + std::vector indicators(kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_3d_tile_2d( + threadpool.get(), + reinterpret_cast(SetTrue3DTile2D), + static_cast(indicators.data()), + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize3DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize3DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize3DTile2DRangeK; k++) { + const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; + EXPECT_TRUE(indicators[linear_idx].load(std::memory_order_relaxed)) + << "Element (" << i << ", " << j << ", " << k << ") not processed"; + } + } + } +} + +static void Increment3DTile2D(std::atomic_int* processed_counters, size_t i, size_t start_j, size_t start_k, size_t tile_j, size_t tile_k) { + for (size_t j = start_j; j < start_j + tile_j; j++) { + for (size_t k = start_k; k < start_k + tile_k; k++) { + const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; + processed_counters[linear_idx].fetch_add(1, std::memory_order_relaxed); + } + } +} + +TEST(Parallelize3DTile2D, SingleThreadPoolEachItemProcessedOnce) { + std::vector counters(kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_3d_tile_2d( + threadpool.get(), + reinterpret_cast(Increment3DTile2D), + static_cast(counters.data()), + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize3DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize3DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize3DTile2DRangeK; k++) { + const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), 1) + << "Element (" << i << ", " << j << ", " << k << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times (expected: 1)"; + } + } + } +} + +TEST(Parallelize3DTile2D, MultiThreadPoolEachItemProcessedOnce) { + std::vector counters(kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_3d_tile_2d( + threadpool.get(), + reinterpret_cast(Increment3DTile2D), + static_cast(counters.data()), + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize3DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize3DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize3DTile2DRangeK; k++) { + const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), 1) + << "Element (" << i << ", " << j << ", " << k << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times (expected: 1)"; + } + } + } +} + +TEST(Parallelize3DTile2D, SingleThreadPoolEachItemProcessedMultipleTimes) { + std::vector counters(kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + for (size_t iteration = 0; iteration < kIncrementIterations; iteration++) { + pthreadpool_parallelize_3d_tile_2d( + threadpool.get(), + reinterpret_cast(Increment3DTile2D), + static_cast(counters.data()), + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); + } + + for (size_t i = 0; i < kParallelize3DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize3DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize3DTile2DRangeK; k++) { + const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), kIncrementIterations) + << "Element (" << i << ", " << j << ", " << k << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations << ")"; + } + } + } +} + +TEST(Parallelize3DTile2D, MultiThreadPoolEachItemProcessedMultipleTimes) { + std::vector counters(kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + for (size_t iteration = 0; iteration < kIncrementIterations; iteration++) { + pthreadpool_parallelize_3d_tile_2d( + threadpool.get(), + reinterpret_cast(Increment3DTile2D), + static_cast(counters.data()), + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); + } + + for (size_t i = 0; i < kParallelize3DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize3DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize3DTile2DRangeK; k++) { + const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), kIncrementIterations) + << "Element (" << i << ", " << j << ", " << k << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations << ")"; + } + } + } +} + +static void IncrementSame3DTile2D(std::atomic_int* num_processed_items, size_t i, size_t start_j, size_t start_k, size_t tile_j, size_t tile_k) { + for (size_t j = start_j; j < start_j + tile_j; j++) { + for (size_t k = start_k; k < start_k + tile_k; k++) { + num_processed_items->fetch_add(1, std::memory_order_relaxed); + } + } +} + +TEST(Parallelize3DTile2D, MultiThreadPoolHighContention) { + std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_3d_tile_2d( + threadpool.get(), + reinterpret_cast(IncrementSame3DTile2D), + static_cast(&num_processed_items), + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); +} + +static void WorkImbalance3DTile2D(std::atomic_int* num_processed_items, size_t i, size_t start_j, size_t start_k, size_t tile_j, size_t tile_k) { + num_processed_items->fetch_add(tile_j * tile_k, std::memory_order_relaxed); + if (i == 0 && start_j == 0 && start_k == 0) { + /* Spin-wait until all items are computed */ + while (num_processed_items->load(std::memory_order_relaxed) != kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK) { + std::atomic_thread_fence(std::memory_order_acquire); + } + } +} + +TEST(Parallelize3DTile2D, MultiThreadPoolWorkStealing) { + std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_3d_tile_2d( + threadpool.get(), + reinterpret_cast(WorkImbalance3DTile2D), + static_cast(&num_processed_items), + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); +} + +static void ComputeNothing3DTile2DWithUArch(void*, uint32_t, size_t, size_t, size_t, size_t, size_t) { +} + +TEST(Parallelize3DTile2DWithUArch, SingleThreadPoolCompletes) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_3d_tile_2d_with_uarch(threadpool.get(), + ComputeNothing3DTile2DWithUArch, + nullptr, + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); +} + +TEST(Parallelize3DTile2DWithUArch, MultiThreadPoolCompletes) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_3d_tile_2d_with_uarch( + threadpool.get(), + ComputeNothing3DTile2DWithUArch, + nullptr, + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); +} + +static void CheckUArch3DTile2DWithUArch(void*, uint32_t uarch_index, size_t, size_t, size_t, size_t, size_t) { + if (uarch_index != kDefaultUArchIndex) { + EXPECT_LE(uarch_index, kMaxUArchIndex); + } +} + +TEST(Parallelize3DTile2DWithUArch, SingleThreadPoolUArchInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_3d_tile_2d_with_uarch( + threadpool.get(), + CheckUArch3DTile2DWithUArch, + nullptr, + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); +} + +TEST(Parallelize3DTile2DWithUArch, MultiThreadPoolUArchInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_3d_tile_2d_with_uarch( + threadpool.get(), + CheckUArch3DTile2DWithUArch, + nullptr, + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); +} + +static void CheckBounds3DTile2DWithUArch(void*, uint32_t, size_t i, size_t start_j, size_t start_k, size_t tile_j, size_t tile_k) { + EXPECT_LT(i, kParallelize3DTile2DRangeI); + EXPECT_LT(start_j, kParallelize3DTile2DRangeJ); + EXPECT_LT(start_k, kParallelize3DTile2DRangeK); EXPECT_LE(start_j + tile_j, kParallelize3DTile2DRangeJ); EXPECT_LE(start_k + tile_k, kParallelize3DTile2DRangeK); } -TEST(Parallelize3DTile2D, SingleThreadPoolAllItemsInBounds) { +TEST(Parallelize3DTile2DWithUArch, SingleThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_3d_tile_2d_with_uarch( + threadpool.get(), + CheckBounds3DTile2DWithUArch, + nullptr, + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); +} + +TEST(Parallelize3DTile2DWithUArch, MultiThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_3d_tile_2d_with_uarch( + threadpool.get(), + CheckBounds3DTile2DWithUArch, + nullptr, + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); +} + +static void CheckTiling3DTile2DWithUArch(void*, uint32_t, size_t i, size_t start_j, size_t start_k, size_t tile_j, size_t tile_k) { + EXPECT_GT(tile_j, 0); + EXPECT_LE(tile_j, kParallelize3DTile2DTileJ); + EXPECT_EQ(start_j % kParallelize3DTile2DTileJ, 0); + EXPECT_EQ(tile_j, std::min(kParallelize3DTile2DTileJ, kParallelize3DTile2DRangeJ - start_j)); + + EXPECT_GT(tile_k, 0); + EXPECT_LE(tile_k, kParallelize3DTile2DTileK); + EXPECT_EQ(start_k % kParallelize3DTile2DTileK, 0); + EXPECT_EQ(tile_k, std::min(kParallelize3DTile2DTileK, kParallelize3DTile2DRangeK - start_k)); +} + +TEST(Parallelize3DTile2DWithUArch, SingleThreadPoolUniformTiling) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_3d_tile_2d_with_uarch( + threadpool.get(), + CheckTiling3DTile2DWithUArch, + nullptr, + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); +} + +TEST(Parallelize3DTile2DWithUArch, MultiThreadPoolUniformTiling) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_3d_tile_2d_with_uarch( + threadpool.get(), + CheckTiling3DTile2DWithUArch, + nullptr, + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); +} + +static void SetTrue3DTile2DWithUArch(std::atomic_bool* processed_indicators, uint32_t, size_t i, size_t start_j, size_t start_k, size_t tile_j, size_t tile_k) { + for (size_t j = start_j; j < start_j + tile_j; j++) { + for (size_t k = start_k; k < start_k + tile_k; k++) { + const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; + processed_indicators[linear_idx].store(true, std::memory_order_relaxed); + } + } +} + +TEST(Parallelize3DTile2DWithUArch, SingleThreadPoolAllItemsProcessed) { + std::vector indicators(kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_3d_tile_2d_with_uarch( + threadpool.get(), + reinterpret_cast(SetTrue3DTile2DWithUArch), + static_cast(indicators.data()), + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize3DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize3DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize3DTile2DRangeK; k++) { + const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; + EXPECT_TRUE(indicators[linear_idx].load(std::memory_order_relaxed)) + << "Element (" << i << ", " << j << ", " << k << ") not processed"; + } + } + } +} + +TEST(Parallelize3DTile2DWithUArch, MultiThreadPoolAllItemsProcessed) { + std::vector indicators(kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_3d_tile_2d_with_uarch( + threadpool.get(), + reinterpret_cast(SetTrue3DTile2DWithUArch), + static_cast(indicators.data()), + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize3DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize3DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize3DTile2DRangeK; k++) { + const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; + EXPECT_TRUE(indicators[linear_idx].load(std::memory_order_relaxed)) + << "Element (" << i << ", " << j << ", " << k << ") not processed"; + } + } + } +} + +static void Increment3DTile2DWithUArch(std::atomic_int* processed_counters, uint32_t, size_t i, size_t start_j, size_t start_k, size_t tile_j, size_t tile_k) { + for (size_t j = start_j; j < start_j + tile_j; j++) { + for (size_t k = start_k; k < start_k + tile_k; k++) { + const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; + processed_counters[linear_idx].fetch_add(1, std::memory_order_relaxed); + } + } +} + +TEST(Parallelize3DTile2DWithUArch, SingleThreadPoolEachItemProcessedOnce) { + std::vector counters(kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_3d_tile_2d_with_uarch( + threadpool.get(), + reinterpret_cast(Increment3DTile2DWithUArch), + static_cast(counters.data()), + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize3DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize3DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize3DTile2DRangeK; k++) { + const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), 1) + << "Element (" << i << ", " << j << ", " << k << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times (expected: 1)"; + } + } + } +} + +TEST(Parallelize3DTile2DWithUArch, MultiThreadPoolEachItemProcessedOnce) { + std::vector counters(kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_3d_tile_2d_with_uarch( + threadpool.get(), + reinterpret_cast(Increment3DTile2DWithUArch), + static_cast(counters.data()), + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize3DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize3DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize3DTile2DRangeK; k++) { + const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), 1) + << "Element (" << i << ", " << j << ", " << k << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times (expected: 1)"; + } + } + } +} + +TEST(Parallelize3DTile2DWithUArch, SingleThreadPoolEachItemProcessedMultipleTimes) { + std::vector counters(kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); - pthreadpool_parallelize_3d_tile_2d( + for (size_t iteration = 0; iteration < kIncrementIterations; iteration++) { + pthreadpool_parallelize_3d_tile_2d_with_uarch( + threadpool.get(), + reinterpret_cast(Increment3DTile2DWithUArch), + static_cast(counters.data()), + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); + } + + for (size_t i = 0; i < kParallelize3DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize3DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize3DTile2DRangeK; k++) { + const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), kIncrementIterations) + << "Element (" << i << ", " << j << ", " << k << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations << ")"; + } + } + } +} + +TEST(Parallelize3DTile2DWithUArch, MultiThreadPoolEachItemProcessedMultipleTimes) { + std::vector counters(kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + for (size_t iteration = 0; iteration < kIncrementIterations; iteration++) { + pthreadpool_parallelize_3d_tile_2d_with_uarch( + threadpool.get(), + reinterpret_cast(Increment3DTile2DWithUArch), + static_cast(counters.data()), + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); + } + + for (size_t i = 0; i < kParallelize3DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize3DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize3DTile2DRangeK; k++) { + const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), kIncrementIterations) + << "Element (" << i << ", " << j << ", " << k << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations << ")"; + } + } + } +} + +static void IncrementSame3DTile2DWithUArch(std::atomic_int* num_processed_items, uint32_t, size_t i, size_t start_j, size_t start_k, size_t tile_j, size_t tile_k) { + for (size_t j = start_j; j < start_j + tile_j; j++) { + for (size_t k = start_k; k < start_k + tile_k; k++) { + num_processed_items->fetch_add(1, std::memory_order_relaxed); + } + } +} + +TEST(Parallelize3DTile2DWithUArch, MultiThreadPoolHighContention) { + std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_3d_tile_2d_with_uarch( threadpool.get(), - CheckBounds3DTile2D, - nullptr, + reinterpret_cast(IncrementSame3DTile2DWithUArch), + static_cast(&num_processed_items), + kDefaultUArchIndex, kMaxUArchIndex, kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, 0 /* flags */); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); } -TEST(Parallelize3DTile2D, MultiThreadPoolAllItemsInBounds) { +static void WorkImbalance3DTile2DWithUArch(std::atomic_int* num_processed_items, uint32_t, size_t i, size_t start_j, size_t start_k, size_t tile_j, size_t tile_k) { + num_processed_items->fetch_add(tile_j * tile_k, std::memory_order_relaxed); + if (i == 0 && start_j == 0 && start_k == 0) { + /* Spin-wait until all items are computed */ + while (num_processed_items->load(std::memory_order_relaxed) != kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK) { + std::atomic_thread_fence(std::memory_order_acquire); + } + } +} + +TEST(Parallelize3DTile2DWithUArch, MultiThreadPoolWorkStealing) { + std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); @@ -1630,41 +3047,114 @@ TEST(Parallelize3DTile2D, MultiThreadPoolAllItemsInBounds) { GTEST_SKIP(); } - pthreadpool_parallelize_3d_tile_2d( + pthreadpool_parallelize_3d_tile_2d_with_uarch( threadpool.get(), - CheckBounds3DTile2D, - nullptr, + reinterpret_cast(WorkImbalance3DTile2DWithUArch), + static_cast(&num_processed_items), + kDefaultUArchIndex, kMaxUArchIndex, kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, 0 /* flags */); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); } -static void CheckTiling3DTile2D(void*, size_t i, size_t start_j, size_t start_k, size_t tile_j, size_t tile_k) { - EXPECT_GT(tile_j, 0); - EXPECT_LE(tile_j, kParallelize3DTile2DTileJ); - EXPECT_EQ(start_j % kParallelize3DTile2DTileJ, 0); - EXPECT_EQ(tile_j, std::min(kParallelize3DTile2DTileJ, kParallelize3DTile2DRangeJ - start_j)); +static void ComputeNothing4DTile2D(void*, size_t, size_t, size_t, size_t, size_t, size_t) { +} + +TEST(Parallelize4DTile2D, SingleThreadPoolCompletes) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_4d_tile_2d(threadpool.get(), + ComputeNothing4DTile2D, + nullptr, + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, + 0 /* flags */); +} + +TEST(Parallelize4DTile2D, MultiThreadPoolCompletes) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_4d_tile_2d( + threadpool.get(), + ComputeNothing4DTile2D, + nullptr, + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, + 0 /* flags */); +} + +static void CheckBounds4DTile2D(void*, size_t i, size_t j, size_t start_k, size_t start_l, size_t tile_k, size_t tile_l) { + EXPECT_LT(i, kParallelize4DTile2DRangeI); + EXPECT_LT(j, kParallelize4DTile2DRangeJ); + EXPECT_LT(start_k, kParallelize4DTile2DRangeK); + EXPECT_LT(start_l, kParallelize4DTile2DRangeL); + EXPECT_LE(start_k + tile_k, kParallelize4DTile2DRangeK); + EXPECT_LE(start_l + tile_l, kParallelize4DTile2DRangeL); +} + +TEST(Parallelize4DTile2D, SingleThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_4d_tile_2d( + threadpool.get(), + CheckBounds4DTile2D, + nullptr, + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, + 0 /* flags */); +} + +TEST(Parallelize4DTile2D, MultiThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_4d_tile_2d( + threadpool.get(), + CheckBounds4DTile2D, + nullptr, + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, + 0 /* flags */); +} +static void CheckTiling4DTile2D(void*, size_t i, size_t j, size_t start_k, size_t start_l, size_t tile_k, size_t tile_l) { EXPECT_GT(tile_k, 0); - EXPECT_LE(tile_k, kParallelize3DTile2DTileK); - EXPECT_EQ(start_k % kParallelize3DTile2DTileK, 0); - EXPECT_EQ(tile_k, std::min(kParallelize3DTile2DTileK, kParallelize3DTile2DRangeK - start_k)); + EXPECT_LE(tile_k, kParallelize4DTile2DTileK); + EXPECT_EQ(start_k % kParallelize4DTile2DTileK, 0); + EXPECT_EQ(tile_k, std::min(kParallelize4DTile2DTileK, kParallelize4DTile2DRangeK - start_k)); + + EXPECT_GT(tile_l, 0); + EXPECT_LE(tile_l, kParallelize4DTile2DTileL); + EXPECT_EQ(start_l % kParallelize4DTile2DTileL, 0); + EXPECT_EQ(tile_l, std::min(kParallelize4DTile2DTileL, kParallelize4DTile2DRangeL - start_l)); } -TEST(Parallelize3DTile2D, SingleThreadPoolUniformTiling) { +TEST(Parallelize4DTile2D, SingleThreadPoolUniformTiling) { auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); - pthreadpool_parallelize_3d_tile_2d( + pthreadpool_parallelize_4d_tile_2d( threadpool.get(), - CheckTiling3DTile2D, + CheckTiling4DTile2D, nullptr, - kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, - kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, 0 /* flags */); } -TEST(Parallelize3DTile2D, MultiThreadPoolUniformTiling) { +TEST(Parallelize4DTile2D, MultiThreadPoolUniformTiling) { auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); @@ -1672,51 +3162,53 @@ TEST(Parallelize3DTile2D, MultiThreadPoolUniformTiling) { GTEST_SKIP(); } - pthreadpool_parallelize_3d_tile_2d( + pthreadpool_parallelize_4d_tile_2d( threadpool.get(), - CheckTiling3DTile2D, + CheckTiling4DTile2D, nullptr, - kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, - kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, 0 /* flags */); } -static void SetTrue3DTile2D(std::atomic_bool* processed_indicators, size_t i, size_t start_j, size_t start_k, size_t tile_j, size_t tile_k) { - for (size_t j = start_j; j < start_j + tile_j; j++) { - for (size_t k = start_k; k < start_k + tile_k; k++) { - const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; +static void SetTrue4DTile2D(std::atomic_bool* processed_indicators, size_t i, size_t j, size_t start_k, size_t start_l, size_t tile_k, size_t tile_l) { + for (size_t k = start_k; k < start_k + tile_k; k++) { + for (size_t l = start_l; l < start_l + tile_l; l++) { + const size_t linear_idx = ((i * kParallelize4DTile2DRangeJ + j) * kParallelize4DTile2DRangeK + k) * kParallelize4DTile2DRangeL + l; processed_indicators[linear_idx].store(true, std::memory_order_relaxed); } } } -TEST(Parallelize3DTile2D, SingleThreadPoolAllItemsProcessed) { - std::vector indicators(kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); +TEST(Parallelize4DTile2D, SingleThreadPoolAllItemsProcessed) { + std::vector indicators(kParallelize4DTile2DRangeI * kParallelize4DTile2DRangeJ * kParallelize4DTile2DRangeK * kParallelize4DTile2DRangeL); auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); - pthreadpool_parallelize_3d_tile_2d( + pthreadpool_parallelize_4d_tile_2d( threadpool.get(), - reinterpret_cast(SetTrue3DTile2D), + reinterpret_cast(SetTrue4DTile2D), static_cast(indicators.data()), - kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, - kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, 0 /* flags */); - for (size_t i = 0; i < kParallelize3DTile2DRangeI; i++) { - for (size_t j = 0; j < kParallelize3DTile2DRangeJ; j++) { - for (size_t k = 0; k < kParallelize3DTile2DRangeK; k++) { - const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; - EXPECT_TRUE(indicators[linear_idx].load(std::memory_order_relaxed)) - << "Element (" << i << ", " << j << ", " << k << ") not processed"; + for (size_t i = 0; i < kParallelize4DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize4DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize4DTile2DRangeK; k++) { + for (size_t l = 0; l < kParallelize4DTile2DRangeL; l++) { + const size_t linear_idx = ((i * kParallelize4DTile2DRangeJ + j) * kParallelize4DTile2DRangeK + k) * kParallelize4DTile2DRangeL + l; + EXPECT_TRUE(indicators[linear_idx].load(std::memory_order_relaxed)) + << "Element (" << i << ", " << j << ", " << k << ", " << l << ") not processed"; + } } } } } -TEST(Parallelize3DTile2D, MultiThreadPoolAllItemsProcessed) { - std::vector indicators(kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); +TEST(Parallelize4DTile2D, MultiThreadPoolAllItemsProcessed) { + std::vector indicators(kParallelize4DTile2DRangeI * kParallelize4DTile2DRangeJ * kParallelize4DTile2DRangeK * kParallelize4DTile2DRangeL); auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); @@ -1725,62 +3217,66 @@ TEST(Parallelize3DTile2D, MultiThreadPoolAllItemsProcessed) { GTEST_SKIP(); } - pthreadpool_parallelize_3d_tile_2d( + pthreadpool_parallelize_4d_tile_2d( threadpool.get(), - reinterpret_cast(SetTrue3DTile2D), + reinterpret_cast(SetTrue4DTile2D), static_cast(indicators.data()), - kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, - kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, 0 /* flags */); - for (size_t i = 0; i < kParallelize3DTile2DRangeI; i++) { - for (size_t j = 0; j < kParallelize3DTile2DRangeJ; j++) { - for (size_t k = 0; k < kParallelize3DTile2DRangeK; k++) { - const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; - EXPECT_TRUE(indicators[linear_idx].load(std::memory_order_relaxed)) - << "Element (" << i << ", " << j << ", " << k << ") not processed"; + for (size_t i = 0; i < kParallelize4DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize4DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize4DTile2DRangeK; k++) { + for (size_t l = 0; l < kParallelize4DTile2DRangeL; l++) { + const size_t linear_idx = ((i * kParallelize4DTile2DRangeJ + j) * kParallelize4DTile2DRangeK + k) * kParallelize4DTile2DRangeL + l; + EXPECT_TRUE(indicators[linear_idx].load(std::memory_order_relaxed)) + << "Element (" << i << ", " << j << ", " << k << ", " << l << ") not processed"; + } } } } } -static void Increment3DTile2D(std::atomic_int* processed_counters, size_t i, size_t start_j, size_t start_k, size_t tile_j, size_t tile_k) { - for (size_t j = start_j; j < start_j + tile_j; j++) { - for (size_t k = start_k; k < start_k + tile_k; k++) { - const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; +static void Increment4DTile2D(std::atomic_int* processed_counters, size_t i, size_t j, size_t start_k, size_t start_l, size_t tile_k, size_t tile_l) { + for (size_t k = start_k; k < start_k + tile_k; k++) { + for (size_t l = start_l; l < start_l + tile_l; l++) { + const size_t linear_idx = ((i * kParallelize4DTile2DRangeJ + j) * kParallelize4DTile2DRangeK + k) * kParallelize4DTile2DRangeL + l; processed_counters[linear_idx].fetch_add(1, std::memory_order_relaxed); } } } -TEST(Parallelize3DTile2D, SingleThreadPoolEachItemProcessedOnce) { - std::vector counters(kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); +TEST(Parallelize4DTile2D, SingleThreadPoolEachItemProcessedOnce) { + std::vector counters(kParallelize4DTile2DRangeI * kParallelize4DTile2DRangeJ * kParallelize4DTile2DRangeK * kParallelize4DTile2DRangeL); auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); - pthreadpool_parallelize_3d_tile_2d( + pthreadpool_parallelize_4d_tile_2d( threadpool.get(), - reinterpret_cast(Increment3DTile2D), + reinterpret_cast(Increment4DTile2D), static_cast(counters.data()), - kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, - kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, 0 /* flags */); - for (size_t i = 0; i < kParallelize3DTile2DRangeI; i++) { - for (size_t j = 0; j < kParallelize3DTile2DRangeJ; j++) { - for (size_t k = 0; k < kParallelize3DTile2DRangeK; k++) { - const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; - EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), 1) - << "Element (" << i << ", " << j << ", " << k << ") was processed " - << counters[linear_idx].load(std::memory_order_relaxed) << " times (expected: 1)"; + for (size_t i = 0; i < kParallelize4DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize4DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize4DTile2DRangeK; k++) { + for (size_t l = 0; l < kParallelize4DTile2DRangeL; l++) { + const size_t linear_idx = ((i * kParallelize4DTile2DRangeJ + j) * kParallelize4DTile2DRangeK + k) * kParallelize4DTile2DRangeL + l; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), 1) + << "Element (" << i << ", " << j << ", " << k << ", " << l << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times (expected: 1)"; + } } } } } -TEST(Parallelize3DTile2D, MultiThreadPoolEachItemProcessedOnce) { - std::vector counters(kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); +TEST(Parallelize4DTile2D, MultiThreadPoolEachItemProcessedOnce) { + std::vector counters(kParallelize4DTile2DRangeI * kParallelize4DTile2DRangeJ * kParallelize4DTile2DRangeK * kParallelize4DTile2DRangeL); auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); @@ -1789,57 +3285,61 @@ TEST(Parallelize3DTile2D, MultiThreadPoolEachItemProcessedOnce) { GTEST_SKIP(); } - pthreadpool_parallelize_3d_tile_2d( + pthreadpool_parallelize_4d_tile_2d( threadpool.get(), - reinterpret_cast(Increment3DTile2D), + reinterpret_cast(Increment4DTile2D), static_cast(counters.data()), - kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, - kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, 0 /* flags */); - for (size_t i = 0; i < kParallelize3DTile2DRangeI; i++) { - for (size_t j = 0; j < kParallelize3DTile2DRangeJ; j++) { - for (size_t k = 0; k < kParallelize3DTile2DRangeK; k++) { - const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; - EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), 1) - << "Element (" << i << ", " << j << ", " << k << ") was processed " - << counters[linear_idx].load(std::memory_order_relaxed) << " times (expected: 1)"; + for (size_t i = 0; i < kParallelize4DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize4DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize4DTile2DRangeK; k++) { + for (size_t l = 0; l < kParallelize4DTile2DRangeL; l++) { + const size_t linear_idx = ((i * kParallelize4DTile2DRangeJ + j) * kParallelize4DTile2DRangeK + k) * kParallelize4DTile2DRangeL + l; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), 1) + << "Element (" << i << ", " << j << ", " << k << ", " << l << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times (expected: 1)"; + } } } } } -TEST(Parallelize3DTile2D, SingleThreadPoolEachItemProcessedMultipleTimes) { - std::vector counters(kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); +TEST(Parallelize4DTile2D, SingleThreadPoolEachItemProcessedMultipleTimes) { + std::vector counters(kParallelize4DTile2DRangeI * kParallelize4DTile2DRangeJ * kParallelize4DTile2DRangeK * kParallelize4DTile2DRangeL); auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); for (size_t iteration = 0; iteration < kIncrementIterations; iteration++) { - pthreadpool_parallelize_3d_tile_2d( + pthreadpool_parallelize_4d_tile_2d( threadpool.get(), - reinterpret_cast(Increment3DTile2D), + reinterpret_cast(Increment4DTile2D), static_cast(counters.data()), - kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, - kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, 0 /* flags */); } - for (size_t i = 0; i < kParallelize3DTile2DRangeI; i++) { - for (size_t j = 0; j < kParallelize3DTile2DRangeJ; j++) { - for (size_t k = 0; k < kParallelize3DTile2DRangeK; k++) { - const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; - EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), kIncrementIterations) - << "Element (" << i << ", " << j << ", " << k << ") was processed " - << counters[linear_idx].load(std::memory_order_relaxed) << " times " - << "(expected: " << kIncrementIterations << ")"; + for (size_t i = 0; i < kParallelize4DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize4DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize4DTile2DRangeK; k++) { + for (size_t l = 0; l < kParallelize4DTile2DRangeL; l++) { + const size_t linear_idx = ((i * kParallelize4DTile2DRangeJ + j) * kParallelize4DTile2DRangeK + k) * kParallelize4DTile2DRangeL + l; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), kIncrementIterations) + << "Element (" << i << ", " << j << ", " << k << ", " << l << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations << ")"; + } } } } } -TEST(Parallelize3DTile2D, MultiThreadPoolEachItemProcessedMultipleTimes) { - std::vector counters(kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); +TEST(Parallelize4DTile2D, MultiThreadPoolEachItemProcessedMultipleTimes) { + std::vector counters(kParallelize4DTile2DRangeI * kParallelize4DTile2DRangeJ * kParallelize4DTile2DRangeK * kParallelize4DTile2DRangeL); auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); @@ -1849,37 +3349,39 @@ TEST(Parallelize3DTile2D, MultiThreadPoolEachItemProcessedMultipleTimes) { } for (size_t iteration = 0; iteration < kIncrementIterations; iteration++) { - pthreadpool_parallelize_3d_tile_2d( + pthreadpool_parallelize_4d_tile_2d( threadpool.get(), - reinterpret_cast(Increment3DTile2D), + reinterpret_cast(Increment4DTile2D), static_cast(counters.data()), - kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, - kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, 0 /* flags */); } - for (size_t i = 0; i < kParallelize3DTile2DRangeI; i++) { - for (size_t j = 0; j < kParallelize3DTile2DRangeJ; j++) { - for (size_t k = 0; k < kParallelize3DTile2DRangeK; k++) { - const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; - EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), kIncrementIterations) - << "Element (" << i << ", " << j << ", " << k << ") was processed " - << counters[linear_idx].load(std::memory_order_relaxed) << " times " - << "(expected: " << kIncrementIterations << ")"; + for (size_t i = 0; i < kParallelize4DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize4DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize4DTile2DRangeK; k++) { + for (size_t l = 0; l < kParallelize4DTile2DRangeL; l++) { + const size_t linear_idx = ((i * kParallelize4DTile2DRangeJ + j) * kParallelize4DTile2DRangeK + k) * kParallelize4DTile2DRangeL + l; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), kIncrementIterations) + << "Element (" << i << ", " << j << ", " << k << ", " << l << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations << ")"; + } } } } } -static void IncrementSame3DTile2D(std::atomic_int* num_processed_items, size_t i, size_t start_j, size_t start_k, size_t tile_j, size_t tile_k) { - for (size_t j = start_j; j < start_j + tile_j; j++) { - for (size_t k = start_k; k < start_k + tile_k; k++) { +static void IncrementSame4DTile2D(std::atomic_int* num_processed_items, size_t i, size_t j, size_t start_k, size_t start_l, size_t tile_k, size_t tile_l) { + for (size_t k = start_k; k < start_k + tile_k; k++) { + for (size_t l = start_l; l < start_l + tile_l; l++) { num_processed_items->fetch_add(1, std::memory_order_relaxed); } } } -TEST(Parallelize3DTile2D, MultiThreadPoolHighContention) { +TEST(Parallelize4DTile2D, MultiThreadPoolHighContention) { std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); @@ -1889,27 +3391,27 @@ TEST(Parallelize3DTile2D, MultiThreadPoolHighContention) { GTEST_SKIP(); } - pthreadpool_parallelize_3d_tile_2d( + pthreadpool_parallelize_4d_tile_2d( threadpool.get(), - reinterpret_cast(IncrementSame3DTile2D), + reinterpret_cast(IncrementSame4DTile2D), static_cast(&num_processed_items), - kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, - kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, 0 /* flags */); - EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize4DTile2DRangeI * kParallelize4DTile2DRangeJ * kParallelize4DTile2DRangeK * kParallelize4DTile2DRangeL); } -static void WorkImbalance3DTile2D(std::atomic_int* num_processed_items, size_t i, size_t start_j, size_t start_k, size_t tile_j, size_t tile_k) { - num_processed_items->fetch_add(tile_j * tile_k, std::memory_order_relaxed); - if (i == 0 && start_j == 0 && start_k == 0) { +static void WorkImbalance4DTile2D(std::atomic_int* num_processed_items, size_t i, size_t j, size_t start_k, size_t start_l, size_t tile_k, size_t tile_l) { + num_processed_items->fetch_add(tile_k * tile_l, std::memory_order_relaxed); + if (i == 0 && j == 0 && start_k == 0 && start_l == 0) { /* Spin-wait until all items are computed */ - while (num_processed_items->load(std::memory_order_relaxed) != kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK) { + while (num_processed_items->load(std::memory_order_relaxed) != kParallelize4DTile2DRangeI * kParallelize4DTile2DRangeJ * kParallelize4DTile2DRangeK * kParallelize4DTile2DRangeL) { std::atomic_thread_fence(std::memory_order_acquire); } } } -TEST(Parallelize3DTile2D, MultiThreadPoolWorkStealing) { +TEST(Parallelize4DTile2D, MultiThreadPoolWorkStealing) { std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); @@ -1919,32 +3421,33 @@ TEST(Parallelize3DTile2D, MultiThreadPoolWorkStealing) { GTEST_SKIP(); } - pthreadpool_parallelize_3d_tile_2d( + pthreadpool_parallelize_4d_tile_2d( threadpool.get(), - reinterpret_cast(WorkImbalance3DTile2D), + reinterpret_cast(WorkImbalance4DTile2D), static_cast(&num_processed_items), - kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, - kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, 0 /* flags */); - EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize4DTile2DRangeI * kParallelize4DTile2DRangeJ * kParallelize4DTile2DRangeK * kParallelize4DTile2DRangeL); } -static void ComputeNothing4DTile2D(void*, size_t, size_t, size_t, size_t, size_t, size_t) { +static void ComputeNothing4DTile2DWithUArch(void*, uint32_t, size_t, size_t, size_t, size_t, size_t, size_t) { } -TEST(Parallelize4DTile2D, SingleThreadPoolCompletes) { +TEST(Parallelize4DTile2DWithUArch, SingleThreadPoolCompletes) { auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); - pthreadpool_parallelize_4d_tile_2d(threadpool.get(), - ComputeNothing4DTile2D, + pthreadpool_parallelize_4d_tile_2d_with_uarch(threadpool.get(), + ComputeNothing4DTile2DWithUArch, nullptr, + kDefaultUArchIndex, kMaxUArchIndex, kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, 0 /* flags */); } -TEST(Parallelize4DTile2D, MultiThreadPoolCompletes) { +TEST(Parallelize4DTile2DWithUArch, MultiThreadPoolCompletes) { auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); @@ -1952,16 +3455,55 @@ TEST(Parallelize4DTile2D, MultiThreadPoolCompletes) { GTEST_SKIP(); } - pthreadpool_parallelize_4d_tile_2d( + pthreadpool_parallelize_4d_tile_2d_with_uarch( threadpool.get(), - ComputeNothing4DTile2D, + ComputeNothing4DTile2DWithUArch, nullptr, + kDefaultUArchIndex, kMaxUArchIndex, kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, 0 /* flags */); } -static void CheckBounds4DTile2D(void*, size_t i, size_t j, size_t start_k, size_t start_l, size_t tile_k, size_t tile_l) { +static void CheckUArch4DTile2DWithUArch(void*, uint32_t uarch_index, size_t, size_t, size_t, size_t, size_t, size_t) { + if (uarch_index != kDefaultUArchIndex) { + EXPECT_LE(uarch_index, kMaxUArchIndex); + } +} + +TEST(Parallelize4DTile2DWithUArch, SingleThreadPoolUArchInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_4d_tile_2d_with_uarch( + threadpool.get(), + CheckUArch4DTile2DWithUArch, + nullptr, + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, + 0 /* flags */); +} + +TEST(Parallelize4DTile2DWithUArch, MultiThreadPoolUArchInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_4d_tile_2d_with_uarch( + threadpool.get(), + CheckUArch4DTile2DWithUArch, + nullptr, + kDefaultUArchIndex, kMaxUArchIndex, + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, + 0 /* flags */); +} + +static void CheckBounds4DTile2DWithUArch(void*, uint32_t, size_t i, size_t j, size_t start_k, size_t start_l, size_t tile_k, size_t tile_l) { EXPECT_LT(i, kParallelize4DTile2DRangeI); EXPECT_LT(j, kParallelize4DTile2DRangeJ); EXPECT_LT(start_k, kParallelize4DTile2DRangeK); @@ -1970,20 +3512,21 @@ static void CheckBounds4DTile2D(void*, size_t i, size_t j, size_t start_k, size_ EXPECT_LE(start_l + tile_l, kParallelize4DTile2DRangeL); } -TEST(Parallelize4DTile2D, SingleThreadPoolAllItemsInBounds) { +TEST(Parallelize4DTile2DWithUArch, SingleThreadPoolAllItemsInBounds) { auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); - pthreadpool_parallelize_4d_tile_2d( + pthreadpool_parallelize_4d_tile_2d_with_uarch( threadpool.get(), - CheckBounds4DTile2D, + CheckBounds4DTile2DWithUArch, nullptr, + kDefaultUArchIndex, kMaxUArchIndex, kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, 0 /* flags */); } -TEST(Parallelize4DTile2D, MultiThreadPoolAllItemsInBounds) { +TEST(Parallelize4DTile2DWithUArch, MultiThreadPoolAllItemsInBounds) { auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); @@ -1991,16 +3534,17 @@ TEST(Parallelize4DTile2D, MultiThreadPoolAllItemsInBounds) { GTEST_SKIP(); } - pthreadpool_parallelize_4d_tile_2d( + pthreadpool_parallelize_4d_tile_2d_with_uarch( threadpool.get(), - CheckBounds4DTile2D, + CheckBounds4DTile2DWithUArch, nullptr, + kDefaultUArchIndex, kMaxUArchIndex, kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, 0 /* flags */); } -static void CheckTiling4DTile2D(void*, size_t i, size_t j, size_t start_k, size_t start_l, size_t tile_k, size_t tile_l) { +static void CheckTiling4DTile2DWithUArch(void*, uint32_t, size_t i, size_t j, size_t start_k, size_t start_l, size_t tile_k, size_t tile_l) { EXPECT_GT(tile_k, 0); EXPECT_LE(tile_k, kParallelize4DTile2DTileK); EXPECT_EQ(start_k % kParallelize4DTile2DTileK, 0); @@ -2012,20 +3556,21 @@ static void CheckTiling4DTile2D(void*, size_t i, size_t j, size_t start_k, size_ EXPECT_EQ(tile_l, std::min(kParallelize4DTile2DTileL, kParallelize4DTile2DRangeL - start_l)); } -TEST(Parallelize4DTile2D, SingleThreadPoolUniformTiling) { +TEST(Parallelize4DTile2DWithUArch, SingleThreadPoolUniformTiling) { auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); - pthreadpool_parallelize_4d_tile_2d( + pthreadpool_parallelize_4d_tile_2d_with_uarch( threadpool.get(), - CheckTiling4DTile2D, + CheckTiling4DTile2DWithUArch, nullptr, + kDefaultUArchIndex, kMaxUArchIndex, kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, 0 /* flags */); } -TEST(Parallelize4DTile2D, MultiThreadPoolUniformTiling) { +TEST(Parallelize4DTile2DWithUArch, MultiThreadPoolUniformTiling) { auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); @@ -2033,16 +3578,17 @@ TEST(Parallelize4DTile2D, MultiThreadPoolUniformTiling) { GTEST_SKIP(); } - pthreadpool_parallelize_4d_tile_2d( + pthreadpool_parallelize_4d_tile_2d_with_uarch( threadpool.get(), - CheckTiling4DTile2D, + CheckTiling4DTile2DWithUArch, nullptr, + kDefaultUArchIndex, kMaxUArchIndex, kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, 0 /* flags */); } -static void SetTrue4DTile2D(std::atomic_bool* processed_indicators, size_t i, size_t j, size_t start_k, size_t start_l, size_t tile_k, size_t tile_l) { +static void SetTrue4DTile2DWithUArch(std::atomic_bool* processed_indicators, uint32_t, size_t i, size_t j, size_t start_k, size_t start_l, size_t tile_k, size_t tile_l) { for (size_t k = start_k; k < start_k + tile_k; k++) { for (size_t l = start_l; l < start_l + tile_l; l++) { const size_t linear_idx = ((i * kParallelize4DTile2DRangeJ + j) * kParallelize4DTile2DRangeK + k) * kParallelize4DTile2DRangeL + l; @@ -2051,16 +3597,17 @@ static void SetTrue4DTile2D(std::atomic_bool* processed_indicators, size_t i, si } } -TEST(Parallelize4DTile2D, SingleThreadPoolAllItemsProcessed) { +TEST(Parallelize4DTile2DWithUArch, SingleThreadPoolAllItemsProcessed) { std::vector indicators(kParallelize4DTile2DRangeI * kParallelize4DTile2DRangeJ * kParallelize4DTile2DRangeK * kParallelize4DTile2DRangeL); auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); - pthreadpool_parallelize_4d_tile_2d( + pthreadpool_parallelize_4d_tile_2d_with_uarch( threadpool.get(), - reinterpret_cast(SetTrue4DTile2D), + reinterpret_cast(SetTrue4DTile2DWithUArch), static_cast(indicators.data()), + kDefaultUArchIndex, kMaxUArchIndex, kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, 0 /* flags */); @@ -2078,7 +3625,7 @@ TEST(Parallelize4DTile2D, SingleThreadPoolAllItemsProcessed) { } } -TEST(Parallelize4DTile2D, MultiThreadPoolAllItemsProcessed) { +TEST(Parallelize4DTile2DWithUArch, MultiThreadPoolAllItemsProcessed) { std::vector indicators(kParallelize4DTile2DRangeI * kParallelize4DTile2DRangeJ * kParallelize4DTile2DRangeK * kParallelize4DTile2DRangeL); auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); @@ -2088,10 +3635,11 @@ TEST(Parallelize4DTile2D, MultiThreadPoolAllItemsProcessed) { GTEST_SKIP(); } - pthreadpool_parallelize_4d_tile_2d( + pthreadpool_parallelize_4d_tile_2d_with_uarch( threadpool.get(), - reinterpret_cast(SetTrue4DTile2D), + reinterpret_cast(SetTrue4DTile2DWithUArch), static_cast(indicators.data()), + kDefaultUArchIndex, kMaxUArchIndex, kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, 0 /* flags */); @@ -2109,7 +3657,7 @@ TEST(Parallelize4DTile2D, MultiThreadPoolAllItemsProcessed) { } } -static void Increment4DTile2D(std::atomic_int* processed_counters, size_t i, size_t j, size_t start_k, size_t start_l, size_t tile_k, size_t tile_l) { +static void Increment4DTile2DWithUArch(std::atomic_int* processed_counters, uint32_t, size_t i, size_t j, size_t start_k, size_t start_l, size_t tile_k, size_t tile_l) { for (size_t k = start_k; k < start_k + tile_k; k++) { for (size_t l = start_l; l < start_l + tile_l; l++) { const size_t linear_idx = ((i * kParallelize4DTile2DRangeJ + j) * kParallelize4DTile2DRangeK + k) * kParallelize4DTile2DRangeL + l; @@ -2118,16 +3666,17 @@ static void Increment4DTile2D(std::atomic_int* processed_counters, size_t i, siz } } -TEST(Parallelize4DTile2D, SingleThreadPoolEachItemProcessedOnce) { +TEST(Parallelize4DTile2DWithUArch, SingleThreadPoolEachItemProcessedOnce) { std::vector counters(kParallelize4DTile2DRangeI * kParallelize4DTile2DRangeJ * kParallelize4DTile2DRangeK * kParallelize4DTile2DRangeL); auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); - pthreadpool_parallelize_4d_tile_2d( + pthreadpool_parallelize_4d_tile_2d_with_uarch( threadpool.get(), - reinterpret_cast(Increment4DTile2D), + reinterpret_cast(Increment4DTile2DWithUArch), static_cast(counters.data()), + kDefaultUArchIndex, kMaxUArchIndex, kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, 0 /* flags */); @@ -2146,7 +3695,7 @@ TEST(Parallelize4DTile2D, SingleThreadPoolEachItemProcessedOnce) { } } -TEST(Parallelize4DTile2D, MultiThreadPoolEachItemProcessedOnce) { +TEST(Parallelize4DTile2DWithUArch, MultiThreadPoolEachItemProcessedOnce) { std::vector counters(kParallelize4DTile2DRangeI * kParallelize4DTile2DRangeJ * kParallelize4DTile2DRangeK * kParallelize4DTile2DRangeL); auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); @@ -2156,10 +3705,11 @@ TEST(Parallelize4DTile2D, MultiThreadPoolEachItemProcessedOnce) { GTEST_SKIP(); } - pthreadpool_parallelize_4d_tile_2d( + pthreadpool_parallelize_4d_tile_2d_with_uarch( threadpool.get(), - reinterpret_cast(Increment4DTile2D), + reinterpret_cast(Increment4DTile2DWithUArch), static_cast(counters.data()), + kDefaultUArchIndex, kMaxUArchIndex, kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, 0 /* flags */); @@ -2178,17 +3728,18 @@ TEST(Parallelize4DTile2D, MultiThreadPoolEachItemProcessedOnce) { } } -TEST(Parallelize4DTile2D, SingleThreadPoolEachItemProcessedMultipleTimes) { +TEST(Parallelize4DTile2DWithUArch, SingleThreadPoolEachItemProcessedMultipleTimes) { std::vector counters(kParallelize4DTile2DRangeI * kParallelize4DTile2DRangeJ * kParallelize4DTile2DRangeK * kParallelize4DTile2DRangeL); auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); for (size_t iteration = 0; iteration < kIncrementIterations; iteration++) { - pthreadpool_parallelize_4d_tile_2d( + pthreadpool_parallelize_4d_tile_2d_with_uarch( threadpool.get(), - reinterpret_cast(Increment4DTile2D), + reinterpret_cast(Increment4DTile2DWithUArch), static_cast(counters.data()), + kDefaultUArchIndex, kMaxUArchIndex, kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, 0 /* flags */); @@ -2209,7 +3760,7 @@ TEST(Parallelize4DTile2D, SingleThreadPoolEachItemProcessedMultipleTimes) { } } -TEST(Parallelize4DTile2D, MultiThreadPoolEachItemProcessedMultipleTimes) { +TEST(Parallelize4DTile2DWithUArch, MultiThreadPoolEachItemProcessedMultipleTimes) { std::vector counters(kParallelize4DTile2DRangeI * kParallelize4DTile2DRangeJ * kParallelize4DTile2DRangeK * kParallelize4DTile2DRangeL); auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); @@ -2220,10 +3771,11 @@ TEST(Parallelize4DTile2D, MultiThreadPoolEachItemProcessedMultipleTimes) { } for (size_t iteration = 0; iteration < kIncrementIterations; iteration++) { - pthreadpool_parallelize_4d_tile_2d( + pthreadpool_parallelize_4d_tile_2d_with_uarch( threadpool.get(), - reinterpret_cast(Increment4DTile2D), + reinterpret_cast(Increment4DTile2DWithUArch), static_cast(counters.data()), + kDefaultUArchIndex, kMaxUArchIndex, kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, 0 /* flags */); @@ -2244,7 +3796,7 @@ TEST(Parallelize4DTile2D, MultiThreadPoolEachItemProcessedMultipleTimes) { } } -static void IncrementSame4DTile2D(std::atomic_int* num_processed_items, size_t i, size_t j, size_t start_k, size_t start_l, size_t tile_k, size_t tile_l) { +static void IncrementSame4DTile2DWithUArch(std::atomic_int* num_processed_items, uint32_t, size_t i, size_t j, size_t start_k, size_t start_l, size_t tile_k, size_t tile_l) { for (size_t k = start_k; k < start_k + tile_k; k++) { for (size_t l = start_l; l < start_l + tile_l; l++) { num_processed_items->fetch_add(1, std::memory_order_relaxed); @@ -2252,7 +3804,7 @@ static void IncrementSame4DTile2D(std::atomic_int* num_processed_items, size_t i } } -TEST(Parallelize4DTile2D, MultiThreadPoolHighContention) { +TEST(Parallelize4DTile2DWithUArch, MultiThreadPoolHighContention) { std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); @@ -2262,17 +3814,18 @@ TEST(Parallelize4DTile2D, MultiThreadPoolHighContention) { GTEST_SKIP(); } - pthreadpool_parallelize_4d_tile_2d( + pthreadpool_parallelize_4d_tile_2d_with_uarch( threadpool.get(), - reinterpret_cast(IncrementSame4DTile2D), + reinterpret_cast(IncrementSame4DTile2DWithUArch), static_cast(&num_processed_items), + kDefaultUArchIndex, kMaxUArchIndex, kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, 0 /* flags */); EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize4DTile2DRangeI * kParallelize4DTile2DRangeJ * kParallelize4DTile2DRangeK * kParallelize4DTile2DRangeL); } -static void WorkImbalance4DTile2D(std::atomic_int* num_processed_items, size_t i, size_t j, size_t start_k, size_t start_l, size_t tile_k, size_t tile_l) { +static void WorkImbalance4DTile2DWithUArch(std::atomic_int* num_processed_items, uint32_t, size_t i, size_t j, size_t start_k, size_t start_l, size_t tile_k, size_t tile_l) { num_processed_items->fetch_add(tile_k * tile_l, std::memory_order_relaxed); if (i == 0 && j == 0 && start_k == 0 && start_l == 0) { /* Spin-wait until all items are computed */ @@ -2282,7 +3835,7 @@ static void WorkImbalance4DTile2D(std::atomic_int* num_processed_items, size_t i } } -TEST(Parallelize4DTile2D, MultiThreadPoolWorkStealing) { +TEST(Parallelize4DTile2DWithUArch, MultiThreadPoolWorkStealing) { std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); @@ -2292,10 +3845,11 @@ TEST(Parallelize4DTile2D, MultiThreadPoolWorkStealing) { GTEST_SKIP(); } - pthreadpool_parallelize_4d_tile_2d( + pthreadpool_parallelize_4d_tile_2d_with_uarch( threadpool.get(), - reinterpret_cast(WorkImbalance4DTile2D), + reinterpret_cast(WorkImbalance4DTile2DWithUArch), static_cast(&num_processed_items), + kDefaultUArchIndex, kMaxUArchIndex, kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, 0 /* flags */); -- cgit v1.2.3