From ecb5c6e670fa9b895c4a61d2dc569e5312d6c824 Mon Sep 17 00:00:00 2001 From: Richard Warren Date: Thu, 1 Oct 2020 10:49:11 -0400 Subject: Changed the usleep parameters based on whether the MPI rank hosts an IO Concentrator --- src/H5FDsubfile_mpi.c | 110 ++++++++++++++++++++++++++-------------------- src/H5FDsubfile_threads.c | 38 ++++++++-------- 2 files changed, 81 insertions(+), 67 deletions(-) diff --git a/src/H5FDsubfile_mpi.c b/src/H5FDsubfile_mpi.c index 6774536..0d8fc60 100644 --- a/src/H5FDsubfile_mpi.c +++ b/src/H5FDsubfile_mpi.c @@ -807,7 +807,7 @@ record_fid_to_subfile(hid_t fid, hid_t subfile_context_id, int *next_index) if (index == sf_file_map_size) { int i; sf_open_file_map = realloc(sf_open_file_map, - ((size_t)(sf_file_map_size * 2) * sizeof(file_map_to_context_t))); + ((size_t)(sf_file_map_size * 2) * sizeof(file_map_to_context_t))); if (sf_open_file_map == NULL) { perror("realloc"); return FAIL; @@ -1192,14 +1192,21 @@ read__independent(int n_io_concentrators, hid_t context_id, int64_t offset, int64_t ioc_read_datasize[n_io_concentrators]; int64_t ioc_read_offset[n_io_concentrators]; MPI_Datatype ioc_read_type[n_io_concentrators]; - useconds_t delay = 50; + useconds_t delay = 100; subfiling_context_t *sf_context = get_subfiling_object(context_id); assert(sf_context != NULL); + /* Note that the sf_write_count is only tracked by an IOC rank */ if (sf_context->sf_write_count && (sf_context->sf_fid > 0)) { fdatasync(sf_context->sf_fid); + + /* We can attempt to give the IOC more compute time + * if we extend out delaying tactic when awaiting + * responses. + */ + delay *= sf_context->topology->world_size; } io_concentrator = sf_context->topology->io_concentrator; @@ -1230,12 +1237,12 @@ read__independent(int n_io_concentrators, hid_t context_id, int64_t offset, #ifndef NDEBUG if (sf_verbose_flag) { - fprintf(stdout, + fprintf(stdout, "[%d %s] Requesting %ld read bytes from IOC(%d): " "sourceOffset=%ld subfile_offset=%ld\n", sf_world_rank, __func__, msg[0], io_concentrator[ioc], - sourceOffset, msg[1]); - fflush(stdout); + sourceOffset, msg[1]); + fflush(stdout); } #endif @@ -1290,12 +1297,12 @@ read__independent(int n_io_concentrators, hid_t context_id, int64_t offset, for (i = 0; i < ready; i++) { #ifndef NDEBUG if (sf_verbose_flag) { - fprintf(stdout, + fprintf(stdout, "[%d] READ bytes(%ld) of data from ioc_concentrator %d " "complete\n", sf_world_rank, ioc_read_datasize[indices[i]], indices[i]); - fflush(stdout); + fflush(stdout); } #endif if (ioc_read_type[indices[i]] != MPI_BYTE) { @@ -1303,8 +1310,8 @@ read__independent(int n_io_concentrators, hid_t context_id, int64_t offset, } n_waiting--; } - if (n_waiting) - usleep(delay); + if (n_waiting) + usleep(delay); } return status; } @@ -1438,16 +1445,23 @@ write__independent(int n_io_concentrators, hid_t context_id, int64_t offset, int64_t ioc_write_datasize[n_io_concentrators]; int64_t ioc_write_offset[n_io_concentrators]; MPI_Datatype ioc_write_type[n_io_concentrators]; - useconds_t delay = 50; - subfiling_context_t *sf_context = get_subfiling_object(context_id); - int i, target, ioc, n_waiting = 0, status = 0; - int errors = 0; + int n_waiting = 0, status = 0, errors = 0; + int i, target, ioc; + useconds_t delay = 100; + subfiling_context_t *sf_context = get_subfiling_object(context_id); + assert(sf_context); io_concentrator = sf_context->topology->io_concentrator; if (sf_context->topology->rank_is_ioc) { sf_context->sf_write_count++; + + /* We can attempt to give the IOC more compute time + * if we extend out delaying tactic when awaiting + * responses. + */ + delay *= sf_context->topology->world_size; } /* The following function will initialize the collection of IO transfer @@ -1498,15 +1512,15 @@ write__independent(int n_io_concentrators, hid_t context_id, int64_t offset, } #ifndef NDEBUG - if (sf_verbose_flag) - { - fprintf(stdout, - "[%d %s]: write_dest[ioc(%d), " - "sourceOffset=%ld, datasize=%ld, foffset=%ld]\n", - sf_world_rank, __func__, ioc, sourceOffset, - ioc_write_datasize[ioc], ioc_write_offset[ioc]); - fflush(stdout); - } + if (sf_verbose_flag) + { + fprintf(stdout, + "[%d %s]: write_dest[ioc(%d), " + "sourceOffset=%ld, datasize=%ld, foffset=%ld]\n", + sf_world_rank, __func__, ioc, sourceOffset, + ioc_write_datasize[ioc], ioc_write_offset[ioc]); + fflush(stdout); + } #endif @@ -1603,8 +1617,8 @@ write__independent(int n_io_concentrators, hid_t context_id, int64_t offset, } n_waiting--; } - if (n_waiting) - usleep(delay); + if (n_waiting) + usleep(delay); } if (errors) return -1; @@ -2021,8 +2035,8 @@ sf_open_subfiles(hid_t fid, char *filename, char *prefix, int flags) assert(sf_context != NULL); sf_context->sf_context_id = context_id; - sf_context->h5_file_id = fid; - sf_context->filename = strdup(filename); + sf_context->h5_file_id = fid; + sf_context->filename = strdup(filename); sf_shutdown_flag = 0; return open__subfiles(sf_context, sf_context->topology->n_io_concentrators, @@ -2162,9 +2176,9 @@ ioc_main(int64_t context_id) atomic_init(&sf_file_refcount, 0); atomic_init(&sf_ioc_fini_refcount, 0); - sf_open_file_count = 0; - sf_close_file_count = 0; - sf_ops_after_first_close = 0; + sf_open_file_count = 0; + sf_close_file_count = 0; + sf_ops_after_first_close = 0; while (!sf_shutdown_flag || sf_work_pending) { flag = 0; @@ -2213,7 +2227,7 @@ ioc_main(int64_t context_id) #ifndef NDEBUG if (sf_logfile) { fclose(sf_logfile); - sf_logfile = NULL; + sf_logfile = NULL; } #endif @@ -2553,7 +2567,7 @@ queue_file_open( { int ret, errors = 0; int flags = (int) (msg->header[0] & 0x0ffffffff); - // int open_count; + // int open_count; atomic_fetch_add(&sf_file_refcount, 1); // atomic #ifndef NDEBUG if (sf_verbose_flag) { @@ -2565,15 +2579,15 @@ queue_file_open( } #endif errors = subfiling_open_file(msg, sf_subfile_prefix, subfile_rank, flags); - // open_count = atomic_load(&sf_file_refcount); + // open_count = atomic_load(&sf_file_refcount); - ret = MPI_Send(&errors, 1, MPI_INT, source, COMPLETED, comm); - if (ret != MPI_SUCCESS) { - printf("[ioc(%d)] MPI_Send FILE_OPEN, COMPLETED to source(%d) FAILED\n", - subfile_rank, source); - fflush(stdout); - errors++; - } + ret = MPI_Send(&errors, 1, MPI_INT, source, COMPLETED, comm); + if (ret != MPI_SUCCESS) { + printf("[ioc(%d)] MPI_Send FILE_OPEN, COMPLETED to source(%d) FAILED\n", + subfile_rank, source); + fflush(stdout); + errors++; + } if (errors) { #ifndef NDEBUG @@ -2657,7 +2671,7 @@ int subfiling_close_file(int subfile_rank, int *fid, MPI_Comm comm) { int errors = 0; - int subfile_fid = *fid; + int subfile_fid = *fid; if (subfile_fid >= 0) { if (fdatasync(subfile_fid) < 0) { @@ -2704,12 +2718,12 @@ int subfiling_shutdown(int subfile_rank, int *fid, MPI_Comm comm) { int ret, source = 0; - int subfile_fid = *fid; + int subfile_fid = *fid; int errors = 0, flag = COMPLETED; if (subfile_fid >= 0) { if (close(subfile_fid) < 0) { perror("subfiling_close_file"); - printf("subfile_fid = %d\n", subfile_fid); + printf("subfile_fid = %d\n", subfile_fid); errors++; } *fid = -1; @@ -2828,7 +2842,7 @@ subfiling_open_file( char filepath[PATH_MAX]; char config[PATH_MAX]; int subfile_fid; - int64_t h5_file_id = msg->header[1]; + int64_t h5_file_id = msg->header[1]; int64_t file_context_id = msg->header[2]; subfiling_context_t *sf_context = get_subfiling_object(file_context_id); assert(sf_context != NULL); @@ -2885,16 +2899,16 @@ subfiling_open_file( sprintf( linebuf, "aggregator_count=%d\n", n_io_concentrators); fwrite(linebuf, strlen(linebuf), 1, f); - sprintf(linebuf,"hdf5_file=%s\n", context->filename); + sprintf(linebuf,"hdf5_file=%s\n", context->filename); fwrite(linebuf, strlen(linebuf), 1, f); for (k = 0; k < n_io_concentrators; k++) { - if (prefix) + if (prefix) sprintf(linebuf, "%s/%ld_node_local_temp_%d_of_%d:%d", prefix, - h5_file_id, subfile_rank, n_io_concentrators, io_concentrator[k]); + h5_file_id, subfile_rank, n_io_concentrators, io_concentrator[k]); else - sprintf(linebuf, "%ld_node_local_temp_%d_of_%d:%d", h5_file_id, - subfile_rank, n_io_concentrators, io_concentrator[k]); + sprintf(linebuf, "%ld_node_local_temp_%d_of_%d:%d", h5_file_id, + subfile_rank, n_io_concentrators, io_concentrator[k]); fwrite(linebuf, strlen(linebuf), 1, f); } diff --git a/src/H5FDsubfile_threads.c b/src/H5FDsubfile_threads.c index 8317009..0cdb5c4 100644 --- a/src/H5FDsubfile_threads.c +++ b/src/H5FDsubfile_threads.c @@ -105,11 +105,11 @@ int initialize_ioc_threads(subfiling_context_t *sf_context) { int status; - unsigned int thread_pool_count = HG_TEST_NUM_THREADS_DEFAULT; + unsigned int thread_pool_count = HG_TEST_NUM_THREADS_DEFAULT; int64_t *context_id = (int64_t *) malloc(sizeof(int64_t)); - int world_size = sf_context->topology->world_size; - size_t alloc_size = ((size_t)world_size * sizeof(struct hg_thread_work)); - char *envValue; + int world_size = sf_context->topology->world_size; + size_t alloc_size = ((size_t)world_size * sizeof(struct hg_thread_work)); + char *envValue; assert(context_id != NULL); /* Initialize the main IOC thread input argument. * Each IOC request will utilize this context_id which is @@ -119,15 +119,15 @@ initialize_ioc_threads(subfiling_context_t *sf_context) */ context_id[0] = sf_context->sf_context_id; - if (pool_request == NULL) { - if ((pool_request = (struct hg_thread_work *)malloc(alloc_size)) == NULL) { - perror("malloc error"); - return -1; - } - else pool_concurrent_max = world_size; - } + if (pool_request == NULL) { + if ((pool_request = (struct hg_thread_work *)malloc(alloc_size)) == NULL) { + perror("malloc error"); + return -1; + } + else pool_concurrent_max = world_size; + } - memset(pool_request, 0, alloc_size); + memset(pool_request, 0, alloc_size); /* Initialize a couple of mutex variables that are used * during IO concentrator operations to serialize @@ -144,13 +144,13 @@ initialize_ioc_threads(subfiling_context_t *sf_context) goto err_exit; } - /* Allow experimentation with the number of helper threads */ - if ((envValue = getenv("IOC_THREAD_POOL_COUNT")) != NULL) { - int value_check = atoi(envValue); - if (value_check > 0) { - thread_pool_count = (unsigned int)value_check; - } - } + /* Allow experimentation with the number of helper threads */ + if ((envValue = getenv("IOC_THREAD_POOL_COUNT")) != NULL) { + int value_check = atoi(envValue); + if (value_check > 0) { + thread_pool_count = (unsigned int)value_check; + } + } /* Initialize a thread pool for the IO Concentrator to use */ status = hg_thread_pool_init(thread_pool_count, &ioc_thread_pool); -- cgit v0.12