diff options
Diffstat (limited to 'src/H5FDioc_threads.c')
-rw-r--r-- | src/H5FDioc_threads.c | 382 |
1 files changed, 203 insertions, 179 deletions
diff --git a/src/H5FDioc_threads.c b/src/H5FDioc_threads.c index 6a4536a..1db8c62 100644 --- a/src/H5FDioc_threads.c +++ b/src/H5FDioc_threads.c @@ -26,10 +26,10 @@ * use mercury for that purpose... */ -static hg_thread_mutex_t ioc_mutex = PTHREAD_MUTEX_INITIALIZER; +static hg_thread_mutex_t ioc_mutex = PTHREAD_MUTEX_INITIALIZER; static hg_thread_mutex_t ioc_thread_mutex = PTHREAD_MUTEX_INITIALIZER; -static hg_thread_pool_t *ioc_thread_pool = NULL; -static hg_thread_t ioc_thread; +static hg_thread_pool_t *ioc_thread_pool = NULL; +static hg_thread_t ioc_thread; #ifndef HG_TEST_NUM_THREADS_DEFAULT #define HG_TEST_NUM_THREADS_DEFAULT 4 @@ -37,8 +37,8 @@ static hg_thread_t ioc_thread; extern int ioc_main(int64_t context_id); -static int pool_concurrent_max = 0; -static struct hg_thread_work *pool_request = NULL; +static int pool_concurrent_max = 0; +static struct hg_thread_work *pool_request = NULL; /*------------------------------------------------------------------------- * Function: local ioc_thread_main @@ -58,16 +58,18 @@ static struct hg_thread_work *pool_request = NULL; * *------------------------------------------------------------------------- */ -static HG_THREAD_RETURN_TYPE ioc_thread_main(void *arg) { - int64_t *context_id = (int64_t *)arg; - hg_thread_ret_t thread_ret = (hg_thread_ret_t)0; +static HG_THREAD_RETURN_TYPE +ioc_thread_main(void *arg) +{ + int64_t * context_id = (int64_t *)arg; + hg_thread_ret_t thread_ret = (hg_thread_ret_t)0; - /* Pass along the subfiling_context_t */ - ioc_main(context_id[0]); + /* Pass along the subfiling_context_t */ + ioc_main(context_id[0]); - /* Upon exit, we can free the input arg */ - free(arg); - return thread_ret; + /* Upon exit, we can free the input arg */ + free(arg); + return thread_ret; } /*------------------------------------------------------------------------- @@ -90,100 +92,103 @@ static HG_THREAD_RETURN_TYPE ioc_thread_main(void *arg) { * *------------------------------------------------------------------------- */ -int initialize_ioc_threads(void *_sf_context) { - int status; - int file_open_count; - subfiling_context_t *sf_context = _sf_context; - unsigned int thread_pool_count = HG_TEST_NUM_THREADS_DEFAULT; - int64_t *context_id = (int64_t *)malloc(sizeof(int64_t)); - int world_size = sf_context->topology->app_layout->world_size; - size_t alloc_size = ((size_t)world_size * sizeof(struct hg_thread_work)); - char *envValue; - double t_start = 0.0, t_end = 0.0; - - assert(context_id != NULL); - - file_open_count = atomic_load(&sf_file_open_count); - atomic_fetch_add(&sf_file_open_count, 1); - - if (file_open_count > 0) - return 0; +int +initialize_ioc_threads(void *_sf_context) +{ + int status; + int file_open_count; + subfiling_context_t *sf_context = _sf_context; + unsigned int thread_pool_count = HG_TEST_NUM_THREADS_DEFAULT; + int64_t * context_id = (int64_t *)malloc(sizeof(int64_t)); + int world_size = sf_context->topology->app_layout->world_size; + size_t alloc_size = ((size_t)world_size * sizeof(struct hg_thread_work)); + char * envValue; + double t_start = 0.0, t_end = 0.0; + + assert(context_id != NULL); + + file_open_count = atomic_load(&sf_file_open_count); + atomic_fetch_add(&sf_file_open_count, 1); + + if (file_open_count > 0) + return 0; + + t_start = MPI_Wtime(); + + /* Initialize the main IOC thread input argument. + * Each IOC request will utilize this context_id which is + * consistent across all MPI ranks, to ensure that requests + * involving reference counting are correctly using the + * correct file contexts. + */ + context_id[0] = sf_context->sf_context_id; + + if (pool_request == NULL) { + if ((pool_request = (struct hg_thread_work *)malloc(alloc_size)) == NULL) { + perror("malloc error"); + return -1; + } + else + pool_concurrent_max = world_size; + } + + memset(pool_request, 0, alloc_size); + + /* Initialize a couple of mutex variables that are used + * during IO concentrator operations to serialize + * access to key objects, e.g. reference counting. + */ + status = hg_thread_mutex_init(&ioc_mutex); + if (status) { + puts("hg_thread_mutex_init failed"); + goto err_exit; + } + status = hg_thread_mutex_init(&ioc_thread_mutex); + if (status) { + puts("hg_thread_mutex_init failed"); + goto err_exit; + } - t_start = MPI_Wtime(); - - /* Initialize the main IOC thread input argument. - * Each IOC request will utilize this context_id which is - * consistent across all MPI ranks, to ensure that requests - * involving reference counting are correctly using the - * correct file contexts. - */ - context_id[0] = sf_context->sf_context_id; - - if (pool_request == NULL) { - if ((pool_request = (struct hg_thread_work *)malloc(alloc_size)) == NULL) { - perror("malloc error"); - return -1; - } else - pool_concurrent_max = world_size; - } - - memset(pool_request, 0, alloc_size); - - /* Initialize a couple of mutex variables that are used - * during IO concentrator operations to serialize - * access to key objects, e.g. reference counting. - */ - status = hg_thread_mutex_init(&ioc_mutex); - if (status) { - puts("hg_thread_mutex_init failed"); - goto err_exit; - } - status = hg_thread_mutex_init(&ioc_thread_mutex); - if (status) { - puts("hg_thread_mutex_init failed"); - goto err_exit; - } - - /* Allow experimentation with the number of helper threads */ - if ((envValue = getenv("IOC_THREAD_POOL_COUNT")) != NULL) { - int value_check = atoi(envValue); - if (value_check > 0) { - thread_pool_count = (unsigned int)value_check; + /* Allow experimentation with the number of helper threads */ + if ((envValue = getenv("IOC_THREAD_POOL_COUNT")) != NULL) { + int value_check = atoi(envValue); + if (value_check > 0) { + thread_pool_count = (unsigned int)value_check; + } + } + + /* Initialize a thread pool for the IO Concentrator to use */ + status = hg_thread_pool_init(thread_pool_count, &ioc_thread_pool); + if (status) { + puts("hg_thread_pool_init failed"); + goto err_exit; + } + + /* Arguments to hg_thread_create are: + * 1. A pointer to reference the created thread. + * 2. User function pointer for the new thread to execute. + * 3. Pointer to the input argument that gets passed along to the user + * function. + */ + status = hg_thread_create(&ioc_thread, ioc_thread_main, context_id); + if (status) { + puts("hg_thread_create failed"); + goto err_exit; } - } - - /* Initialize a thread pool for the IO Concentrator to use */ - status = hg_thread_pool_init(thread_pool_count, &ioc_thread_pool); - if (status) { - puts("hg_thread_pool_init failed"); - goto err_exit; - } - - /* Arguments to hg_thread_create are: - * 1. A pointer to reference the created thread. - * 2. User function pointer for the new thread to execute. - * 3. Pointer to the input argument that gets passed along to the user - * function. - */ - status = hg_thread_create(&ioc_thread, ioc_thread_main, context_id); - if (status) { - puts("hg_thread_create failed"); - goto err_exit; - } #ifndef NDEBUG - t_end = MPI_Wtime(); - if (sf_verbose_flag) { - if (sf_context->topology->subfile_rank == 0) { - HDprintf("%s: time = %lf seconds\n", __func__, (t_end - t_start)); - HDfflush(stdout); + t_end = MPI_Wtime(); + if (sf_verbose_flag) { + if (sf_context->topology->subfile_rank == 0) { + HDprintf("%s: time = %lf seconds\n", __func__, (t_end - t_start)); + HDfflush(stdout); + } } - } #endif - return 0; + return 0; err_exit: - return -1; + return -1; } /*------------------------------------------------------------------------- @@ -202,30 +207,38 @@ err_exit: * *------------------------------------------------------------------------- */ -void __attribute__((destructor)) finalize_ioc_threads(void) { - if (ioc_thread_pool != NULL) { - hg_thread_pool_destroy(ioc_thread_pool); - ioc_thread_pool = NULL; - } +void __attribute__((destructor)) finalize_ioc_threads(void) +{ + if (ioc_thread_pool != NULL) { + hg_thread_pool_destroy(ioc_thread_pool); + ioc_thread_pool = NULL; + } } -static const char *translate_opcode(io_op_t op) +static const char * +translate_opcode(io_op_t op) { - switch(op) { - case READ_OP: return "READ_OP"; - break; - case WRITE_OP: return "WRITE_OP"; - break; - case OPEN_OP: return "OPEN_OP"; - break; - case CLOSE_OP: return "CLOSE_OP"; - break; - case FINI_OP: return "FINI_OP"; - break; - case LOGGING_OP: return "LOGGING_OP"; - break; - } - return "unknown"; + switch (op) { + case READ_OP: + return "READ_OP"; + break; + case WRITE_OP: + return "WRITE_OP"; + break; + case OPEN_OP: + return "OPEN_OP"; + break; + case CLOSE_OP: + return "CLOSE_OP"; + break; + case FINI_OP: + return "FINI_OP"; + break; + case LOGGING_OP: + return "LOGGING_OP"; + break; + } + return "unknown"; } /*------------------------------------------------------------------------- * Function: local: handle_work_request @@ -249,45 +262,44 @@ static const char *translate_opcode(io_op_t op) * *------------------------------------------------------------------------- */ -static HG_THREAD_RETURN_TYPE handle_work_request(void *arg) +static HG_THREAD_RETURN_TYPE +handle_work_request(void *arg) { - int status = 0; - hg_thread_ret_t ret = 0; - sf_work_request_t *msg = (sf_work_request_t *)arg; - int64_t file_context_id = msg->header[2]; - subfiling_context_t *sf_context = NULL; - - sf_context = get__subfiling_object(file_context_id); - assert(sf_context != NULL); - - atomic_fetch_add(&sf_work_pending, 1); // atomic - switch (msg->tag) { - case WRITE_INDEP: - status = queue_write_indep(msg, msg->subfile_rank, msg->source, - sf_context->sf_data_comm); - break; - case READ_INDEP: - status = queue_read_indep(msg, msg->subfile_rank, msg->source, - sf_context->sf_data_comm); - break; - default: - HDprintf("[ioc(%d)] received message tag(%x)from rank %d\n", - msg->subfile_rank, msg->tag, msg->source); - status = -1; - break; - } - fflush(stdout); - - atomic_fetch_sub(&sf_work_pending, 1); // atomic - if (status < 0) { - HDprintf("[ioc(%d) %s]: request(%s) filename=%s from " - "rank(%d), size=%ld, offset=%ld FAILED\n", - msg->subfile_rank, __func__, translate_opcode((io_op_t)msg->tag), - sf_context->filename, msg->source, msg->header[0], msg->header[1]); - + int status = 0; + hg_thread_ret_t ret = 0; + sf_work_request_t * msg = (sf_work_request_t *)arg; + int64_t file_context_id = msg->header[2]; + subfiling_context_t *sf_context = NULL; + + sf_context = get__subfiling_object(file_context_id); + assert(sf_context != NULL); + + atomic_fetch_add(&sf_work_pending, 1); // atomic + switch (msg->tag) { + case WRITE_INDEP: + status = queue_write_indep(msg, msg->subfile_rank, msg->source, sf_context->sf_data_comm); + break; + case READ_INDEP: + status = queue_read_indep(msg, msg->subfile_rank, msg->source, sf_context->sf_data_comm); + break; + default: + HDprintf("[ioc(%d)] received message tag(%x)from rank %d\n", msg->subfile_rank, msg->tag, + msg->source); + status = -1; + break; + } fflush(stdout); - } - return ret; + + atomic_fetch_sub(&sf_work_pending, 1); // atomic + if (status < 0) { + HDprintf("[ioc(%d) %s]: request(%s) filename=%s from " + "rank(%d), size=%ld, offset=%ld FAILED\n", + msg->subfile_rank, __func__, translate_opcode((io_op_t)msg->tag), sf_context->sf_filename, + msg->source, msg->header[0], msg->header[1]); + + fflush(stdout); + } + return ret; } /*------------------------------------------------------------------------- @@ -306,18 +318,20 @@ static HG_THREAD_RETURN_TYPE handle_work_request(void *arg) * *------------------------------------------------------------------------- */ -int tpool_add_work(void *_work) { - static int work_index = 0; - sf_work_request_t *work = (sf_work_request_t *)_work; - - hg_thread_mutex_lock(&ioc_mutex); - if (work_index == pool_concurrent_max) - work_index = 0; - pool_request[work_index].func = handle_work_request; - pool_request[work_index].args = work; - hg_thread_pool_post(ioc_thread_pool, &pool_request[work_index++]); - hg_thread_mutex_unlock(&ioc_mutex); - return 0; +int +tpool_add_work(void *_work) +{ + static int work_index = 0; + sf_work_request_t *work = (sf_work_request_t *)_work; + + hg_thread_mutex_lock(&ioc_mutex); + if (work_index == pool_concurrent_max) + work_index = 0; + pool_request[work_index].func = handle_work_request; + pool_request[work_index].args = work; + hg_thread_pool_post(ioc_thread_pool, &pool_request[work_index++]); + hg_thread_mutex_unlock(&ioc_mutex); + return 0; } /*------------------------------------------------------------------------- @@ -335,7 +349,11 @@ int tpool_add_work(void *_work) { * *------------------------------------------------------------------------- */ -bool tpool_is_empty(void) { return HG_QUEUE_IS_EMPTY(&ioc_thread_pool->queue); } +bool +tpool_is_empty(void) +{ + return HG_QUEUE_IS_EMPTY(&ioc_thread_pool->queue); +} /*------------------------------------------------------------------------- * Function: begin_thread_exclusive @@ -351,8 +369,10 @@ bool tpool_is_empty(void) { return HG_QUEUE_IS_EMPTY(&ioc_thread_pool->queue); } * *------------------------------------------------------------------------- */ -void begin_thread_exclusive(void) { - hg_thread_mutex_lock(&ioc_thread_mutex); +void +begin_thread_exclusive(void) +{ + hg_thread_mutex_lock(&ioc_thread_mutex); } /*------------------------------------------------------------------------- @@ -370,8 +390,10 @@ void begin_thread_exclusive(void) { * *------------------------------------------------------------------------- */ -void end_thread_exclusive(void) { - hg_thread_mutex_unlock(&ioc_thread_mutex); +void +end_thread_exclusive(void) +{ + hg_thread_mutex_unlock(&ioc_thread_mutex); } /*------------------------------------------------------------------------- @@ -389,9 +411,11 @@ void end_thread_exclusive(void) { * *------------------------------------------------------------------------- */ -int wait_for_thread_main(void) { - if (hg_thread_join(ioc_thread) != 0) { - return -1; - } - return 0; +int +wait_for_thread_main(void) +{ + if (hg_thread_join(ioc_thread) != 0) { + return -1; + } + return 0; } |