summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorRichard Warren <Richard.Warren@hdfgroup.org>2020-10-01 14:49:11 (GMT)
committerRichard Warren <Richard.Warren@hdfgroup.org>2020-10-01 14:49:11 (GMT)
commitecb5c6e670fa9b895c4a61d2dc569e5312d6c824 (patch)
tree7b813beab0cfa08dac4975cd973128997b3dc2f2 /src
parent8b9212b0156ace0f30f7ff05222e956a23d3f243 (diff)
downloadhdf5-ecb5c6e670fa9b895c4a61d2dc569e5312d6c824.zip
hdf5-ecb5c6e670fa9b895c4a61d2dc569e5312d6c824.tar.gz
hdf5-ecb5c6e670fa9b895c4a61d2dc569e5312d6c824.tar.bz2
Changed the usleep parameters based on whether the MPI rank hosts an IO Concentratorinactive/develop_subfiling_v1
Diffstat (limited to 'src')
-rw-r--r--src/H5FDsubfile_mpi.c110
-rw-r--r--src/H5FDsubfile_threads.c38
2 files changed, 81 insertions, 67 deletions
diff --git a/src/H5FDsubfile_mpi.c b/src/H5FDsubfile_mpi.c
index 6774536..0d8fc60 100644
--- a/src/H5FDsubfile_mpi.c
+++ b/src/H5FDsubfile_mpi.c
@@ -807,7 +807,7 @@ record_fid_to_subfile(hid_t fid, hid_t subfile_context_id, int *next_index)
if (index == sf_file_map_size) {
int i;
sf_open_file_map = realloc(sf_open_file_map,
- ((size_t)(sf_file_map_size * 2) * sizeof(file_map_to_context_t)));
+ ((size_t)(sf_file_map_size * 2) * sizeof(file_map_to_context_t)));
if (sf_open_file_map == NULL) {
perror("realloc");
return FAIL;
@@ -1192,14 +1192,21 @@ read__independent(int n_io_concentrators, hid_t context_id, int64_t offset,
int64_t ioc_read_datasize[n_io_concentrators];
int64_t ioc_read_offset[n_io_concentrators];
MPI_Datatype ioc_read_type[n_io_concentrators];
- useconds_t delay = 50;
+ useconds_t delay = 100;
subfiling_context_t *sf_context = get_subfiling_object(context_id);
assert(sf_context != NULL);
+
/* Note that the sf_write_count is only tracked by an IOC rank */
if (sf_context->sf_write_count && (sf_context->sf_fid > 0)) {
fdatasync(sf_context->sf_fid);
+
+ /* We can attempt to give the IOC more compute time
+ * if we extend out delaying tactic when awaiting
+ * responses.
+ */
+ delay *= sf_context->topology->world_size;
}
io_concentrator = sf_context->topology->io_concentrator;
@@ -1230,12 +1237,12 @@ read__independent(int n_io_concentrators, hid_t context_id, int64_t offset,
#ifndef NDEBUG
if (sf_verbose_flag) {
- fprintf(stdout,
+ fprintf(stdout,
"[%d %s] Requesting %ld read bytes from IOC(%d): "
"sourceOffset=%ld subfile_offset=%ld\n",
sf_world_rank, __func__, msg[0], io_concentrator[ioc],
- sourceOffset, msg[1]);
- fflush(stdout);
+ sourceOffset, msg[1]);
+ fflush(stdout);
}
#endif
@@ -1290,12 +1297,12 @@ read__independent(int n_io_concentrators, hid_t context_id, int64_t offset,
for (i = 0; i < ready; i++) {
#ifndef NDEBUG
if (sf_verbose_flag) {
- fprintf(stdout,
+ fprintf(stdout,
"[%d] READ bytes(%ld) of data from ioc_concentrator %d "
"complete\n",
sf_world_rank, ioc_read_datasize[indices[i]],
indices[i]);
- fflush(stdout);
+ fflush(stdout);
}
#endif
if (ioc_read_type[indices[i]] != MPI_BYTE) {
@@ -1303,8 +1310,8 @@ read__independent(int n_io_concentrators, hid_t context_id, int64_t offset,
}
n_waiting--;
}
- if (n_waiting)
- usleep(delay);
+ if (n_waiting)
+ usleep(delay);
}
return status;
}
@@ -1438,16 +1445,23 @@ write__independent(int n_io_concentrators, hid_t context_id, int64_t offset,
int64_t ioc_write_datasize[n_io_concentrators];
int64_t ioc_write_offset[n_io_concentrators];
MPI_Datatype ioc_write_type[n_io_concentrators];
- useconds_t delay = 50;
- subfiling_context_t *sf_context = get_subfiling_object(context_id);
- int i, target, ioc, n_waiting = 0, status = 0;
- int errors = 0;
+ int n_waiting = 0, status = 0, errors = 0;
+ int i, target, ioc;
+ useconds_t delay = 100;
+ subfiling_context_t *sf_context = get_subfiling_object(context_id);
+ assert(sf_context);
io_concentrator = sf_context->topology->io_concentrator;
if (sf_context->topology->rank_is_ioc) {
sf_context->sf_write_count++;
+
+ /* We can attempt to give the IOC more compute time
+ * if we extend out delaying tactic when awaiting
+ * responses.
+ */
+ delay *= sf_context->topology->world_size;
}
/* The following function will initialize the collection of IO transfer
@@ -1498,15 +1512,15 @@ write__independent(int n_io_concentrators, hid_t context_id, int64_t offset,
}
#ifndef NDEBUG
- if (sf_verbose_flag)
- {
- fprintf(stdout,
- "[%d %s]: write_dest[ioc(%d), "
- "sourceOffset=%ld, datasize=%ld, foffset=%ld]\n",
- sf_world_rank, __func__, ioc, sourceOffset,
- ioc_write_datasize[ioc], ioc_write_offset[ioc]);
- fflush(stdout);
- }
+ if (sf_verbose_flag)
+ {
+ fprintf(stdout,
+ "[%d %s]: write_dest[ioc(%d), "
+ "sourceOffset=%ld, datasize=%ld, foffset=%ld]\n",
+ sf_world_rank, __func__, ioc, sourceOffset,
+ ioc_write_datasize[ioc], ioc_write_offset[ioc]);
+ fflush(stdout);
+ }
#endif
@@ -1603,8 +1617,8 @@ write__independent(int n_io_concentrators, hid_t context_id, int64_t offset,
}
n_waiting--;
}
- if (n_waiting)
- usleep(delay);
+ if (n_waiting)
+ usleep(delay);
}
if (errors)
return -1;
@@ -2021,8 +2035,8 @@ sf_open_subfiles(hid_t fid, char *filename, char *prefix, int flags)
assert(sf_context != NULL);
sf_context->sf_context_id = context_id;
- sf_context->h5_file_id = fid;
- sf_context->filename = strdup(filename);
+ sf_context->h5_file_id = fid;
+ sf_context->filename = strdup(filename);
sf_shutdown_flag = 0;
return open__subfiles(sf_context, sf_context->topology->n_io_concentrators,
@@ -2162,9 +2176,9 @@ ioc_main(int64_t context_id)
atomic_init(&sf_file_refcount, 0);
atomic_init(&sf_ioc_fini_refcount, 0);
- sf_open_file_count = 0;
- sf_close_file_count = 0;
- sf_ops_after_first_close = 0;
+ sf_open_file_count = 0;
+ sf_close_file_count = 0;
+ sf_ops_after_first_close = 0;
while (!sf_shutdown_flag || sf_work_pending) {
flag = 0;
@@ -2213,7 +2227,7 @@ ioc_main(int64_t context_id)
#ifndef NDEBUG
if (sf_logfile) {
fclose(sf_logfile);
- sf_logfile = NULL;
+ sf_logfile = NULL;
}
#endif
@@ -2553,7 +2567,7 @@ queue_file_open(
{
int ret, errors = 0;
int flags = (int) (msg->header[0] & 0x0ffffffff);
- // int open_count;
+ // int open_count;
atomic_fetch_add(&sf_file_refcount, 1); // atomic
#ifndef NDEBUG
if (sf_verbose_flag) {
@@ -2565,15 +2579,15 @@ queue_file_open(
}
#endif
errors = subfiling_open_file(msg, sf_subfile_prefix, subfile_rank, flags);
- // open_count = atomic_load(&sf_file_refcount);
+ // open_count = atomic_load(&sf_file_refcount);
- ret = MPI_Send(&errors, 1, MPI_INT, source, COMPLETED, comm);
- if (ret != MPI_SUCCESS) {
- printf("[ioc(%d)] MPI_Send FILE_OPEN, COMPLETED to source(%d) FAILED\n",
- subfile_rank, source);
- fflush(stdout);
- errors++;
- }
+ ret = MPI_Send(&errors, 1, MPI_INT, source, COMPLETED, comm);
+ if (ret != MPI_SUCCESS) {
+ printf("[ioc(%d)] MPI_Send FILE_OPEN, COMPLETED to source(%d) FAILED\n",
+ subfile_rank, source);
+ fflush(stdout);
+ errors++;
+ }
if (errors) {
#ifndef NDEBUG
@@ -2657,7 +2671,7 @@ int
subfiling_close_file(int subfile_rank, int *fid, MPI_Comm comm)
{
int errors = 0;
- int subfile_fid = *fid;
+ int subfile_fid = *fid;
if (subfile_fid >= 0) {
if (fdatasync(subfile_fid) < 0) {
@@ -2704,12 +2718,12 @@ int
subfiling_shutdown(int subfile_rank, int *fid, MPI_Comm comm)
{
int ret, source = 0;
- int subfile_fid = *fid;
+ int subfile_fid = *fid;
int errors = 0, flag = COMPLETED;
if (subfile_fid >= 0) {
if (close(subfile_fid) < 0) {
perror("subfiling_close_file");
- printf("subfile_fid = %d\n", subfile_fid);
+ printf("subfile_fid = %d\n", subfile_fid);
errors++;
}
*fid = -1;
@@ -2828,7 +2842,7 @@ subfiling_open_file(
char filepath[PATH_MAX];
char config[PATH_MAX];
int subfile_fid;
- int64_t h5_file_id = msg->header[1];
+ int64_t h5_file_id = msg->header[1];
int64_t file_context_id = msg->header[2];
subfiling_context_t *sf_context = get_subfiling_object(file_context_id);
assert(sf_context != NULL);
@@ -2885,16 +2899,16 @@ subfiling_open_file(
sprintf(
linebuf, "aggregator_count=%d\n", n_io_concentrators);
fwrite(linebuf, strlen(linebuf), 1, f);
- sprintf(linebuf,"hdf5_file=%s\n", context->filename);
+ sprintf(linebuf,"hdf5_file=%s\n", context->filename);
fwrite(linebuf, strlen(linebuf), 1, f);
for (k = 0; k < n_io_concentrators; k++) {
- if (prefix)
+ if (prefix)
sprintf(linebuf, "%s/%ld_node_local_temp_%d_of_%d:%d", prefix,
- h5_file_id, subfile_rank, n_io_concentrators, io_concentrator[k]);
+ h5_file_id, subfile_rank, n_io_concentrators, io_concentrator[k]);
else
- sprintf(linebuf, "%ld_node_local_temp_%d_of_%d:%d", h5_file_id,
- subfile_rank, n_io_concentrators, io_concentrator[k]);
+ sprintf(linebuf, "%ld_node_local_temp_%d_of_%d:%d", h5_file_id,
+ subfile_rank, n_io_concentrators, io_concentrator[k]);
fwrite(linebuf, strlen(linebuf), 1, f);
}
diff --git a/src/H5FDsubfile_threads.c b/src/H5FDsubfile_threads.c
index 8317009..0cdb5c4 100644
--- a/src/H5FDsubfile_threads.c
+++ b/src/H5FDsubfile_threads.c
@@ -105,11 +105,11 @@ int
initialize_ioc_threads(subfiling_context_t *sf_context)
{
int status;
- unsigned int thread_pool_count = HG_TEST_NUM_THREADS_DEFAULT;
+ unsigned int thread_pool_count = HG_TEST_NUM_THREADS_DEFAULT;
int64_t *context_id = (int64_t *) malloc(sizeof(int64_t));
- int world_size = sf_context->topology->world_size;
- size_t alloc_size = ((size_t)world_size * sizeof(struct hg_thread_work));
- char *envValue;
+ int world_size = sf_context->topology->world_size;
+ size_t alloc_size = ((size_t)world_size * sizeof(struct hg_thread_work));
+ char *envValue;
assert(context_id != NULL);
/* Initialize the main IOC thread input argument.
* Each IOC request will utilize this context_id which is
@@ -119,15 +119,15 @@ initialize_ioc_threads(subfiling_context_t *sf_context)
*/
context_id[0] = sf_context->sf_context_id;
- if (pool_request == NULL) {
- if ((pool_request = (struct hg_thread_work *)malloc(alloc_size)) == NULL) {
- perror("malloc error");
- return -1;
- }
- else pool_concurrent_max = world_size;
- }
+ if (pool_request == NULL) {
+ if ((pool_request = (struct hg_thread_work *)malloc(alloc_size)) == NULL) {
+ perror("malloc error");
+ return -1;
+ }
+ else pool_concurrent_max = world_size;
+ }
- memset(pool_request, 0, alloc_size);
+ memset(pool_request, 0, alloc_size);
/* Initialize a couple of mutex variables that are used
* during IO concentrator operations to serialize
@@ -144,13 +144,13 @@ initialize_ioc_threads(subfiling_context_t *sf_context)
goto err_exit;
}
- /* Allow experimentation with the number of helper threads */
- if ((envValue = getenv("IOC_THREAD_POOL_COUNT")) != NULL) {
- int value_check = atoi(envValue);
- if (value_check > 0) {
- thread_pool_count = (unsigned int)value_check;
- }
- }
+ /* Allow experimentation with the number of helper threads */
+ if ((envValue = getenv("IOC_THREAD_POOL_COUNT")) != NULL) {
+ int value_check = atoi(envValue);
+ if (value_check > 0) {
+ thread_pool_count = (unsigned int)value_check;
+ }
+ }
/* Initialize a thread pool for the IO Concentrator to use */
status = hg_thread_pool_init(thread_pool_count, &ioc_thread_pool);