aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMarat Dukhan <maratek@google.com>2019-09-30 13:49:57 -0700
committerMarat Dukhan <maratek@google.com>2019-09-30 13:49:57 -0700
commitc06f29339e2a200d9da38d86a2cb8be5128cf4c5 (patch)
tree7095f0a0b239947045953d375391f33089f23a1d
parente6be7fc5754dfcc62136a712c7dd74d866101450 (diff)
downloadplatform_external_pthreadpool-c06f29339e2a200d9da38d86a2cb8be5128cf4c5.tar.gz
platform_external_pthreadpool-c06f29339e2a200d9da38d86a2cb8be5128cf4c5.tar.bz2
platform_external_pthreadpool-c06f29339e2a200d9da38d86a2cb8be5128cf4c5.zip
Use caller thread as one of workers in the thread pool
-rw-r--r--src/threadpool-pthreads.c162
1 files changed, 85 insertions, 77 deletions
diff --git a/src/threadpool-pthreads.c b/src/threadpool-pthreads.c
index 93b03bf..f03af5a 100644
--- a/src/threadpool-pthreads.c
+++ b/src/threadpool-pthreads.c
@@ -260,17 +260,15 @@ static void thread_compute_1d(struct pthreadpool* threadpool, struct thread_info
while (atomic_decrement(&thread->range_length)) {
function(argument, range_start++);
}
- /* Done, now look for other threads' items to steal */
- if (threadpool->active_threads > 1) {
- /* There are still other threads with work */
- const size_t thread_number = thread->thread_number;
- const size_t threads_count = threadpool->threads_count;
- for (size_t tid = (thread_number + 1) % threads_count; tid != thread_number; tid = (tid + 1) % threads_count) {
- struct thread_info* other_thread = &threadpool->threads[tid];
- while (atomic_decrement(&other_thread->range_length)) {
- const size_t item_id = __sync_sub_and_fetch(&other_thread->range_end, 1);
- function(argument, item_id);
- }
+
+ /* There still may be other threads with work */
+ const size_t thread_number = thread->thread_number;
+ const size_t threads_count = threadpool->threads_count;
+ for (size_t tid = (thread_number + 1) % threads_count; tid != thread_number; tid = (tid + 1) % threads_count) {
+ struct thread_info* other_thread = &threadpool->threads[tid];
+ while (atomic_decrement(&other_thread->range_length)) {
+ const size_t item_id = __sync_sub_and_fetch(&other_thread->range_end, 1);
+ function(argument, item_id);
}
}
}
@@ -366,26 +364,31 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) {
return NULL;
}
threadpool->threads_count = threads_count;
- pthread_mutex_init(&threadpool->execution_mutex, NULL);
-#if !PTHREADPOOL_USE_FUTEX
- pthread_mutex_init(&threadpool->completion_mutex, NULL);
- pthread_cond_init(&threadpool->completion_condvar, NULL);
- pthread_mutex_init(&threadpool->command_mutex, NULL);
- pthread_cond_init(&threadpool->command_condvar, NULL);
-#endif
-#if PTHREADPOOL_USE_FUTEX
- threadpool->has_active_threads = 1;
-#endif
- threadpool->active_threads = threadpool->threads_count;
+ /* Thread pool with a single thread computes everything on the caller thread. */
+ if (threads_count > 1) {
+ pthread_mutex_init(&threadpool->execution_mutex, NULL);
+ #if !PTHREADPOOL_USE_FUTEX
+ pthread_mutex_init(&threadpool->completion_mutex, NULL);
+ pthread_cond_init(&threadpool->completion_condvar, NULL);
+ pthread_mutex_init(&threadpool->command_mutex, NULL);
+ pthread_cond_init(&threadpool->command_condvar, NULL);
+ #endif
- for (size_t tid = 0; tid < threads_count; tid++) {
- threadpool->threads[tid].thread_number = tid;
- pthread_create(&threadpool->threads[tid].thread_object, NULL, &thread_main, &threadpool->threads[tid]);
- }
+ #if PTHREADPOOL_USE_FUTEX
+ threadpool->has_active_threads = 1;
+ #endif
+ threadpool->active_threads = threadpool->threads_count - 1 /* caller thread */;
+
+ /* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */
+ for (size_t tid = 1; tid < threads_count; tid++) {
+ threadpool->threads[tid].thread_number = tid;
+ pthread_create(&threadpool->threads[tid].thread_object, NULL, &thread_main, &threadpool->threads[tid]);
+ }
- /* Wait until all threads initialize */
- wait_worker_threads(threadpool);
+ /* Wait until all threads initialize */
+ wait_worker_threads(threadpool);
+ }
return threadpool;
}
@@ -403,8 +406,8 @@ void pthreadpool_compute_1d(
void* argument,
size_t range)
{
- if (threadpool == NULL) {
- /* No thread pool provided: execute function sequentially on the calling thread */
+ if (threadpool == NULL || threadpool->threads_count <= 1) {
+ /* No thread pool used: execute function sequentially on the calling thread */
for (size_t i = 0; i < range; i++) {
function(argument, i);
}
@@ -417,7 +420,7 @@ void pthreadpool_compute_1d(
threadpool->function = function;
threadpool->argument = argument;
- threadpool->active_threads = threadpool->threads_count;
+ threadpool->active_threads = threadpool->threads_count - 1 /* caller thread */;
threadpool->has_active_threads = 1;
/* Spread the work between threads */
@@ -449,7 +452,7 @@ void pthreadpool_compute_1d(
threadpool->argument = argument;
/* Locking of completion_mutex not needed: readers are sleeping on command_condvar */
- threadpool->active_threads = threadpool->threads_count;
+ threadpool->active_threads = threadpool->threads_count - 1 /* caller thread */;
/* Spread the work between threads */
for (size_t tid = 0; tid < threadpool->threads_count; tid++) {
@@ -475,6 +478,9 @@ void pthreadpool_compute_1d(
pthread_cond_broadcast(&threadpool->command_condvar);
#endif
+ /* Do computations as worker #0 */
+ thread_compute_1d(threadpool, &threadpool->threads[0]);
+
/* Wait until the threads finish computation */
wait_worker_threads(threadpool);
@@ -504,8 +510,8 @@ void pthreadpool_compute_1d_tiled(
size_t range,
size_t tile)
{
- if (threadpool == NULL) {
- /* No thread pool provided: execute function sequentially on the calling thread */
+ if (threadpool == NULL || threadpool->threads_count <= 1) {
+ /* No thread pool used: execute function sequentially on the calling thread */
for (size_t i = 0; i < range; i += tile) {
function(argument, i, min(range - i, tile));
}
@@ -541,8 +547,8 @@ void pthreadpool_compute_2d(
size_t range_i,
size_t range_j)
{
- if (threadpool == NULL) {
- /* No thread pool provided: execute function sequentially on the calling thread */
+ if (threadpool == NULL || threadpool->threads_count <= 1) {
+ /* No thread pool used: execute function sequentially on the calling thread */
for (size_t i = 0; i < range_i; i++) {
for (size_t j = 0; j < range_j; j++) {
function(argument, i, j);
@@ -590,8 +596,8 @@ void pthreadpool_compute_2d_tiled(
size_t tile_i,
size_t tile_j)
{
- if (threadpool == NULL) {
- /* No thread pool provided: execute function sequentially on the calling thread */
+ if (threadpool == NULL || threadpool->threads_count <= 1) {
+ /* No thread pool used: execute function sequentially on the calling thread */
for (size_t i = 0; i < range_i; i += tile_i) {
for (size_t j = 0; j < range_j; j += tile_j) {
function(argument, i, j, min(range_i - i, tile_i), min(range_j - j, tile_j));
@@ -655,8 +661,8 @@ void pthreadpool_compute_3d_tiled(
size_t tile_j,
size_t tile_k)
{
- if (threadpool == NULL) {
- /* No thread pool provided: execute function sequentially on the calling thread */
+ if (threadpool == NULL || threadpool->threads_count <= 1) {
+ /* No thread pool used: execute function sequentially on the calling thread */
for (size_t i = 0; i < range_i; i += tile_i) {
for (size_t j = 0; j < range_j; j += tile_j) {
for (size_t k = 0; k < range_k; k += tile_k) {
@@ -738,8 +744,8 @@ void pthreadpool_compute_4d_tiled(
size_t tile_k,
size_t tile_l)
{
- if (threadpool == NULL) {
- /* No thread pool provided: execute function sequentially on the calling thread */
+ if (threadpool == NULL || threadpool->threads_count <= 1) {
+ /* No thread pool used: execute function sequentially on the calling thread */
for (size_t i = 0; i < range_i; i += tile_i) {
for (size_t j = 0; j < range_j; j += tile_j) {
for (size_t k = 0; k < range_k; k += tile_k) {
@@ -779,43 +785,45 @@ void pthreadpool_compute_4d_tiled(
void pthreadpool_destroy(struct pthreadpool* threadpool) {
if (threadpool != NULL) {
- #if PTHREADPOOL_USE_FUTEX
- threadpool->active_threads = threadpool->threads_count;
- threadpool->has_active_threads = 1;
- __sync_synchronize();
- threadpool->command = threadpool_command_shutdown;
- __sync_synchronize();
- futex_wake_all(&threadpool->command);
- #else
- /* Lock the command variable to ensure that threads don't shutdown until both command and active_threads are updated */
- pthread_mutex_lock(&threadpool->command_mutex);
-
- /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */
- threadpool->active_threads = threadpool->threads_count;
-
- /* Update the threadpool command. */
- threadpool->command = threadpool_command_shutdown;
-
- /* Wake up worker threads */
- pthread_cond_broadcast(&threadpool->command_condvar);
-
- /* Commit the state changes and let workers start processing */
- pthread_mutex_unlock(&threadpool->command_mutex);
- #endif
+ if (threadpool->threads_count > 1) {
+ #if PTHREADPOOL_USE_FUTEX
+ threadpool->active_threads = threadpool->threads_count - 1 /* caller thread */;
+ threadpool->has_active_threads = 1;
+ __sync_synchronize();
+ threadpool->command = threadpool_command_shutdown;
+ __sync_synchronize();
+ futex_wake_all(&threadpool->command);
+ #else
+ /* Lock the command variable to ensure that threads don't shutdown until both command and active_threads are updated */
+ pthread_mutex_lock(&threadpool->command_mutex);
+
+ /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */
+ threadpool->active_threads = threadpool->threads_count - 1 /* caller thread */;
+
+ /* Update the threadpool command. */
+ threadpool->command = threadpool_command_shutdown;
+
+ /* Wake up worker threads */
+ pthread_cond_broadcast(&threadpool->command_condvar);
+
+ /* Commit the state changes and let workers start processing */
+ pthread_mutex_unlock(&threadpool->command_mutex);
+ #endif
+
+ /* Wait until all threads return */
+ for (size_t thread = 1; thread < threadpool->threads_count; thread++) {
+ pthread_join(threadpool->threads[thread].thread_object, NULL);
+ }
- /* Wait until all threads return */
- for (size_t thread = 0; thread < threadpool->threads_count; thread++) {
- pthread_join(threadpool->threads[thread].thread_object, NULL);
+ /* Release resources */
+ pthread_mutex_destroy(&threadpool->execution_mutex);
+ #if !PTHREADPOOL_USE_FUTEX
+ pthread_mutex_destroy(&threadpool->completion_mutex);
+ pthread_cond_destroy(&threadpool->completion_condvar);
+ pthread_mutex_destroy(&threadpool->command_mutex);
+ pthread_cond_destroy(&threadpool->command_condvar);
+ #endif
}
-
- /* Release resources */
- pthread_mutex_destroy(&threadpool->execution_mutex);
- #if !PTHREADPOOL_USE_FUTEX
- pthread_mutex_destroy(&threadpool->completion_mutex);
- pthread_cond_destroy(&threadpool->completion_condvar);
- pthread_mutex_destroy(&threadpool->command_mutex);
- pthread_cond_destroy(&threadpool->command_condvar);
- #endif
free(threadpool);
}
}