diff options
author | Marat Dukhan <maratek@google.com> | 2019-09-30 13:49:57 -0700 |
---|---|---|
committer | Marat Dukhan <maratek@google.com> | 2019-09-30 13:49:57 -0700 |
commit | c06f29339e2a200d9da38d86a2cb8be5128cf4c5 (patch) | |
tree | 7095f0a0b239947045953d375391f33089f23a1d | |
parent | e6be7fc5754dfcc62136a712c7dd74d866101450 (diff) | |
download | platform_external_pthreadpool-c06f29339e2a200d9da38d86a2cb8be5128cf4c5.tar.gz platform_external_pthreadpool-c06f29339e2a200d9da38d86a2cb8be5128cf4c5.tar.bz2 platform_external_pthreadpool-c06f29339e2a200d9da38d86a2cb8be5128cf4c5.zip |
Use caller thread as one of workers in the thread pool
-rw-r--r-- | src/threadpool-pthreads.c | 162 |
1 files changed, 85 insertions, 77 deletions
diff --git a/src/threadpool-pthreads.c b/src/threadpool-pthreads.c index 93b03bf..f03af5a 100644 --- a/src/threadpool-pthreads.c +++ b/src/threadpool-pthreads.c @@ -260,17 +260,15 @@ static void thread_compute_1d(struct pthreadpool* threadpool, struct thread_info while (atomic_decrement(&thread->range_length)) { function(argument, range_start++); } - /* Done, now look for other threads' items to steal */ - if (threadpool->active_threads > 1) { - /* There are still other threads with work */ - const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count; - for (size_t tid = (thread_number + 1) % threads_count; tid != thread_number; tid = (tid + 1) % threads_count) { - struct thread_info* other_thread = &threadpool->threads[tid]; - while (atomic_decrement(&other_thread->range_length)) { - const size_t item_id = __sync_sub_and_fetch(&other_thread->range_end, 1); - function(argument, item_id); - } + + /* There still may be other threads with work */ + const size_t thread_number = thread->thread_number; + const size_t threads_count = threadpool->threads_count; + for (size_t tid = (thread_number + 1) % threads_count; tid != thread_number; tid = (tid + 1) % threads_count) { + struct thread_info* other_thread = &threadpool->threads[tid]; + while (atomic_decrement(&other_thread->range_length)) { + const size_t item_id = __sync_sub_and_fetch(&other_thread->range_end, 1); + function(argument, item_id); } } } @@ -366,26 +364,31 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { return NULL; } threadpool->threads_count = threads_count; - pthread_mutex_init(&threadpool->execution_mutex, NULL); -#if !PTHREADPOOL_USE_FUTEX - pthread_mutex_init(&threadpool->completion_mutex, NULL); - pthread_cond_init(&threadpool->completion_condvar, NULL); - pthread_mutex_init(&threadpool->command_mutex, NULL); - pthread_cond_init(&threadpool->command_condvar, NULL); -#endif -#if PTHREADPOOL_USE_FUTEX - threadpool->has_active_threads = 1; -#endif - threadpool->active_threads = threadpool->threads_count; + /* Thread pool with a single thread computes everything on the caller thread. */ + if (threads_count > 1) { + pthread_mutex_init(&threadpool->execution_mutex, NULL); + #if !PTHREADPOOL_USE_FUTEX + pthread_mutex_init(&threadpool->completion_mutex, NULL); + pthread_cond_init(&threadpool->completion_condvar, NULL); + pthread_mutex_init(&threadpool->command_mutex, NULL); + pthread_cond_init(&threadpool->command_condvar, NULL); + #endif - for (size_t tid = 0; tid < threads_count; tid++) { - threadpool->threads[tid].thread_number = tid; - pthread_create(&threadpool->threads[tid].thread_object, NULL, &thread_main, &threadpool->threads[tid]); - } + #if PTHREADPOOL_USE_FUTEX + threadpool->has_active_threads = 1; + #endif + threadpool->active_threads = threadpool->threads_count - 1 /* caller thread */; + + /* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */ + for (size_t tid = 1; tid < threads_count; tid++) { + threadpool->threads[tid].thread_number = tid; + pthread_create(&threadpool->threads[tid].thread_object, NULL, &thread_main, &threadpool->threads[tid]); + } - /* Wait until all threads initialize */ - wait_worker_threads(threadpool); + /* Wait until all threads initialize */ + wait_worker_threads(threadpool); + } return threadpool; } @@ -403,8 +406,8 @@ void pthreadpool_compute_1d( void* argument, size_t range) { - if (threadpool == NULL) { - /* No thread pool provided: execute function sequentially on the calling thread */ + if (threadpool == NULL || threadpool->threads_count <= 1) { + /* No thread pool used: execute function sequentially on the calling thread */ for (size_t i = 0; i < range; i++) { function(argument, i); } @@ -417,7 +420,7 @@ void pthreadpool_compute_1d( threadpool->function = function; threadpool->argument = argument; - threadpool->active_threads = threadpool->threads_count; + threadpool->active_threads = threadpool->threads_count - 1 /* caller thread */; threadpool->has_active_threads = 1; /* Spread the work between threads */ @@ -449,7 +452,7 @@ void pthreadpool_compute_1d( threadpool->argument = argument; /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ - threadpool->active_threads = threadpool->threads_count; + threadpool->active_threads = threadpool->threads_count - 1 /* caller thread */; /* Spread the work between threads */ for (size_t tid = 0; tid < threadpool->threads_count; tid++) { @@ -475,6 +478,9 @@ void pthreadpool_compute_1d( pthread_cond_broadcast(&threadpool->command_condvar); #endif + /* Do computations as worker #0 */ + thread_compute_1d(threadpool, &threadpool->threads[0]); + /* Wait until the threads finish computation */ wait_worker_threads(threadpool); @@ -504,8 +510,8 @@ void pthreadpool_compute_1d_tiled( size_t range, size_t tile) { - if (threadpool == NULL) { - /* No thread pool provided: execute function sequentially on the calling thread */ + if (threadpool == NULL || threadpool->threads_count <= 1) { + /* No thread pool used: execute function sequentially on the calling thread */ for (size_t i = 0; i < range; i += tile) { function(argument, i, min(range - i, tile)); } @@ -541,8 +547,8 @@ void pthreadpool_compute_2d( size_t range_i, size_t range_j) { - if (threadpool == NULL) { - /* No thread pool provided: execute function sequentially on the calling thread */ + if (threadpool == NULL || threadpool->threads_count <= 1) { + /* No thread pool used: execute function sequentially on the calling thread */ for (size_t i = 0; i < range_i; i++) { for (size_t j = 0; j < range_j; j++) { function(argument, i, j); @@ -590,8 +596,8 @@ void pthreadpool_compute_2d_tiled( size_t tile_i, size_t tile_j) { - if (threadpool == NULL) { - /* No thread pool provided: execute function sequentially on the calling thread */ + if (threadpool == NULL || threadpool->threads_count <= 1) { + /* No thread pool used: execute function sequentially on the calling thread */ for (size_t i = 0; i < range_i; i += tile_i) { for (size_t j = 0; j < range_j; j += tile_j) { function(argument, i, j, min(range_i - i, tile_i), min(range_j - j, tile_j)); @@ -655,8 +661,8 @@ void pthreadpool_compute_3d_tiled( size_t tile_j, size_t tile_k) { - if (threadpool == NULL) { - /* No thread pool provided: execute function sequentially on the calling thread */ + if (threadpool == NULL || threadpool->threads_count <= 1) { + /* No thread pool used: execute function sequentially on the calling thread */ for (size_t i = 0; i < range_i; i += tile_i) { for (size_t j = 0; j < range_j; j += tile_j) { for (size_t k = 0; k < range_k; k += tile_k) { @@ -738,8 +744,8 @@ void pthreadpool_compute_4d_tiled( size_t tile_k, size_t tile_l) { - if (threadpool == NULL) { - /* No thread pool provided: execute function sequentially on the calling thread */ + if (threadpool == NULL || threadpool->threads_count <= 1) { + /* No thread pool used: execute function sequentially on the calling thread */ for (size_t i = 0; i < range_i; i += tile_i) { for (size_t j = 0; j < range_j; j += tile_j) { for (size_t k = 0; k < range_k; k += tile_k) { @@ -779,43 +785,45 @@ void pthreadpool_compute_4d_tiled( void pthreadpool_destroy(struct pthreadpool* threadpool) { if (threadpool != NULL) { - #if PTHREADPOOL_USE_FUTEX - threadpool->active_threads = threadpool->threads_count; - threadpool->has_active_threads = 1; - __sync_synchronize(); - threadpool->command = threadpool_command_shutdown; - __sync_synchronize(); - futex_wake_all(&threadpool->command); - #else - /* Lock the command variable to ensure that threads don't shutdown until both command and active_threads are updated */ - pthread_mutex_lock(&threadpool->command_mutex); - - /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ - threadpool->active_threads = threadpool->threads_count; - - /* Update the threadpool command. */ - threadpool->command = threadpool_command_shutdown; - - /* Wake up worker threads */ - pthread_cond_broadcast(&threadpool->command_condvar); - - /* Commit the state changes and let workers start processing */ - pthread_mutex_unlock(&threadpool->command_mutex); - #endif + if (threadpool->threads_count > 1) { + #if PTHREADPOOL_USE_FUTEX + threadpool->active_threads = threadpool->threads_count - 1 /* caller thread */; + threadpool->has_active_threads = 1; + __sync_synchronize(); + threadpool->command = threadpool_command_shutdown; + __sync_synchronize(); + futex_wake_all(&threadpool->command); + #else + /* Lock the command variable to ensure that threads don't shutdown until both command and active_threads are updated */ + pthread_mutex_lock(&threadpool->command_mutex); + + /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ + threadpool->active_threads = threadpool->threads_count - 1 /* caller thread */; + + /* Update the threadpool command. */ + threadpool->command = threadpool_command_shutdown; + + /* Wake up worker threads */ + pthread_cond_broadcast(&threadpool->command_condvar); + + /* Commit the state changes and let workers start processing */ + pthread_mutex_unlock(&threadpool->command_mutex); + #endif + + /* Wait until all threads return */ + for (size_t thread = 1; thread < threadpool->threads_count; thread++) { + pthread_join(threadpool->threads[thread].thread_object, NULL); + } - /* Wait until all threads return */ - for (size_t thread = 0; thread < threadpool->threads_count; thread++) { - pthread_join(threadpool->threads[thread].thread_object, NULL); + /* Release resources */ + pthread_mutex_destroy(&threadpool->execution_mutex); + #if !PTHREADPOOL_USE_FUTEX + pthread_mutex_destroy(&threadpool->completion_mutex); + pthread_cond_destroy(&threadpool->completion_condvar); + pthread_mutex_destroy(&threadpool->command_mutex); + pthread_cond_destroy(&threadpool->command_condvar); + #endif } - - /* Release resources */ - pthread_mutex_destroy(&threadpool->execution_mutex); - #if !PTHREADPOOL_USE_FUTEX - pthread_mutex_destroy(&threadpool->completion_mutex); - pthread_cond_destroy(&threadpool->completion_condvar); - pthread_mutex_destroy(&threadpool->command_mutex); - pthread_cond_destroy(&threadpool->command_condvar); - #endif free(threadpool); } } |