diff options
author | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2020-02-12 02:43:35 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2020-02-12 02:43:35 +0000 |
commit | cf46291aa8a9d2d970d1d80d1c8705659e47aec1 (patch) | |
tree | 14f32522dabce86e2fa22fcc5dc309b1cb642818 | |
parent | 899dbeadc1331b4f724899ce8908823bcc4fb3e4 (diff) | |
parent | 1a8016e4ac75320f4de5d477d3f2324f95048ecc (diff) | |
download | platform_external_pthreadpool-cf46291aa8a9d2d970d1d80d1c8705659e47aec1.tar.gz platform_external_pthreadpool-cf46291aa8a9d2d970d1d80d1c8705659e47aec1.tar.bz2 platform_external_pthreadpool-cf46291aa8a9d2d970d1d80d1c8705659e47aec1.zip |
Merge remote-tracking branch 'aosp/upstream-master' into import am: d8871eeb9d am: 999ffef6a3 am: 1a8016e4ac
Change-Id: If87d483d75485f260beac54f1f37dcd9f0203a12
-rw-r--r-- | .gitignore | 21 | ||||
-rw-r--r-- | .travis.yml | 12 | ||||
-rw-r--r-- | CMakeLists.txt | 166 | ||||
-rw-r--r-- | LICENSE | 26 | ||||
-rw-r--r-- | README.md | 61 | ||||
-rw-r--r-- | bench/latency.cc | 93 | ||||
-rw-r--r-- | bench/throughput.cc | 99 | ||||
-rw-r--r-- | cmake/DownloadFXdiv.cmake | 15 | ||||
-rw-r--r-- | cmake/DownloadGoogleBenchmark.cmake | 15 | ||||
-rw-r--r-- | cmake/DownloadGoogleTest.cmake | 15 | ||||
-rwxr-xr-x | configure.py | 34 | ||||
-rw-r--r-- | confu.yaml | 8 | ||||
-rw-r--r-- | examples/addition.c | 48 | ||||
-rw-r--r-- | include/pthreadpool.h | 240 | ||||
-rw-r--r-- | jni/Android.mk | 17 | ||||
-rw-r--r-- | jni/Application.mk | 4 | ||||
-rw-r--r-- | src/threadpool-legacy.c | 235 | ||||
-rw-r--r-- | src/threadpool-pthreads.c | 1209 | ||||
-rw-r--r-- | src/threadpool-shim.c | 195 | ||||
-rw-r--r-- | src/threadpool-utils.h | 62 | ||||
-rw-r--r-- | test/pthreadpool.cc | 2852 |
21 files changed, 5427 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8a4bd32 --- /dev/null +++ b/.gitignore @@ -0,0 +1,21 @@ +# Ninja files +build.ninja + +# Build objects and artifacts +deps/ +build/ +bin/ +obj/ +lib/ +libs/ +*.pyc +*.pyo + +# System files +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +ehthumbs.db +Thumbs.db diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..faff03c --- /dev/null +++ b/.travis.yml @@ -0,0 +1,12 @@ +dist: xenial +language: c +script: + - mkdir build + - cd build + - cmake .. -G Ninja + - ninja + - ctest --verbose +addons: + apt: + packages: + - ninja-build diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..714325a --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,166 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 3.5 FATAL_ERROR) + +INCLUDE(GNUInstallDirs) + +# ---[ Project +PROJECT(pthreadpool C CXX) + +# ---[ Options. +SET(PTHREADPOOL_LIBRARY_TYPE "default" CACHE STRING "Type of library (shared, static, or default) to build") +SET_PROPERTY(CACHE PTHREADPOOL_LIBRARY_TYPE PROPERTY STRINGS default static shared) +OPTION(PTHREADPOOL_ALLOW_DEPRECATED_API "Enable deprecated API functions" ON) +IF("${CMAKE_SOURCE_DIR}" STREQUAL "${PROJECT_SOURCE_DIR}") + OPTION(PTHREADPOOL_BUILD_TESTS "Build pthreadpool unit tests" ON) + OPTION(PTHREADPOOL_BUILD_BENCHMARKS "Build pthreadpool micro-benchmarks" ON) +ELSE() + SET(PTHREADPOOL_BUILD_TESTS OFF CACHE BOOL "Build pthreadpool unit tests") + SET(PTHREADPOOL_BUILD_BENCHMARKS OFF CACHE BOOL "Build pthreadpool micro-benchmarks") +ENDIF() + +# ---[ CMake options +IF(PTHREADPOOL_BUILD_TESTS) + ENABLE_TESTING() +ENDIF() + +MACRO(PTHREADPOOL_TARGET_ENABLE_CXX11 target) + SET_TARGET_PROPERTIES(${target} PROPERTIES + CXX_STANDARD 11 + CXX_EXTENSIONS NO) +ENDMACRO() + +# ---[ Download deps +IF(NOT DEFINED FXDIV_SOURCE_DIR) + MESSAGE(STATUS "Downloading FXdiv to ${CMAKE_BINARY_DIR}/FXdiv-source (define FXDIV_SOURCE_DIR to avoid it)") + CONFIGURE_FILE(cmake/DownloadFXdiv.cmake "${CMAKE_BINARY_DIR}/FXdiv-download/CMakeLists.txt") + EXECUTE_PROCESS(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" . + WORKING_DIRECTORY "${CMAKE_BINARY_DIR}/FXdiv-download") + EXECUTE_PROCESS(COMMAND "${CMAKE_COMMAND}" --build . + WORKING_DIRECTORY "${CMAKE_BINARY_DIR}/FXdiv-download") + SET(FXDIV_SOURCE_DIR "${CMAKE_BINARY_DIR}/FXdiv-source" CACHE STRING "FXdiv source directory") +ENDIF() + +IF(PTHREADPOOL_BUILD_TESTS AND NOT DEFINED GOOGLETEST_SOURCE_DIR) + MESSAGE(STATUS "Downloading Google Test to ${CMAKE_BINARY_DIR}/googletest-source (define GOOGLETEST_SOURCE_DIR to avoid it)") + CONFIGURE_FILE(cmake/DownloadGoogleTest.cmake "${CMAKE_BINARY_DIR}/googletest-download/CMakeLists.txt") + EXECUTE_PROCESS(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" . + WORKING_DIRECTORY "${CMAKE_BINARY_DIR}/googletest-download") + EXECUTE_PROCESS(COMMAND "${CMAKE_COMMAND}" --build . + WORKING_DIRECTORY "${CMAKE_BINARY_DIR}/googletest-download") + SET(GOOGLETEST_SOURCE_DIR "${CMAKE_BINARY_DIR}/googletest-source" CACHE STRING "Google Test source directory") +ENDIF() + +IF(PTHREADPOOL_BUILD_BENCHMARKS AND NOT DEFINED GOOGLEBENCHMARK_SOURCE_DIR) + MESSAGE(STATUS "Downloading Google Benchmark to ${CMAKE_BINARY_DIR}/googlebenchmark-source (define GOOGLEBENCHMARK_SOURCE_DIR to avoid it)") + CONFIGURE_FILE(cmake/DownloadGoogleBenchmark.cmake "${CMAKE_BINARY_DIR}/googlebenchmark-download/CMakeLists.txt") + EXECUTE_PROCESS(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" . + WORKING_DIRECTORY "${CMAKE_BINARY_DIR}/googlebenchmark-download") + EXECUTE_PROCESS(COMMAND "${CMAKE_COMMAND}" --build . + WORKING_DIRECTORY "${CMAKE_BINARY_DIR}/googlebenchmark-download") + SET(GOOGLEBENCHMARK_SOURCE_DIR "${CMAKE_BINARY_DIR}/googlebenchmark-source" CACHE STRING "Google Benchmark source directory") +ENDIF() + +# ---[ pthreadpool library +IF(PTHREADPOOL_ALLOW_DEPRECATED_API) + SET(PTHREADPOOL_SRCS src/threadpool-legacy.c) +ENDIF() +IF(CMAKE_SYSTEM_NAME STREQUAL "Emscripten") + LIST(APPEND PTHREADPOOL_SRCS src/threadpool-shim.c) +ELSE() + LIST(APPEND PTHREADPOOL_SRCS src/threadpool-pthreads.c) +ENDIF() + +IF(${CMAKE_VERSION} VERSION_LESS "3.0") + ADD_LIBRARY(pthreadpool_interface STATIC include/pthreadpool.h) + SET_TARGET_PROPERTIES(pthreadpool_interface PROPERTIES LINKER_LANGUAGE C) +ELSE() + ADD_LIBRARY(pthreadpool_interface INTERFACE) +ENDIF() +TARGET_INCLUDE_DIRECTORIES(pthreadpool_interface INTERFACE include) +IF(NOT PTHREADPOOL_ALLOW_DEPRECATED_API) + TARGET_COMPILE_DEFINITIONS(pthreadpool_interface INTERFACE PTHREADPOOL_NO_DEPRECATED_API=1) +ENDIF() +INSTALL(FILES include/pthreadpool.h DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}) + +IF(PTHREADPOOL_LIBRARY_TYPE STREQUAL "default") + ADD_LIBRARY(pthreadpool ${PTHREADPOOL_SRCS}) +ELSEIF(PTHREADPOOL_LIBRARY_TYPE STREQUAL "shared") + ADD_LIBRARY(pthreadpool SHARED ${PTHREADPOOL_SRCS}) +ELSEIF(PTHREADPOOL_LIBRARY_TYPE STREQUAL "static") + ADD_LIBRARY(pthreadpool STATIC ${PTHREADPOOL_SRCS}) +ELSE() + MESSAGE(FATAL_ERROR "Unsupported library type ${PTHREADPOOL_LIBRARY_TYPE}") +ENDIF() + +SET_TARGET_PROPERTIES(pthreadpool PROPERTIES + C_STANDARD 11 + C_EXTENSIONS NO) +TARGET_LINK_LIBRARIES(pthreadpool PUBLIC pthreadpool_interface) +TARGET_INCLUDE_DIRECTORIES(pthreadpool PRIVATE src) +IF(NOT CMAKE_SYSTEM_NAME STREQUAL "Emscripten") + SET(CMAKE_THREAD_PREFER_PTHREAD TRUE) + IF(NOT CMAKE_GENERATOR STREQUAL "Xcode") + FIND_PACKAGE(Threads REQUIRED) + ENDIF() + IF(CMAKE_USE_PTHREADS_INIT) + IF(CMAKE_SYSTEM_NAME STREQUAL "Linux" OR CMAKE_SYSTEM_NAME STREQUAL "Android") + TARGET_COMPILE_OPTIONS(pthreadpool PUBLIC -pthread) + ENDIF() + ENDIF() + TARGET_LINK_LIBRARIES(pthreadpool PUBLIC ${CMAKE_THREAD_LIBS_INIT}) +ENDIF() +IF(CMAKE_SYSTEM_NAME STREQUAL "Linux") + TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE _GNU_SOURCE=1) +ENDIF() + +# ---[ Configure FXdiv +IF(NOT TARGET fxdiv) + SET(FXDIV_BUILD_TESTS OFF CACHE BOOL "") + SET(FXDIV_BUILD_BENCHMARKS OFF CACHE BOOL "") + ADD_SUBDIRECTORY( + "${FXDIV_SOURCE_DIR}" + "${CMAKE_BINARY_DIR}/FXdiv") +ENDIF() +TARGET_LINK_LIBRARIES(pthreadpool PRIVATE fxdiv) + +INSTALL(TARGETS pthreadpool + LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} + ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}) + +IF(PTHREADPOOL_BUILD_TESTS) + # ---[ Build google test + IF(NOT TARGET gtest) + SET(gtest_force_shared_crt ON CACHE BOOL "" FORCE) + ADD_SUBDIRECTORY( + "${GOOGLETEST_SOURCE_DIR}" + "${CMAKE_BINARY_DIR}/googletest") + ENDIF() + + ADD_EXECUTABLE(pthreadpool-test test/pthreadpool.cc) + SET_TARGET_PROPERTIES(pthreadpool-test PROPERTIES + CXX_STANDARD 11 + CXX_EXTENSIONS NO) + TARGET_LINK_LIBRARIES(pthreadpool-test pthreadpool gtest gtest_main) + ADD_TEST(pthreadpool pthreadpool-test) +ENDIF() + +IF(PTHREADPOOL_BUILD_BENCHMARKS) + # ---[ Build google benchmark + IF(NOT TARGET benchmark) + SET(BENCHMARK_ENABLE_TESTING OFF CACHE BOOL "") + ADD_SUBDIRECTORY( + "${GOOGLEBENCHMARK_SOURCE_DIR}" + "${CMAKE_BINARY_DIR}/googlebenchmark") + ENDIF() + + ADD_EXECUTABLE(latency-bench bench/latency.cc) + SET_TARGET_PROPERTIES(latency-bench PROPERTIES + CXX_STANDARD 11 + CXX_EXTENSIONS NO) + TARGET_LINK_LIBRARIES(latency-bench pthreadpool benchmark) + + ADD_EXECUTABLE(throughput-bench bench/throughput.cc) + SET_TARGET_PROPERTIES(throughput-bench PROPERTIES + CXX_STANDARD 11 + CXX_EXTENSIONS NO) + TARGET_LINK_LIBRARIES(throughput-bench pthreadpool benchmark) +ENDIF() @@ -0,0 +1,26 @@ +Copyright 2019 Google LLC +Copyright (c) 2017 Facebook Inc. +Copyright (c) 2015-2017 Georgia Institute of Technology +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + diff --git a/README.md b/README.md new file mode 100644 index 0000000..3faafaa --- /dev/null +++ b/README.md @@ -0,0 +1,61 @@ +# pthreadpool + +[![BSD (2 clause) License](https://img.shields.io/badge/License-BSD%202--Clause%20%22Simplified%22%20License-blue.svg)](https://github.com/Maratyszcza/pthreadpool/blob/master/LICENSE) +[![Build Status](https://img.shields.io/travis/Maratyszcza/pthreadpool.svg)](https://travis-ci.org/Maratyszcza/pthreadpool) + +**pthreadpool** is a portable and efficient thread pool implementation. +It provides similar functionality to `#pragma omp parallel for`, but with additional features. + +## Features: + +* C interface (C++-compatible). +* 1D-6D loops with step parameters. +* Run on user-specified or auto-detected number of threads. +* Work-stealing scheduling for efficient work balancing. +* Wait-free synchronization of work items. +* Compatible with Linux (including Android), macOS, iOS, Emscripten, Native Client environments. +* 100% unit tests coverage. +* Throughput and latency microbenchmarks. + +## Example + + The following example demonstates using the thread pool for parallel addition of two arrays: + +```c +static void add_arrays(struct array_addition_context* context, size_t i) { + context->sum[i] = context->augend[i] + context->addend[i]; +} + +#define ARRAY_SIZE 4 + +int main() { + double augend[ARRAY_SIZE] = { 1.0, 2.0, 4.0, -5.0 }; + double addend[ARRAY_SIZE] = { 0.25, -1.75, 0.0, 0.5 }; + double sum[ARRAY_SIZE]; + + pthreadpool_t threadpool = pthreadpool_create(0); + assert(threadpool != NULL); + + const size_t threads_count = pthreadpool_get_threads_count(threadpool); + printf("Created thread pool with %zu threads\n", threads_count); + + struct array_addition_context context = { augend, addend, sum }; + pthreadpool_parallelize_1d(threadpool, + (pthreadpool_task_1d_t) add_arrays, + (void**) &context, + ARRAY_SIZE, + PTHREADPOOL_FLAG_DISABLE_DENORMALS /* flags */); + + pthreadpool_destroy(threadpool); + threadpool = NULL; + + printf("%8s\t%.2lf\t%.2lf\t%.2lf\t%.2lf\n", "Augend", + augend[0], augend[1], augend[2], augend[3]); + printf("%8s\t%.2lf\t%.2lf\t%.2lf\t%.2lf\n", "Addend", + addend[0], addend[1], addend[2], addend[3]); + printf("%8s\t%.2lf\t%.2lf\t%.2lf\t%.2lf\n", "Sum", + sum[0], sum[1], sum[2], sum[3]); + + return 0; +} +``` diff --git a/bench/latency.cc b/bench/latency.cc new file mode 100644 index 0000000..f500cdf --- /dev/null +++ b/bench/latency.cc @@ -0,0 +1,93 @@ +#include <benchmark/benchmark.h> + +#include <unistd.h> + +#include <pthreadpool.h> + + +static void SetNumberOfThreads(benchmark::internal::Benchmark* benchmark) { + const int max_threads = sysconf(_SC_NPROCESSORS_ONLN); + for (int t = 1; t <= max_threads; t++) { + benchmark->Arg(t); + } +} + + +static void compute_1d(void*, size_t x) { +} + +static void pthreadpool_parallelize_1d(benchmark::State& state) { + const uint32_t threads = static_cast<uint32_t>(state.range(0)); + pthreadpool_t threadpool = pthreadpool_create(threads); + while (state.KeepRunning()) { + pthreadpool_parallelize_1d( + threadpool, + compute_1d, + nullptr /* context */, + threads, + 0 /* flags */); + } + pthreadpool_destroy(threadpool); +} +BENCHMARK(pthreadpool_parallelize_1d)->UseRealTime()->Apply(SetNumberOfThreads); + + +static void compute_1d_tile_1d(void*, size_t, size_t) { +} + +static void pthreadpool_parallelize_1d_tile_1d(benchmark::State& state) { + const uint32_t threads = static_cast<uint32_t>(state.range(0)); + pthreadpool_t threadpool = pthreadpool_create(threads); + while (state.KeepRunning()) { + pthreadpool_parallelize_1d_tile_1d( + threadpool, + compute_1d_tile_1d, + nullptr /* context */, + threads, 1, + 0 /* flags */); + } + pthreadpool_destroy(threadpool); +} +BENCHMARK(pthreadpool_parallelize_1d_tile_1d)->UseRealTime()->Apply(SetNumberOfThreads); + + +static void compute_2d(void*, size_t, size_t) { +} + +static void pthreadpool_parallelize_2d(benchmark::State& state) { + const uint32_t threads = static_cast<uint32_t>(state.range(0)); + pthreadpool_t threadpool = pthreadpool_create(threads); + while (state.KeepRunning()) { + pthreadpool_parallelize_2d( + threadpool, + compute_2d, + nullptr /* context */, + 1, threads, + 0 /* flags */); + } + pthreadpool_destroy(threadpool); +} +BENCHMARK(pthreadpool_parallelize_2d)->UseRealTime()->Apply(SetNumberOfThreads); + + +static void compute_2d_tile_2d(void*, size_t, size_t, size_t, size_t) { +} + +static void pthreadpool_parallelize_2d_tile_2d(benchmark::State& state) { + const uint32_t threads = static_cast<uint32_t>(state.range(0)); + pthreadpool_t threadpool = pthreadpool_create(threads); + while (state.KeepRunning()) { + pthreadpool_parallelize_2d_tile_2d( + threadpool, + compute_2d_tile_2d, + nullptr /* context */, + 1, threads, + 1, 1, + 0 /* flags */); + } + pthreadpool_destroy(threadpool); +} +BENCHMARK(pthreadpool_parallelize_2d_tile_2d)->UseRealTime()->Apply(SetNumberOfThreads); + + +BENCHMARK_MAIN(); diff --git a/bench/throughput.cc b/bench/throughput.cc new file mode 100644 index 0000000..2242ccb --- /dev/null +++ b/bench/throughput.cc @@ -0,0 +1,99 @@ +#include <benchmark/benchmark.h> + +#include <pthreadpool.h> + + +static void compute_1d(void*, size_t) { +} + +static void pthreadpool_parallelize_1d(benchmark::State& state) { + pthreadpool_t threadpool = pthreadpool_create(0); + const size_t threads = pthreadpool_get_threads_count(threadpool); + const size_t items = static_cast<size_t>(state.range(0)); + while (state.KeepRunning()) { + pthreadpool_parallelize_1d( + threadpool, + compute_1d, + nullptr /* context */, + items * threads, + 0 /* flags */); + } + pthreadpool_destroy(threadpool); + + /* Do not normalize by thread */ + state.SetItemsProcessed(int64_t(state.iterations()) * items); +} +BENCHMARK(pthreadpool_parallelize_1d)->UseRealTime()->RangeMultiplier(10)->Range(10, 1000000); + + +static void compute_1d_tile_1d(void*, size_t, size_t) { +} + +static void pthreadpool_parallelize_1d_tile_1d(benchmark::State& state) { + pthreadpool_t threadpool = pthreadpool_create(0); + const size_t threads = pthreadpool_get_threads_count(threadpool); + const size_t items = static_cast<size_t>(state.range(0)); + while (state.KeepRunning()) { + pthreadpool_parallelize_1d_tile_1d( + threadpool, + compute_1d_tile_1d, + nullptr /* context */, + items * threads, 1, + 0 /* flags */); + } + pthreadpool_destroy(threadpool); + + /* Do not normalize by thread */ + state.SetItemsProcessed(int64_t(state.iterations()) * items); +} +BENCHMARK(pthreadpool_parallelize_1d_tile_1d)->UseRealTime()->RangeMultiplier(10)->Range(10, 1000000); + + +static void compute_2d(void* context, size_t x, size_t y) { +} + +static void pthreadpool_parallelize_2d(benchmark::State& state) { + pthreadpool_t threadpool = pthreadpool_create(0); + const size_t threads = pthreadpool_get_threads_count(threadpool); + const size_t items = static_cast<size_t>(state.range(0)); + while (state.KeepRunning()) { + pthreadpool_parallelize_2d( + threadpool, + compute_2d, + nullptr /* context */, + threads, items, + 0 /* flags */); + } + pthreadpool_destroy(threadpool); + + /* Do not normalize by thread */ + state.SetItemsProcessed(int64_t(state.iterations()) * items); +} +BENCHMARK(pthreadpool_parallelize_2d)->UseRealTime()->RangeMultiplier(10)->Range(10, 1000000); + + +static void compute_2d_tiled(void* context, size_t x0, size_t y0, size_t xn, size_t yn) { +} + +static void pthreadpool_parallelize_2d_tile_2d(benchmark::State& state) { + pthreadpool_t threadpool = pthreadpool_create(0); + const size_t threads = pthreadpool_get_threads_count(threadpool); + const size_t items = static_cast<size_t>(state.range(0)); + while (state.KeepRunning()) { + pthreadpool_parallelize_2d_tile_2d( + threadpool, + compute_2d_tiled, + nullptr /* context */, + threads, items, + 1, 1, + 0 /* flags */); + } + pthreadpool_destroy(threadpool); + + /* Do not normalize by thread */ + state.SetItemsProcessed(int64_t(state.iterations()) * items); +} +BENCHMARK(pthreadpool_parallelize_2d_tile_2d)->UseRealTime()->RangeMultiplier(10)->Range(10, 1000000); + + +BENCHMARK_MAIN(); diff --git a/cmake/DownloadFXdiv.cmake b/cmake/DownloadFXdiv.cmake new file mode 100644 index 0000000..cbda7d0 --- /dev/null +++ b/cmake/DownloadFXdiv.cmake @@ -0,0 +1,15 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 3.5 FATAL_ERROR) + +PROJECT(fxdiv-download NONE) + +INCLUDE(ExternalProject) +ExternalProject_Add(fxdiv + GIT_REPOSITORY https://github.com/Maratyszcza/FXdiv.git + GIT_TAG master + SOURCE_DIR "${CMAKE_BINARY_DIR}/FXdiv-source" + BINARY_DIR "${CMAKE_BINARY_DIR}/FXdiv" + CONFIGURE_COMMAND "" + BUILD_COMMAND "" + INSTALL_COMMAND "" + TEST_COMMAND "" +) diff --git a/cmake/DownloadGoogleBenchmark.cmake b/cmake/DownloadGoogleBenchmark.cmake new file mode 100644 index 0000000..d042e07 --- /dev/null +++ b/cmake/DownloadGoogleBenchmark.cmake @@ -0,0 +1,15 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 3.5 FATAL_ERROR) + +PROJECT(googlebenchmark-download NONE) + +INCLUDE(ExternalProject) +ExternalProject_Add(googlebenchmark + URL https://github.com/google/benchmark/archive/v1.5.0.zip + URL_HASH SHA256=2d22dd3758afee43842bb504af1a8385cccb3ee1f164824e4837c1c1b04d92a0 + SOURCE_DIR "${CMAKE_BINARY_DIR}/googlebenchmark-source" + BINARY_DIR "${CMAKE_BINARY_DIR}/googlebenchmark" + CONFIGURE_COMMAND "" + BUILD_COMMAND "" + INSTALL_COMMAND "" + TEST_COMMAND "" +) diff --git a/cmake/DownloadGoogleTest.cmake b/cmake/DownloadGoogleTest.cmake new file mode 100644 index 0000000..2231ff7 --- /dev/null +++ b/cmake/DownloadGoogleTest.cmake @@ -0,0 +1,15 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 3.5 FATAL_ERROR) + +PROJECT(googletest-download NONE) + +INCLUDE(ExternalProject) +ExternalProject_Add(googletest + URL https://github.com/google/googletest/archive/release-1.10.0.zip + URL_HASH SHA256=94c634d499558a76fa649edb13721dce6e98fb1e7018dfaeba3cd7a083945e91 + SOURCE_DIR "${CMAKE_BINARY_DIR}/googletest-source" + BINARY_DIR "${CMAKE_BINARY_DIR}/googletest" + CONFIGURE_COMMAND "" + BUILD_COMMAND "" + INSTALL_COMMAND "" + TEST_COMMAND "" +) diff --git a/configure.py b/configure.py new file mode 100755 index 0000000..fd4ce92 --- /dev/null +++ b/configure.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python + + +import confu +parser = confu.standard_parser("pthreadpool configuration script") + + +def main(args): + options = parser.parse_args(args) + build = confu.Build.from_options(options) + + build.export_cpath("include", ["pthreadpool.h"]) + + with build.options(source_dir="src", extra_include_dirs="src", deps=build.deps.fxdiv): + sources = ["threadpool-legacy.c"] + if build.target.is_emscripten: + sources.append("threadpool-shim.c") + else: + sources.append("threadpool-pthreads.c") + build.static_library("pthreadpool", [build.cc(src) for src in sources]) + + with build.options(source_dir="test", deps=[build, build.deps.googletest]): + build.unittest("pthreadpool-test", build.cxx("pthreadpool.cc")) + + with build.options(source_dir="bench", deps=[build, build.deps.googlebenchmark]): + build.benchmark("latency-bench", build.cxx("latency.cc")) + build.benchmark("throughput-bench", build.cxx("throughput.cc")) + + return build + + +if __name__ == "__main__": + import sys + main(sys.argv[1:]).generate() diff --git a/confu.yaml b/confu.yaml new file mode 100644 index 0000000..fc54f60 --- /dev/null +++ b/confu.yaml @@ -0,0 +1,8 @@ +name: pthreadpool +title: pthread-based thread pool +license: Simplified BSD +deps: + - name: fxdiv + url: https://github.com/Maratyszcza/FXdiv.git + - name: googletest + - name: googlebenchmark diff --git a/examples/addition.c b/examples/addition.c new file mode 100644 index 0000000..de806df --- /dev/null +++ b/examples/addition.c @@ -0,0 +1,48 @@ +#include <stdio.h> +#include <stdint.h> +#include <assert.h> + +#include <pthreadpool.h> + +struct array_addition_context { + double *augend; + double *addend; + double *sum; +}; + +static void add_arrays(struct array_addition_context* context, size_t i) { + context->sum[i] = context->augend[i] + context->addend[i]; +} + +#define ARRAY_SIZE 4 + +int main() { + double augend[ARRAY_SIZE] = { 1.0, 2.0, 4.0, -5.0 }; + double addend[ARRAY_SIZE] = { 0.25, -1.75, 0.0, 0.5 }; + double sum[ARRAY_SIZE]; + + pthreadpool_t threadpool = pthreadpool_create(0); + assert(threadpool != NULL); + + const size_t threads_count = pthreadpool_get_threads_count(threadpool); + printf("Created thread pool with %zu threads\n", threads_count); + + struct array_addition_context context = { augend, addend, sum }; + pthreadpool_parallelize_1d(threadpool, + (pthreadpool_task_1d_t) add_arrays, + (void**) &context, + ARRAY_SIZE, + PTHREADPOOL_FLAG_DISABLE_DENORMALS /* flags */); + + pthreadpool_destroy(threadpool); + threadpool = NULL; + + printf("%8s\t%.2lf\t%.2lf\t%.2lf\t%.2lf\n", "Augend", + augend[0], augend[1], augend[2], augend[3]); + printf("%8s\t%.2lf\t%.2lf\t%.2lf\t%.2lf\n", "Addend", + addend[0], addend[1], addend[2], addend[3]); + printf("%8s\t%.2lf\t%.2lf\t%.2lf\t%.2lf\n", "Sum", + sum[0], sum[1], sum[2], sum[3]); + + return 0; +} diff --git a/include/pthreadpool.h b/include/pthreadpool.h new file mode 100644 index 0000000..2443285 --- /dev/null +++ b/include/pthreadpool.h @@ -0,0 +1,240 @@ +#ifndef PTHREADPOOL_H_ +#define PTHREADPOOL_H_ + +#include <stddef.h> +#include <stdint.h> + +typedef struct pthreadpool* pthreadpool_t; + +typedef void (*pthreadpool_task_1d_t)(void*, size_t); +typedef void (*pthreadpool_task_1d_tile_1d_t)(void*, size_t, size_t); +typedef void (*pthreadpool_task_2d_t)(void*, size_t, size_t); +typedef void (*pthreadpool_task_2d_tile_1d_t)(void*, size_t, size_t, size_t); +typedef void (*pthreadpool_task_2d_tile_2d_t)(void*, size_t, size_t, size_t, size_t); +typedef void (*pthreadpool_task_3d_tile_2d_t)(void*, size_t, size_t, size_t, size_t, size_t); +typedef void (*pthreadpool_task_4d_tile_2d_t)(void*, size_t, size_t, size_t, size_t, size_t, size_t); +typedef void (*pthreadpool_task_5d_tile_2d_t)(void*, size_t, size_t, size_t, size_t, size_t, size_t, size_t); +typedef void (*pthreadpool_task_6d_tile_2d_t)(void*, size_t, size_t, size_t, size_t, size_t, size_t, size_t, size_t); + + +#define PTHREADPOOL_FLAG_DISABLE_DENORMALS 0x00000001 + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Creates a thread pool with the specified number of threads. + * + * @param[in] threads_count The number of threads in the thread pool. + * A value of 0 has special interpretation: it creates a thread for each + * processor core available in the system. + * + * @returns A pointer to an opaque thread pool object. + * On error the function returns NULL and sets errno accordingly. + */ +pthreadpool_t pthreadpool_create(size_t threads_count); + +/** + * Queries the number of threads in a thread pool. + * + * @param[in] threadpool The thread pool to query. + * + * @returns The number of threads in the thread pool. + */ +size_t pthreadpool_get_threads_count(pthreadpool_t threadpool); + +/** + * Processes items in parallel using threads from a thread pool. + * + * When the call returns, all items have been processed and the thread pool is + * ready for a new task. + * + * @note If multiple threads call this function with the same thread pool, the + * calls are serialized. + * + * @param[in] threadpool The thread pool to use for parallelisation. + * @param[in] function The function to call for each item. + * @param[in] argument The first argument passed to the @a function. + * @param[in] items The number of items to process. The @a function + * will be called once for each item. + */ +void pthreadpool_parallelize_1d( + pthreadpool_t threadpool, + pthreadpool_task_1d_t function, + void* argument, + size_t range, + uint32_t flags); + +void pthreadpool_parallelize_1d_tile_1d( + pthreadpool_t threadpool, + pthreadpool_task_1d_tile_1d_t function, + void* argument, + size_t range, + size_t tile, + uint32_t flags); + +void pthreadpool_parallelize_2d( + pthreadpool_t threadpool, + pthreadpool_task_2d_t function, + void* argument, + size_t range_i, + size_t range_j, + uint32_t flags); + +void pthreadpool_parallelize_2d_tile_1d( + pthreadpool_t threadpool, + pthreadpool_task_2d_tile_1d_t function, + void* argument, + size_t range_i, + size_t range_j, + size_t tile_j, + uint32_t flags); + +void pthreadpool_parallelize_2d_tile_2d( + pthreadpool_t threadpool, + pthreadpool_task_2d_tile_2d_t function, + void* argument, + size_t range_i, + size_t range_j, + size_t tile_i, + size_t tile_j, + uint32_t flags); + +void pthreadpool_parallelize_3d_tile_2d( + pthreadpool_t threadpool, + pthreadpool_task_3d_tile_2d_t function, + void* argument, + size_t range_i, + size_t range_j, + size_t range_k, + size_t tile_j, + size_t tile_k, + uint32_t flags); + +void pthreadpool_parallelize_4d_tile_2d( + pthreadpool_t threadpool, + pthreadpool_task_4d_tile_2d_t function, + void* argument, + size_t range_i, + size_t range_j, + size_t range_k, + size_t range_l, + size_t tile_k, + size_t tile_l, + uint32_t flags); + +void pthreadpool_parallelize_5d_tile_2d( + pthreadpool_t threadpool, + pthreadpool_task_5d_tile_2d_t function, + void* argument, + size_t range_i, + size_t range_j, + size_t range_k, + size_t range_l, + size_t range_m, + size_t tile_l, + size_t tile_m, + uint32_t flags); + +void pthreadpool_parallelize_6d_tile_2d( + pthreadpool_t threadpool, + pthreadpool_task_6d_tile_2d_t function, + void* argument, + size_t range_i, + size_t range_j, + size_t range_k, + size_t range_l, + size_t range_m, + size_t range_n, + size_t tile_m, + size_t tile_n, + uint32_t flags); + +/** + * Terminates threads in the thread pool and releases associated resources. + * + * @warning Accessing the thread pool after a call to this function constitutes + * undefined behaviour and may cause data corruption. + * + * @param[in,out] threadpool The thread pool to destroy. + */ +void pthreadpool_destroy(pthreadpool_t threadpool); + + +#ifndef PTHREADPOOL_NO_DEPRECATED_API + +/* Legacy API for compatibility with pre-existing users (e.g. NNPACK) */ +#if defined(__GNUC__) + #define PTHREADPOOL_DEPRECATED __attribute__((__deprecated__)) +#else + #define PTHREADPOOL_DEPRECATED +#endif + +typedef void (*pthreadpool_function_1d_t)(void*, size_t) PTHREADPOOL_DEPRECATED; +typedef void (*pthreadpool_function_1d_tiled_t)(void*, size_t, size_t) PTHREADPOOL_DEPRECATED; +typedef void (*pthreadpool_function_2d_t)(void*, size_t, size_t) PTHREADPOOL_DEPRECATED; +typedef void (*pthreadpool_function_2d_tiled_t)(void*, size_t, size_t, size_t, size_t) PTHREADPOOL_DEPRECATED; +typedef void (*pthreadpool_function_3d_tiled_t)(void*, size_t, size_t, size_t, size_t, size_t, size_t) PTHREADPOOL_DEPRECATED; +typedef void (*pthreadpool_function_4d_tiled_t)(void*, size_t, size_t, size_t, size_t, size_t, size_t, size_t, size_t) PTHREADPOOL_DEPRECATED; + +void pthreadpool_compute_1d( + pthreadpool_t threadpool, + pthreadpool_function_1d_t function, + void* argument, + size_t range) PTHREADPOOL_DEPRECATED; + +void pthreadpool_compute_1d_tiled( + pthreadpool_t threadpool, + pthreadpool_function_1d_tiled_t function, + void* argument, + size_t range, + size_t tile) PTHREADPOOL_DEPRECATED; + +void pthreadpool_compute_2d( + pthreadpool_t threadpool, + pthreadpool_function_2d_t function, + void* argument, + size_t range_i, + size_t range_j) PTHREADPOOL_DEPRECATED; + +void pthreadpool_compute_2d_tiled( + pthreadpool_t threadpool, + pthreadpool_function_2d_tiled_t function, + void* argument, + size_t range_i, + size_t range_j, + size_t tile_i, + size_t tile_j) PTHREADPOOL_DEPRECATED; + +void pthreadpool_compute_3d_tiled( + pthreadpool_t threadpool, + pthreadpool_function_3d_tiled_t function, + void* argument, + size_t range_i, + size_t range_j, + size_t range_k, + size_t tile_i, + size_t tile_j, + size_t tile_k) PTHREADPOOL_DEPRECATED; + +void pthreadpool_compute_4d_tiled( + pthreadpool_t threadpool, + pthreadpool_function_4d_tiled_t function, + void* argument, + size_t range_i, + size_t range_j, + size_t range_k, + size_t range_l, + size_t tile_i, + size_t tile_j, + size_t tile_k, + size_t tile_l) PTHREADPOOL_DEPRECATED; + +#endif /* PTHREADPOOL_NO_DEPRECATED_API */ + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* PTHREADPOOL_H_ */ diff --git a/jni/Android.mk b/jni/Android.mk new file mode 100644 index 0000000..32d1f77 --- /dev/null +++ b/jni/Android.mk @@ -0,0 +1,17 @@ +LOCAL_PATH := $(call my-dir)/.. + +include $(CLEAR_VARS) +LOCAL_MODULE := pthreadpool_interface +LOCAL_EXPORT_C_INCLUDES := $(LOCAL_PATH)/include +include $(BUILD_STATIC_LIBRARY) + +include $(CLEAR_VARS) +LOCAL_MODULE := pthreadpool +LOCAL_SRC_FILES := src/threadpool-pthreads.c +LOCAL_CFLAGS := -std=c99 -Wall +LOCAL_STATIC_LIBRARIES := pthreadpool_interface fxdiv +include $(BUILD_STATIC_LIBRARY) + +$(call import-add-path,$(LOCAL_PATH)/deps) + +$(call import-module,fxdiv/jni) diff --git a/jni/Application.mk b/jni/Application.mk new file mode 100644 index 0000000..0f660e3 --- /dev/null +++ b/jni/Application.mk @@ -0,0 +1,4 @@ +APP_PLATFORM := android-15 +APP_PIE := true +APP_STL := c++_static +APP_ABI := all diff --git a/src/threadpool-legacy.c b/src/threadpool-legacy.c new file mode 100644 index 0000000..43fb798 --- /dev/null +++ b/src/threadpool-legacy.c @@ -0,0 +1,235 @@ +/* Standard C headers */ +#include <stddef.h> + +/* Dependencies */ +#include <fxdiv.h> + +/* Library header */ +#include <pthreadpool.h> + + +static inline size_t divide_round_up(size_t dividend, size_t divisor) { + if (dividend % divisor == 0) { + return dividend / divisor; + } else { + return dividend / divisor + 1; + } +} + +static inline size_t min(size_t a, size_t b) { + return a < b ? a : b; +} + +void pthreadpool_compute_1d( + pthreadpool_t threadpool, + pthreadpool_function_1d_t function, + void* argument, + size_t range) +{ + pthreadpool_parallelize_1d(threadpool, + (pthreadpool_task_1d_t) function, argument, + range, 0 /* flags */); +} + +void pthreadpool_compute_1d_tiled( + pthreadpool_t threadpool, + pthreadpool_function_1d_tiled_t function, + void* argument, + size_t range, + size_t tile) +{ + pthreadpool_parallelize_1d_tile_1d(threadpool, + (pthreadpool_task_1d_tile_1d_t) function, argument, + range, tile, 0 /* flags */); +} + +void pthreadpool_compute_2d( + pthreadpool_t threadpool, + pthreadpool_function_2d_t function, + void* argument, + size_t range_i, + size_t range_j) +{ + pthreadpool_parallelize_2d(threadpool, + (pthreadpool_task_2d_t) function, argument, + range_i, range_j, 0 /* flags */); +} + +void pthreadpool_compute_2d_tiled( + pthreadpool_t threadpool, + pthreadpool_function_2d_tiled_t function, + void* argument, + size_t range_i, + size_t range_j, + size_t tile_i, + size_t tile_j) +{ + pthreadpool_parallelize_2d_tile_2d(threadpool, + (pthreadpool_task_2d_tile_2d_t) function, argument, + range_i, range_j, tile_i, tile_j, 0 /* flags */); +} + +struct compute_3d_tiled_context { + pthreadpool_function_3d_tiled_t function; + void* argument; + struct fxdiv_divisor_size_t tile_range_j; + struct fxdiv_divisor_size_t tile_range_k; + size_t range_i; + size_t range_j; + size_t range_k; + size_t tile_i; + size_t tile_j; + size_t tile_k; +}; + +static void compute_3d_tiled(const struct compute_3d_tiled_context* context, size_t linear_index) { + const struct fxdiv_divisor_size_t tile_range_k = context->tile_range_k; + const struct fxdiv_result_size_t tile_index_ij_k = fxdiv_divide_size_t(linear_index, tile_range_k); + const struct fxdiv_divisor_size_t tile_range_j = context->tile_range_j; + const struct fxdiv_result_size_t tile_index_i_j = fxdiv_divide_size_t(tile_index_ij_k.quotient, tile_range_j); + const size_t max_tile_i = context->tile_i; + const size_t max_tile_j = context->tile_j; + const size_t max_tile_k = context->tile_k; + const size_t index_i = tile_index_i_j.quotient * max_tile_i; + const size_t index_j = tile_index_i_j.remainder * max_tile_j; + const size_t index_k = tile_index_ij_k.remainder * max_tile_k; + const size_t tile_i = min(max_tile_i, context->range_i - index_i); + const size_t tile_j = min(max_tile_j, context->range_j - index_j); + const size_t tile_k = min(max_tile_k, context->range_k - index_k); + context->function(context->argument, index_i, index_j, index_k, tile_i, tile_j, tile_k); +} + +void pthreadpool_compute_3d_tiled( + pthreadpool_t threadpool, + pthreadpool_function_3d_tiled_t function, + void* argument, + size_t range_i, + size_t range_j, + size_t range_k, + size_t tile_i, + size_t tile_j, + size_t tile_k) +{ + if (pthreadpool_get_threads_count(threadpool) <= 1) { + /* No thread pool used: execute function sequentially on the calling thread */ + for (size_t i = 0; i < range_i; i += tile_i) { + for (size_t j = 0; j < range_j; j += tile_j) { + for (size_t k = 0; k < range_k; k += tile_k) { + function(argument, i, j, k, min(range_i - i, tile_i), min(range_j - j, tile_j), min(range_k - k, tile_k)); + } + } + } + } else { + /* Execute in parallel on the thread pool using linearized index */ + const size_t tile_range_i = divide_round_up(range_i, tile_i); + const size_t tile_range_j = divide_round_up(range_j, tile_j); + const size_t tile_range_k = divide_round_up(range_k, tile_k); + struct compute_3d_tiled_context context = { + .function = function, + .argument = argument, + .tile_range_j = fxdiv_init_size_t(tile_range_j), + .tile_range_k = fxdiv_init_size_t(tile_range_k), + .range_i = range_i, + .range_j = range_j, + .range_k = range_k, + .tile_i = tile_i, + .tile_j = tile_j, + .tile_k = tile_k + }; + pthreadpool_parallelize_1d(threadpool, + (pthreadpool_task_1d_t) compute_3d_tiled, &context, + tile_range_i * tile_range_j * tile_range_k, + 0 /* flags */); + } +} + +struct compute_4d_tiled_context { + pthreadpool_function_4d_tiled_t function; + void* argument; + struct fxdiv_divisor_size_t tile_range_kl; + struct fxdiv_divisor_size_t tile_range_j; + struct fxdiv_divisor_size_t tile_range_l; + size_t range_i; + size_t range_j; + size_t range_k; + size_t range_l; + size_t tile_i; + size_t tile_j; + size_t tile_k; + size_t tile_l; +}; + +static void compute_4d_tiled(const struct compute_4d_tiled_context* context, size_t linear_index) { + const struct fxdiv_divisor_size_t tile_range_kl = context->tile_range_kl; + const struct fxdiv_result_size_t tile_index_ij_kl = fxdiv_divide_size_t(linear_index, tile_range_kl); + const struct fxdiv_divisor_size_t tile_range_j = context->tile_range_j; + const struct fxdiv_result_size_t tile_index_i_j = fxdiv_divide_size_t(tile_index_ij_kl.quotient, tile_range_j); + const struct fxdiv_divisor_size_t tile_range_l = context->tile_range_l; + const struct fxdiv_result_size_t tile_index_k_l = fxdiv_divide_size_t(tile_index_ij_kl.remainder, tile_range_l); + const size_t max_tile_i = context->tile_i; + const size_t max_tile_j = context->tile_j; + const size_t max_tile_k = context->tile_k; + const size_t max_tile_l = context->tile_l; + const size_t index_i = tile_index_i_j.quotient * max_tile_i; + const size_t index_j = tile_index_i_j.remainder * max_tile_j; + const size_t index_k = tile_index_k_l.quotient * max_tile_k; + const size_t index_l = tile_index_k_l.remainder * max_tile_l; + const size_t tile_i = min(max_tile_i, context->range_i - index_i); + const size_t tile_j = min(max_tile_j, context->range_j - index_j); + const size_t tile_k = min(max_tile_k, context->range_k - index_k); + const size_t tile_l = min(max_tile_l, context->range_l - index_l); + context->function(context->argument, index_i, index_j, index_k, index_l, tile_i, tile_j, tile_k, tile_l); +} + +void pthreadpool_compute_4d_tiled( + pthreadpool_t threadpool, + pthreadpool_function_4d_tiled_t function, + void* argument, + size_t range_i, + size_t range_j, + size_t range_k, + size_t range_l, + size_t tile_i, + size_t tile_j, + size_t tile_k, + size_t tile_l) +{ + if (pthreadpool_get_threads_count(threadpool) <= 1) { + /* No thread pool used: execute function sequentially on the calling thread */ + for (size_t i = 0; i < range_i; i += tile_i) { + for (size_t j = 0; j < range_j; j += tile_j) { + for (size_t k = 0; k < range_k; k += tile_k) { + for (size_t l = 0; l < range_l; l += tile_l) { + function(argument, i, j, k, l, + min(range_i - i, tile_i), min(range_j - j, tile_j), min(range_k - k, tile_k), min(range_l - l, tile_l)); + } + } + } + } + } else { + /* Execute in parallel on the thread pool using linearized index */ + const size_t tile_range_i = divide_round_up(range_i, tile_i); + const size_t tile_range_j = divide_round_up(range_j, tile_j); + const size_t tile_range_k = divide_round_up(range_k, tile_k); + const size_t tile_range_l = divide_round_up(range_l, tile_l); + struct compute_4d_tiled_context context = { + .function = function, + .argument = argument, + .tile_range_kl = fxdiv_init_size_t(tile_range_k * tile_range_l), + .tile_range_j = fxdiv_init_size_t(tile_range_j), + .tile_range_l = fxdiv_init_size_t(tile_range_l), + .range_i = range_i, + .range_j = range_j, + .range_k = range_k, + .range_l = range_l, + .tile_i = tile_i, + .tile_j = tile_j, + .tile_k = tile_k, + .tile_l = tile_l + }; + pthreadpool_parallelize_1d(threadpool, + (pthreadpool_task_1d_t) compute_4d_tiled, &context, + tile_range_i * tile_range_j * tile_range_k * tile_range_l, + 0 /* flags */); + } +} diff --git a/src/threadpool-pthreads.c b/src/threadpool-pthreads.c new file mode 100644 index 0000000..6c6a6d4 --- /dev/null +++ b/src/threadpool-pthreads.c @@ -0,0 +1,1209 @@ +/* Standard C headers */ +#include <stdatomic.h> +#include <stdbool.h> +#include <stdint.h> +#include <stdlib.h> +#include <string.h> + +/* POSIX headers */ +#include <pthread.h> +#include <unistd.h> + +/* Futex-specific headers */ +#ifndef PTHREADPOOL_USE_FUTEX + #if defined(__linux__) + #define PTHREADPOOL_USE_FUTEX 1 + #include <sys/syscall.h> + #include <linux/futex.h> + + /* Old Android NDKs do not define SYS_futex and FUTEX_PRIVATE_FLAG */ + #ifndef SYS_futex + #define SYS_futex __NR_futex + #endif + #ifndef FUTEX_PRIVATE_FLAG + #define FUTEX_PRIVATE_FLAG 128 + #endif + #elif defined(__native_client__) + #define PTHREADPOOL_USE_FUTEX 1 + #include <irt.h> + #else + #define PTHREADPOOL_USE_FUTEX 0 + #endif +#endif + +/* Dependencies */ +#include <fxdiv.h> + +/* Library header */ +#include <pthreadpool.h> + +/* Internal headers */ +#include "threadpool-utils.h" + +/* Number of iterations in spin-wait loop before going into futex/mutex wait */ +#define PTHREADPOOL_SPIN_WAIT_ITERATIONS 1000000 + +#define PTHREADPOOL_CACHELINE_SIZE 64 +#define PTHREADPOOL_CACHELINE_ALIGNED __attribute__((__aligned__(PTHREADPOOL_CACHELINE_SIZE))) + +#if defined(__clang__) + #if __has_extension(c_static_assert) || __has_feature(c_static_assert) + #define PTHREADPOOL_STATIC_ASSERT(predicate, message) _Static_assert((predicate), message) + #else + #define PTHREADPOOL_STATIC_ASSERT(predicate, message) + #endif +#elif defined(__GNUC__) && ((__GNUC__ > 4) || (__GNUC__ == 4) && (__GNUC_MINOR__ >= 6)) + /* Static assert is supported by gcc >= 4.6 */ + #define PTHREADPOOL_STATIC_ASSERT(predicate, message) _Static_assert((predicate), message) +#else + #define PTHREADPOOL_STATIC_ASSERT(predicate, message) +#endif + +static inline size_t multiply_divide(size_t a, size_t b, size_t d) { + #if defined(__SIZEOF_SIZE_T__) && (__SIZEOF_SIZE_T__ == 4) + return (size_t) (((uint64_t) a) * ((uint64_t) b)) / ((uint64_t) d); + #elif defined(__SIZEOF_SIZE_T__) && (__SIZEOF_SIZE_T__ == 8) + return (size_t) (((__uint128_t) a) * ((__uint128_t) b)) / ((__uint128_t) d); + #else + #error "Unsupported platform" + #endif +} + +static inline size_t divide_round_up(size_t dividend, size_t divisor) { + if (dividend % divisor == 0) { + return dividend / divisor; + } else { + return dividend / divisor + 1; + } +} + +static inline size_t min(size_t a, size_t b) { + return a < b ? a : b; +} + +#if PTHREADPOOL_USE_FUTEX + #if defined(__linux__) + static int futex_wait(_Atomic uint32_t* address, uint32_t value) { + return syscall(SYS_futex, address, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, value, NULL); + } + + static int futex_wake_all(_Atomic uint32_t* address) { + return syscall(SYS_futex, address, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT_MAX); + } + #elif defined(__native_client__) + static struct nacl_irt_futex nacl_irt_futex = { 0 }; + static pthread_once_t nacl_init_guard = PTHREAD_ONCE_INIT; + static void nacl_init(void) { + nacl_interface_query(NACL_IRT_FUTEX_v0_1, &nacl_irt_futex, sizeof(nacl_irt_futex)); + } + + static int futex_wait(_Atomic uint32_t* address, uint32_t value) { + return nacl_irt_futex.futex_wait_abs((_Atomic int*) address, (int) value, NULL); + } + + static int futex_wake_all(_Atomic uint32_t* address) { + int count; + return nacl_irt_futex.futex_wake((_Atomic int*) address, INT_MAX, &count); + } + #else + #error "Platform-specific implementation of futex_wait and futex_wake_all required" + #endif +#endif + +#define THREADPOOL_COMMAND_MASK UINT32_C(0x7FFFFFFF) + +enum threadpool_command { + threadpool_command_init, + threadpool_command_compute_1d, + threadpool_command_shutdown, +}; + +struct PTHREADPOOL_CACHELINE_ALIGNED thread_info { + /** + * Index of the first element in the work range. + * Before processing a new element the owning worker thread increments this value. + */ + atomic_size_t range_start; + /** + * Index of the element after the last element of the work range. + * Before processing a new element the stealing worker thread decrements this value. + */ + atomic_size_t range_end; + /** + * The number of elements in the work range. + * Due to race conditions range_length <= range_end - range_start. + * The owning worker thread must decrement this value before incrementing @a range_start. + * The stealing worker thread must decrement this value before decrementing @a range_end. + */ + atomic_size_t range_length; + /** + * Thread number in the 0..threads_count-1 range. + */ + size_t thread_number; + /** + * The pthread object corresponding to the thread. + */ + pthread_t thread_object; + /** + * Condition variable used to wake up the thread. + * When the thread is idle, it waits on this condition variable. + */ + pthread_cond_t wakeup_condvar; +}; + +PTHREADPOOL_STATIC_ASSERT(sizeof(struct thread_info) % PTHREADPOOL_CACHELINE_SIZE == 0, "thread_info structure must occupy an integer number of cache lines (64 bytes)"); + +struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { + /** + * The number of threads that are processing an operation. + */ + atomic_size_t active_threads; +#if PTHREADPOOL_USE_FUTEX + /** + * Indicates if there are active threads. + * Only two values are possible: + * - has_active_threads == 0 if active_threads == 0 + * - has_active_threads == 1 if active_threads != 0 + */ + _Atomic uint32_t has_active_threads; +#endif + /** + * The last command submitted to the thread pool. + */ + _Atomic uint32_t command; + /** + * The function to call for each item. + */ + void *_Atomic task; + /** + * The first argument to the item processing function. + */ + void *_Atomic argument; + /** + * Copy of the flags passed to parallelization function. + */ + _Atomic uint32_t flags; + /** + * Serializes concurrent calls to @a pthreadpool_parallelize_* from different threads. + */ + pthread_mutex_t execution_mutex; +#if !PTHREADPOOL_USE_FUTEX + /** + * Guards access to the @a active_threads variable. + */ + pthread_mutex_t completion_mutex; + /** + * Condition variable to wait until all threads complete an operation (until @a active_threads is zero). + */ + pthread_cond_t completion_condvar; + /** + * Guards access to the @a command variable. + */ + pthread_mutex_t command_mutex; + /** + * Condition variable to wait for change of the @a command variable. + */ + pthread_cond_t command_condvar; +#endif + /** + * The number of threads in the thread pool. Never changes after initialization. + */ + size_t threads_count; + /** + * Thread information structures that immediately follow this structure. + */ + struct thread_info threads[]; +}; + +PTHREADPOOL_STATIC_ASSERT(sizeof(struct pthreadpool) % PTHREADPOOL_CACHELINE_SIZE == 0, "pthreadpool structure must occupy an integer number of cache lines (64 bytes)"); + +static void checkin_worker_thread(struct pthreadpool* threadpool) { + #if PTHREADPOOL_USE_FUTEX + if (atomic_fetch_sub_explicit(&threadpool->active_threads, 1, memory_order_relaxed) == 1) { + atomic_store_explicit(&threadpool->has_active_threads, 0, memory_order_release); + futex_wake_all(&threadpool->has_active_threads); + } + #else + pthread_mutex_lock(&threadpool->completion_mutex); + if (atomic_fetch_sub_explicit(&threadpool->active_threads, 1, memory_order_relaxed) == 1) { + pthread_cond_signal(&threadpool->completion_condvar); + } + pthread_mutex_unlock(&threadpool->completion_mutex); + #endif +} + +static void wait_worker_threads(struct pthreadpool* threadpool) { + /* Initial check */ + #if PTHREADPOOL_USE_FUTEX + uint32_t has_active_threads = atomic_load_explicit(&threadpool->has_active_threads, memory_order_relaxed); + if (has_active_threads == 0) { + return; + } + #else + size_t active_threads = atomic_load_explicit(&threadpool->active_threads, memory_order_relaxed); + if (active_threads == 0) { + return; + } + #endif + + /* Spin-wait */ + for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) { + /* This fence serves as a sleep instruction */ + atomic_thread_fence(memory_order_acquire); + + #if PTHREADPOOL_USE_FUTEX + has_active_threads = atomic_load_explicit(&threadpool->has_active_threads, memory_order_relaxed); + if (has_active_threads == 0) { + return; + } + #else + active_threads = atomic_load_explicit(&threadpool->active_threads, memory_order_relaxed); + if (active_threads == 0) { + return; + } + #endif + } + + /* Fall-back to mutex/futex wait */ + #if PTHREADPOOL_USE_FUTEX + while ((has_active_threads = atomic_load(&threadpool->has_active_threads)) != 0) { + futex_wait(&threadpool->has_active_threads, 1); + } + #else + pthread_mutex_lock(&threadpool->completion_mutex); + while (atomic_load_explicit(&threadpool->active_threads, memory_order_relaxed) != 0) { + pthread_cond_wait(&threadpool->completion_condvar, &threadpool->completion_mutex); + }; + pthread_mutex_unlock(&threadpool->completion_mutex); + #endif +} + +inline static bool atomic_decrement(atomic_size_t* value) { + size_t actual_value = atomic_load_explicit(value, memory_order_relaxed); + if (actual_value == 0) { + return false; + } + while (!atomic_compare_exchange_weak_explicit( + value, &actual_value, actual_value - 1, memory_order_relaxed, memory_order_relaxed)) + { + if (actual_value == 0) { + return false; + } + } + return true; +} + +inline static size_t modulo_decrement(uint32_t i, uint32_t n) { + /* Wrap modulo n, if needed */ + if (i == 0) { + i = n; + } + /* Decrement input variable */ + return i - 1; +} + +static void thread_parallelize_1d(struct pthreadpool* threadpool, struct thread_info* thread) { + const pthreadpool_task_1d_t task = (pthreadpool_task_1d_t) atomic_load_explicit(&threadpool->task, memory_order_relaxed); + void *const argument = atomic_load_explicit(&threadpool->argument, memory_order_relaxed); + /* Process thread's own range of items */ + size_t range_start = atomic_load_explicit(&thread->range_start, memory_order_relaxed); + while (atomic_decrement(&thread->range_length)) { + task(argument, range_start++); + } + + /* There still may be other threads with work */ + const size_t thread_number = thread->thread_number; + const size_t threads_count = threadpool->threads_count; + for (size_t tid = modulo_decrement(thread_number, threads_count); + tid != thread_number; + tid = modulo_decrement(tid, threads_count)) + { + struct thread_info* other_thread = &threadpool->threads[tid]; + while (atomic_decrement(&other_thread->range_length)) { + const size_t item_id = atomic_fetch_sub_explicit(&other_thread->range_end, 1, memory_order_relaxed) - 1; + task(argument, item_id); + } + } + atomic_thread_fence(memory_order_release); +} + +static uint32_t wait_for_new_command( + struct pthreadpool* threadpool, + uint32_t last_command) +{ + uint32_t command = atomic_load_explicit(&threadpool->command, memory_order_relaxed); + if (command != last_command) { + atomic_thread_fence(memory_order_acquire); + return command; + } + + /* Spin-wait loop */ + for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) { + /* This fence serves as a sleep instruction */ + atomic_thread_fence(memory_order_acquire); + + command = atomic_load_explicit(&threadpool->command, memory_order_relaxed); + if (command != last_command) { + atomic_thread_fence(memory_order_acquire); + return command; + } + } + + /* Spin-wait timed out, fall back to mutex/futex wait */ + #if PTHREADPOOL_USE_FUTEX + do { + futex_wait(&threadpool->command, last_command); + command = atomic_load_explicit(&threadpool->command, memory_order_relaxed); + } while (command == last_command); + #else + /* Lock the command mutex */ + pthread_mutex_lock(&threadpool->command_mutex); + /* Read the command */ + while ((command = atomic_load_explicit(&threadpool->command, memory_order_relaxed)) == last_command) { + /* Wait for new command */ + pthread_cond_wait(&threadpool->command_condvar, &threadpool->command_mutex); + } + /* Read a new command */ + pthread_mutex_unlock(&threadpool->command_mutex); + #endif + atomic_thread_fence(memory_order_acquire); + return command; +} + +static void* thread_main(void* arg) { + struct thread_info* thread = (struct thread_info*) arg; + struct pthreadpool* threadpool = ((struct pthreadpool*) (thread - thread->thread_number)) - 1; + uint32_t last_command = threadpool_command_init; + struct fpu_state saved_fpu_state = { 0 }; + + /* Check in */ + checkin_worker_thread(threadpool); + + /* Monitor new commands and act accordingly */ + for (;;) { + uint32_t command = wait_for_new_command(threadpool, last_command); + const uint32_t flags = atomic_load_explicit(&threadpool->flags, memory_order_relaxed); + + /* Process command */ + switch (command & THREADPOOL_COMMAND_MASK) { + case threadpool_command_compute_1d: + { + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + saved_fpu_state = get_fpu_state(); + disable_fpu_denormals(); + } + thread_parallelize_1d(threadpool, thread); + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + set_fpu_state(saved_fpu_state); + } + break; + } + case threadpool_command_shutdown: + /* Exit immediately: the master thread is waiting on pthread_join */ + return NULL; + case threadpool_command_init: + /* To inhibit compiler warning */ + break; + } + /* Notify the master thread that we finished processing */ + checkin_worker_thread(threadpool); + /* Update last command */ + last_command = command; + }; +} + +static struct pthreadpool* pthreadpool_allocate(size_t threads_count) { + const size_t threadpool_size = sizeof(struct pthreadpool) + threads_count * sizeof(struct thread_info); + struct pthreadpool* threadpool = NULL; + #if defined(__ANDROID__) + /* + * Android didn't get posix_memalign until API level 17 (Android 4.2). + * Use (otherwise obsolete) memalign function on Android platform. + */ + threadpool = memalign(PTHREADPOOL_CACHELINE_SIZE, threadpool_size); + if (threadpool == NULL) { + return NULL; + } + #else + if (posix_memalign((void**) &threadpool, PTHREADPOOL_CACHELINE_SIZE, threadpool_size) != 0) { + return NULL; + } + #endif + memset(threadpool, 0, threadpool_size); + return threadpool; +} + +struct pthreadpool* pthreadpool_create(size_t threads_count) { +#if defined(__native_client__) + pthread_once(&nacl_init_guard, nacl_init); +#endif + + if (threads_count == 0) { + threads_count = (size_t) sysconf(_SC_NPROCESSORS_ONLN); + } + struct pthreadpool* threadpool = pthreadpool_allocate(threads_count); + if (threadpool == NULL) { + return NULL; + } + threadpool->threads_count = threads_count; + for (size_t tid = 0; tid < threads_count; tid++) { + threadpool->threads[tid].thread_number = tid; + } + + /* Thread pool with a single thread computes everything on the caller thread. */ + if (threads_count > 1) { + pthread_mutex_init(&threadpool->execution_mutex, NULL); + #if !PTHREADPOOL_USE_FUTEX + pthread_mutex_init(&threadpool->completion_mutex, NULL); + pthread_cond_init(&threadpool->completion_condvar, NULL); + pthread_mutex_init(&threadpool->command_mutex, NULL); + pthread_cond_init(&threadpool->command_condvar, NULL); + #endif + + #if PTHREADPOOL_USE_FUTEX + atomic_store_explicit(&threadpool->has_active_threads, 1, memory_order_relaxed); + #endif + atomic_store_explicit( + &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_release); + + /* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */ + for (size_t tid = 1; tid < threads_count; tid++) { + pthread_create(&threadpool->threads[tid].thread_object, NULL, &thread_main, &threadpool->threads[tid]); + } + + /* Wait until all threads initialize */ + wait_worker_threads(threadpool); + } + return threadpool; +} + +size_t pthreadpool_get_threads_count(struct pthreadpool* threadpool) { + if (threadpool == NULL) { + return 1; + } else { + return threadpool->threads_count; + } +} + +void pthreadpool_parallelize_1d( + struct pthreadpool* threadpool, + pthreadpool_task_1d_t task, + void* argument, + size_t range, + uint32_t flags) +{ + if (threadpool == NULL || threadpool->threads_count <= 1) { + /* No thread pool used: execute task sequentially on the calling thread */ + struct fpu_state saved_fpu_state = { 0 }; + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + saved_fpu_state = get_fpu_state(); + disable_fpu_denormals(); + } + for (size_t i = 0; i < range; i++) { + task(argument, i); + } + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + set_fpu_state(saved_fpu_state); + } + } else { + /* Protect the global threadpool structures */ + pthread_mutex_lock(&threadpool->execution_mutex); + + #if !PTHREADPOOL_USE_FUTEX + /* Lock the command variables to ensure that threads don't start processing before they observe complete command with all arguments */ + pthread_mutex_lock(&threadpool->command_mutex); + #endif + + /* Setup global arguments */ + atomic_store_explicit(&threadpool->task, task, memory_order_relaxed); + atomic_store_explicit(&threadpool->argument, argument, memory_order_relaxed); + atomic_store_explicit(&threadpool->flags, flags, memory_order_relaxed); + + /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ + atomic_store_explicit( + &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_relaxed); + #if PTHREADPOOL_USE_FUTEX + atomic_store_explicit(&threadpool->has_active_threads, 1, memory_order_relaxed); + #endif + + /* Spread the work between threads */ + for (size_t tid = 0; tid < threadpool->threads_count; tid++) { + struct thread_info* thread = &threadpool->threads[tid]; + const size_t range_start = multiply_divide(range, tid, threadpool->threads_count); + const size_t range_end = multiply_divide(range, tid + 1, threadpool->threads_count); + atomic_store_explicit(&thread->range_start, range_start, memory_order_relaxed); + atomic_store_explicit(&thread->range_end, range_end, memory_order_relaxed); + atomic_store_explicit(&thread->range_length, range_end - range_start, memory_order_relaxed); + } + + #if PTHREADPOOL_USE_FUTEX + /* + * Make new command parameters globally visible. Having this fence before updating the command is imporatnt: it + * guarantees that if a worker thread observes new command value, it also observes the updated command parameters. + */ + atomic_thread_fence(memory_order_release); + #endif + + /* + * Update the threadpool command. + * Imporantly, do it after initializing command parameters (range, task, argument) + * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask + * to ensure the unmasked command is different then the last command, because worker threads + * monitor for change in the unmasked command. + */ + const uint32_t old_command = atomic_load_explicit(&threadpool->command, memory_order_relaxed); + const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_compute_1d; + + #if PTHREADPOOL_USE_FUTEX + atomic_store_explicit(&threadpool->command, new_command, memory_order_release); + + /* Wake up the threads */ + futex_wake_all(&threadpool->command); + #else + atomic_store_explicit(&threadpool->command, new_command, memory_order_relaxed); + + /* Unlock the command variables before waking up the threads for better performance */ + pthread_mutex_unlock(&threadpool->command_mutex); + + /* Wake up the threads */ + pthread_cond_broadcast(&threadpool->command_condvar); + #endif + + /* Save and modify FPU denormals control, if needed */ + struct fpu_state saved_fpu_state = { 0 }; + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + saved_fpu_state = get_fpu_state(); + disable_fpu_denormals(); + } + + /* Do computations as worker #0 */ + thread_parallelize_1d(threadpool, &threadpool->threads[0]); + + /* Restore FPU denormals control, if needed */ + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + set_fpu_state(saved_fpu_state); + } + + /* Wait until the threads finish computation */ + wait_worker_threads(threadpool); + + /* Make changes by other threads visible to this thread */ + atomic_thread_fence(memory_order_acquire); + + /* Unprotect the global threadpool structures */ + pthread_mutex_unlock(&threadpool->execution_mutex); + } +} + +struct compute_1d_tile_1d_context { + pthreadpool_task_1d_tile_1d_t task; + void* argument; + size_t range; + size_t tile; +}; + +static void compute_1d_tile_1d(const struct compute_1d_tile_1d_context* context, size_t linear_index) { + const size_t tile_index = linear_index; + const size_t index = tile_index * context->tile; + const size_t tile = min(context->tile, context->range - index); + context->task(context->argument, index, tile); +} + +void pthreadpool_parallelize_1d_tile_1d( + pthreadpool_t threadpool, + pthreadpool_task_1d_tile_1d_t task, + void* argument, + size_t range, + size_t tile, + uint32_t flags) +{ + if (threadpool == NULL || threadpool->threads_count <= 1) { + /* No thread pool used: execute task sequentially on the calling thread */ + struct fpu_state saved_fpu_state = { 0 }; + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + saved_fpu_state = get_fpu_state(); + disable_fpu_denormals(); + } + for (size_t i = 0; i < range; i += tile) { + task(argument, i, min(range - i, tile)); + } + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + set_fpu_state(saved_fpu_state); + } + } else { + /* Execute in parallel on the thread pool using linearized index */ + const size_t tile_range = divide_round_up(range, tile); + struct compute_1d_tile_1d_context context = { + .task = task, + .argument = argument, + .range = range, + .tile = tile + }; + pthreadpool_parallelize_1d(threadpool, (pthreadpool_task_1d_t) compute_1d_tile_1d, &context, tile_range, flags); + } +} + +struct compute_2d_context { + pthreadpool_task_2d_t task; + void* argument; + struct fxdiv_divisor_size_t range_j; +}; + +static void compute_2d(const struct compute_2d_context* context, size_t linear_index) { + const struct fxdiv_divisor_size_t range_j = context->range_j; + const struct fxdiv_result_size_t index = fxdiv_divide_size_t(linear_index, range_j); + context->task(context->argument, index.quotient, index.remainder); +} + +void pthreadpool_parallelize_2d( + struct pthreadpool* threadpool, + pthreadpool_task_2d_t task, + void* argument, + size_t range_i, + size_t range_j, + uint32_t flags) +{ + if (threadpool == NULL || threadpool->threads_count <= 1) { + /* No thread pool used: execute task sequentially on the calling thread */ + struct fpu_state saved_fpu_state = { 0 }; + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + saved_fpu_state = get_fpu_state(); + disable_fpu_denormals(); + } + for (size_t i = 0; i < range_i; i++) { + for (size_t j = 0; j < range_j; j++) { + task(argument, i, j); + } + } + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + set_fpu_state(saved_fpu_state); + } + } else { + /* Execute in parallel on the thread pool using linearized index */ + struct compute_2d_context context = { + .task = task, + .argument = argument, + .range_j = fxdiv_init_size_t(range_j) + }; + pthreadpool_parallelize_1d(threadpool, (pthreadpool_task_1d_t) compute_2d, &context, range_i * range_j, flags); + } +} + +struct compute_2d_tile_1d_context { + pthreadpool_task_2d_tile_1d_t task; + void* argument; + struct fxdiv_divisor_size_t tile_range_j; + size_t range_i; + size_t range_j; + size_t tile_j; +}; + +static void compute_2d_tile_1d(const struct compute_2d_tile_1d_context* context, size_t linear_index) { + const struct fxdiv_divisor_size_t tile_range_j = context->tile_range_j; + const struct fxdiv_result_size_t tile_index = fxdiv_divide_size_t(linear_index, tile_range_j); + const size_t max_tile_j = context->tile_j; + const size_t index_i = tile_index.quotient; + const size_t index_j = tile_index.remainder * max_tile_j; + const size_t tile_j = min(max_tile_j, context->range_j - index_j); + context->task(context->argument, index_i, index_j, tile_j); +} + +void pthreadpool_parallelize_2d_tile_1d( + pthreadpool_t threadpool, + pthreadpool_task_2d_tile_1d_t task, + void* argument, + size_t range_i, + size_t range_j, + size_t tile_j, + uint32_t flags) +{ + if (threadpool == NULL || threadpool->threads_count <= 1) { + /* No thread pool used: execute task sequentially on the calling thread */ + struct fpu_state saved_fpu_state = { 0 }; + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + saved_fpu_state = get_fpu_state(); + disable_fpu_denormals(); + } + for (size_t i = 0; i < range_i; i++) { + for (size_t j = 0; j < range_j; j += tile_j) { + task(argument, i, j, min(range_j - j, tile_j)); + } + } + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + set_fpu_state(saved_fpu_state); + } + } else { + /* Execute in parallel on the thread pool using linearized index */ + const size_t tile_range_j = divide_round_up(range_j, tile_j); + struct compute_2d_tile_1d_context context = { + .task = task, + .argument = argument, + .tile_range_j = fxdiv_init_size_t(tile_range_j), + .range_i = range_i, + .range_j = range_j, + .tile_j = tile_j + }; + pthreadpool_parallelize_1d(threadpool, (pthreadpool_task_1d_t) compute_2d_tile_1d, &context, range_i * tile_range_j, flags); + } +} + +struct compute_2d_tile_2d_context { + pthreadpool_task_2d_tile_2d_t task; + void* argument; + struct fxdiv_divisor_size_t tile_range_j; + size_t range_i; + size_t range_j; + size_t tile_i; + size_t tile_j; +}; + +static void compute_2d_tile_2d(const struct compute_2d_tile_2d_context* context, size_t linear_index) { + const struct fxdiv_divisor_size_t tile_range_j = context->tile_range_j; + const struct fxdiv_result_size_t tile_index = fxdiv_divide_size_t(linear_index, tile_range_j); + const size_t max_tile_i = context->tile_i; + const size_t max_tile_j = context->tile_j; + const size_t index_i = tile_index.quotient * max_tile_i; + const size_t index_j = tile_index.remainder * max_tile_j; + const size_t tile_i = min(max_tile_i, context->range_i - index_i); + const size_t tile_j = min(max_tile_j, context->range_j - index_j); + context->task(context->argument, index_i, index_j, tile_i, tile_j); +} + +void pthreadpool_parallelize_2d_tile_2d( + pthreadpool_t threadpool, + pthreadpool_task_2d_tile_2d_t task, + void* argument, + size_t range_i, + size_t range_j, + size_t tile_i, + size_t tile_j, + uint32_t flags) +{ + if (threadpool == NULL || threadpool->threads_count <= 1) { + /* No thread pool used: execute task sequentially on the calling thread */ + struct fpu_state saved_fpu_state = { 0 }; + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + saved_fpu_state = get_fpu_state(); + disable_fpu_denormals(); + } + for (size_t i = 0; i < range_i; i += tile_i) { + for (size_t j = 0; j < range_j; j += tile_j) { + task(argument, i, j, min(range_i - i, tile_i), min(range_j - j, tile_j)); + } + } + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + set_fpu_state(saved_fpu_state); + } + } else { + /* Execute in parallel on the thread pool using linearized index */ + const size_t tile_range_i = divide_round_up(range_i, tile_i); + const size_t tile_range_j = divide_round_up(range_j, tile_j); + struct compute_2d_tile_2d_context context = { + .task = task, + .argument = argument, + .tile_range_j = fxdiv_init_size_t(tile_range_j), + .range_i = range_i, + .range_j = range_j, + .tile_i = tile_i, + .tile_j = tile_j + }; + pthreadpool_parallelize_1d(threadpool, (pthreadpool_task_1d_t) compute_2d_tile_2d, &context, tile_range_i * tile_range_j, flags); + } +} + +struct compute_3d_tile_2d_context { + pthreadpool_task_3d_tile_2d_t task; + void* argument; + struct fxdiv_divisor_size_t tile_range_j; + struct fxdiv_divisor_size_t tile_range_k; + size_t range_j; + size_t range_k; + size_t tile_j; + size_t tile_k; +}; + +static void compute_3d_tile_2d(const struct compute_3d_tile_2d_context* context, size_t linear_index) { + const struct fxdiv_divisor_size_t tile_range_k = context->tile_range_k; + const struct fxdiv_result_size_t tile_index_ij_k = fxdiv_divide_size_t(linear_index, tile_range_k); + const struct fxdiv_divisor_size_t tile_range_j = context->tile_range_j; + const struct fxdiv_result_size_t tile_index_i_j = fxdiv_divide_size_t(tile_index_ij_k.quotient, tile_range_j); + const size_t max_tile_j = context->tile_j; + const size_t max_tile_k = context->tile_k; + const size_t index_i = tile_index_i_j.quotient; + const size_t index_j = tile_index_i_j.remainder * max_tile_j; + const size_t index_k = tile_index_ij_k.remainder * max_tile_k; + const size_t tile_j = min(max_tile_j, context->range_j - index_j); + const size_t tile_k = min(max_tile_k, context->range_k - index_k); + context->task(context->argument, index_i, index_j, index_k, tile_j, tile_k); +} + +void pthreadpool_parallelize_3d_tile_2d( + pthreadpool_t threadpool, + pthreadpool_task_3d_tile_2d_t task, + void* argument, + size_t range_i, + size_t range_j, + size_t range_k, + size_t tile_j, + size_t tile_k, + uint32_t flags) +{ + if (threadpool == NULL || threadpool->threads_count <= 1) { + /* No thread pool used: execute task sequentially on the calling thread */ + struct fpu_state saved_fpu_state = { 0 }; + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + saved_fpu_state = get_fpu_state(); + disable_fpu_denormals(); + } + for (size_t i = 0; i < range_i; i++) { + for (size_t j = 0; j < range_j; j += tile_j) { + for (size_t k = 0; k < range_k; k += tile_k) { + task(argument, i, j, k, min(range_j - j, tile_j), min(range_k - k, tile_k)); + } + } + } + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + set_fpu_state(saved_fpu_state); + } + } else { + /* Execute in parallel on the thread pool using linearized index */ + const size_t tile_range_j = divide_round_up(range_j, tile_j); + const size_t tile_range_k = divide_round_up(range_k, tile_k); + struct compute_3d_tile_2d_context context = { + .task = task, + .argument = argument, + .tile_range_j = fxdiv_init_size_t(tile_range_j), + .tile_range_k = fxdiv_init_size_t(tile_range_k), + .range_j = range_j, + .range_k = range_k, + .tile_j = tile_j, + .tile_k = tile_k + }; + pthreadpool_parallelize_1d(threadpool, + (pthreadpool_task_1d_t) compute_3d_tile_2d, &context, + range_i * tile_range_j * tile_range_k, flags); + } +} + +struct compute_4d_tile_2d_context { + pthreadpool_task_4d_tile_2d_t task; + void* argument; + struct fxdiv_divisor_size_t tile_range_kl; + struct fxdiv_divisor_size_t range_j; + struct fxdiv_divisor_size_t tile_range_l; + size_t range_k; + size_t range_l; + size_t tile_k; + size_t tile_l; +}; + +static void compute_4d_tile_2d(const struct compute_4d_tile_2d_context* context, size_t linear_index) { + const struct fxdiv_divisor_size_t tile_range_kl = context->tile_range_kl; + const struct fxdiv_result_size_t tile_index_ij_kl = fxdiv_divide_size_t(linear_index, tile_range_kl); + const struct fxdiv_divisor_size_t range_j = context->range_j; + const struct fxdiv_result_size_t tile_index_i_j = fxdiv_divide_size_t(tile_index_ij_kl.quotient, range_j); + const struct fxdiv_divisor_size_t tile_range_l = context->tile_range_l; + const struct fxdiv_result_size_t tile_index_k_l = fxdiv_divide_size_t(tile_index_ij_kl.remainder, tile_range_l); + const size_t max_tile_k = context->tile_k; + const size_t max_tile_l = context->tile_l; + const size_t index_i = tile_index_i_j.quotient; + const size_t index_j = tile_index_i_j.remainder; + const size_t index_k = tile_index_k_l.quotient * max_tile_k; + const size_t index_l = tile_index_k_l.remainder * max_tile_l; + const size_t tile_k = min(max_tile_k, context->range_k - index_k); + const size_t tile_l = min(max_tile_l, context->range_l - index_l); + context->task(context->argument, index_i, index_j, index_k, index_l, tile_k, tile_l); +} + +void pthreadpool_parallelize_4d_tile_2d( + pthreadpool_t threadpool, + pthreadpool_task_4d_tile_2d_t task, + void* argument, + size_t range_i, + size_t range_j, + size_t range_k, + size_t range_l, + size_t tile_k, + size_t tile_l, + uint32_t flags) +{ + if (threadpool == NULL || threadpool->threads_count <= 1) { + /* No thread pool used: execute task sequentially on the calling thread */ + struct fpu_state saved_fpu_state = { 0 }; + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + saved_fpu_state = get_fpu_state(); + disable_fpu_denormals(); + } + for (size_t i = 0; i < range_i; i++) { + for (size_t j = 0; j < range_j; j++) { + for (size_t k = 0; k < range_k; k += tile_k) { + for (size_t l = 0; l < range_l; l += tile_l) { + task(argument, i, j, k, l, + min(range_k - k, tile_k), min(range_l - l, tile_l)); + } + } + } + } + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + set_fpu_state(saved_fpu_state); + } + } else { + /* Execute in parallel on the thread pool using linearized index */ + const size_t tile_range_k = divide_round_up(range_k, tile_k); + const size_t tile_range_l = divide_round_up(range_l, tile_l); + struct compute_4d_tile_2d_context context = { + .task = task, + .argument = argument, + .tile_range_kl = fxdiv_init_size_t(tile_range_k * tile_range_l), + .range_j = fxdiv_init_size_t(range_j), + .tile_range_l = fxdiv_init_size_t(tile_range_l), + .range_k = range_k, + .range_l = range_l, + .tile_k = tile_k, + .tile_l = tile_l + }; + pthreadpool_parallelize_1d(threadpool, + (pthreadpool_task_1d_t) compute_4d_tile_2d, &context, + range_i * range_j * tile_range_k * tile_range_l, flags); + } +} + +struct compute_5d_tile_2d_context { + pthreadpool_task_5d_tile_2d_t task; + void* argument; + struct fxdiv_divisor_size_t tile_range_lm; + struct fxdiv_divisor_size_t range_k; + struct fxdiv_divisor_size_t tile_range_m; + struct fxdiv_divisor_size_t range_j; + size_t range_l; + size_t range_m; + size_t tile_l; + size_t tile_m; +}; + +static void compute_5d_tile_2d(const struct compute_5d_tile_2d_context* context, size_t linear_index) { + const struct fxdiv_divisor_size_t tile_range_lm = context->tile_range_lm; + const struct fxdiv_result_size_t tile_index_ijk_lm = fxdiv_divide_size_t(linear_index, tile_range_lm); + const struct fxdiv_divisor_size_t range_k = context->range_k; + const struct fxdiv_result_size_t tile_index_ij_k = fxdiv_divide_size_t(tile_index_ijk_lm.quotient, range_k); + const struct fxdiv_divisor_size_t tile_range_m = context->tile_range_m; + const struct fxdiv_result_size_t tile_index_l_m = fxdiv_divide_size_t(tile_index_ijk_lm.remainder, tile_range_m); + const struct fxdiv_divisor_size_t range_j = context->range_j; + const struct fxdiv_result_size_t tile_index_i_j = fxdiv_divide_size_t(tile_index_ij_k.quotient, range_j); + + const size_t max_tile_l = context->tile_l; + const size_t max_tile_m = context->tile_m; + const size_t index_i = tile_index_i_j.quotient; + const size_t index_j = tile_index_i_j.remainder; + const size_t index_k = tile_index_ij_k.remainder; + const size_t index_l = tile_index_l_m.quotient * max_tile_l; + const size_t index_m = tile_index_l_m.remainder * max_tile_m; + const size_t tile_l = min(max_tile_l, context->range_l - index_l); + const size_t tile_m = min(max_tile_m, context->range_m - index_m); + context->task(context->argument, index_i, index_j, index_k, index_l, index_m, tile_l, tile_m); +} + +void pthreadpool_parallelize_5d_tile_2d( + pthreadpool_t threadpool, + pthreadpool_task_5d_tile_2d_t task, + void* argument, + size_t range_i, + size_t range_j, + size_t range_k, + size_t range_l, + size_t range_m, + size_t tile_l, + size_t tile_m, + uint32_t flags) +{ + if (threadpool == NULL || threadpool->threads_count <= 1) { + /* No thread pool used: execute task sequentially on the calling thread */ + struct fpu_state saved_fpu_state = { 0 }; + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + saved_fpu_state = get_fpu_state(); + disable_fpu_denormals(); + } + for (size_t i = 0; i < range_i; i++) { + for (size_t j = 0; j < range_j; j++) { + for (size_t k = 0; k < range_k; k++) { + for (size_t l = 0; l < range_l; l += tile_l) { + for (size_t m = 0; m < range_m; m += tile_m) { + task(argument, i, j, k, l, m, + min(range_l - l, tile_l), min(range_m - m, tile_m)); + } + } + } + } + } + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + set_fpu_state(saved_fpu_state); + } + } else { + /* Execute in parallel on the thread pool using linearized index */ + const size_t tile_range_l = divide_round_up(range_l, tile_l); + const size_t tile_range_m = divide_round_up(range_m, tile_m); + struct compute_5d_tile_2d_context context = { + .task = task, + .argument = argument, + .tile_range_lm = fxdiv_init_size_t(tile_range_l * tile_range_m), + .range_k = fxdiv_init_size_t(range_k), + .tile_range_m = fxdiv_init_size_t(tile_range_m), + .range_j = fxdiv_init_size_t(range_j), + .range_l = range_l, + .range_m = range_m, + .tile_l = tile_l, + .tile_m = tile_m, + }; + pthreadpool_parallelize_1d(threadpool, + (pthreadpool_task_1d_t) compute_5d_tile_2d, &context, + range_i * range_j * range_k * tile_range_l * tile_range_m, flags); + } +} + +struct compute_6d_tile_2d_context { + pthreadpool_task_6d_tile_2d_t task; + void* argument; + struct fxdiv_divisor_size_t tile_range_lmn; + struct fxdiv_divisor_size_t range_k; + struct fxdiv_divisor_size_t tile_range_n; + struct fxdiv_divisor_size_t range_j; + struct fxdiv_divisor_size_t tile_range_m; + size_t range_m; + size_t range_n; + size_t tile_m; + size_t tile_n; +}; + +static void compute_6d_tile_2d(const struct compute_6d_tile_2d_context* context, size_t linear_index) { + const struct fxdiv_divisor_size_t tile_range_lmn = context->tile_range_lmn; + const struct fxdiv_result_size_t tile_index_ijk_lmn = fxdiv_divide_size_t(linear_index, tile_range_lmn); + const struct fxdiv_divisor_size_t range_k = context->range_k; + const struct fxdiv_result_size_t tile_index_ij_k = fxdiv_divide_size_t(tile_index_ijk_lmn.quotient, range_k); + const struct fxdiv_divisor_size_t tile_range_n = context->tile_range_n; + const struct fxdiv_result_size_t tile_index_lm_n = fxdiv_divide_size_t(tile_index_ijk_lmn.remainder, tile_range_n); + const struct fxdiv_divisor_size_t range_j = context->range_j; + const struct fxdiv_result_size_t tile_index_i_j = fxdiv_divide_size_t(tile_index_ij_k.quotient, range_j); + const struct fxdiv_divisor_size_t tile_range_m = context->tile_range_m; + const struct fxdiv_result_size_t tile_index_l_m = fxdiv_divide_size_t(tile_index_lm_n.quotient, tile_range_m); + + const size_t max_tile_m = context->tile_m; + const size_t max_tile_n = context->tile_n; + const size_t index_i = tile_index_i_j.quotient; + const size_t index_j = tile_index_i_j.remainder; + const size_t index_k = tile_index_ij_k.remainder; + const size_t index_l = tile_index_l_m.quotient; + const size_t index_m = tile_index_l_m.remainder * max_tile_m; + const size_t index_n = tile_index_lm_n.remainder * max_tile_n; + const size_t tile_m = min(max_tile_m, context->range_m - index_m); + const size_t tile_n = min(max_tile_n, context->range_n - index_n); + context->task(context->argument, index_i, index_j, index_k, index_l, index_m, index_n, tile_m, tile_n); +} + +void pthreadpool_parallelize_6d_tile_2d( + pthreadpool_t threadpool, + pthreadpool_task_6d_tile_2d_t task, + void* argument, + size_t range_i, + size_t range_j, + size_t range_k, + size_t range_l, + size_t range_m, + size_t range_n, + size_t tile_m, + size_t tile_n, + uint32_t flags) +{ + if (threadpool == NULL || threadpool->threads_count <= 1) { + /* No thread pool used: execute task sequentially on the calling thread */ + struct fpu_state saved_fpu_state = { 0 }; + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + saved_fpu_state = get_fpu_state(); + disable_fpu_denormals(); + } + for (size_t i = 0; i < range_i; i++) { + for (size_t j = 0; j < range_j; j++) { + for (size_t k = 0; k < range_k; k++) { + for (size_t l = 0; l < range_l; l++) { + for (size_t m = 0; m < range_m; m += tile_m) { + for (size_t n = 0; n < range_n; n += tile_n) { + task(argument, i, j, k, l, m, n, + min(range_m - m, tile_m), min(range_n - n, tile_n)); + } + } + } + } + } + } + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + set_fpu_state(saved_fpu_state); + } + } else { + /* Execute in parallel on the thread pool using linearized index */ + const size_t tile_range_m = divide_round_up(range_m, tile_m); + const size_t tile_range_n = divide_round_up(range_n, tile_n); + struct compute_6d_tile_2d_context context = { + .task = task, + .argument = argument, + .tile_range_lmn = fxdiv_init_size_t(range_l * tile_range_m * tile_range_n), + .range_k = fxdiv_init_size_t(range_k), + .tile_range_n = fxdiv_init_size_t(tile_range_n), + .range_j = fxdiv_init_size_t(range_j), + .tile_range_m = fxdiv_init_size_t(tile_range_m), + .range_m = range_m, + .range_n = range_n, + .tile_m = tile_m, + .tile_n = tile_n, + }; + pthreadpool_parallelize_1d(threadpool, + (pthreadpool_task_1d_t) compute_6d_tile_2d, &context, + range_i * range_j * range_k * range_l * tile_range_m * tile_range_n, flags); + } +} + +void pthreadpool_destroy(struct pthreadpool* threadpool) { + if (threadpool != NULL) { + if (threadpool->threads_count > 1) { + #if PTHREADPOOL_USE_FUTEX + atomic_store_explicit( + &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_relaxed); + atomic_store_explicit(&threadpool->has_active_threads, 1, memory_order_release); + + atomic_store_explicit(&threadpool->command, threadpool_command_shutdown, memory_order_release); + + /* Wake up worker threads */ + futex_wake_all(&threadpool->command); + #else + /* Lock the command variable to ensure that threads don't shutdown until both command and active_threads are updated */ + pthread_mutex_lock(&threadpool->command_mutex); + + /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ + atomic_store_explicit( + &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_release); + + /* Update the threadpool command. */ + atomic_store_explicit(&threadpool->command, threadpool_command_shutdown, memory_order_release); + + /* Wake up worker threads */ + pthread_cond_broadcast(&threadpool->command_condvar); + + /* Commit the state changes and let workers start processing */ + pthread_mutex_unlock(&threadpool->command_mutex); + #endif + + /* Wait until all threads return */ + for (size_t thread = 1; thread < threadpool->threads_count; thread++) { + pthread_join(threadpool->threads[thread].thread_object, NULL); + } + + /* Release resources */ + pthread_mutex_destroy(&threadpool->execution_mutex); + #if !PTHREADPOOL_USE_FUTEX + pthread_mutex_destroy(&threadpool->completion_mutex); + pthread_cond_destroy(&threadpool->completion_condvar); + pthread_mutex_destroy(&threadpool->command_mutex); + pthread_cond_destroy(&threadpool->command_condvar); + #endif + } + free(threadpool); + } +} diff --git a/src/threadpool-shim.c b/src/threadpool-shim.c new file mode 100644 index 0000000..c8ef51d --- /dev/null +++ b/src/threadpool-shim.c @@ -0,0 +1,195 @@ +/* Standard C headers */ +#include <stddef.h> + +/* Library header */ +#include <pthreadpool.h> + +static inline size_t min(size_t a, size_t b) { + return a < b ? a : b; +} + +struct pthreadpool* pthreadpool_create(size_t threads_count) { + return NULL; +} + +size_t pthreadpool_get_threads_count(struct pthreadpool* threadpool) { + return 1; +} + +void pthreadpool_parallelize_1d( + struct pthreadpool* threadpool, + pthreadpool_task_1d_t task, + void* argument, + size_t range, + uint32_t flags) +{ + for (size_t i = 0; i < range; i++) { + task(argument, i); + } +} + +void pthreadpool_parallelize_1d_tile_1d( + pthreadpool_t threadpool, + pthreadpool_task_1d_tile_1d_t task, + void* argument, + size_t range, + size_t tile, + uint32_t flags) +{ + for (size_t i = 0; i < range; i += tile) { + task(argument, i, min(range - i, tile)); + } +} + +void pthreadpool_parallelize_2d( + struct pthreadpool* threadpool, + pthreadpool_task_2d_t task, + void* argument, + size_t range_i, + size_t range_j, + uint32_t flags) +{ + for (size_t i = 0; i < range_i; i++) { + for (size_t j = 0; j < range_j; j++) { + task(argument, i, j); + } + } +} + +void pthreadpool_parallelize_2d_tile_1d( + pthreadpool_t threadpool, + pthreadpool_task_2d_tile_1d_t task, + void* argument, + size_t range_i, + size_t range_j, + size_t tile_j, + uint32_t flags) +{ + for (size_t i = 0; i < range_i; i++) { + for (size_t j = 0; j < range_j; j += tile_j) { + task(argument, i, j, min(range_j - j, tile_j)); + } + } +} + +void pthreadpool_parallelize_2d_tile_2d( + pthreadpool_t threadpool, + pthreadpool_task_2d_tile_2d_t task, + void* argument, + size_t range_i, + size_t range_j, + size_t tile_i, + size_t tile_j, + uint32_t flags) +{ + for (size_t i = 0; i < range_i; i += tile_i) { + for (size_t j = 0; j < range_j; j += tile_j) { + task(argument, i, j, min(range_i - i, tile_i), min(range_j - j, tile_j)); + } + } +} + +void pthreadpool_parallelize_3d_tile_2d( + pthreadpool_t threadpool, + pthreadpool_task_3d_tile_2d_t task, + void* argument, + size_t range_i, + size_t range_j, + size_t range_k, + size_t tile_j, + size_t tile_k, + uint32_t flags) +{ + for (size_t i = 0; i < range_i; i++) { + for (size_t j = 0; j < range_j; j += tile_j) { + for (size_t k = 0; k < range_k; k += tile_k) { + task(argument, i, j, k, + min(range_j - j, tile_j), min(range_k - k, tile_k)); + } + } + } +} + +void pthreadpool_parallelize_4d_tile_2d( + pthreadpool_t threadpool, + pthreadpool_task_4d_tile_2d_t task, + void* argument, + size_t range_i, + size_t range_j, + size_t range_k, + size_t range_l, + size_t tile_k, + size_t tile_l, + uint32_t flags) +{ + for (size_t i = 0; i < range_i; i++) { + for (size_t j = 0; j < range_j; j++) { + for (size_t k = 0; k < range_k; k += tile_k) { + for (size_t l = 0; l < range_l; l += tile_l) { + task(argument, i, j, k, l, + min(range_k - k, tile_k), min(range_l - l, tile_l)); + } + } + } + } +} + +void pthreadpool_parallelize_5d_tile_2d( + pthreadpool_t threadpool, + pthreadpool_task_5d_tile_2d_t task, + void* argument, + size_t range_i, + size_t range_j, + size_t range_k, + size_t range_l, + size_t range_m, + size_t tile_l, + size_t tile_m, + uint32_t flags) +{ + for (size_t i = 0; i < range_i; i++) { + for (size_t j = 0; j < range_j; j++) { + for (size_t k = 0; k < range_k; k++) { + for (size_t l = 0; l < range_l; l += tile_l) { + for (size_t m = 0; m < range_m; m += tile_m) { + task(argument, i, j, k, l, m, + min(range_l - l, tile_l), min(range_m - m, tile_m)); + } + } + } + } + } +} + +void pthreadpool_parallelize_6d_tile_2d( + pthreadpool_t threadpool, + pthreadpool_task_6d_tile_2d_t task, + void* argument, + size_t range_i, + size_t range_j, + size_t range_k, + size_t range_l, + size_t range_m, + size_t range_n, + size_t tile_m, + size_t tile_n, + uint32_t flags) +{ + for (size_t i = 0; i < range_i; i++) { + for (size_t j = 0; j < range_j; j++) { + for (size_t k = 0; k < range_k; k++) { + for (size_t l = 0; l < range_l; l++) { + for (size_t m = 0; m < range_m; m += tile_m) { + for (size_t n = 0; n < range_n; n += tile_n) { + task(argument, i, j, k, l, m, n, + min(range_m - m, tile_m), min(range_n - n, tile_n)); + } + } + } + } + } + } +} + +void pthreadpool_destroy(struct pthreadpool* threadpool) { +} diff --git a/src/threadpool-utils.h b/src/threadpool-utils.h new file mode 100644 index 0000000..65c7fb0 --- /dev/null +++ b/src/threadpool-utils.h @@ -0,0 +1,62 @@ +#pragma once + +#include <stdint.h> + +#if defined(__SSE__) || defined(__x86_64__) +#include <xmmintrin.h> +#endif + +struct fpu_state { +#if defined(__SSE__) || defined(__x86_64__) + uint32_t mxcsr; +#elif defined(__arm__) && defined(__ARM_FP) && (__ARM_FP != 0) + uint32_t fpscr; +#elif defined(__aarch64__) + uint64_t fpcr; +#else + char unused; +#endif +}; + +static inline struct fpu_state get_fpu_state() { + struct fpu_state state = { 0 }; +#if defined(__SSE__) || defined(__x86_64__) + state.mxcsr = (uint32_t) _mm_getcsr(); +#elif defined(__arm__) && defined(__ARM_FP) && (__ARM_FP != 0) + __asm__ __volatile__("VMRS %[fpscr], fpscr" : [fpscr] "=r" (state.fpscr)); +#elif defined(__aarch64__) + __asm__ __volatile__("MRS %[fpcr], fpcr" : [fpcr] "=r" (state.fpcr)); +#endif + return state; +} + +static inline void set_fpu_state(const struct fpu_state state) { +#if defined(__SSE__) || defined(__x86_64__) + _mm_setcsr((unsigned int) state.mxcsr); +#elif defined(__arm__) && defined(__ARM_FP) && (__ARM_FP != 0) + __asm__ __volatile__("VMSR fpscr, %[fpscr]" : : [fpscr] "r" (state.fpscr)); +#elif defined(__aarch64__) + __asm__ __volatile__("MSR fpcr, %[fpcr]" : : [fpcr] "r" (state.fpcr)); +#endif +} + +static inline void disable_fpu_denormals() { +#if defined(__SSE__) || defined(__x86_64__) + _mm_setcsr(_mm_getcsr() | 0x8040); +#elif defined(__arm__) && defined(__ARM_FP) && (__ARM_FP != 0) + uint32_t fpscr; + __asm__ __volatile__( + "VMRS %[fpscr], fpscr\n" + "ORR %[fpscr], #0x1000000\n" + "VMSR fpscr, %[fpscr]\n" + : [fpscr] "=r" (fpscr)); +#elif defined(__aarch64__) + uint64_t fpcr; + __asm__ __volatile__( + "MRS %[fpcr], fpcr\n" + "ORR %w[fpcr], %w[fpcr], 0x1000000\n" + "ORR %w[fpcr], %w[fpcr], 0x80000\n" + "MSR fpcr, %[fpcr]\n" + : [fpcr] "=r" (fpcr)); +#endif +} diff --git a/test/pthreadpool.cc b/test/pthreadpool.cc new file mode 100644 index 0000000..4faf3be --- /dev/null +++ b/test/pthreadpool.cc @@ -0,0 +1,2852 @@ +#include <gtest/gtest.h> + +#include <pthreadpool.h> + +#include <algorithm> +#include <atomic> +#include <cstddef> +#include <memory> + + +typedef std::unique_ptr<pthreadpool, decltype(&pthreadpool_destroy)> auto_pthreadpool_t; + + +const size_t kParallelize1DRange = 1223; +const size_t kParallelize1DTile1DRange = 1303; +const size_t kParallelize1DTile1DTile = 11; +const size_t kParallelize2DRangeI = 41; +const size_t kParallelize2DRangeJ = 43; +const size_t kParallelize2DTile1DRangeI = 43; +const size_t kParallelize2DTile1DRangeJ = 53; +const size_t kParallelize2DTile1DTileJ = 5; +const size_t kParallelize2DTile2DRangeI = 53; +const size_t kParallelize2DTile2DRangeJ = 59; +const size_t kParallelize2DTile2DTileI = 5; +const size_t kParallelize2DTile2DTileJ = 7; +const size_t kParallelize3DTile2DRangeI = 19; +const size_t kParallelize3DTile2DRangeJ = 23; +const size_t kParallelize3DTile2DRangeK = 29; +const size_t kParallelize3DTile2DTileJ = 2; +const size_t kParallelize3DTile2DTileK = 3; +const size_t kParallelize4DTile2DRangeI = 17; +const size_t kParallelize4DTile2DRangeJ = 19; +const size_t kParallelize4DTile2DRangeK = 23; +const size_t kParallelize4DTile2DRangeL = 29; +const size_t kParallelize4DTile2DTileK = 2; +const size_t kParallelize4DTile2DTileL = 3; +const size_t kParallelize5DTile2DRangeI = 13; +const size_t kParallelize5DTile2DRangeJ = 17; +const size_t kParallelize5DTile2DRangeK = 19; +const size_t kParallelize5DTile2DRangeL = 23; +const size_t kParallelize5DTile2DRangeM = 29; +const size_t kParallelize5DTile2DTileL = 3; +const size_t kParallelize5DTile2DTileM = 2; +const size_t kParallelize6DTile2DRangeI = 7; +const size_t kParallelize6DTile2DRangeJ = 11; +const size_t kParallelize6DTile2DRangeK = 13; +const size_t kParallelize6DTile2DRangeL = 17; +const size_t kParallelize6DTile2DRangeM = 19; +const size_t kParallelize6DTile2DRangeN = 23; +const size_t kParallelize6DTile2DTileM = 3; +const size_t kParallelize6DTile2DTileN = 2; + +const size_t kIncrementIterations = 101; +const size_t kIncrementIterations5D = 7; +const size_t kIncrementIterations6D = 3; + + +TEST(CreateAndDestroy, NullThreadPool) { + pthreadpool* threadpool = nullptr; + pthreadpool_destroy(threadpool); +} + +TEST(CreateAndDestroy, SingleThreadPool) { + pthreadpool* threadpool = pthreadpool_create(1); + ASSERT_TRUE(threadpool); + pthreadpool_destroy(threadpool); +} + +TEST(CreateAndDestroy, MultiThreadPool) { + pthreadpool* threadpool = pthreadpool_create(0); + ASSERT_TRUE(threadpool); + pthreadpool_destroy(threadpool); +} + +static void ComputeNothing1D(void*, size_t) { +} + +TEST(Parallelize1D, SingleThreadPoolCompletes) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_1d(threadpool.get(), + ComputeNothing1D, + nullptr, + kParallelize1DRange, + 0 /* flags */); +} + +TEST(Parallelize1D, MultiThreadPoolCompletes) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d( + threadpool.get(), + ComputeNothing1D, + nullptr, + kParallelize1DRange, + 0 /* flags */); +} + +static void CheckBounds1D(void*, size_t i) { + EXPECT_LT(i, kParallelize1DRange); +} + +TEST(Parallelize1D, SingleThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_1d( + threadpool.get(), + CheckBounds1D, + nullptr, + kParallelize1DRange, + 0 /* flags */); +} + +TEST(Parallelize1D, MultiThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d( + threadpool.get(), + CheckBounds1D, + nullptr, + kParallelize1DRange, + 0 /* flags */); +} + +static void SetTrue1D(std::atomic_bool* processed_indicators, size_t i) { + processed_indicators[i].store(true, std::memory_order_relaxed); +} + +TEST(Parallelize1D, SingleThreadPoolAllItemsProcessed) { + std::vector<std::atomic_bool> indicators(kParallelize1DRange); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_1d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_1d_t>(SetTrue1D), + static_cast<void*>(indicators.data()), + kParallelize1DRange, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DRange; i++) { + EXPECT_TRUE(indicators[i].load(std::memory_order_relaxed)) + << "Element " << i << " not processed"; + } +} + +TEST(Parallelize1D, MultiThreadPoolAllItemsProcessed) { + std::vector<std::atomic_bool> indicators(kParallelize1DRange); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_1d_t>(SetTrue1D), + static_cast<void*>(indicators.data()), + kParallelize1DRange, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DRange; i++) { + EXPECT_TRUE(indicators[i].load(std::memory_order_relaxed)) + << "Element " << i << " not processed"; + } +} + +static void Increment1D(std::atomic_int* processed_counters, size_t i) { + processed_counters[i].fetch_add(1, std::memory_order_relaxed); +} + +TEST(Parallelize1D, SingleThreadPoolEachItemProcessedOnce) { + std::vector<std::atomic_int> counters(kParallelize1DRange); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_1d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_1d_t>(Increment1D), + static_cast<void*>(counters.data()), + kParallelize1DRange, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DRange; i++) { + EXPECT_EQ(counters[i].load(std::memory_order_relaxed), 1) + << "Element " << i << " was processed " << counters[i].load(std::memory_order_relaxed) << " times (expected: 1)"; + } +} + +TEST(Parallelize1D, MultiThreadPoolEachItemProcessedOnce) { + std::vector<std::atomic_int> counters(kParallelize1DRange); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_1d_t>(Increment1D), + static_cast<void*>(counters.data()), + kParallelize1DRange, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DRange; i++) { + EXPECT_EQ(counters[i].load(std::memory_order_relaxed), 1) + << "Element " << i << " was processed " << counters[i].load(std::memory_order_relaxed) << " times (expected: 1)"; + } +} + +TEST(Parallelize1D, SingleThreadPoolEachItemProcessedMultipleTimes) { + std::vector<std::atomic_int> counters(kParallelize1DRange); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + for (size_t iteration = 0; iteration < kIncrementIterations; iteration++) { + pthreadpool_parallelize_1d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_1d_t>(Increment1D), + static_cast<void*>(counters.data()), + kParallelize1DRange, + 0 /* flags */); + } + + for (size_t i = 0; i < kParallelize1DRange; i++) { + EXPECT_EQ(counters[i].load(std::memory_order_relaxed), kIncrementIterations) + << "Element " << i << " was processed " << counters[i].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations << ")"; + } +} + +TEST(Parallelize1D, MultiThreadPoolEachItemProcessedMultipleTimes) { + std::vector<std::atomic_int> counters(kParallelize1DRange); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + for (size_t iteration = 0; iteration < kIncrementIterations; iteration++) { + pthreadpool_parallelize_1d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_1d_t>(Increment1D), + static_cast<void*>(counters.data()), + kParallelize1DRange, + 0 /* flags */); + } + + for (size_t i = 0; i < kParallelize1DRange; i++) { + EXPECT_EQ(counters[i].load(std::memory_order_relaxed), kIncrementIterations) + << "Element " << i << " was processed " << counters[i].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations << ")"; + } +} + +static void WorkImbalance1D(std::atomic_int* num_processed_items, size_t i) { + num_processed_items->fetch_add(1, std::memory_order_relaxed); + if (i == 0) { + /* Spin-wait until all items are computed */ + while (num_processed_items->load(std::memory_order_relaxed) != kParallelize1DRange) { + std::atomic_thread_fence(std::memory_order_acquire); + } + } +} + +TEST(Parallelize1D, MultiThreadPoolWorkStealing) { + std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_1d_t>(WorkImbalance1D), + static_cast<void*>(&num_processed_items), + kParallelize1DRange, + 0 /* flags */); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize1DRange); +} + +static void ComputeNothing1DTile1D(void*, size_t, size_t) { +} + +TEST(Parallelize1DTile1D, SingleThreadPoolCompletes) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_1d_tile_1d(threadpool.get(), + ComputeNothing1DTile1D, + nullptr, + kParallelize1DTile1DRange, kParallelize1DTile1DTile, + 0 /* flags */); +} + +TEST(Parallelize1DTile1D, MultiThreadPoolCompletes) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_tile_1d( + threadpool.get(), + ComputeNothing1DTile1D, + nullptr, + kParallelize1DTile1DRange, kParallelize1DTile1DTile, + 0 /* flags */); +} + +static void CheckBounds1DTile1D(void*, size_t start_i, size_t tile_i) { + EXPECT_LT(start_i, kParallelize1DTile1DRange); + EXPECT_LE(start_i + tile_i, kParallelize1DTile1DRange); +} + +TEST(Parallelize1DTile1D, SingleThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_1d_tile_1d( + threadpool.get(), + CheckBounds1DTile1D, + nullptr, + kParallelize1DTile1DRange, kParallelize1DTile1DTile, + 0 /* flags */); +} + +TEST(Parallelize1DTile1D, MultiThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_tile_1d( + threadpool.get(), + CheckBounds1DTile1D, + nullptr, + kParallelize1DTile1DRange, kParallelize1DTile1DTile, + 0 /* flags */); +} + +static void CheckTiling1DTile1D(void*, size_t start_i, size_t tile_i) { + EXPECT_GT(tile_i, 0); + EXPECT_LE(tile_i, kParallelize1DTile1DTile); + EXPECT_EQ(start_i % kParallelize1DTile1DTile, 0); + EXPECT_EQ(tile_i, std::min<size_t>(kParallelize1DTile1DTile, kParallelize1DTile1DRange - start_i)); +} + +TEST(Parallelize1DTile1D, SingleThreadPoolUniformTiling) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_1d_tile_1d( + threadpool.get(), + CheckTiling1DTile1D, + nullptr, + kParallelize1DTile1DRange, kParallelize1DTile1DTile, + 0 /* flags */); +} + +TEST(Parallelize1DTile1D, MultiThreadPoolUniformTiling) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_tile_1d( + threadpool.get(), + CheckTiling1DTile1D, + nullptr, + kParallelize1DTile1DRange, kParallelize1DTile1DTile, + 0 /* flags */); +} + +static void SetTrue1DTile1D(std::atomic_bool* processed_indicators, size_t start_i, size_t tile_i) { + for (size_t i = start_i; i < start_i + tile_i; i++) { + processed_indicators[i].store(true, std::memory_order_relaxed); + } +} + +TEST(Parallelize1DTile1D, SingleThreadPoolAllItemsProcessed) { + std::vector<std::atomic_bool> indicators(kParallelize1DTile1DRange); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_1d_tile_1d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_1d_tile_1d_t>(SetTrue1DTile1D), + static_cast<void*>(indicators.data()), + kParallelize1DTile1DRange, kParallelize1DTile1DTile, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DTile1DRange; i++) { + EXPECT_TRUE(indicators[i].load(std::memory_order_relaxed)) + << "Element " << i << " not processed"; + } +} + +TEST(Parallelize1DTile1D, MultiThreadPoolAllItemsProcessed) { + std::vector<std::atomic_bool> indicators(kParallelize1DTile1DRange); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_tile_1d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_1d_tile_1d_t>(SetTrue1DTile1D), + static_cast<void*>(indicators.data()), + kParallelize1DTile1DRange, kParallelize1DTile1DTile, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DTile1DRange; i++) { + EXPECT_TRUE(indicators[i].load(std::memory_order_relaxed)) + << "Element " << i << " not processed"; + } +} + +static void Increment1DTile1D(std::atomic_int* processed_counters, size_t start_i, size_t tile_i) { + for (size_t i = start_i; i < start_i + tile_i; i++) { + processed_counters[i].fetch_add(1, std::memory_order_relaxed); + } +} + +TEST(Parallelize1DTile1D, SingleThreadPoolEachItemProcessedOnce) { + std::vector<std::atomic_int> counters(kParallelize1DTile1DRange); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_1d_tile_1d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_1d_tile_1d_t>(Increment1DTile1D), + static_cast<void*>(counters.data()), + kParallelize1DTile1DRange, kParallelize1DTile1DTile, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DTile1DRange; i++) { + EXPECT_EQ(counters[i].load(std::memory_order_relaxed), 1) + << "Element " << i << " was processed " << counters[i].load(std::memory_order_relaxed) << " times (expected: 1)"; + } +} + +TEST(Parallelize1DTile1D, MultiThreadPoolEachItemProcessedOnce) { + std::vector<std::atomic_int> counters(kParallelize1DTile1DRange); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_tile_1d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_1d_tile_1d_t>(Increment1DTile1D), + static_cast<void*>(counters.data()), + kParallelize1DTile1DRange, kParallelize1DTile1DTile, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DTile1DRange; i++) { + EXPECT_EQ(counters[i].load(std::memory_order_relaxed), 1) + << "Element " << i << " was processed " << counters[i].load(std::memory_order_relaxed) << " times (expected: 1)"; + } +} + +TEST(Parallelize1DTile1D, SingleThreadPoolEachItemProcessedMultipleTimes) { + std::vector<std::atomic_int> counters(kParallelize1DTile1DRange); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + for (size_t iteration = 0; iteration < kIncrementIterations; iteration++) { + pthreadpool_parallelize_1d_tile_1d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_1d_tile_1d_t>(Increment1DTile1D), + static_cast<void*>(counters.data()), + kParallelize1DTile1DRange, kParallelize1DTile1DTile, + 0 /* flags */); + } + + for (size_t i = 0; i < kParallelize1DTile1DRange; i++) { + EXPECT_EQ(counters[i].load(std::memory_order_relaxed), kIncrementIterations) + << "Element " << i << " was processed " << counters[i].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations << ")"; + } +} + +TEST(Parallelize1DTile1D, MultiThreadPoolEachItemProcessedMultipleTimes) { + std::vector<std::atomic_int> counters(kParallelize1DTile1DRange); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + for (size_t iteration = 0; iteration < kIncrementIterations; iteration++) { + pthreadpool_parallelize_1d_tile_1d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_1d_tile_1d_t>(Increment1DTile1D), + static_cast<void*>(counters.data()), + kParallelize1DTile1DRange, kParallelize1DTile1DTile, + 0 /* flags */); + } + + for (size_t i = 0; i < kParallelize1DTile1DRange; i++) { + EXPECT_EQ(counters[i].load(std::memory_order_relaxed), kIncrementIterations) + << "Element " << i << " was processed " << counters[i].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations << ")"; + } +} + +static void WorkImbalance1DTile1D(std::atomic_int* num_processed_items, size_t start_i, size_t tile_i) { + num_processed_items->fetch_add(tile_i, std::memory_order_relaxed); + if (start_i == 0) { + /* Spin-wait until all items are computed */ + while (num_processed_items->load(std::memory_order_relaxed) != kParallelize1DTile1DRange) { + std::atomic_thread_fence(std::memory_order_acquire); + } + } +} + +TEST(Parallelize1DTile1D, MultiThreadPoolWorkStealing) { + std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_tile_1d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_1d_tile_1d_t>(WorkImbalance1DTile1D), + static_cast<void*>(&num_processed_items), + kParallelize1DTile1DRange, kParallelize1DTile1DTile, + 0 /* flags */); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize1DTile1DRange); +} + +static void ComputeNothing2D(void*, size_t, size_t) { +} + +TEST(Parallelize2D, SingleThreadPoolCompletes) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_2d(threadpool.get(), + ComputeNothing2D, + nullptr, + kParallelize2DRangeI, kParallelize2DRangeJ, + 0 /* flags */); +} + +TEST(Parallelize2D, MultiThreadPoolCompletes) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_2d( + threadpool.get(), + ComputeNothing2D, + nullptr, + kParallelize2DRangeI, kParallelize2DRangeJ, + 0 /* flags */); +} + +static void CheckBounds2D(void*, size_t i, size_t j) { + EXPECT_LT(i, kParallelize2DRangeI); + EXPECT_LT(j, kParallelize2DRangeJ); +} + +TEST(Parallelize2D, SingleThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_2d( + threadpool.get(), + CheckBounds2D, + nullptr, + kParallelize2DRangeI, kParallelize2DRangeJ, + 0 /* flags */); +} + +TEST(Parallelize2D, MultiThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_2d( + threadpool.get(), + CheckBounds2D, + nullptr, + kParallelize2DRangeI, kParallelize2DRangeJ, + 0 /* flags */); +} + +static void SetTrue2D(std::atomic_bool* processed_indicators, size_t i, size_t j) { + const size_t linear_idx = i * kParallelize2DRangeJ + j; + processed_indicators[linear_idx].store(true, std::memory_order_relaxed); +} + +TEST(Parallelize2D, SingleThreadPoolAllItemsProcessed) { + std::vector<std::atomic_bool> indicators(kParallelize2DRangeI * kParallelize2DRangeJ); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_2d_t>(SetTrue2D), + static_cast<void*>(indicators.data()), + kParallelize2DRangeI, kParallelize2DRangeJ, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize2DRangeI; i++) { + for (size_t j = 0; j < kParallelize2DRangeJ; j++) { + const size_t linear_idx = i * kParallelize2DRangeJ + j; + EXPECT_TRUE(indicators[linear_idx].load(std::memory_order_relaxed)) + << "Element (" << i << ", " << j << ") not processed"; + } + } +} + +TEST(Parallelize2D, MultiThreadPoolAllItemsProcessed) { + std::vector<std::atomic_bool> indicators(kParallelize2DRangeI * kParallelize2DRangeJ); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_2d_t>(SetTrue2D), + static_cast<void*>(indicators.data()), + kParallelize2DRangeI, kParallelize2DRangeJ, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize2DRangeI; i++) { + for (size_t j = 0; j < kParallelize2DRangeJ; j++) { + const size_t linear_idx = i * kParallelize2DRangeJ + j; + EXPECT_TRUE(indicators[linear_idx].load(std::memory_order_relaxed)) + << "Element (" << i << ", " << j << ") not processed"; + } + } +} + +static void Increment2D(std::atomic_int* processed_counters, size_t i, size_t j) { + const size_t linear_idx = i * kParallelize2DRangeJ + j; + processed_counters[linear_idx].fetch_add(1, std::memory_order_relaxed); +} + +TEST(Parallelize2D, SingleThreadPoolEachItemProcessedOnce) { + std::vector<std::atomic_int> counters(kParallelize2DRangeI * kParallelize2DRangeJ); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_2d_t>(Increment2D), + static_cast<void*>(counters.data()), + kParallelize2DRangeI, kParallelize2DRangeJ, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize2DRangeI; i++) { + for (size_t j = 0; j < kParallelize2DRangeJ; j++) { + const size_t linear_idx = i * kParallelize2DRangeJ + j; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), 1) + << "Element (" << i << ", " << j << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times (expected: 1)"; + } + } +} + +TEST(Parallelize2D, MultiThreadPoolEachItemProcessedOnce) { + std::vector<std::atomic_int> counters(kParallelize2DRangeI * kParallelize2DRangeJ); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_2d_t>(Increment2D), + static_cast<void*>(counters.data()), + kParallelize2DRangeI, kParallelize2DRangeJ, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize2DRangeI; i++) { + for (size_t j = 0; j < kParallelize2DRangeJ; j++) { + const size_t linear_idx = i * kParallelize2DRangeJ + j; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), 1) + << "Element (" << i << ", " << j << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times (expected: 1)"; + } + } +} + +TEST(Parallelize2D, SingleThreadPoolEachItemProcessedMultipleTimes) { + std::vector<std::atomic_int> counters(kParallelize2DRangeI * kParallelize2DRangeJ); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + for (size_t iteration = 0; iteration < kIncrementIterations; iteration++) { + pthreadpool_parallelize_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_2d_t>(Increment2D), + static_cast<void*>(counters.data()), + kParallelize2DRangeI, kParallelize2DRangeJ, + 0 /* flags */); + } + + for (size_t i = 0; i < kParallelize2DRangeI; i++) { + for (size_t j = 0; j < kParallelize2DRangeJ; j++) { + const size_t linear_idx = i * kParallelize2DRangeJ + j; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), kIncrementIterations) + << "Element (" << i << ", " << j << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations << ")"; + } + } +} + +TEST(Parallelize2D, MultiThreadPoolEachItemProcessedMultipleTimes) { + std::vector<std::atomic_int> counters(kParallelize2DRangeI * kParallelize2DRangeJ); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + for (size_t iteration = 0; iteration < kIncrementIterations; iteration++) { + pthreadpool_parallelize_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_2d_t>(Increment2D), + static_cast<void*>(counters.data()), + kParallelize2DRangeI, kParallelize2DRangeJ, + 0 /* flags */); + } + + for (size_t i = 0; i < kParallelize2DRangeI; i++) { + for (size_t j = 0; j < kParallelize2DRangeJ; j++) { + const size_t linear_idx = i * kParallelize2DRangeJ + j; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), kIncrementIterations) + << "Element (" << i << ", " << j << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations << ")"; + } + } +} + +static void WorkImbalance2D(std::atomic_int* num_processed_items, size_t i, size_t j) { + num_processed_items->fetch_add(1, std::memory_order_relaxed); + if (i == 0 && j == 0) { + /* Spin-wait until all items are computed */ + while (num_processed_items->load(std::memory_order_relaxed) != kParallelize2DRangeI * kParallelize2DRangeJ) { + std::atomic_thread_fence(std::memory_order_acquire); + } + } +} + +TEST(Parallelize2D, MultiThreadPoolWorkStealing) { + std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_2d_t>(WorkImbalance2D), + static_cast<void*>(&num_processed_items), + kParallelize2DRangeI, kParallelize2DRangeJ, + 0 /* flags */); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize2DRangeI * kParallelize2DRangeJ); +} + +static void ComputeNothing2DTile1D(void*, size_t, size_t, size_t) { +} + +TEST(Parallelize2DTile1D, SingleThreadPoolCompletes) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_2d_tile_1d(threadpool.get(), + ComputeNothing2DTile1D, + nullptr, + kParallelize2DTile1DRangeI, kParallelize2DTile1DRangeJ, kParallelize2DTile1DTileJ, + 0 /* flags */); +} + +TEST(Parallelize2DTile1D, MultiThreadPoolCompletes) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_2d_tile_1d( + threadpool.get(), + ComputeNothing2DTile1D, + nullptr, + kParallelize2DTile1DRangeI, kParallelize2DTile1DRangeJ, kParallelize2DTile1DTileJ, + 0 /* flags */); +} + +static void CheckBounds2DTile1D(void*, size_t i, size_t start_j, size_t tile_j) { + EXPECT_LT(i, kParallelize2DTile1DRangeI); + EXPECT_LT(start_j, kParallelize2DTile1DRangeJ); + EXPECT_LE(start_j + tile_j, kParallelize2DTile1DRangeJ); +} + +TEST(Parallelize2DTile1D, SingleThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_2d_tile_1d( + threadpool.get(), + CheckBounds2DTile1D, + nullptr, + kParallelize2DTile1DRangeI, kParallelize2DTile1DRangeJ, kParallelize2DTile1DTileJ, + 0 /* flags */); +} + +TEST(Parallelize2DTile1D, MultiThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_2d_tile_1d( + threadpool.get(), + CheckBounds2DTile1D, + nullptr, + kParallelize2DTile1DRangeI, kParallelize2DTile1DRangeJ, kParallelize2DTile1DTileJ, + 0 /* flags */); +} + +static void CheckTiling2DTile1D(void*, size_t i, size_t start_j, size_t tile_j) { + EXPECT_GT(tile_j, 0); + EXPECT_LE(tile_j, kParallelize2DTile1DTileJ); + EXPECT_EQ(start_j % kParallelize2DTile1DTileJ, 0); + EXPECT_EQ(tile_j, std::min<size_t>(kParallelize2DTile1DTileJ, kParallelize2DTile1DRangeJ - start_j)); +} + +TEST(Parallelize2DTile1D, SingleThreadPoolUniformTiling) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_2d_tile_1d( + threadpool.get(), + CheckTiling2DTile1D, + nullptr, + kParallelize2DTile1DRangeI, kParallelize2DTile1DRangeJ, kParallelize2DTile1DTileJ, + 0 /* flags */); +} + +TEST(Parallelize2DTile1D, MultiThreadPoolUniformTiling) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_2d_tile_1d( + threadpool.get(), + CheckTiling2DTile1D, + nullptr, + kParallelize2DTile1DRangeI, kParallelize2DTile1DRangeJ, kParallelize2DTile1DTileJ, + 0 /* flags */); +} + +static void SetTrue2DTile1D(std::atomic_bool* processed_indicators, size_t i, size_t start_j, size_t tile_j) { + for (size_t j = start_j; j < start_j + tile_j; j++) { + const size_t linear_idx = i * kParallelize2DTile1DRangeJ + j; + processed_indicators[linear_idx].store(true, std::memory_order_relaxed); + } +} + +TEST(Parallelize2DTile1D, SingleThreadPoolAllItemsProcessed) { + std::vector<std::atomic_bool> indicators(kParallelize2DTile1DRangeI * kParallelize2DTile1DRangeJ); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_2d_tile_1d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_2d_tile_1d_t>(SetTrue2DTile1D), + static_cast<void*>(indicators.data()), + kParallelize2DTile1DRangeI, kParallelize2DTile1DRangeJ, kParallelize2DTile1DTileJ, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize2DTile1DRangeI; i++) { + for (size_t j = 0; j < kParallelize2DTile1DRangeJ; j++) { + const size_t linear_idx = i * kParallelize2DTile1DRangeJ + j; + EXPECT_TRUE(indicators[linear_idx].load(std::memory_order_relaxed)) + << "Element (" << i << ", " << j << ") not processed"; + } + } +} + +TEST(Parallelize2DTile1D, MultiThreadPoolAllItemsProcessed) { + std::vector<std::atomic_bool> indicators(kParallelize2DTile1DRangeI * kParallelize2DTile1DRangeJ); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_2d_tile_1d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_2d_tile_1d_t>(SetTrue2DTile1D), + static_cast<void*>(indicators.data()), + kParallelize2DTile1DRangeI, kParallelize2DTile1DRangeJ, kParallelize2DTile1DTileJ, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize2DTile1DRangeI; i++) { + for (size_t j = 0; j < kParallelize2DTile1DRangeJ; j++) { + const size_t linear_idx = i * kParallelize2DTile1DRangeJ + j; + EXPECT_TRUE(indicators[linear_idx].load(std::memory_order_relaxed)) + << "Element (" << i << ", " << j << ") not processed"; + } + } +} + +static void Increment2DTile1D(std::atomic_int* processed_counters, size_t i, size_t start_j, size_t tile_j) { + for (size_t j = start_j; j < start_j + tile_j; j++) { + const size_t linear_idx = i * kParallelize2DTile1DRangeJ + j; + processed_counters[linear_idx].fetch_add(1, std::memory_order_relaxed); + } +} + +TEST(Parallelize2DTile1D, SingleThreadPoolEachItemProcessedOnce) { + std::vector<std::atomic_int> counters(kParallelize2DTile1DRangeI * kParallelize2DTile1DRangeJ); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_2d_tile_1d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_2d_tile_1d_t>(Increment2DTile1D), + static_cast<void*>(counters.data()), + kParallelize2DTile1DRangeI, kParallelize2DTile1DRangeJ, kParallelize2DTile1DTileJ, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize2DTile1DRangeI; i++) { + for (size_t j = 0; j < kParallelize2DTile1DRangeJ; j++) { + const size_t linear_idx = i * kParallelize2DTile1DRangeJ + j; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), 1) + << "Element (" << i << ", " << j << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times (expected: 1)"; + } + } +} + +TEST(Parallelize2DTile1D, MultiThreadPoolEachItemProcessedOnce) { + std::vector<std::atomic_int> counters(kParallelize2DTile1DRangeI * kParallelize2DTile1DRangeJ); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_2d_tile_1d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_2d_tile_1d_t>(Increment2DTile1D), + static_cast<void*>(counters.data()), + kParallelize2DTile1DRangeI, kParallelize2DTile1DRangeJ, kParallelize2DTile1DTileJ, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize2DTile1DRangeI; i++) { + for (size_t j = 0; j < kParallelize2DTile1DRangeJ; j++) { + const size_t linear_idx = i * kParallelize2DTile1DRangeJ + j; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), 1) + << "Element (" << i << ", " << j << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times (expected: 1)"; + } + } +} + +TEST(Parallelize2DTile1D, SingleThreadPoolEachItemProcessedMultipleTimes) { + std::vector<std::atomic_int> counters(kParallelize2DTile1DRangeI * kParallelize2DTile1DRangeJ); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + for (size_t iteration = 0; iteration < kIncrementIterations; iteration++) { + pthreadpool_parallelize_2d_tile_1d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_2d_tile_1d_t>(Increment2DTile1D), + static_cast<void*>(counters.data()), + kParallelize2DTile1DRangeI, kParallelize2DTile1DRangeJ, kParallelize2DTile1DTileJ, + 0 /* flags */); + } + + for (size_t i = 0; i < kParallelize2DTile1DRangeI; i++) { + for (size_t j = 0; j < kParallelize2DTile1DRangeJ; j++) { + const size_t linear_idx = i * kParallelize2DTile1DRangeJ + j; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), kIncrementIterations) + << "Element (" << i << ", " << j << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations << ")"; + } + } +} + +TEST(Parallelize2DTile1D, MultiThreadPoolEachItemProcessedMultipleTimes) { + std::vector<std::atomic_int> counters(kParallelize2DTile1DRangeI * kParallelize2DTile1DRangeJ); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + for (size_t iteration = 0; iteration < kIncrementIterations; iteration++) { + pthreadpool_parallelize_2d_tile_1d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_2d_tile_1d_t>(Increment2DTile1D), + static_cast<void*>(counters.data()), + kParallelize2DTile1DRangeI, kParallelize2DTile1DRangeJ, kParallelize2DTile1DTileJ, + 0 /* flags */); + } + + for (size_t i = 0; i < kParallelize2DTile1DRangeI; i++) { + for (size_t j = 0; j < kParallelize2DTile1DRangeJ; j++) { + const size_t linear_idx = i * kParallelize2DTile1DRangeJ + j; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), kIncrementIterations) + << "Element (" << i << ", " << j << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations << ")"; + } + } +} + +static void WorkImbalance2DTile1D(std::atomic_int* num_processed_items, size_t i, size_t start_j, size_t tile_j) { + num_processed_items->fetch_add(tile_j, std::memory_order_relaxed); + if (i == 0 && start_j == 0) { + /* Spin-wait until all items are computed */ + while (num_processed_items->load(std::memory_order_relaxed) != kParallelize2DTile1DRangeI * kParallelize2DTile1DRangeJ) { + std::atomic_thread_fence(std::memory_order_acquire); + } + } +} + +TEST(Parallelize2DTile1D, MultiThreadPoolWorkStealing) { + std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_2d_tile_1d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_2d_tile_1d_t>(WorkImbalance2DTile1D), + static_cast<void*>(&num_processed_items), + kParallelize2DTile1DRangeI, kParallelize2DTile1DRangeJ, kParallelize2DTile1DTileJ, + 0 /* flags */); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize2DTile1DRangeI * kParallelize2DTile1DRangeJ); +} + +static void ComputeNothing2DTile2D(void*, size_t, size_t, size_t, size_t) { +} + +TEST(Parallelize2DTile2D, SingleThreadPoolCompletes) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_2d_tile_2d(threadpool.get(), + ComputeNothing2DTile2D, + nullptr, + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); +} + +TEST(Parallelize2DTile2D, MultiThreadPoolCompletes) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_2d_tile_2d( + threadpool.get(), + ComputeNothing2DTile2D, + nullptr, + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); +} + +static void CheckBounds2DTile2D(void*, size_t start_i, size_t start_j, size_t tile_i, size_t tile_j) { + EXPECT_LT(start_i, kParallelize2DTile2DRangeI); + EXPECT_LT(start_j, kParallelize2DTile2DRangeJ); + EXPECT_LE(start_i + tile_i, kParallelize2DTile2DRangeI); + EXPECT_LE(start_j + tile_j, kParallelize2DTile2DRangeJ); +} + +TEST(Parallelize2DTile2D, SingleThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_2d_tile_2d( + threadpool.get(), + CheckBounds2DTile2D, + nullptr, + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); +} + +TEST(Parallelize2DTile2D, MultiThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_2d_tile_2d( + threadpool.get(), + CheckBounds2DTile2D, + nullptr, + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); +} + +static void CheckTiling2DTile2D(void*, size_t start_i, size_t start_j, size_t tile_i, size_t tile_j) { + EXPECT_GT(tile_i, 0); + EXPECT_LE(tile_i, kParallelize2DTile2DTileI); + EXPECT_EQ(start_i % kParallelize2DTile2DTileI, 0); + EXPECT_EQ(tile_i, std::min<size_t>(kParallelize2DTile2DTileI, kParallelize2DTile2DRangeI - start_i)); + + EXPECT_GT(tile_j, 0); + EXPECT_LE(tile_j, kParallelize2DTile2DTileJ); + EXPECT_EQ(start_j % kParallelize2DTile2DTileJ, 0); + EXPECT_EQ(tile_j, std::min<size_t>(kParallelize2DTile2DTileJ, kParallelize2DTile2DRangeJ - start_j)); +} + +TEST(Parallelize2DTile2D, SingleThreadPoolUniformTiling) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_2d_tile_2d( + threadpool.get(), + CheckTiling2DTile2D, + nullptr, + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); +} + +TEST(Parallelize2DTile2D, MultiThreadPoolUniformTiling) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_2d_tile_2d( + threadpool.get(), + CheckTiling2DTile2D, + nullptr, + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); +} + +static void SetTrue2DTile2D(std::atomic_bool* processed_indicators, size_t start_i, size_t start_j, size_t tile_i, size_t tile_j) { + for (size_t i = start_i; i < start_i + tile_i; i++) { + for (size_t j = start_j; j < start_j + tile_j; j++) { + const size_t linear_idx = i * kParallelize2DTile2DRangeJ + j; + processed_indicators[linear_idx].store(true, std::memory_order_relaxed); + } + } +} + +TEST(Parallelize2DTile2D, SingleThreadPoolAllItemsProcessed) { + std::vector<std::atomic_bool> indicators(kParallelize2DTile2DRangeI * kParallelize2DTile2DRangeJ); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_2d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_2d_tile_2d_t>(SetTrue2DTile2D), + static_cast<void*>(indicators.data()), + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize2DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize2DTile2DRangeJ; j++) { + const size_t linear_idx = i * kParallelize2DTile2DRangeJ + j; + EXPECT_TRUE(indicators[linear_idx].load(std::memory_order_relaxed)) + << "Element (" << i << ", " << j << ") not processed"; + } + } +} + +TEST(Parallelize2DTile2D, MultiThreadPoolAllItemsProcessed) { + std::vector<std::atomic_bool> indicators(kParallelize2DTile2DRangeI * kParallelize2DTile2DRangeJ); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_2d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_2d_tile_2d_t>(SetTrue2DTile2D), + static_cast<void*>(indicators.data()), + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize2DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize2DTile2DRangeJ; j++) { + const size_t linear_idx = i * kParallelize2DTile2DRangeJ + j; + EXPECT_TRUE(indicators[linear_idx].load(std::memory_order_relaxed)) + << "Element (" << i << ", " << j << ") not processed"; + } + } +} + +static void Increment2DTile2D(std::atomic_int* processed_counters, size_t start_i, size_t start_j, size_t tile_i, size_t tile_j) { + for (size_t i = start_i; i < start_i + tile_i; i++) { + for (size_t j = start_j; j < start_j + tile_j; j++) { + const size_t linear_idx = i * kParallelize2DTile2DRangeJ + j; + processed_counters[linear_idx].fetch_add(1, std::memory_order_relaxed); + } + } +} + +TEST(Parallelize2DTile2D, SingleThreadPoolEachItemProcessedOnce) { + std::vector<std::atomic_int> counters(kParallelize2DTile2DRangeI * kParallelize2DTile2DRangeJ); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_2d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_2d_tile_2d_t>(Increment2DTile2D), + static_cast<void*>(counters.data()), + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize2DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize2DTile2DRangeJ; j++) { + const size_t linear_idx = i * kParallelize2DTile2DRangeJ + j; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), 1) + << "Element (" << i << ", " << j << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times (expected: 1)"; + } + } +} + +TEST(Parallelize2DTile2D, MultiThreadPoolEachItemProcessedOnce) { + std::vector<std::atomic_int> counters(kParallelize2DTile2DRangeI * kParallelize2DTile2DRangeJ); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_2d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_2d_tile_2d_t>(Increment2DTile2D), + static_cast<void*>(counters.data()), + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize2DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize2DTile2DRangeJ; j++) { + const size_t linear_idx = i * kParallelize2DTile2DRangeJ + j; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), 1) + << "Element (" << i << ", " << j << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times (expected: 1)"; + } + } +} + +TEST(Parallelize2DTile2D, SingleThreadPoolEachItemProcessedMultipleTimes) { + std::vector<std::atomic_int> counters(kParallelize2DTile2DRangeI * kParallelize2DTile2DRangeJ); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + for (size_t iteration = 0; iteration < kIncrementIterations; iteration++) { + pthreadpool_parallelize_2d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_2d_tile_2d_t>(Increment2DTile2D), + static_cast<void*>(counters.data()), + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); + } + + for (size_t i = 0; i < kParallelize2DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize2DTile2DRangeJ; j++) { + const size_t linear_idx = i * kParallelize2DTile2DRangeJ + j; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), kIncrementIterations) + << "Element (" << i << ", " << j << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations << ")"; + } + } +} + +TEST(Parallelize2DTile2D, MultiThreadPoolEachItemProcessedMultipleTimes) { + std::vector<std::atomic_int> counters(kParallelize2DTile2DRangeI * kParallelize2DTile2DRangeJ); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + for (size_t iteration = 0; iteration < kIncrementIterations; iteration++) { + pthreadpool_parallelize_2d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_2d_tile_2d_t>(Increment2DTile2D), + static_cast<void*>(counters.data()), + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); + } + + for (size_t i = 0; i < kParallelize2DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize2DTile2DRangeJ; j++) { + const size_t linear_idx = i * kParallelize2DTile2DRangeJ + j; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), kIncrementIterations) + << "Element (" << i << ", " << j << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations << ")"; + } + } +} + +static void WorkImbalance2DTile2D(std::atomic_int* num_processed_items, size_t start_i, size_t start_j, size_t tile_i, size_t tile_j) { + num_processed_items->fetch_add(tile_i * tile_j, std::memory_order_relaxed); + if (start_i == 0 && start_j == 0) { + /* Spin-wait until all items are computed */ + while (num_processed_items->load(std::memory_order_relaxed) != kParallelize2DTile2DRangeI * kParallelize2DTile2DRangeJ) { + std::atomic_thread_fence(std::memory_order_acquire); + } + } +} + +TEST(Parallelize2DTile2D, MultiThreadPoolWorkStealing) { + std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_2d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_2d_tile_2d_t>(WorkImbalance2DTile2D), + static_cast<void*>(&num_processed_items), + kParallelize2DTile2DRangeI, kParallelize2DTile2DRangeJ, + kParallelize2DTile2DTileI, kParallelize2DTile2DTileJ, + 0 /* flags */); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize2DTile2DRangeI * kParallelize2DTile2DRangeJ); +} + +static void ComputeNothing3DTile2D(void*, size_t, size_t, size_t, size_t, size_t) { +} + +TEST(Parallelize3DTile2D, SingleThreadPoolCompletes) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_3d_tile_2d(threadpool.get(), + ComputeNothing3DTile2D, + nullptr, + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); +} + +TEST(Parallelize3DTile2D, MultiThreadPoolCompletes) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_3d_tile_2d( + threadpool.get(), + ComputeNothing3DTile2D, + nullptr, + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); +} + +static void CheckBounds3DTile2D(void*, size_t i, size_t start_j, size_t start_k, size_t tile_j, size_t tile_k) { + EXPECT_LT(i, kParallelize3DTile2DRangeI); + EXPECT_LT(start_j, kParallelize3DTile2DRangeJ); + EXPECT_LT(start_k, kParallelize3DTile2DRangeK); + EXPECT_LE(start_j + tile_j, kParallelize3DTile2DRangeJ); + EXPECT_LE(start_k + tile_k, kParallelize3DTile2DRangeK); +} + +TEST(Parallelize3DTile2D, SingleThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_3d_tile_2d( + threadpool.get(), + CheckBounds3DTile2D, + nullptr, + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); +} + +TEST(Parallelize3DTile2D, MultiThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_3d_tile_2d( + threadpool.get(), + CheckBounds3DTile2D, + nullptr, + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); +} + +static void CheckTiling3DTile2D(void*, size_t i, size_t start_j, size_t start_k, size_t tile_j, size_t tile_k) { + EXPECT_GT(tile_j, 0); + EXPECT_LE(tile_j, kParallelize3DTile2DTileJ); + EXPECT_EQ(start_j % kParallelize3DTile2DTileJ, 0); + EXPECT_EQ(tile_j, std::min<size_t>(kParallelize3DTile2DTileJ, kParallelize3DTile2DRangeJ - start_j)); + + EXPECT_GT(tile_k, 0); + EXPECT_LE(tile_k, kParallelize3DTile2DTileK); + EXPECT_EQ(start_k % kParallelize3DTile2DTileK, 0); + EXPECT_EQ(tile_k, std::min<size_t>(kParallelize3DTile2DTileK, kParallelize3DTile2DRangeK - start_k)); +} + +TEST(Parallelize3DTile2D, SingleThreadPoolUniformTiling) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_3d_tile_2d( + threadpool.get(), + CheckTiling3DTile2D, + nullptr, + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); +} + +TEST(Parallelize3DTile2D, MultiThreadPoolUniformTiling) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_3d_tile_2d( + threadpool.get(), + CheckTiling3DTile2D, + nullptr, + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); +} + +static void SetTrue3DTile2D(std::atomic_bool* processed_indicators, size_t i, size_t start_j, size_t start_k, size_t tile_j, size_t tile_k) { + for (size_t j = start_j; j < start_j + tile_j; j++) { + for (size_t k = start_k; k < start_k + tile_k; k++) { + const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; + processed_indicators[linear_idx].store(true, std::memory_order_relaxed); + } + } +} + +TEST(Parallelize3DTile2D, SingleThreadPoolAllItemsProcessed) { + std::vector<std::atomic_bool> indicators(kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_3d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_3d_tile_2d_t>(SetTrue3DTile2D), + static_cast<void*>(indicators.data()), + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize3DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize3DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize3DTile2DRangeK; k++) { + const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; + EXPECT_TRUE(indicators[linear_idx].load(std::memory_order_relaxed)) + << "Element (" << i << ", " << j << ", " << k << ") not processed"; + } + } + } +} + +TEST(Parallelize3DTile2D, MultiThreadPoolAllItemsProcessed) { + std::vector<std::atomic_bool> indicators(kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_3d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_3d_tile_2d_t>(SetTrue3DTile2D), + static_cast<void*>(indicators.data()), + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize3DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize3DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize3DTile2DRangeK; k++) { + const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; + EXPECT_TRUE(indicators[linear_idx].load(std::memory_order_relaxed)) + << "Element (" << i << ", " << j << ", " << k << ") not processed"; + } + } + } +} + +static void Increment3DTile2D(std::atomic_int* processed_counters, size_t i, size_t start_j, size_t start_k, size_t tile_j, size_t tile_k) { + for (size_t j = start_j; j < start_j + tile_j; j++) { + for (size_t k = start_k; k < start_k + tile_k; k++) { + const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; + processed_counters[linear_idx].fetch_add(1, std::memory_order_relaxed); + } + } +} + +TEST(Parallelize3DTile2D, SingleThreadPoolEachItemProcessedOnce) { + std::vector<std::atomic_int> counters(kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_3d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_3d_tile_2d_t>(Increment3DTile2D), + static_cast<void*>(counters.data()), + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize3DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize3DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize3DTile2DRangeK; k++) { + const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), 1) + << "Element (" << i << ", " << j << ", " << k << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times (expected: 1)"; + } + } + } +} + +TEST(Parallelize3DTile2D, MultiThreadPoolEachItemProcessedOnce) { + std::vector<std::atomic_int> counters(kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_3d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_3d_tile_2d_t>(Increment3DTile2D), + static_cast<void*>(counters.data()), + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize3DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize3DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize3DTile2DRangeK; k++) { + const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), 1) + << "Element (" << i << ", " << j << ", " << k << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times (expected: 1)"; + } + } + } +} + +TEST(Parallelize3DTile2D, SingleThreadPoolEachItemProcessedMultipleTimes) { + std::vector<std::atomic_int> counters(kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + for (size_t iteration = 0; iteration < kIncrementIterations; iteration++) { + pthreadpool_parallelize_3d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_3d_tile_2d_t>(Increment3DTile2D), + static_cast<void*>(counters.data()), + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); + } + + for (size_t i = 0; i < kParallelize3DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize3DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize3DTile2DRangeK; k++) { + const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), kIncrementIterations) + << "Element (" << i << ", " << j << ", " << k << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations << ")"; + } + } + } +} + +TEST(Parallelize3DTile2D, MultiThreadPoolEachItemProcessedMultipleTimes) { + std::vector<std::atomic_int> counters(kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + for (size_t iteration = 0; iteration < kIncrementIterations; iteration++) { + pthreadpool_parallelize_3d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_3d_tile_2d_t>(Increment3DTile2D), + static_cast<void*>(counters.data()), + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); + } + + for (size_t i = 0; i < kParallelize3DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize3DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize3DTile2DRangeK; k++) { + const size_t linear_idx = (i * kParallelize3DTile2DRangeJ + j) * kParallelize3DTile2DRangeK + k; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), kIncrementIterations) + << "Element (" << i << ", " << j << ", " << k << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations << ")"; + } + } + } +} + +static void WorkImbalance3DTile2D(std::atomic_int* num_processed_items, size_t i, size_t start_j, size_t start_k, size_t tile_j, size_t tile_k) { + num_processed_items->fetch_add(tile_j * tile_k, std::memory_order_relaxed); + if (i == 0 && start_j == 0 && start_k == 0) { + /* Spin-wait until all items are computed */ + while (num_processed_items->load(std::memory_order_relaxed) != kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK) { + std::atomic_thread_fence(std::memory_order_acquire); + } + } +} + +TEST(Parallelize3DTile2D, MultiThreadPoolWorkStealing) { + std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_3d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_3d_tile_2d_t>(WorkImbalance3DTile2D), + static_cast<void*>(&num_processed_items), + kParallelize3DTile2DRangeI, kParallelize3DTile2DRangeJ, kParallelize3DTile2DRangeK, + kParallelize3DTile2DTileJ, kParallelize3DTile2DTileK, + 0 /* flags */); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize3DTile2DRangeI * kParallelize3DTile2DRangeJ * kParallelize3DTile2DRangeK); +} + +static void ComputeNothing4DTile2D(void*, size_t, size_t, size_t, size_t, size_t, size_t) { +} + +TEST(Parallelize4DTile2D, SingleThreadPoolCompletes) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_4d_tile_2d(threadpool.get(), + ComputeNothing4DTile2D, + nullptr, + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, + 0 /* flags */); +} + +TEST(Parallelize4DTile2D, MultiThreadPoolCompletes) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_4d_tile_2d( + threadpool.get(), + ComputeNothing4DTile2D, + nullptr, + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, + 0 /* flags */); +} + +static void CheckBounds4DTile2D(void*, size_t i, size_t j, size_t start_k, size_t start_l, size_t tile_k, size_t tile_l) { + EXPECT_LT(i, kParallelize4DTile2DRangeI); + EXPECT_LT(j, kParallelize4DTile2DRangeJ); + EXPECT_LT(start_k, kParallelize4DTile2DRangeK); + EXPECT_LT(start_l, kParallelize4DTile2DRangeL); + EXPECT_LE(start_k + tile_k, kParallelize4DTile2DRangeK); + EXPECT_LE(start_l + tile_l, kParallelize4DTile2DRangeL); +} + +TEST(Parallelize4DTile2D, SingleThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_4d_tile_2d( + threadpool.get(), + CheckBounds4DTile2D, + nullptr, + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, + 0 /* flags */); +} + +TEST(Parallelize4DTile2D, MultiThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_4d_tile_2d( + threadpool.get(), + CheckBounds4DTile2D, + nullptr, + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, + 0 /* flags */); +} + +static void CheckTiling4DTile2D(void*, size_t i, size_t j, size_t start_k, size_t start_l, size_t tile_k, size_t tile_l) { + EXPECT_GT(tile_k, 0); + EXPECT_LE(tile_k, kParallelize4DTile2DTileK); + EXPECT_EQ(start_k % kParallelize4DTile2DTileK, 0); + EXPECT_EQ(tile_k, std::min<size_t>(kParallelize4DTile2DTileK, kParallelize4DTile2DRangeK - start_k)); + + EXPECT_GT(tile_l, 0); + EXPECT_LE(tile_l, kParallelize4DTile2DTileL); + EXPECT_EQ(start_l % kParallelize4DTile2DTileL, 0); + EXPECT_EQ(tile_l, std::min<size_t>(kParallelize4DTile2DTileL, kParallelize4DTile2DRangeL - start_l)); +} + +TEST(Parallelize4DTile2D, SingleThreadPoolUniformTiling) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_4d_tile_2d( + threadpool.get(), + CheckTiling4DTile2D, + nullptr, + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, + 0 /* flags */); +} + +TEST(Parallelize4DTile2D, MultiThreadPoolUniformTiling) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_4d_tile_2d( + threadpool.get(), + CheckTiling4DTile2D, + nullptr, + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, + 0 /* flags */); +} + +static void SetTrue4DTile2D(std::atomic_bool* processed_indicators, size_t i, size_t j, size_t start_k, size_t start_l, size_t tile_k, size_t tile_l) { + for (size_t k = start_k; k < start_k + tile_k; k++) { + for (size_t l = start_l; l < start_l + tile_l; l++) { + const size_t linear_idx = ((i * kParallelize4DTile2DRangeJ + j) * kParallelize4DTile2DRangeK + k) * kParallelize4DTile2DRangeL + l; + processed_indicators[linear_idx].store(true, std::memory_order_relaxed); + } + } +} + +TEST(Parallelize4DTile2D, SingleThreadPoolAllItemsProcessed) { + std::vector<std::atomic_bool> indicators(kParallelize4DTile2DRangeI * kParallelize4DTile2DRangeJ * kParallelize4DTile2DRangeK * kParallelize4DTile2DRangeL); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_4d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_4d_tile_2d_t>(SetTrue4DTile2D), + static_cast<void*>(indicators.data()), + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize4DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize4DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize4DTile2DRangeK; k++) { + for (size_t l = 0; l < kParallelize4DTile2DRangeL; l++) { + const size_t linear_idx = ((i * kParallelize4DTile2DRangeJ + j) * kParallelize4DTile2DRangeK + k) * kParallelize4DTile2DRangeL + l; + EXPECT_TRUE(indicators[linear_idx].load(std::memory_order_relaxed)) + << "Element (" << i << ", " << j << ", " << k << ", " << l << ") not processed"; + } + } + } + } +} + +TEST(Parallelize4DTile2D, MultiThreadPoolAllItemsProcessed) { + std::vector<std::atomic_bool> indicators(kParallelize4DTile2DRangeI * kParallelize4DTile2DRangeJ * kParallelize4DTile2DRangeK * kParallelize4DTile2DRangeL); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_4d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_4d_tile_2d_t>(SetTrue4DTile2D), + static_cast<void*>(indicators.data()), + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize4DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize4DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize4DTile2DRangeK; k++) { + for (size_t l = 0; l < kParallelize4DTile2DRangeL; l++) { + const size_t linear_idx = ((i * kParallelize4DTile2DRangeJ + j) * kParallelize4DTile2DRangeK + k) * kParallelize4DTile2DRangeL + l; + EXPECT_TRUE(indicators[linear_idx].load(std::memory_order_relaxed)) + << "Element (" << i << ", " << j << ", " << k << ", " << l << ") not processed"; + } + } + } + } +} + +static void Increment4DTile2D(std::atomic_int* processed_counters, size_t i, size_t j, size_t start_k, size_t start_l, size_t tile_k, size_t tile_l) { + for (size_t k = start_k; k < start_k + tile_k; k++) { + for (size_t l = start_l; l < start_l + tile_l; l++) { + const size_t linear_idx = ((i * kParallelize4DTile2DRangeJ + j) * kParallelize4DTile2DRangeK + k) * kParallelize4DTile2DRangeL + l; + processed_counters[linear_idx].fetch_add(1, std::memory_order_relaxed); + } + } +} + +TEST(Parallelize4DTile2D, SingleThreadPoolEachItemProcessedOnce) { + std::vector<std::atomic_int> counters(kParallelize4DTile2DRangeI * kParallelize4DTile2DRangeJ * kParallelize4DTile2DRangeK * kParallelize4DTile2DRangeL); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_4d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_4d_tile_2d_t>(Increment4DTile2D), + static_cast<void*>(counters.data()), + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize4DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize4DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize4DTile2DRangeK; k++) { + for (size_t l = 0; l < kParallelize4DTile2DRangeL; l++) { + const size_t linear_idx = ((i * kParallelize4DTile2DRangeJ + j) * kParallelize4DTile2DRangeK + k) * kParallelize4DTile2DRangeL + l; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), 1) + << "Element (" << i << ", " << j << ", " << k << ", " << l << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times (expected: 1)"; + } + } + } + } +} + +TEST(Parallelize4DTile2D, MultiThreadPoolEachItemProcessedOnce) { + std::vector<std::atomic_int> counters(kParallelize4DTile2DRangeI * kParallelize4DTile2DRangeJ * kParallelize4DTile2DRangeK * kParallelize4DTile2DRangeL); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_4d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_4d_tile_2d_t>(Increment4DTile2D), + static_cast<void*>(counters.data()), + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize4DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize4DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize4DTile2DRangeK; k++) { + for (size_t l = 0; l < kParallelize4DTile2DRangeL; l++) { + const size_t linear_idx = ((i * kParallelize4DTile2DRangeJ + j) * kParallelize4DTile2DRangeK + k) * kParallelize4DTile2DRangeL + l; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), 1) + << "Element (" << i << ", " << j << ", " << k << ", " << l << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times (expected: 1)"; + } + } + } + } +} + +TEST(Parallelize4DTile2D, SingleThreadPoolEachItemProcessedMultipleTimes) { + std::vector<std::atomic_int> counters(kParallelize4DTile2DRangeI * kParallelize4DTile2DRangeJ * kParallelize4DTile2DRangeK * kParallelize4DTile2DRangeL); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + for (size_t iteration = 0; iteration < kIncrementIterations; iteration++) { + pthreadpool_parallelize_4d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_4d_tile_2d_t>(Increment4DTile2D), + static_cast<void*>(counters.data()), + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, + 0 /* flags */); + } + + for (size_t i = 0; i < kParallelize4DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize4DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize4DTile2DRangeK; k++) { + for (size_t l = 0; l < kParallelize4DTile2DRangeL; l++) { + const size_t linear_idx = ((i * kParallelize4DTile2DRangeJ + j) * kParallelize4DTile2DRangeK + k) * kParallelize4DTile2DRangeL + l; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), kIncrementIterations) + << "Element (" << i << ", " << j << ", " << k << ", " << l << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations << ")"; + } + } + } + } +} + +TEST(Parallelize4DTile2D, MultiThreadPoolEachItemProcessedMultipleTimes) { + std::vector<std::atomic_int> counters(kParallelize4DTile2DRangeI * kParallelize4DTile2DRangeJ * kParallelize4DTile2DRangeK * kParallelize4DTile2DRangeL); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + for (size_t iteration = 0; iteration < kIncrementIterations; iteration++) { + pthreadpool_parallelize_4d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_4d_tile_2d_t>(Increment4DTile2D), + static_cast<void*>(counters.data()), + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, + 0 /* flags */); + } + + for (size_t i = 0; i < kParallelize4DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize4DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize4DTile2DRangeK; k++) { + for (size_t l = 0; l < kParallelize4DTile2DRangeL; l++) { + const size_t linear_idx = ((i * kParallelize4DTile2DRangeJ + j) * kParallelize4DTile2DRangeK + k) * kParallelize4DTile2DRangeL + l; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), kIncrementIterations) + << "Element (" << i << ", " << j << ", " << k << ", " << l << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations << ")"; + } + } + } + } +} + +static void WorkImbalance4DTile2D(std::atomic_int* num_processed_items, size_t i, size_t j, size_t start_k, size_t start_l, size_t tile_k, size_t tile_l) { + num_processed_items->fetch_add(tile_k * tile_l, std::memory_order_relaxed); + if (i == 0 && j == 0 && start_k == 0 && start_l == 0) { + /* Spin-wait until all items are computed */ + while (num_processed_items->load(std::memory_order_relaxed) != kParallelize4DTile2DRangeI * kParallelize4DTile2DRangeJ * kParallelize4DTile2DRangeK * kParallelize4DTile2DRangeL) { + std::atomic_thread_fence(std::memory_order_acquire); + } + } +} + +TEST(Parallelize4DTile2D, MultiThreadPoolWorkStealing) { + std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_4d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_4d_tile_2d_t>(WorkImbalance4DTile2D), + static_cast<void*>(&num_processed_items), + kParallelize4DTile2DRangeI, kParallelize4DTile2DRangeJ, kParallelize4DTile2DRangeK, kParallelize4DTile2DRangeL, + kParallelize4DTile2DTileK, kParallelize4DTile2DTileL, + 0 /* flags */); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize4DTile2DRangeI * kParallelize4DTile2DRangeJ * kParallelize4DTile2DRangeK * kParallelize4DTile2DRangeL); +} + +static void ComputeNothing5DTile2D(void*, size_t, size_t, size_t, size_t, size_t, size_t, size_t) { +} + +TEST(Parallelize5DTile2D, SingleThreadPoolCompletes) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_5d_tile_2d(threadpool.get(), + ComputeNothing5DTile2D, + nullptr, + kParallelize5DTile2DRangeI, kParallelize5DTile2DRangeJ, kParallelize5DTile2DRangeK, kParallelize5DTile2DRangeL, kParallelize5DTile2DRangeM, + kParallelize5DTile2DTileL, kParallelize5DTile2DTileM, + 0 /* flags */); +} + +TEST(Parallelize5DTile2D, MultiThreadPoolCompletes) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_5d_tile_2d( + threadpool.get(), + ComputeNothing5DTile2D, + nullptr, + kParallelize5DTile2DRangeI, kParallelize5DTile2DRangeJ, kParallelize5DTile2DRangeK, kParallelize5DTile2DRangeL, kParallelize5DTile2DRangeM, + kParallelize5DTile2DTileL, kParallelize5DTile2DTileM, + 0 /* flags */); +} + +static void CheckBounds5DTile2D(void*, size_t i, size_t j, size_t k, size_t start_l, size_t start_m, size_t tile_l, size_t tile_m) { + EXPECT_LT(i, kParallelize5DTile2DRangeI); + EXPECT_LT(j, kParallelize5DTile2DRangeJ); + EXPECT_LT(k, kParallelize5DTile2DRangeK); + EXPECT_LT(start_l, kParallelize5DTile2DRangeL); + EXPECT_LT(start_m, kParallelize5DTile2DRangeM); + EXPECT_LE(start_l + tile_l, kParallelize5DTile2DRangeL); + EXPECT_LE(start_m + tile_m, kParallelize5DTile2DRangeM); +} + +TEST(Parallelize5DTile2D, SingleThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_5d_tile_2d( + threadpool.get(), + CheckBounds5DTile2D, + nullptr, + kParallelize5DTile2DRangeI, kParallelize5DTile2DRangeJ, kParallelize5DTile2DRangeK, kParallelize5DTile2DRangeL, kParallelize5DTile2DRangeM, + kParallelize5DTile2DTileL, kParallelize5DTile2DTileM, + 0 /* flags */); +} + +TEST(Parallelize5DTile2D, MultiThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_5d_tile_2d( + threadpool.get(), + CheckBounds5DTile2D, + nullptr, + kParallelize5DTile2DRangeI, kParallelize5DTile2DRangeJ, kParallelize5DTile2DRangeK, kParallelize5DTile2DRangeL, kParallelize5DTile2DRangeM, + kParallelize5DTile2DTileL, kParallelize5DTile2DTileM, + 0 /* flags */); +} + +static void CheckTiling5DTile2D(void*, size_t i, size_t j, size_t k, size_t start_l, size_t start_m, size_t tile_l, size_t tile_m) { + EXPECT_GT(tile_l, 0); + EXPECT_LE(tile_l, kParallelize5DTile2DTileL); + EXPECT_EQ(start_l % kParallelize5DTile2DTileL, 0); + EXPECT_EQ(tile_l, std::min<size_t>(kParallelize5DTile2DTileL, kParallelize5DTile2DRangeL - start_l)); + + EXPECT_GT(tile_m, 0); + EXPECT_LE(tile_m, kParallelize5DTile2DTileM); + EXPECT_EQ(start_m % kParallelize5DTile2DTileM, 0); + EXPECT_EQ(tile_m, std::min<size_t>(kParallelize5DTile2DTileM, kParallelize5DTile2DRangeM - start_m)); +} + +TEST(Parallelize5DTile2D, SingleThreadPoolUniformTiling) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_5d_tile_2d( + threadpool.get(), + CheckTiling5DTile2D, + nullptr, + kParallelize5DTile2DRangeI, kParallelize5DTile2DRangeJ, kParallelize5DTile2DRangeK, kParallelize5DTile2DRangeL, kParallelize5DTile2DRangeM, + kParallelize5DTile2DTileL, kParallelize5DTile2DTileM, + 0 /* flags */); +} + +TEST(Parallelize5DTile2D, MultiThreadPoolUniformTiling) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_5d_tile_2d( + threadpool.get(), + CheckTiling5DTile2D, + nullptr, + kParallelize5DTile2DRangeI, kParallelize5DTile2DRangeJ, kParallelize5DTile2DRangeK, kParallelize5DTile2DRangeL, kParallelize5DTile2DRangeM, + kParallelize5DTile2DTileL, kParallelize5DTile2DTileM, + 0 /* flags */); +} + +static void SetTrue5DTile2D(std::atomic_bool* processed_indicators, size_t i, size_t j, size_t k, size_t start_l, size_t start_m, size_t tile_l, size_t tile_m) { + for (size_t l = start_l; l < start_l + tile_l; l++) { + for (size_t m = start_m; m < start_m + tile_m; m++) { + const size_t linear_idx = (((i * kParallelize5DTile2DRangeJ + j) * kParallelize5DTile2DRangeK + k) * kParallelize5DTile2DRangeL + l) * kParallelize5DTile2DRangeM + m; + processed_indicators[linear_idx].store(true, std::memory_order_relaxed); + } + } +} + +TEST(Parallelize5DTile2D, SingleThreadPoolAllItemsProcessed) { + std::vector<std::atomic_bool> indicators(kParallelize5DTile2DRangeI * kParallelize5DTile2DRangeJ * kParallelize5DTile2DRangeK * kParallelize5DTile2DRangeL * kParallelize5DTile2DRangeM); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_5d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_5d_tile_2d_t>(SetTrue5DTile2D), + static_cast<void*>(indicators.data()), + kParallelize5DTile2DRangeI, kParallelize5DTile2DRangeJ, kParallelize5DTile2DRangeK, kParallelize5DTile2DRangeL, kParallelize5DTile2DRangeM, + kParallelize5DTile2DTileL, kParallelize5DTile2DTileM, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize5DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize5DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize5DTile2DRangeK; k++) { + for (size_t l = 0; l < kParallelize5DTile2DRangeL; l++) { + for (size_t m = 0; m < kParallelize5DTile2DRangeM; m++) { + const size_t linear_idx = (((i * kParallelize5DTile2DRangeJ + j) * kParallelize5DTile2DRangeK + k) * kParallelize5DTile2DRangeL + l) * kParallelize5DTile2DRangeM + m; + EXPECT_TRUE(indicators[linear_idx].load(std::memory_order_relaxed)) + << "Element (" << i << ", " << j << ", " << k << ", " << l << ", " << m << ") not processed"; + } + } + } + } + } +} + +TEST(Parallelize5DTile2D, MultiThreadPoolAllItemsProcessed) { + std::vector<std::atomic_bool> indicators(kParallelize5DTile2DRangeI * kParallelize5DTile2DRangeJ * kParallelize5DTile2DRangeK * kParallelize5DTile2DRangeL * kParallelize5DTile2DRangeM); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_5d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_5d_tile_2d_t>(SetTrue5DTile2D), + static_cast<void*>(indicators.data()), + kParallelize5DTile2DRangeI, kParallelize5DTile2DRangeJ, kParallelize5DTile2DRangeK, kParallelize5DTile2DRangeL, kParallelize5DTile2DRangeM, + kParallelize5DTile2DTileL, kParallelize5DTile2DTileM, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize5DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize5DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize5DTile2DRangeK; k++) { + for (size_t l = 0; l < kParallelize5DTile2DRangeL; l++) { + for (size_t m = 0; m < kParallelize5DTile2DRangeM; m++) { + const size_t linear_idx = (((i * kParallelize5DTile2DRangeJ + j) * kParallelize5DTile2DRangeK + k) * kParallelize5DTile2DRangeL + l) * kParallelize5DTile2DRangeM + m; + EXPECT_TRUE(indicators[linear_idx].load(std::memory_order_relaxed)) + << "Element (" << i << ", " << j << ", " << k << ", " << l << ", " << m << ") not processed"; + } + } + } + } + } +} + +static void Increment5DTile2D(std::atomic_int* processed_counters, size_t i, size_t j, size_t k, size_t start_l, size_t start_m, size_t tile_l, size_t tile_m) { + for (size_t l = start_l; l < start_l + tile_l; l++) { + for (size_t m = start_m; m < start_m + tile_m; m++) { + const size_t linear_idx = (((i * kParallelize5DTile2DRangeJ + j) * kParallelize5DTile2DRangeK + k) * kParallelize5DTile2DRangeL + l) * kParallelize5DTile2DRangeM + m; + processed_counters[linear_idx].fetch_add(1, std::memory_order_relaxed); + } + } +} + +TEST(Parallelize5DTile2D, SingleThreadPoolEachItemProcessedOnce) { + std::vector<std::atomic_int> counters(kParallelize5DTile2DRangeI * kParallelize5DTile2DRangeJ * kParallelize5DTile2DRangeK * kParallelize5DTile2DRangeL * kParallelize5DTile2DRangeM); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_5d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_5d_tile_2d_t>(Increment5DTile2D), + static_cast<void*>(counters.data()), + kParallelize5DTile2DRangeI, kParallelize5DTile2DRangeJ, kParallelize5DTile2DRangeK, kParallelize5DTile2DRangeL, kParallelize5DTile2DRangeM, + kParallelize5DTile2DTileL, kParallelize5DTile2DTileM, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize5DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize5DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize5DTile2DRangeK; k++) { + for (size_t l = 0; l < kParallelize5DTile2DRangeL; l++) { + for (size_t m = 0; m < kParallelize5DTile2DRangeM; m++) { + const size_t linear_idx = (((i * kParallelize5DTile2DRangeJ + j) * kParallelize5DTile2DRangeK + k) * kParallelize5DTile2DRangeL + l) * kParallelize5DTile2DRangeM + m; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), 1) + << "Element (" << i << ", " << j << ", " << k << ", " << l << ", " << m << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times (expected: 1)"; + } + } + } + } + } +} + +TEST(Parallelize5DTile2D, MultiThreadPoolEachItemProcessedOnce) { + std::vector<std::atomic_int> counters(kParallelize5DTile2DRangeI * kParallelize5DTile2DRangeJ * kParallelize5DTile2DRangeK * kParallelize5DTile2DRangeL * kParallelize5DTile2DRangeM); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_5d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_5d_tile_2d_t>(Increment5DTile2D), + static_cast<void*>(counters.data()), + kParallelize5DTile2DRangeI, kParallelize5DTile2DRangeJ, kParallelize5DTile2DRangeK, kParallelize5DTile2DRangeL, kParallelize5DTile2DRangeM, + kParallelize5DTile2DTileL, kParallelize5DTile2DTileM, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize5DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize5DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize5DTile2DRangeK; k++) { + for (size_t l = 0; l < kParallelize5DTile2DRangeL; l++) { + for (size_t m = 0; m < kParallelize5DTile2DRangeM; m++) { + const size_t linear_idx = (((i * kParallelize5DTile2DRangeJ + j) * kParallelize5DTile2DRangeK + k) * kParallelize5DTile2DRangeL + l) * kParallelize5DTile2DRangeM + m; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), 1) + << "Element (" << i << ", " << j << ", " << k << ", " << l << ", " << m << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times (expected: 1)"; + } + } + } + } + } +} + +TEST(Parallelize5DTile2D, SingleThreadPoolEachItemProcessedMultipleTimes) { + std::vector<std::atomic_int> counters(kParallelize5DTile2DRangeI * kParallelize5DTile2DRangeJ * kParallelize5DTile2DRangeK * kParallelize5DTile2DRangeL * kParallelize5DTile2DRangeM); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + for (size_t iteration = 0; iteration < kIncrementIterations5D; iteration++) { + pthreadpool_parallelize_5d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_5d_tile_2d_t>(Increment5DTile2D), + static_cast<void*>(counters.data()), + kParallelize5DTile2DRangeI, kParallelize5DTile2DRangeJ, kParallelize5DTile2DRangeK, kParallelize5DTile2DRangeL, kParallelize5DTile2DRangeM, + kParallelize5DTile2DTileL, kParallelize5DTile2DTileM, + 0 /* flags */); + } + + for (size_t i = 0; i < kParallelize5DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize5DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize5DTile2DRangeK; k++) { + for (size_t l = 0; l < kParallelize5DTile2DRangeL; l++) { + for (size_t m = 0; m < kParallelize5DTile2DRangeM; m++) { + const size_t linear_idx = (((i * kParallelize5DTile2DRangeJ + j) * kParallelize5DTile2DRangeK + k) * kParallelize5DTile2DRangeL + l) * kParallelize5DTile2DRangeM + m; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), kIncrementIterations5D) + << "Element (" << i << ", " << j << ", " << k << ", " << l << ", " << m << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations5D << ")"; + } + } + } + } + } +} + +TEST(Parallelize5DTile2D, MultiThreadPoolEachItemProcessedMultipleTimes) { + std::vector<std::atomic_int> counters(kParallelize5DTile2DRangeI * kParallelize5DTile2DRangeJ * kParallelize5DTile2DRangeK * kParallelize5DTile2DRangeL * kParallelize5DTile2DRangeM); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + for (size_t iteration = 0; iteration < kIncrementIterations5D; iteration++) { + pthreadpool_parallelize_5d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_5d_tile_2d_t>(Increment5DTile2D), + static_cast<void*>(counters.data()), + kParallelize5DTile2DRangeI, kParallelize5DTile2DRangeJ, kParallelize5DTile2DRangeK, kParallelize5DTile2DRangeL, kParallelize5DTile2DRangeM, + kParallelize5DTile2DTileL, kParallelize5DTile2DTileM, + 0 /* flags */); + } + + for (size_t i = 0; i < kParallelize5DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize5DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize5DTile2DRangeK; k++) { + for (size_t l = 0; l < kParallelize5DTile2DRangeL; l++) { + for (size_t m = 0; m < kParallelize5DTile2DRangeM; m++) { + const size_t linear_idx = (((i * kParallelize5DTile2DRangeJ + j) * kParallelize5DTile2DRangeK + k) * kParallelize5DTile2DRangeL + l) * kParallelize5DTile2DRangeM + m; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), kIncrementIterations5D) + << "Element (" << i << ", " << j << ", " << k << ", " << l << ", " << m << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations5D << ")"; + } + } + } + } + } +} + +static void WorkImbalance5DTile2D(std::atomic_int* num_processed_items, size_t i, size_t j, size_t k, size_t start_l, size_t start_m, size_t tile_l, size_t tile_m) { + num_processed_items->fetch_add(tile_l * tile_m, std::memory_order_relaxed); + if (i == 0 && j == 0 && k == 0 && start_l == 0 && start_m == 0) { + /* Spin-wait until all items are computed */ + while (num_processed_items->load(std::memory_order_relaxed) != kParallelize5DTile2DRangeI * kParallelize5DTile2DRangeJ * kParallelize5DTile2DRangeK * kParallelize5DTile2DRangeL * kParallelize5DTile2DRangeM) { + std::atomic_thread_fence(std::memory_order_acquire); + } + } +} + +TEST(Parallelize5DTile2D, MultiThreadPoolWorkStealing) { + std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_5d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_5d_tile_2d_t>(WorkImbalance5DTile2D), + static_cast<void*>(&num_processed_items), + kParallelize5DTile2DRangeI, kParallelize5DTile2DRangeJ, kParallelize5DTile2DRangeK, kParallelize5DTile2DRangeL, kParallelize5DTile2DRangeM, + kParallelize5DTile2DTileL, kParallelize5DTile2DTileM, + 0 /* flags */); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize5DTile2DRangeI * kParallelize5DTile2DRangeJ * kParallelize5DTile2DRangeK * kParallelize5DTile2DRangeL * kParallelize5DTile2DRangeM); +} + +static void ComputeNothing6DTile2D(void*, size_t, size_t, size_t, size_t, size_t, size_t, size_t, size_t) { +} + +TEST(Parallelize6DTile2D, SingleThreadPoolCompletes) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_6d_tile_2d(threadpool.get(), + ComputeNothing6DTile2D, + nullptr, + kParallelize6DTile2DRangeI, kParallelize6DTile2DRangeJ, kParallelize6DTile2DRangeK, kParallelize6DTile2DRangeL, kParallelize6DTile2DRangeM, kParallelize6DTile2DRangeN, + kParallelize6DTile2DTileM, kParallelize6DTile2DTileN, + 0 /* flags */); +} + +TEST(Parallelize6DTile2D, MultiThreadPoolCompletes) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_6d_tile_2d( + threadpool.get(), + ComputeNothing6DTile2D, + nullptr, + kParallelize6DTile2DRangeI, kParallelize6DTile2DRangeJ, kParallelize6DTile2DRangeK, kParallelize6DTile2DRangeL, kParallelize6DTile2DRangeM, kParallelize6DTile2DRangeN, + kParallelize6DTile2DTileM, kParallelize6DTile2DTileN, + 0 /* flags */); +} + +static void CheckBounds6DTile2D(void*, size_t i, size_t j, size_t k, size_t l, size_t start_m, size_t start_n, size_t tile_m, size_t tile_n) { + EXPECT_LT(i, kParallelize6DTile2DRangeI); + EXPECT_LT(j, kParallelize6DTile2DRangeJ); + EXPECT_LT(k, kParallelize6DTile2DRangeK); + EXPECT_LT(l, kParallelize6DTile2DRangeL); + EXPECT_LT(start_m, kParallelize6DTile2DRangeM); + EXPECT_LT(start_n, kParallelize6DTile2DRangeN); + EXPECT_LE(start_m + tile_m, kParallelize6DTile2DRangeM); + EXPECT_LE(start_n + tile_n, kParallelize6DTile2DRangeN); +} + +TEST(Parallelize6DTile2D, SingleThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_6d_tile_2d( + threadpool.get(), + CheckBounds6DTile2D, + nullptr, + kParallelize6DTile2DRangeI, kParallelize6DTile2DRangeJ, kParallelize6DTile2DRangeK, kParallelize6DTile2DRangeL, kParallelize6DTile2DRangeM, kParallelize6DTile2DRangeN, + kParallelize6DTile2DTileM, kParallelize6DTile2DTileN, + 0 /* flags */); +} + +TEST(Parallelize6DTile2D, MultiThreadPoolAllItemsInBounds) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_6d_tile_2d( + threadpool.get(), + CheckBounds6DTile2D, + nullptr, + kParallelize6DTile2DRangeI, kParallelize6DTile2DRangeJ, kParallelize6DTile2DRangeK, kParallelize6DTile2DRangeL, kParallelize6DTile2DRangeM, kParallelize6DTile2DRangeN, + kParallelize6DTile2DTileM, kParallelize6DTile2DTileN, + 0 /* flags */); +} + +static void CheckTiling6DTile2D(void*, size_t i, size_t j, size_t k, size_t l, size_t start_m, size_t start_n, size_t tile_m, size_t tile_n) { + EXPECT_GT(tile_m, 0); + EXPECT_LE(tile_m, kParallelize6DTile2DTileM); + EXPECT_EQ(start_m % kParallelize6DTile2DTileM, 0); + EXPECT_EQ(tile_m, std::min<size_t>(kParallelize6DTile2DTileM, kParallelize6DTile2DRangeM - start_m)); + + EXPECT_GT(tile_n, 0); + EXPECT_LE(tile_n, kParallelize6DTile2DTileN); + EXPECT_EQ(start_n % kParallelize6DTile2DTileN, 0); + EXPECT_EQ(tile_n, std::min<size_t>(kParallelize6DTile2DTileN, kParallelize6DTile2DRangeN - start_n)); +} + +TEST(Parallelize6DTile2D, SingleThreadPoolUniformTiling) { + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_6d_tile_2d( + threadpool.get(), + CheckTiling6DTile2D, + nullptr, + kParallelize6DTile2DRangeI, kParallelize6DTile2DRangeJ, kParallelize6DTile2DRangeK, kParallelize6DTile2DRangeL, kParallelize6DTile2DRangeM, kParallelize6DTile2DRangeN, + kParallelize6DTile2DTileM, kParallelize6DTile2DTileN, + 0 /* flags */); +} + +TEST(Parallelize6DTile2D, MultiThreadPoolUniformTiling) { + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_6d_tile_2d( + threadpool.get(), + CheckTiling6DTile2D, + nullptr, + kParallelize6DTile2DRangeI, kParallelize6DTile2DRangeJ, kParallelize6DTile2DRangeK, kParallelize6DTile2DRangeL, kParallelize6DTile2DRangeM, kParallelize6DTile2DRangeN, + kParallelize6DTile2DTileM, kParallelize6DTile2DTileN, + 0 /* flags */); +} + +static void SetTrue6DTile2D(std::atomic_bool* processed_indicators, size_t i, size_t j, size_t k, size_t l, size_t start_m, size_t start_n, size_t tile_m, size_t tile_n) { + for (size_t m = start_m; m < start_m + tile_m; m++) { + for (size_t n = start_n; n < start_n + tile_n; n++) { + const size_t linear_idx = ((((i * kParallelize6DTile2DRangeJ + j) * kParallelize6DTile2DRangeK + k) * kParallelize6DTile2DRangeL + l) * kParallelize6DTile2DRangeM + m) * kParallelize6DTile2DRangeN + n; + processed_indicators[linear_idx].store(true, std::memory_order_relaxed); + } + } +} + +TEST(Parallelize6DTile2D, SingleThreadPoolAllItemsProcessed) { + std::vector<std::atomic_bool> indicators(kParallelize6DTile2DRangeI * kParallelize6DTile2DRangeJ * kParallelize6DTile2DRangeK * kParallelize6DTile2DRangeL * kParallelize6DTile2DRangeM * kParallelize6DTile2DRangeN); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_6d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_6d_tile_2d_t>(SetTrue6DTile2D), + static_cast<void*>(indicators.data()), + kParallelize6DTile2DRangeI, kParallelize6DTile2DRangeJ, kParallelize6DTile2DRangeK, kParallelize6DTile2DRangeL, kParallelize6DTile2DRangeM, kParallelize6DTile2DRangeN, + kParallelize6DTile2DTileM, kParallelize6DTile2DTileN, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize6DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize6DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize6DTile2DRangeK; k++) { + for (size_t l = 0; l < kParallelize6DTile2DRangeL; l++) { + for (size_t m = 0; m < kParallelize6DTile2DRangeM; m++) { + for (size_t n = 0; n < kParallelize6DTile2DRangeN; n++) { + const size_t linear_idx = ((((i * kParallelize6DTile2DRangeJ + j) * kParallelize6DTile2DRangeK + k) * kParallelize6DTile2DRangeL + l) * kParallelize6DTile2DRangeM + m) * kParallelize6DTile2DRangeN + n; + EXPECT_TRUE(indicators[linear_idx].load(std::memory_order_relaxed)) + << "Element (" << i << ", " << j << ", " << k << ", " << l << ", " << m << ", " << n << ") not processed"; + } + } + } + } + } + } +} + +TEST(Parallelize6DTile2D, MultiThreadPoolAllItemsProcessed) { + std::vector<std::atomic_bool> indicators(kParallelize6DTile2DRangeI * kParallelize6DTile2DRangeJ * kParallelize6DTile2DRangeK * kParallelize6DTile2DRangeL * kParallelize6DTile2DRangeM * kParallelize6DTile2DRangeN); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_6d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_6d_tile_2d_t>(SetTrue6DTile2D), + static_cast<void*>(indicators.data()), + kParallelize6DTile2DRangeI, kParallelize6DTile2DRangeJ, kParallelize6DTile2DRangeK, kParallelize6DTile2DRangeL, kParallelize6DTile2DRangeM, kParallelize6DTile2DRangeN, + kParallelize6DTile2DTileM, kParallelize6DTile2DTileN, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize6DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize6DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize6DTile2DRangeK; k++) { + for (size_t l = 0; l < kParallelize6DTile2DRangeL; l++) { + for (size_t m = 0; m < kParallelize6DTile2DRangeM; m++) { + for (size_t n = 0; n < kParallelize6DTile2DRangeN; n++) { + const size_t linear_idx = ((((i * kParallelize6DTile2DRangeJ + j) * kParallelize6DTile2DRangeK + k) * kParallelize6DTile2DRangeL + l) * kParallelize6DTile2DRangeM + m) * kParallelize6DTile2DRangeN + n; + EXPECT_TRUE(indicators[linear_idx].load(std::memory_order_relaxed)) + << "Element (" << i << ", " << j << ", " << k << ", " << l << ", " << m << ", " << n << ") not processed"; + } + } + } + } + } + } +} + +static void Increment6DTile2D(std::atomic_int* processed_counters, size_t i, size_t j, size_t k, size_t l, size_t start_m, size_t start_n, size_t tile_m, size_t tile_n) { + for (size_t m = start_m; m < start_m + tile_m; m++) { + for (size_t n = start_n; n < start_n + tile_n; n++) { + const size_t linear_idx = ((((i * kParallelize6DTile2DRangeJ + j) * kParallelize6DTile2DRangeK + k) * kParallelize6DTile2DRangeL + l) * kParallelize6DTile2DRangeM + m) * kParallelize6DTile2DRangeN + n; + processed_counters[linear_idx].fetch_add(1, std::memory_order_relaxed); + } + } +} + +TEST(Parallelize6DTile2D, SingleThreadPoolEachItemProcessedOnce) { + std::vector<std::atomic_int> counters(kParallelize6DTile2DRangeI * kParallelize6DTile2DRangeJ * kParallelize6DTile2DRangeK * kParallelize6DTile2DRangeL * kParallelize6DTile2DRangeM * kParallelize6DTile2DRangeN); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + pthreadpool_parallelize_6d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_6d_tile_2d_t>(Increment6DTile2D), + static_cast<void*>(counters.data()), + kParallelize6DTile2DRangeI, kParallelize6DTile2DRangeJ, kParallelize6DTile2DRangeK, kParallelize6DTile2DRangeL, kParallelize6DTile2DRangeM, kParallelize6DTile2DRangeN, + kParallelize6DTile2DTileM, kParallelize6DTile2DTileN, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize6DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize6DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize6DTile2DRangeK; k++) { + for (size_t l = 0; l < kParallelize6DTile2DRangeL; l++) { + for (size_t m = 0; m < kParallelize6DTile2DRangeM; m++) { + for (size_t n = 0; n < kParallelize6DTile2DRangeN; n++) { + const size_t linear_idx = ((((i * kParallelize6DTile2DRangeJ + j) * kParallelize6DTile2DRangeK + k) * kParallelize6DTile2DRangeL + l) * kParallelize6DTile2DRangeM + m) * kParallelize6DTile2DRangeN + n; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), 1) + << "Element (" << i << ", " << j << ", " << k << ", " << l << ", " << m << ", " << n << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times (expected: 1)"; + } + } + } + } + } + } +} + +TEST(Parallelize6DTile2D, MultiThreadPoolEachItemProcessedOnce) { + std::vector<std::atomic_int> counters(kParallelize6DTile2DRangeI * kParallelize6DTile2DRangeJ * kParallelize6DTile2DRangeK * kParallelize6DTile2DRangeL * kParallelize6DTile2DRangeM * kParallelize6DTile2DRangeN); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_6d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_6d_tile_2d_t>(Increment6DTile2D), + static_cast<void*>(counters.data()), + kParallelize6DTile2DRangeI, kParallelize6DTile2DRangeJ, kParallelize6DTile2DRangeK, kParallelize6DTile2DRangeL, kParallelize6DTile2DRangeM, kParallelize6DTile2DRangeN, + kParallelize6DTile2DTileM, kParallelize6DTile2DTileN, + 0 /* flags */); + + for (size_t i = 0; i < kParallelize6DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize6DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize6DTile2DRangeK; k++) { + for (size_t l = 0; l < kParallelize6DTile2DRangeL; l++) { + for (size_t m = 0; m < kParallelize6DTile2DRangeM; m++) { + for (size_t n = 0; n < kParallelize6DTile2DRangeN; n++) { + const size_t linear_idx = ((((i * kParallelize6DTile2DRangeJ + j) * kParallelize6DTile2DRangeK + k) * kParallelize6DTile2DRangeL + l) * kParallelize6DTile2DRangeM + m) * kParallelize6DTile2DRangeN + n; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), 1) + << "Element (" << i << ", " << j << ", " << k << ", " << l << ", " << m << ", " << n << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times (expected: 1)"; + } + } + } + } + } + } +} + +TEST(Parallelize6DTile2D, SingleThreadPoolEachItemProcessedMultipleTimes) { + std::vector<std::atomic_int> counters(kParallelize6DTile2DRangeI * kParallelize6DTile2DRangeJ * kParallelize6DTile2DRangeK * kParallelize6DTile2DRangeL * kParallelize6DTile2DRangeM * kParallelize6DTile2DRangeN); + + auto_pthreadpool_t threadpool(pthreadpool_create(1), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + for (size_t iteration = 0; iteration < kIncrementIterations6D; iteration++) { + pthreadpool_parallelize_6d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_6d_tile_2d_t>(Increment6DTile2D), + static_cast<void*>(counters.data()), + kParallelize6DTile2DRangeI, kParallelize6DTile2DRangeJ, kParallelize6DTile2DRangeK, kParallelize6DTile2DRangeL, kParallelize6DTile2DRangeM, kParallelize6DTile2DRangeN, + kParallelize6DTile2DTileM, kParallelize6DTile2DTileN, + 0 /* flags */); + } + + for (size_t i = 0; i < kParallelize6DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize6DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize6DTile2DRangeK; k++) { + for (size_t l = 0; l < kParallelize6DTile2DRangeL; l++) { + for (size_t m = 0; m < kParallelize6DTile2DRangeM; m++) { + for (size_t n = 0; n < kParallelize6DTile2DRangeN; n++) { + const size_t linear_idx = ((((i * kParallelize6DTile2DRangeJ + j) * kParallelize6DTile2DRangeK + k) * kParallelize6DTile2DRangeL + l) * kParallelize6DTile2DRangeM + m) * kParallelize6DTile2DRangeN + n; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), kIncrementIterations6D) + << "Element (" << i << ", " << j << ", " << k << ", " << l << ", " << m << ", " << n << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations6D << ")"; + } + } + } + } + } + } +} + +TEST(Parallelize6DTile2D, MultiThreadPoolEachItemProcessedMultipleTimes) { + std::vector<std::atomic_int> counters(kParallelize6DTile2DRangeI * kParallelize6DTile2DRangeJ * kParallelize6DTile2DRangeK * kParallelize6DTile2DRangeL * kParallelize6DTile2DRangeM * kParallelize6DTile2DRangeN); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + for (size_t iteration = 0; iteration < kIncrementIterations6D; iteration++) { + pthreadpool_parallelize_6d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_6d_tile_2d_t>(Increment6DTile2D), + static_cast<void*>(counters.data()), + kParallelize6DTile2DRangeI, kParallelize6DTile2DRangeJ, kParallelize6DTile2DRangeK, kParallelize6DTile2DRangeL, kParallelize6DTile2DRangeM, kParallelize6DTile2DRangeN, + kParallelize6DTile2DTileM, kParallelize6DTile2DTileN, + 0 /* flags */); + } + + for (size_t i = 0; i < kParallelize6DTile2DRangeI; i++) { + for (size_t j = 0; j < kParallelize6DTile2DRangeJ; j++) { + for (size_t k = 0; k < kParallelize6DTile2DRangeK; k++) { + for (size_t l = 0; l < kParallelize6DTile2DRangeL; l++) { + for (size_t m = 0; m < kParallelize6DTile2DRangeM; m++) { + for (size_t n = 0; n < kParallelize6DTile2DRangeN; n++) { + const size_t linear_idx = ((((i * kParallelize6DTile2DRangeJ + j) * kParallelize6DTile2DRangeK + k) * kParallelize6DTile2DRangeL + l) * kParallelize6DTile2DRangeM + m) * kParallelize6DTile2DRangeN + n; + EXPECT_EQ(counters[linear_idx].load(std::memory_order_relaxed), kIncrementIterations6D) + << "Element (" << i << ", " << j << ", " << k << ", " << l << ", " << m << ", " << n << ") was processed " + << counters[linear_idx].load(std::memory_order_relaxed) << " times " + << "(expected: " << kIncrementIterations6D << ")"; + } + } + } + } + } + } +} + +static void WorkImbalance6DTile2D(std::atomic_int* num_processed_items, size_t i, size_t j, size_t k, size_t l, size_t start_m, size_t start_n, size_t tile_m, size_t tile_n) { + num_processed_items->fetch_add(tile_m * tile_n, std::memory_order_relaxed); + if (i == 0 && j == 0 && k == 0 && l == 0 && start_m == 0 && start_n == 0) { + /* Spin-wait until all items are computed */ + while (num_processed_items->load(std::memory_order_relaxed) != kParallelize6DTile2DRangeI * kParallelize6DTile2DRangeJ * kParallelize6DTile2DRangeK * kParallelize6DTile2DRangeL * kParallelize6DTile2DRangeM * kParallelize6DTile2DRangeN) { + std::atomic_thread_fence(std::memory_order_acquire); + } + } +} + +TEST(Parallelize6DTile2D, MultiThreadPoolWorkStealing) { + std::atomic_int num_processed_items = ATOMIC_VAR_INIT(0); + + auto_pthreadpool_t threadpool(pthreadpool_create(0), pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_6d_tile_2d( + threadpool.get(), + reinterpret_cast<pthreadpool_task_6d_tile_2d_t>(WorkImbalance6DTile2D), + static_cast<void*>(&num_processed_items), + kParallelize6DTile2DRangeI, kParallelize6DTile2DRangeJ, kParallelize6DTile2DRangeK, kParallelize6DTile2DRangeL, kParallelize6DTile2DRangeM, kParallelize6DTile2DRangeN, + kParallelize6DTile2DTileM, kParallelize6DTile2DTileN, + 0 /* flags */); + EXPECT_EQ(num_processed_items.load(std::memory_order_relaxed), kParallelize6DTile2DRangeI * kParallelize6DTile2DRangeJ * kParallelize6DTile2DRangeK * kParallelize6DTile2DRangeL * kParallelize6DTile2DRangeM * kParallelize6DTile2DRangeN); +} |