diff options
author | Qi Wang <interwq@gwu.edu> | 2017-06-09 05:46:31 (GMT) |
---|---|---|
committer | Qi Wang <interwq@gmail.com> | 2017-06-12 15:56:14 (GMT) |
commit | 464cb60490efda800625b16fedd5bcd238e1526e (patch) | |
tree | cf9a1798be2a035bbe1e735c16cb056f0abc1666 | |
parent | 13685ab1b767091d817cb4959d24a42447a6fb78 (diff) | |
download | jemalloc-464cb60490efda800625b16fedd5bcd238e1526e.zip jemalloc-464cb60490efda800625b16fedd5bcd238e1526e.tar.gz jemalloc-464cb60490efda800625b16fedd5bcd238e1526e.tar.bz2 |
Move background thread creation to background_thread_0.
To avoid complications, avoid invoking pthread_create "internally", instead rely
on thread0 to launch new threads, and also terminating threads when asked.
-rw-r--r-- | include/jemalloc/internal/background_thread_externs.h | 2 | ||||
-rw-r--r-- | include/jemalloc/internal/background_thread_structs.h | 2 | ||||
-rw-r--r-- | src/background_thread.c | 381 | ||||
-rw-r--r-- | src/ctl.c | 12 |
4 files changed, 251 insertions, 146 deletions
diff --git a/include/jemalloc/internal/background_thread_externs.h b/include/jemalloc/internal/background_thread_externs.h index aef1c90..7c88369 100644 --- a/include/jemalloc/internal/background_thread_externs.h +++ b/include/jemalloc/internal/background_thread_externs.h @@ -10,8 +10,6 @@ extern background_thread_info_t *background_thread_info; bool background_thread_create(tsd_t *tsd, unsigned arena_ind); bool background_threads_enable(tsd_t *tsd); bool background_threads_disable(tsd_t *tsd); -bool background_threads_disable_single(tsd_t *tsd, - background_thread_info_t *info); void background_thread_interval_check(tsdn_t *tsdn, arena_t *arena, arena_decay_t *decay, size_t npages_new); void background_thread_prefork0(tsdn_t *tsdn); diff --git a/include/jemalloc/internal/background_thread_structs.h b/include/jemalloc/internal/background_thread_structs.h index edf90fe..f6ad4ad 100644 --- a/include/jemalloc/internal/background_thread_structs.h +++ b/include/jemalloc/internal/background_thread_structs.h @@ -18,6 +18,8 @@ struct background_thread_info_s { malloc_mutex_t mtx; /* Whether the thread has been created. */ bool started; + /* Pause execution (for arena reset / destroy). */ + bool pause; /* When true, it means no wakeup scheduled. */ atomic_b_t indefinite_sleep; /* Next scheduled wakeup time (absolute time in ns). */ diff --git a/src/background_thread.c b/src/background_thread.c index 51d23cb..f2bc474 100644 --- a/src/background_thread.c +++ b/src/background_thread.c @@ -50,8 +50,6 @@ pthread_create_wrapper(pthread_t *__restrict thread, const pthread_attr_t *attr, bool background_thread_create(tsd_t *tsd, unsigned arena_ind) NOT_REACHED bool background_threads_enable(tsd_t *tsd) NOT_REACHED bool background_threads_disable(tsd_t *tsd) NOT_REACHED -bool background_threads_disable_single(tsd_t *tsd, - background_thread_info_t *info) NOT_REACHED void background_thread_interval_check(tsdn_t *tsdn, arena_t *arena, arena_decay_t *decay, size_t npages_new) NOT_REACHED void background_thread_prefork0(tsdn_t *tsdn) NOT_REACHED @@ -67,8 +65,9 @@ void background_thread_ctl_init(tsdn_t *tsdn) NOT_REACHED static bool background_thread_enabled_at_fork; static void -background_thread_info_reinit(tsdn_t *tsdn, background_thread_info_t *info) { +background_thread_info_init(tsdn_t *tsdn, background_thread_info_t *info) { background_thread_wakeup_time_set(tsdn, info, 0); + info->pause = false; info->npages_to_purge_new = 0; if (config_stats) { info->tot_n_runs = 0; @@ -210,107 +209,227 @@ arena_decay_compute_purge_interval(tsdn_t *tsdn, arena_t *arena) { return i1 < i2 ? i1 : i2; } -static inline uint64_t -background_work_once(tsdn_t *tsdn, unsigned ind) { - arena_t *arena; - unsigned i, narenas; - uint64_t min_interval; +static void +background_thread_sleep(tsdn_t *tsdn, background_thread_info_t *info, + uint64_t interval) { + if (config_stats) { + info->tot_n_runs++; + } + info->npages_to_purge_new = 0; + + struct timeval tv; + /* Specific clock required by timedwait. */ + gettimeofday(&tv, NULL); + nstime_t before_sleep; + nstime_init2(&before_sleep, tv.tv_sec, tv.tv_usec * 1000); + + int ret; + if (interval == BACKGROUND_THREAD_INDEFINITE_SLEEP) { + assert(background_thread_indefinite_sleep(info)); + ret = pthread_cond_wait(&info->cond, &info->mtx.lock); + assert(ret == 0); + } else { + assert(interval >= BACKGROUND_THREAD_MIN_INTERVAL_NS && + interval <= BACKGROUND_THREAD_INDEFINITE_SLEEP); + /* We need malloc clock (can be different from tv). */ + nstime_t next_wakeup; + nstime_init(&next_wakeup, 0); + nstime_update(&next_wakeup); + nstime_iadd(&next_wakeup, interval); + assert(nstime_ns(&next_wakeup) < + BACKGROUND_THREAD_INDEFINITE_SLEEP); + background_thread_wakeup_time_set(tsdn, info, + nstime_ns(&next_wakeup)); + + nstime_t ts_wakeup; + nstime_copy(&ts_wakeup, &before_sleep); + nstime_iadd(&ts_wakeup, interval); + struct timespec ts; + ts.tv_sec = (size_t)nstime_sec(&ts_wakeup); + ts.tv_nsec = (size_t)nstime_nsec(&ts_wakeup); + + assert(!background_thread_indefinite_sleep(info)); + ret = pthread_cond_timedwait(&info->cond, &info->mtx.lock, &ts); + assert(ret == ETIMEDOUT || ret == 0); + background_thread_wakeup_time_set(tsdn, info, + BACKGROUND_THREAD_INDEFINITE_SLEEP); + } + if (config_stats) { + gettimeofday(&tv, NULL); + nstime_t after_sleep; + nstime_init2(&after_sleep, tv.tv_sec, tv.tv_usec * 1000); + if (nstime_compare(&after_sleep, &before_sleep) > 0) { + nstime_subtract(&after_sleep, &before_sleep); + nstime_add(&info->tot_sleep_time, &after_sleep); + } + } + while (info->pause) { + malloc_mutex_unlock(tsdn, &info->mtx); + /* Wait on global lock to update status. */ + malloc_mutex_lock(tsdn, &background_thread_lock); + malloc_mutex_unlock(tsdn, &background_thread_lock); + malloc_mutex_lock(tsdn, &info->mtx); + } +} - min_interval = BACKGROUND_THREAD_INDEFINITE_SLEEP; - narenas = narenas_total_get(); - for (i = ind; i < narenas; i += ncpus) { - arena = arena_get(tsdn, i, false); +static inline void +background_work_sleep_once(tsdn_t *tsdn, background_thread_info_t *info, unsigned ind) { + uint64_t min_interval = BACKGROUND_THREAD_INDEFINITE_SLEEP; + unsigned narenas = narenas_total_get(); + + for (unsigned i = ind; i < narenas; i += ncpus) { + arena_t *arena = arena_get(tsdn, i, false); if (!arena) { continue; } - arena_decay(tsdn, arena, true, false); + if (min_interval == BACKGROUND_THREAD_MIN_INTERVAL_NS) { + /* Min interval will be used. */ + continue; + } uint64_t interval = arena_decay_compute_purge_interval(tsdn, arena); - if (interval == BACKGROUND_THREAD_MIN_INTERVAL_NS) { - return interval; - } - - assert(interval > BACKGROUND_THREAD_MIN_INTERVAL_NS); + assert(interval >= BACKGROUND_THREAD_MIN_INTERVAL_NS); if (min_interval > interval) { min_interval = interval; } } + background_thread_sleep(tsdn, info, min_interval); +} + +static bool +background_threads_disable_single(tsd_t *tsd, background_thread_info_t *info) { + if (info == &background_thread_info[0]) { + malloc_mutex_assert_owner(tsd_tsdn(tsd), + &background_thread_lock); + } else { + malloc_mutex_assert_not_owner(tsd_tsdn(tsd), + &background_thread_lock); + } + + pre_reentrancy(tsd); + malloc_mutex_lock(tsd_tsdn(tsd), &info->mtx); + bool has_thread; + if (info->started) { + has_thread = true; + info->started = false; + pthread_cond_signal(&info->cond); + } else { + has_thread = false; + } + malloc_mutex_unlock(tsd_tsdn(tsd), &info->mtx); + + if (!has_thread) { + post_reentrancy(tsd); + return false; + } + void *ret; + if (pthread_join(info->thread, &ret)) { + post_reentrancy(tsd); + return true; + } + assert(ret == NULL); + n_background_threads--; + post_reentrancy(tsd); - return min_interval; + return false; } +static void *background_thread_entry(void *ind_arg); + static void -background_work(tsdn_t *tsdn, unsigned ind) { - int ret; - background_thread_info_t *info = &background_thread_info[ind]; +check_background_thread_creation(tsd_t *tsd, unsigned *n_created, + bool *created_threads) { + if (likely(*n_created == n_background_threads)) { + return; + } - malloc_mutex_lock(tsdn, &info->mtx); - background_thread_wakeup_time_set(tsdn, info, - BACKGROUND_THREAD_INDEFINITE_SLEEP); - while (info->started) { - uint64_t interval = background_work_once(tsdn, ind); - if (config_stats) { - info->tot_n_runs++; + for (unsigned i = 1; i < ncpus; i++) { + if (created_threads[i]) { + continue; } - info->npages_to_purge_new = 0; + background_thread_info_t *info = &background_thread_info[i]; + malloc_mutex_lock(tsd_tsdn(tsd), &info->mtx); + if (info->started) { + pre_reentrancy(tsd); + int err = pthread_create_wrapper(&info->thread, NULL, + background_thread_entry, (void *)(uintptr_t)i); + post_reentrancy(tsd); + + if (err == 0) { + (*n_created)++; + created_threads[i] = true; + } else { + malloc_printf("<jemalloc>: background thread " + "creation failed (%d)\n", err); + if (opt_abort) { + abort(); + } + } + } + malloc_mutex_unlock(tsd_tsdn(tsd), &info->mtx); + } +} - struct timeval tv; - /* Specific clock required by timedwait. */ - gettimeofday(&tv, NULL); - nstime_t before_sleep; - nstime_init2(&before_sleep, tv.tv_sec, tv.tv_usec * 1000); +static void +background_thread0_work(tsd_t *tsd) { + /* Thread0 is also responsible for launching / terminating threads. */ + VARIABLE_ARRAY(bool, created_threads, ncpus); + unsigned i; + for (i = 1; i < ncpus; i++) { + created_threads[i] = false; + } + /* Start working, and create more threads when asked. */ + unsigned n_created = 1; + while (background_thread_info[0].started) { + check_background_thread_creation(tsd, &n_created, + (bool *)&created_threads); + background_work_sleep_once(tsd_tsdn(tsd), + &background_thread_info[0], 0); + } - if (interval == BACKGROUND_THREAD_INDEFINITE_SLEEP) { - assert(background_thread_indefinite_sleep(info)); - ret = pthread_cond_wait(&info->cond, &info->mtx.lock); - assert(ret == 0); + /* + * Shut down other threads at exit. Note that the ctl thread is holding + * the global background_thread mutex (and is waiting) for us. + */ + assert(!background_thread_enabled()); + for (i = 1; i < ncpus; i++) { + background_thread_info_t *info = &background_thread_info[i]; + if (created_threads[i]) { + background_threads_disable_single(tsd, info); } else { - assert(interval >= BACKGROUND_THREAD_MIN_INTERVAL_NS && - interval <= BACKGROUND_THREAD_INDEFINITE_SLEEP); - /* We need malloc clock (can be different from tv). */ - nstime_t next_wakeup; - nstime_init(&next_wakeup, 0); - nstime_update(&next_wakeup); - nstime_iadd(&next_wakeup, interval); - assert(nstime_ns(&next_wakeup) < - BACKGROUND_THREAD_INDEFINITE_SLEEP); - background_thread_wakeup_time_set(tsdn, info, - nstime_ns(&next_wakeup)); - - nstime_t ts_wakeup; - nstime_copy(&ts_wakeup, &before_sleep); - nstime_iadd(&ts_wakeup, interval); - struct timespec ts; - ts.tv_sec = (size_t)nstime_sec(&ts_wakeup); - ts.tv_nsec = (size_t)nstime_nsec(&ts_wakeup); - - assert(!background_thread_indefinite_sleep(info)); - ret = pthread_cond_timedwait(&info->cond, - &info->mtx.lock, &ts); - assert(ret == ETIMEDOUT || ret == 0); - background_thread_wakeup_time_set(tsdn, info, - BACKGROUND_THREAD_INDEFINITE_SLEEP); + malloc_mutex_lock(tsd_tsdn(tsd), &info->mtx); + /* Clear in case the thread wasn't created. */ + info->started = false; + malloc_mutex_unlock(tsd_tsdn(tsd), &info->mtx); } + } + background_thread_info[0].started = false; + assert(n_background_threads == 1); +} - if (config_stats) { - gettimeofday(&tv, NULL); - nstime_t after_sleep; - nstime_init2(&after_sleep, tv.tv_sec, tv.tv_usec * 1000); - if (nstime_compare(&after_sleep, &before_sleep) > 0) { - nstime_subtract(&after_sleep, &before_sleep); - nstime_add(&info->tot_sleep_time, &after_sleep); - } +static void +background_work(tsd_t *tsd, unsigned ind) { + background_thread_info_t *info = &background_thread_info[ind]; + + malloc_mutex_lock(tsd_tsdn(tsd), &info->mtx); + background_thread_wakeup_time_set(tsd_tsdn(tsd), info, + BACKGROUND_THREAD_INDEFINITE_SLEEP); + if (ind == 0) { + background_thread0_work(tsd); + } else { + while (info->started) { + background_work_sleep_once(tsd_tsdn(tsd), info, ind); } } - background_thread_wakeup_time_set(tsdn, info, 0); - malloc_mutex_unlock(tsdn, &info->mtx); + background_thread_wakeup_time_set(tsd_tsdn(tsd), info, 0); + malloc_mutex_unlock(tsd_tsdn(tsd), &info->mtx); } static void * background_thread_entry(void *ind_arg) { unsigned thread_ind = (unsigned)(uintptr_t)ind_arg; - assert(thread_ind < narenas_total_get() && thread_ind < ncpus); + assert(thread_ind < ncpus); if (opt_percpu_arena != percpu_arena_disabled) { set_current_thread_affinity((int)thread_ind); @@ -320,13 +439,21 @@ background_thread_entry(void *ind_arg) { * side effects, for example triggering new arena creation (which in * turn triggers another background thread creation). */ - background_work(tsd_tsdn(tsd_internal_fetch()), thread_ind); + background_work(tsd_internal_fetch(), thread_ind); assert(pthread_equal(pthread_self(), background_thread_info[thread_ind].thread)); return NULL; } +static void +background_thread_init(tsd_t *tsd, background_thread_info_t *info) { + malloc_mutex_assert_owner(tsd_tsdn(tsd), &background_thread_lock); + info->started = true; + background_thread_info_init(tsd_tsdn(tsd), info); + n_background_threads++; +} + /* Create a new background thread if needed. */ bool background_thread_create(tsd_t *tsd, unsigned arena_ind) { @@ -341,30 +468,35 @@ background_thread_create(tsd_t *tsd, unsigned arena_ind) { malloc_mutex_lock(tsd_tsdn(tsd), &info->mtx); need_new_thread = background_thread_enabled() && !info->started; if (need_new_thread) { - info->started = true; - background_thread_info_reinit(tsd_tsdn(tsd), info); - n_background_threads++; + background_thread_init(tsd, info); } malloc_mutex_unlock(tsd_tsdn(tsd), &info->mtx); if (!need_new_thread) { return false; } + if (arena_ind != 0) { + /* Threads are created asynchronously by Thread 0. */ + background_thread_info_t *t0 = &background_thread_info[0]; + malloc_mutex_lock(tsd_tsdn(tsd), &t0->mtx); + assert(t0->started); + pthread_cond_signal(&t0->cond); + malloc_mutex_unlock(tsd_tsdn(tsd), &t0->mtx); + + return false; + } pre_reentrancy(tsd); - malloc_mutex_unlock(tsd_tsdn(tsd), &background_thread_lock); /* * To avoid complications (besides reentrancy), create internal - * background threads with the underlying pthread_create, and drop - * background_thread_lock (pthread_create may take internal locks). + * background threads with the underlying pthread_create. */ int err = pthread_create_wrapper(&info->thread, NULL, background_thread_entry, (void *)thread_ind); - malloc_mutex_lock(tsd_tsdn(tsd), &background_thread_lock); post_reentrancy(tsd); if (err != 0) { - malloc_printf("<jemalloc>: arena %u background thread creation " - "failed (%d).\n", arena_ind, err); + malloc_printf("<jemalloc>: arena 0 background thread creation " + "failed (%d)\n", err); malloc_mutex_lock(tsd_tsdn(tsd), &info->mtx); info->started = false; n_background_threads--; @@ -372,7 +504,6 @@ background_thread_create(tsd_t *tsd, unsigned arena_ind) { return true; } - assert(info->started); return false; } @@ -383,72 +514,43 @@ background_threads_enable(tsd_t *tsd) { assert(background_thread_enabled()); malloc_mutex_assert_owner(tsd_tsdn(tsd), &background_thread_lock); - VARIABLE_ARRAY(bool, created, ncpus); - unsigned i, ncreated; + VARIABLE_ARRAY(bool, marked, ncpus); + unsigned i, nmarked; for (i = 0; i < ncpus; i++) { - created[i] = false; + marked[i] = false; } - ncreated = 0; - + nmarked = 0; + /* Mark the threads we need to create for thread 0. */ unsigned n = narenas_total_get(); - for (i = 0; i < n; i++) { - if (created[i % ncpus] || + for (i = 1; i < n; i++) { + if (marked[i % ncpus] || arena_get(tsd_tsdn(tsd), i, false) == NULL) { continue; } - if (background_thread_create(tsd, i)) { - return true; - } - created[i % ncpus] = true; - if (++ncreated == ncpus) { + background_thread_info_t *info = &background_thread_info[i]; + malloc_mutex_lock(tsd_tsdn(tsd), &info->mtx); + assert(!info->started); + background_thread_init(tsd, info); + malloc_mutex_unlock(tsd_tsdn(tsd), &info->mtx); + marked[i % ncpus] = true; + if (++nmarked == ncpus) { break; } } - return false; + return background_thread_create(tsd, 0); } bool -background_threads_disable_single(tsd_t *tsd, background_thread_info_t *info) { +background_threads_disable(tsd_t *tsd) { + assert(!background_thread_enabled()); malloc_mutex_assert_owner(tsd_tsdn(tsd), &background_thread_lock); - pre_reentrancy(tsd); - bool has_thread; - malloc_mutex_lock(tsd_tsdn(tsd), &info->mtx); - if (info->started) { - has_thread = true; - info->started = false; - pthread_cond_signal(&info->cond); - } else { - has_thread = false; - } - malloc_mutex_unlock(tsd_tsdn(tsd), &info->mtx); - - if (!has_thread) { - post_reentrancy(tsd); - return false; - } - void *ret; - if (pthread_join(info->thread, &ret)) { - post_reentrancy(tsd); + /* Thread 0 will be responsible for terminating other threads. */ + if (background_threads_disable_single(tsd, + &background_thread_info[0])) { return true; } - assert(ret == NULL); - n_background_threads--; - post_reentrancy(tsd); - - return false; -} - -bool -background_threads_disable(tsd_t *tsd) { - assert(!background_thread_enabled()); - for (unsigned i = 0; i < ncpus; i++) { - background_thread_info_t *info = &background_thread_info[i]; - if (background_threads_disable_single(tsd, info)) { - return true; - } - } assert(n_background_threads == 0); return false; @@ -460,7 +562,6 @@ background_thread_interval_check(tsdn_t *tsdn, arena_t *arena, arena_decay_t *decay, size_t npages_new) { background_thread_info_t *info = arena_background_thread_info_get( arena); - if (malloc_mutex_trylock(tsdn, &info->mtx)) { /* * Background thread may hold the mutex for a long period of @@ -474,7 +575,6 @@ background_thread_interval_check(tsdn_t *tsdn, arena_t *arena, if (!info->started) { goto label_done; } - assert(background_thread_enabled()); if (malloc_mutex_trylock(tsdn, &decay->mtx)) { goto label_done; } @@ -646,7 +746,7 @@ bool background_thread_boot0(void) { if (!have_background_thread && opt_background_thread) { malloc_printf("<jemalloc>: option background_thread currently " - "supports pthread only. \n"); + "supports pthread only\n"); return true; } @@ -686,9 +786,10 @@ background_thread_boot1(tsdn_t *tsdn) { for (unsigned i = 0; i < ncpus; i++) { background_thread_info_t *info = &background_thread_info[i]; + /* Thread mutex is rank_inclusive because of thread0. */ if (malloc_mutex_init(&info->mtx, "background_thread", WITNESS_RANK_BACKGROUND_THREAD, - malloc_mutex_rank_exclusive)) { + malloc_mutex_address_ordered)) { return true; } if (pthread_cond_init(&info->cond, NULL)) { @@ -696,7 +797,7 @@ background_thread_boot1(tsdn_t *tsdn) { } malloc_mutex_lock(tsdn, &info->mtx); info->started = false; - background_thread_info_reinit(tsdn, info); + background_thread_info_init(tsdn, info); malloc_mutex_unlock(tsdn, &info->mtx); } #endif @@ -1948,8 +1948,10 @@ arena_reset_prepare_background_thread(tsd_t *tsd, unsigned arena_ind) { unsigned ind = arena_ind % ncpus; background_thread_info_t *info = &background_thread_info[ind]; - assert(info->started); - background_threads_disable_single(tsd, info); + assert(info->started && !info->pause); + malloc_mutex_lock(tsd_tsdn(tsd), &info->mtx); + info->pause = true; + malloc_mutex_unlock(tsd_tsdn(tsd), &info->mtx); } } } @@ -1961,8 +1963,10 @@ arena_reset_finish_background_thread(tsd_t *tsd, unsigned arena_ind) { unsigned ind = arena_ind % ncpus; background_thread_info_t *info = &background_thread_info[ind]; - assert(!info->started); - background_thread_create(tsd, ind); + assert(info->started && info->pause); + malloc_mutex_lock(tsd_tsdn(tsd), &info->mtx); + info->pause = false; + malloc_mutex_unlock(tsd_tsdn(tsd), &info->mtx); } malloc_mutex_unlock(tsd_tsdn(tsd), &background_thread_lock); } |