summaryrefslogtreecommitdiffstats
path: root/src/H5FDsubfile_mpi.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/H5FDsubfile_mpi.c')
-rw-r--r--src/H5FDsubfile_mpi.c110
1 files changed, 62 insertions, 48 deletions
diff --git a/src/H5FDsubfile_mpi.c b/src/H5FDsubfile_mpi.c
index 6774536..0d8fc60 100644
--- a/src/H5FDsubfile_mpi.c
+++ b/src/H5FDsubfile_mpi.c
@@ -807,7 +807,7 @@ record_fid_to_subfile(hid_t fid, hid_t subfile_context_id, int *next_index)
if (index == sf_file_map_size) {
int i;
sf_open_file_map = realloc(sf_open_file_map,
- ((size_t)(sf_file_map_size * 2) * sizeof(file_map_to_context_t)));
+ ((size_t)(sf_file_map_size * 2) * sizeof(file_map_to_context_t)));
if (sf_open_file_map == NULL) {
perror("realloc");
return FAIL;
@@ -1192,14 +1192,21 @@ read__independent(int n_io_concentrators, hid_t context_id, int64_t offset,
int64_t ioc_read_datasize[n_io_concentrators];
int64_t ioc_read_offset[n_io_concentrators];
MPI_Datatype ioc_read_type[n_io_concentrators];
- useconds_t delay = 50;
+ useconds_t delay = 100;
subfiling_context_t *sf_context = get_subfiling_object(context_id);
assert(sf_context != NULL);
+
/* Note that the sf_write_count is only tracked by an IOC rank */
if (sf_context->sf_write_count && (sf_context->sf_fid > 0)) {
fdatasync(sf_context->sf_fid);
+
+ /* We can attempt to give the IOC more compute time
+ * if we extend out delaying tactic when awaiting
+ * responses.
+ */
+ delay *= sf_context->topology->world_size;
}
io_concentrator = sf_context->topology->io_concentrator;
@@ -1230,12 +1237,12 @@ read__independent(int n_io_concentrators, hid_t context_id, int64_t offset,
#ifndef NDEBUG
if (sf_verbose_flag) {
- fprintf(stdout,
+ fprintf(stdout,
"[%d %s] Requesting %ld read bytes from IOC(%d): "
"sourceOffset=%ld subfile_offset=%ld\n",
sf_world_rank, __func__, msg[0], io_concentrator[ioc],
- sourceOffset, msg[1]);
- fflush(stdout);
+ sourceOffset, msg[1]);
+ fflush(stdout);
}
#endif
@@ -1290,12 +1297,12 @@ read__independent(int n_io_concentrators, hid_t context_id, int64_t offset,
for (i = 0; i < ready; i++) {
#ifndef NDEBUG
if (sf_verbose_flag) {
- fprintf(stdout,
+ fprintf(stdout,
"[%d] READ bytes(%ld) of data from ioc_concentrator %d "
"complete\n",
sf_world_rank, ioc_read_datasize[indices[i]],
indices[i]);
- fflush(stdout);
+ fflush(stdout);
}
#endif
if (ioc_read_type[indices[i]] != MPI_BYTE) {
@@ -1303,8 +1310,8 @@ read__independent(int n_io_concentrators, hid_t context_id, int64_t offset,
}
n_waiting--;
}
- if (n_waiting)
- usleep(delay);
+ if (n_waiting)
+ usleep(delay);
}
return status;
}
@@ -1438,16 +1445,23 @@ write__independent(int n_io_concentrators, hid_t context_id, int64_t offset,
int64_t ioc_write_datasize[n_io_concentrators];
int64_t ioc_write_offset[n_io_concentrators];
MPI_Datatype ioc_write_type[n_io_concentrators];
- useconds_t delay = 50;
- subfiling_context_t *sf_context = get_subfiling_object(context_id);
- int i, target, ioc, n_waiting = 0, status = 0;
- int errors = 0;
+ int n_waiting = 0, status = 0, errors = 0;
+ int i, target, ioc;
+ useconds_t delay = 100;
+ subfiling_context_t *sf_context = get_subfiling_object(context_id);
+ assert(sf_context);
io_concentrator = sf_context->topology->io_concentrator;
if (sf_context->topology->rank_is_ioc) {
sf_context->sf_write_count++;
+
+ /* We can attempt to give the IOC more compute time
+ * if we extend out delaying tactic when awaiting
+ * responses.
+ */
+ delay *= sf_context->topology->world_size;
}
/* The following function will initialize the collection of IO transfer
@@ -1498,15 +1512,15 @@ write__independent(int n_io_concentrators, hid_t context_id, int64_t offset,
}
#ifndef NDEBUG
- if (sf_verbose_flag)
- {
- fprintf(stdout,
- "[%d %s]: write_dest[ioc(%d), "
- "sourceOffset=%ld, datasize=%ld, foffset=%ld]\n",
- sf_world_rank, __func__, ioc, sourceOffset,
- ioc_write_datasize[ioc], ioc_write_offset[ioc]);
- fflush(stdout);
- }
+ if (sf_verbose_flag)
+ {
+ fprintf(stdout,
+ "[%d %s]: write_dest[ioc(%d), "
+ "sourceOffset=%ld, datasize=%ld, foffset=%ld]\n",
+ sf_world_rank, __func__, ioc, sourceOffset,
+ ioc_write_datasize[ioc], ioc_write_offset[ioc]);
+ fflush(stdout);
+ }
#endif
@@ -1603,8 +1617,8 @@ write__independent(int n_io_concentrators, hid_t context_id, int64_t offset,
}
n_waiting--;
}
- if (n_waiting)
- usleep(delay);
+ if (n_waiting)
+ usleep(delay);
}
if (errors)
return -1;
@@ -2021,8 +2035,8 @@ sf_open_subfiles(hid_t fid, char *filename, char *prefix, int flags)
assert(sf_context != NULL);
sf_context->sf_context_id = context_id;
- sf_context->h5_file_id = fid;
- sf_context->filename = strdup(filename);
+ sf_context->h5_file_id = fid;
+ sf_context->filename = strdup(filename);
sf_shutdown_flag = 0;
return open__subfiles(sf_context, sf_context->topology->n_io_concentrators,
@@ -2162,9 +2176,9 @@ ioc_main(int64_t context_id)
atomic_init(&sf_file_refcount, 0);
atomic_init(&sf_ioc_fini_refcount, 0);
- sf_open_file_count = 0;
- sf_close_file_count = 0;
- sf_ops_after_first_close = 0;
+ sf_open_file_count = 0;
+ sf_close_file_count = 0;
+ sf_ops_after_first_close = 0;
while (!sf_shutdown_flag || sf_work_pending) {
flag = 0;
@@ -2213,7 +2227,7 @@ ioc_main(int64_t context_id)
#ifndef NDEBUG
if (sf_logfile) {
fclose(sf_logfile);
- sf_logfile = NULL;
+ sf_logfile = NULL;
}
#endif
@@ -2553,7 +2567,7 @@ queue_file_open(
{
int ret, errors = 0;
int flags = (int) (msg->header[0] & 0x0ffffffff);
- // int open_count;
+ // int open_count;
atomic_fetch_add(&sf_file_refcount, 1); // atomic
#ifndef NDEBUG
if (sf_verbose_flag) {
@@ -2565,15 +2579,15 @@ queue_file_open(
}
#endif
errors = subfiling_open_file(msg, sf_subfile_prefix, subfile_rank, flags);
- // open_count = atomic_load(&sf_file_refcount);
+ // open_count = atomic_load(&sf_file_refcount);
- ret = MPI_Send(&errors, 1, MPI_INT, source, COMPLETED, comm);
- if (ret != MPI_SUCCESS) {
- printf("[ioc(%d)] MPI_Send FILE_OPEN, COMPLETED to source(%d) FAILED\n",
- subfile_rank, source);
- fflush(stdout);
- errors++;
- }
+ ret = MPI_Send(&errors, 1, MPI_INT, source, COMPLETED, comm);
+ if (ret != MPI_SUCCESS) {
+ printf("[ioc(%d)] MPI_Send FILE_OPEN, COMPLETED to source(%d) FAILED\n",
+ subfile_rank, source);
+ fflush(stdout);
+ errors++;
+ }
if (errors) {
#ifndef NDEBUG
@@ -2657,7 +2671,7 @@ int
subfiling_close_file(int subfile_rank, int *fid, MPI_Comm comm)
{
int errors = 0;
- int subfile_fid = *fid;
+ int subfile_fid = *fid;
if (subfile_fid >= 0) {
if (fdatasync(subfile_fid) < 0) {
@@ -2704,12 +2718,12 @@ int
subfiling_shutdown(int subfile_rank, int *fid, MPI_Comm comm)
{
int ret, source = 0;
- int subfile_fid = *fid;
+ int subfile_fid = *fid;
int errors = 0, flag = COMPLETED;
if (subfile_fid >= 0) {
if (close(subfile_fid) < 0) {
perror("subfiling_close_file");
- printf("subfile_fid = %d\n", subfile_fid);
+ printf("subfile_fid = %d\n", subfile_fid);
errors++;
}
*fid = -1;
@@ -2828,7 +2842,7 @@ subfiling_open_file(
char filepath[PATH_MAX];
char config[PATH_MAX];
int subfile_fid;
- int64_t h5_file_id = msg->header[1];
+ int64_t h5_file_id = msg->header[1];
int64_t file_context_id = msg->header[2];
subfiling_context_t *sf_context = get_subfiling_object(file_context_id);
assert(sf_context != NULL);
@@ -2885,16 +2899,16 @@ subfiling_open_file(
sprintf(
linebuf, "aggregator_count=%d\n", n_io_concentrators);
fwrite(linebuf, strlen(linebuf), 1, f);
- sprintf(linebuf,"hdf5_file=%s\n", context->filename);
+ sprintf(linebuf,"hdf5_file=%s\n", context->filename);
fwrite(linebuf, strlen(linebuf), 1, f);
for (k = 0; k < n_io_concentrators; k++) {
- if (prefix)
+ if (prefix)
sprintf(linebuf, "%s/%ld_node_local_temp_%d_of_%d:%d", prefix,
- h5_file_id, subfile_rank, n_io_concentrators, io_concentrator[k]);
+ h5_file_id, subfile_rank, n_io_concentrators, io_concentrator[k]);
else
- sprintf(linebuf, "%ld_node_local_temp_%d_of_%d:%d", h5_file_id,
- subfile_rank, n_io_concentrators, io_concentrator[k]);
+ sprintf(linebuf, "%ld_node_local_temp_%d_of_%d:%d", h5_file_id,
+ subfile_rank, n_io_concentrators, io_concentrator[k]);
fwrite(linebuf, strlen(linebuf), 1, f);
}