summaryrefslogtreecommitdiffstats
path: root/src/H5FDsubfile_mpi.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/H5FDsubfile_mpi.c')
-rw-r--r--src/H5FDsubfile_mpi.c168
1 files changed, 86 insertions, 82 deletions
diff --git a/src/H5FDsubfile_mpi.c b/src/H5FDsubfile_mpi.c
index 6e060c5..2204d8d 100644
--- a/src/H5FDsubfile_mpi.c
+++ b/src/H5FDsubfile_mpi.c
@@ -31,19 +31,18 @@ static double sf_queue_delay_time = 0.0;
* intend to use the user defined HDF5 filename for a
* zeroth subfile as well as for all metadata.
*/
-#define SF_NODE_LOCAL_TEMPLATE "%ld_node_local_%d_of_%d"
-#define SF_FILENAME_TEMPLATE "%ld_subfile_%d_of_%d"
+#define SF_FILENAME_TEMPLATE ".subfile_%ld_%d_of_%d"
static int *request_count_per_rank = NULL;
-atomic_int sf_workinprogress = 0;
-atomic_int sf_work_pending = 0;
-atomic_int sf_file_open_count = 0;
-atomic_int sf_file_close_count = 0;
-atomic_int sf_file_refcount = 0;
-atomic_int sf_ioc_fini_refcount = 0;
-atomic_int sf_ioc_ready = 0;
-volatile int sf_shutdown_flag = 0;
+atomic_int sf_workinprogress = 0;
+atomic_int sf_work_pending = 0;
+atomic_int sf_file_open_count = 0;
+atomic_int sf_file_close_count = 0;
+atomic_int sf_file_refcount = 0;
+atomic_int sf_ioc_fini_refcount = 0;
+atomic_int sf_ioc_ready = 0;
+atomic_int sf_shutdown_flag = 0;
/*
* Structure definitions to enable async io completions
@@ -793,7 +792,7 @@ get_ioc_subfile_path(int ioc, int ioc_count, subfiling_context_t *sf_context)
char * prefix = sf_context->subfile_prefix;
if (prefix != NULL) {
- sprintf(filepath, "%s/" SF_NODE_LOCAL_TEMPLATE, prefix, sf_context->h5_file_id, ioc, ioc_count);
+ sprintf(filepath, "%s/" SF_FILENAME_TEMPLATE, prefix, sf_context->h5_file_id, ioc, ioc_count);
}
else {
strcpy(filepath, sf_context->h5_filename);
@@ -1002,7 +1001,6 @@ write__independent_async(int n_io_concentrators, hid_t context_id, int64_t offse
fflush(stdout);
#endif
status = MPI_Send(msg, 3, MPI_INT64_T, io_concentrator[ioc_start], WRITE_INDEP, sf_context->sf_msg_comm);
-
if (status != MPI_SUCCESS) {
int len;
char estring[MPI_MAX_ERROR_STRING];
@@ -1123,7 +1121,7 @@ H5FD__write_vector_internal(hid_t h5_fid, hssize_t count, haddr_t addrs[], size_
sf_context = get__subfiling_object(sf_context_id);
assert(sf_context != NULL);
- active_reqs = (MPI_Request *)calloc((size_t)(count + 2), sizeof(MPI_Request));
+ active_reqs = (MPI_Request *)calloc((size_t)(count + 2), sizeof(struct __mpi_req));
assert(active_reqs);
sf_async_reqs = (io_req_t **)calloc((size_t)count, sizeof(void *));
@@ -1206,7 +1204,7 @@ H5FD__read_vector_internal(hid_t h5_fid, hssize_t count, haddr_t addrs[], size_t
sf_context = get__subfiling_object(sf_context_id);
assert(sf_context != NULL);
- active_reqs = (MPI_Request *)calloc((size_t)(count + 2), sizeof(MPI_Request));
+ active_reqs = (MPI_Request *)calloc((size_t)(count + 2), sizeof(struct __mpi_req));
assert(active_reqs);
sf_async_reqs = (io_req_t **)calloc((size_t)count, sizeof(void *));
@@ -1274,7 +1272,7 @@ sf_shutdown_local_ioc(hid_t fid)
subfiling_context_t *sf_context = get__subfiling_object(context_id);
assert(sf_context != NULL);
if (sf_context->topology->rank_is_ioc) {
- sf_shutdown_flag = 1;
+ atomic_fetch_add(&sf_shutdown_flag, 1);
}
return 0;
}
@@ -1381,6 +1379,7 @@ ioc_main(int64_t context_id)
atomic_init(&sf_file_close_count, 0);
atomic_init(&sf_file_refcount, 0);
atomic_init(&sf_ioc_fini_refcount, 0);
+ atomic_init(&sf_shutdown_flag, 0);
atomic_init(&sf_ioc_ready, 1);
shutdown_requested = 0;
@@ -1431,7 +1430,6 @@ ioc_main(int64_t context_id)
incoming_requests[index].subfile_rank = subfile_rank;
incoming_requests[index].start_time = queue_start_time;
incoming_requests[index].buffer = NULL;
- incoming_requests[index].completed = 0;
tpool_add_work(&incoming_requests[index]);
if (index == max_work_depth - 1) {
atomic_init(&sf_workinprogress, 0);
@@ -1445,7 +1443,7 @@ ioc_main(int64_t context_id)
else {
usleep(delay);
}
- shutdown_requested = sf_shutdown_flag;
+ shutdown_requested = atomic_load(&sf_shutdown_flag);
}
if (incoming_requests) {
@@ -1453,7 +1451,7 @@ ioc_main(int64_t context_id)
}
/* Reset the shutdown flag */
- sf_shutdown_flag = 0;
+ atomic_init(&sf_shutdown_flag, 0);
return 0;
}
@@ -1598,7 +1596,11 @@ queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm
return ret;
}
+ if (msg->serialize)
+ ioc__wait_for_serialize(msg);
+
fd = sf_context->sf_fid;
+
if (fd < 0) {
printf("[ioc(%d)] WARNING: %s called while subfile_fid = %d (closed)\n", subfile_rank, __func__, fd);
fflush(stdout);
@@ -1619,7 +1621,6 @@ queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm
sf_queue_delay_time += t_queue_delay;
/* Done... */
- msg->completed = 1;
if (sf_eof > sf_context->sf_eof)
sf_context->sf_eof = sf_eof;
@@ -1740,56 +1741,58 @@ queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm
/* ---------------------------------------------------
* Helper function for subfiling_open_file() see below
+ * Subfiles should be located in the same directory
+ * as the HDF5 file unless the user has provided
+ * an alternate directory name as indicated by the
+ * sf_context->subfile_prefix argument.
* ---------------------------------------------------*/
static void
-get__subfile_name(subfiling_context_t *sf_context, int64_t h5_file_id, int subfile_rank, char **_prefix,
+get__subfile_name(subfiling_context_t *sf_context, int64_t h5_file_id, int subfile_rank, char **_basename,
char **_subfile_dir, char *filepath)
{
- const char slash = '/';
- char workdir[PATH_MAX];
- char configdir[PATH_MAX];
-
char *prefix = NULL, *subfile_dir = NULL;
+ char *base = NULL;
int n_io_concentrators = sf_context->topology->n_io_concentrators;
- memset(workdir, 0, PATH_MAX);
- getcwd(workdir, PATH_MAX);
+ /* We require this to be non-null */
+ HDassert(sf_context);
- if ((prefix = sf_context->subfile_prefix) == NULL) {
- memset(configdir, 0, PATH_MAX);
- strncpy(configdir, sf_context->h5_filename, strlen(sf_context->h5_filename));
- prefix = dirname(configdir);
- }
+ prefix = (char *)malloc(PATH_MAX);
+ HDassert(prefix);
- size_t prefix_len = strlen(prefix);
- if (strcmp(prefix, workdir)) {
- if (prefix[prefix_len - 1] == slash) {
- if (sf_context->subfile_prefix)
- sprintf(filepath, "%s" SF_NODE_LOCAL_TEMPLATE, prefix, h5_file_id, subfile_rank,
- n_io_concentrators);
- else
- sprintf(filepath, "%s" SF_FILENAME_TEMPLATE, prefix, h5_file_id, subfile_rank,
- n_io_concentrators);
- }
- else {
- if (sf_context->subfile_prefix)
- sprintf(filepath, "%s/" SF_NODE_LOCAL_TEMPLATE, prefix, h5_file_id, subfile_rank,
- n_io_concentrators);
- else
- sprintf(filepath, "%s/" SF_FILENAME_TEMPLATE, prefix, h5_file_id, subfile_rank,
- n_io_concentrators);
- }
+ /* Under normal operation, we co-locate subfiles
+ * with the HDF5 file
+ */
+ strcpy(prefix, sf_context->h5_filename);
+ base = basename(prefix);
+ *_basename = strdup(base);
+
+ if (sf_context->subfile_prefix == NULL) {
+ subfile_dir = dirname(prefix);
+ *_subfile_dir = strdup(subfile_dir);
}
else {
- memset(configdir, 0, PATH_MAX);
- strcpy(configdir, sf_context->h5_filename);
- subfile_dir = dirname(configdir);
- sprintf(filepath, "%s/" SF_FILENAME_TEMPLATE, subfile_dir, h5_file_id, subfile_rank,
- n_io_concentrators);
+ /* Note: Users may specify a directory name which is inaccessable
+ * from where the current is running. In particular, "node-local"
+ * storage is not uniformly available to all processes.
+ * We would like to check if the user pathname unavailable and
+ * if so, we could default to creating the subfiles in the
+ * current directory. (?)
+ */
+ *_subfile_dir = strdup(sf_context->subfile_prefix);
}
- *_prefix = prefix;
- *_subfile_dir = subfile_dir;
+ /* The subfile naming should produce files of the following form:
+ * If we assume the HDF5 file is named ABC.h5, then subfiles
+ * will have names:
+ * ABC.h5.subfile_<file-number>_0_of_2,
+ * ABC.h5.subfile_<file-number>_1_of_2, and
+ * ABC.h5.subfile_<file-number>.config
+ */
+ sprintf(filepath, "%s/%s" SF_FILENAME_TEMPLATE, subfile_dir, base, h5_file_id, subfile_rank,
+ n_io_concentrators);
+ if (prefix)
+ HDfree(prefix);
}
/*-------------------------------------------------------------------------
@@ -1821,10 +1824,14 @@ get__subfile_name(subfiling_context_t *sf_context, int64_t h5_file_id, int subfi
int
subfiling_open_file(sf_work_request_t *msg, int subfile_rank, int flags)
{
- int errors = 0;
- char filepath[PATH_MAX];
+ int errors = 0;
+ char filepath[PATH_MAX];
+ char linebuf[PATH_MAX];
+
+ char * temp = NULL;
char * prefix = NULL;
char * subfile_dir = NULL;
+ char * base = NULL;
mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
double t_start = 0.0, t_end = 0.0;
@@ -1837,7 +1844,6 @@ subfiling_open_file(sf_work_request_t *msg, int subfile_rank, int flags)
/* Only allow the actual IO concentrator ranks to create sub-files */
if (subfile_rank >= 0) {
int k, retries = 2;
- char config[PATH_MAX];
int64_t h5_file_id = msg->header[1];
int64_t file_context_id = msg->header[2];
subfiling_context_t *sf_context = get__subfiling_object(file_context_id);
@@ -1850,20 +1856,20 @@ subfiling_open_file(sf_work_request_t *msg, int subfile_rank, int flags)
* and possibly (IFF our subfile_rank is 0) a config file.
*/
- get__subfile_name(sf_context, h5_file_id, subfile_rank, &prefix, &subfile_dir, filepath);
+ get__subfile_name(sf_context, h5_file_id, subfile_rank, &base, &subfile_dir, filepath);
sf_context->sf_filename = strdup(filepath);
assert(sf_context->sf_filename);
/* Check if we need to create the subfiles */
if (sf_context->sf_fid == -2) {
- const char *dotconfig = "-subfile_config";
- int n_io_concentrators = sf_context->topology->n_io_concentrators;
- int * io_concentrator = sf_context->topology->io_concentrator;
+ int n_io_concentrators = sf_context->topology->n_io_concentrators;
+ int *io_concentrator = sf_context->topology->io_concentrator;
for (k = 0; k < retries; k++) {
int fd;
if ((fd = HDopen(filepath, O_CREAT | O_RDWR | O_TRUNC, mode)) > 0) {
sf_context->sf_fid = fd;
+ sf_context->sf_eof = 0;
break;
}
}
@@ -1882,38 +1888,31 @@ subfiling_open_file(sf_work_request_t *msg, int subfile_rank, int flags)
errors++;
goto done;
}
-
- memset(config, 0, sizeof(filepath));
- sprintf(config, "%s%s", sf_context->h5_filename, dotconfig);
-
+ sprintf(filepath, "%s/%s.subfile_%ld.config", subfile_dir, base, h5_file_id);
+ /* SUBFILE rank 0 does the work creating a config file */
if ((subfile_rank == 0) && (flags & O_CREAT)) {
FILE *f = NULL;
/* If a config file already exists, AND
* the user wants to truncate subfiles (if they exist),
* then we should also truncate an existing config file.
*/
- if (access(config, flags) == 0) {
- truncate(config, 0);
+ if (access(filepath, flags) == 0) {
+ truncate(filepath, 0);
}
- f = HDfopen(config, "w+");
+ f = HDfopen(filepath, "w+");
if (f != NULL) {
- char linebuf[PATH_MAX];
sprintf(linebuf, "stripe_size=%ld\n", sf_context->sf_stripe_size);
- HDfwrite(linebuf, strlen(linebuf), 1, f);
+ HDfwrite(linebuf, 1, strlen(linebuf), f);
sprintf(linebuf, "aggregator_count=%d\n", n_io_concentrators);
- HDfwrite(linebuf, strlen(linebuf), 1, f);
+ HDfwrite(linebuf, 1, strlen(linebuf), f);
sprintf(linebuf, "hdf5_file=%s\n", sf_context->h5_filename);
- HDfwrite(linebuf, strlen(linebuf), 1, f);
+ HDfwrite(linebuf, 1, strlen(linebuf), f);
+ sprintf(linebuf, "subfile_dir=%s\n", subfile_dir);
for (k = 0; k < n_io_concentrators; k++) {
- if (prefix)
- sprintf(linebuf, "%s/%ld_node_local_temp_%d_of_%d:%d\n", prefix, h5_file_id,
- subfile_rank, n_io_concentrators, io_concentrator[k]);
- else
- sprintf(linebuf, "%ld_subfile_%d_of_%d:%d\n", h5_file_id, subfile_rank,
- n_io_concentrators, io_concentrator[k]);
-
- HDfwrite(linebuf, strlen(linebuf), 1, f);
+ sprintf(linebuf, "%s.subfile_%ld_%d_of_%d:%d\n", base, h5_file_id, subfile_rank,
+ n_io_concentrators, io_concentrator[k]);
+ HDfwrite(linebuf, 1, strlen(linebuf), f);
}
fclose(f);
@@ -1924,6 +1923,7 @@ subfiling_open_file(sf_work_request_t *msg, int subfile_rank, int flags)
goto done;
}
}
+
#ifndef NDEBUG
if (sf_verbose_flag) {
if (sf_logfile) {
@@ -1961,6 +1961,10 @@ subfiling_open_file(sf_work_request_t *msg, int subfile_rank, int flags)
done:
t_end = MPI_Wtime();
+ if (base)
+ HDfree(base);
+ if (subfile_dir)
+ HDfree(subfile_dir);
#ifndef NDEBUG
if (sf_verbose_flag) {