summaryrefslogtreecommitdiffstats
path: root/src/H5FDsubfile_mpi.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/H5FDsubfile_mpi.c')
-rw-r--r--src/H5FDsubfile_mpi.c2628
1 files changed, 1333 insertions, 1295 deletions
diff --git a/src/H5FDsubfile_mpi.c b/src/H5FDsubfile_mpi.c
index 64a3959..5963870 100644
--- a/src/H5FDsubfile_mpi.c
+++ b/src/H5FDsubfile_mpi.c
@@ -13,17 +13,17 @@
#include "H5FDsubfiling.h"
-static int sf_close_file_count = 0;
+static int sf_close_file_count = 0;
static int sf_ops_after_first_close = 0;
-static int sf_enable_directIO = 0;
+static int sf_enable_directIO = 0;
-static int sf_write_ops = 0;
-static double sf_pwrite_time = 0.0;
+static int sf_write_ops = 0;
+static double sf_pwrite_time = 0.0;
static double sf_write_wait_time = 0.0;
-static int sf_read_ops = 0;
-static double sf_pread_time = 0.0;
-static double sf_read_wait_time = 0.0;
+static int sf_read_ops = 0;
+static double sf_pread_time = 0.0;
+static double sf_read_wait_time = 0.0;
static double sf_queue_delay_time = 0.0;
/* The following is our basic template for a subfile filename.
@@ -31,19 +31,19 @@ static double sf_queue_delay_time = 0.0;
* intend to use the user defined HDF5 filename for a
* zeroth subfile as well as for all metadata.
*/
-#define SF_NODE_LOCAL_TEMPLATE "%ld_node_local_%d_of_%d"
-#define SF_FILENAME_TEMPLATE "%ld_subfile_%d_of_%d"
+#define SF_FILENAME_TEMPLATE ".subfile_%ld_%d_of_%d"
static int *request_count_per_rank = NULL;
-atomic_int sf_workinprogress = 0;
-atomic_int sf_work_pending = 0;
-atomic_int sf_file_open_count = 0;
-atomic_int sf_file_close_count = 0;
-atomic_int sf_file_refcount = 0;
+atomic_int sf_workinprogress = 0;
+atomic_int sf_work_pending = 0;
+atomic_int sf_file_open_count = 0;
+atomic_int sf_file_close_count = 0;
+atomic_int sf_file_refcount = 0;
atomic_int sf_ioc_fini_refcount = 0;
-atomic_int sf_ioc_ready = 0;
-volatile int sf_shutdown_flag = 0;
+atomic_int sf_ioc_ready = 0;
+atomic_int sf_shutdown_flag = 0;
+// volatile int sf_shutdown_flag = 0;
/*
* Structure definitions to enable async io completions
@@ -52,69 +52,68 @@ volatile int sf_shutdown_flag = 0;
* invoked. See below.
*/
typedef struct _client_io_args {
- int ioc; /* ID of the IO Concentrator handling this IO. */
- hid_t context_id; /* The context id provided for the read or write */
- int64_t offset; /* The file offset for the IO operation */
- int64_t elements; /* How many bytes */
- void *data; /* A pointer to the (contiguous) data segment */
- MPI_Request io_req; /* An MPI request to allow the code to loop while */
- /* making progress on multiple IOs */
+ int ioc; /* ID of the IO Concentrator handling this IO. */
+ hid_t context_id; /* The context id provided for the read or write */
+ int64_t offset; /* The file offset for the IO operation */
+ int64_t elements; /* How many bytes */
+ void * data; /* A pointer to the (contiguous) data segment */
+ MPI_Request io_req; /* An MPI request to allow the code to loop while */
+ /* making progress on multiple IOs */
} io_args_t;
/* pre-define */
typedef struct _client_io_func io_func_t;
struct _client_io_func {
- int (*io_function)(void *this_io); /* pointer to a completion function */
- io_args_t io_args; /* arguments passed to the completion function */
- int pending; /* The function is complete (0) or pending (1)? */
+ int (*io_function)(void *this_io); /* pointer to a completion function */
+ io_args_t io_args; /* arguments passed to the completion function */
+ int pending; /* The function is complete (0) or pending (1)? */
};
typedef struct _io_req {
- struct _io_req *prev; /* A simple list structure containing completion */
- struct _io_req *next; /* functions. These should get removed as IO ops */
- io_func_t completion_func; /* are completed */
+ struct _io_req *prev; /* A simple list structure containing completion */
+ struct _io_req *next; /* functions. These should get removed as IO ops */
+ io_func_t completion_func; /* are completed */
} io_req_t;
-int n_io_pending = 0;
+int n_io_pending = 0;
io_req_t pending_io_requests;
typedef struct _client_xfer_info {
- int64_t offset;
- int64_t length;
- int ioc_targets;
- io_op_t op;
+ int64_t offset;
+ int64_t length;
+ int ioc_targets;
+ io_op_t op;
} client_xfer_info_t;
typedef struct _xfer_info {
- int64_t offset;
- int64_t length;
+ int64_t offset;
+ int64_t length;
} xfer_info_t;
#define STAT_BLOCKSIZE 1024
typedef struct _ioc_stats {
- int read_index;
- int read_size;
- xfer_info_t *read_info;
- int write_index;
- int write_size;
- xfer_info_t *write_info;
+ int read_index;
+ int read_size;
+ xfer_info_t *read_info;
+ int write_index;
+ int write_size;
+ xfer_info_t *write_info;
} ioc_stats_t;
static ioc_stats_t ioc_xfer_records;
-int client_op_index = 0;
-int client_op_size = 0;
-client_xfer_info_t *client_ops = NULL;
+int client_op_index = 0;
+int client_op_size = 0;
+client_xfer_info_t *client_ops = NULL;
/* const char *sf_subfile_prefix = "."; */
-#define MAX_WORK_PER_RANK 2
-#define K(n) ((n)*1024)
-#define M(n) ((n) * (1024 * 1024))
+#define MAX_WORK_PER_RANK 2
+#define K(n) ((n)*1024)
+#define M(n) ((n) * (1024 * 1024))
#define DEFAULT_STRIPE_SIZE M(32)
-#define MAX_DEPTH 1024
-
+#define MAX_DEPTH 1024
/*
=========================================
@@ -122,17 +121,18 @@ Private functions
=========================================
*/
-static inline void *cast_to_void(const void *data) {
- union {
- const void *const_ptr_to_data;
- void *ptr_to_data;
- } eliminate_const_warning;
- eliminate_const_warning.const_ptr_to_data = data;
- return eliminate_const_warning.ptr_to_data;
+static inline void *
+cast_to_void(const void *data)
+{
+ union {
+ const void *const_ptr_to_data;
+ void * ptr_to_data;
+ } eliminate_const_warning;
+ eliminate_const_warning.const_ptr_to_data = data;
+ return eliminate_const_warning.ptr_to_data;
}
-static char *get_ioc_subfile_path(int ioc, int ioc_count,
- subfiling_context_t *sf_context);
-static int async_completion(void *arg);
+static char *get_ioc_subfile_path(int ioc, int ioc_count, subfiling_context_t *sf_context);
+static int async_completion(void *arg);
/* ===================================================================== */
/* MPI_Datatype Creation functions.
@@ -176,49 +176,49 @@ static int async_completion(void *arg);
/* Fill the output vectors 'io_offset', 'io_datasize' and 'io_f_offset'
* All calculations are in terms of bytes.
*/
-static void H5FD__create_first_mpi_type(
- subfiling_context_t *context, int ioc_depth, int64_t src_offset,
- int64_t target_datasize, int64_t f_offset, int64_t *io_offset,
- int64_t *io_datasize, int64_t *io_f_offset, int64_t first_io) {
- int64_t stripe_size = context->sf_stripe_size;
- int64_t blocksize_per_stripe = context->sf_blocksize_per_stripe;
- int64_t offset_in_stripe = f_offset % stripe_size;
- int64_t next_offset = blocksize_per_stripe - offset_in_stripe;
- int64_t total_bytes = first_io;
-
- io_offset[0] = src_offset;
- io_datasize[0] = first_io;
- io_f_offset[0] = f_offset;
+static void
+H5FD__create_first_mpi_type(subfiling_context_t *context, int ioc_depth, int64_t src_offset,
+ int64_t target_datasize, int64_t f_offset, int64_t *io_offset,
+ int64_t *io_datasize, int64_t *io_f_offset, int64_t first_io)
+{
+ int64_t stripe_size = context->sf_stripe_size;
+ int64_t blocksize_per_stripe = context->sf_blocksize_per_stripe;
+ int64_t offset_in_stripe = f_offset % stripe_size;
+ int64_t next_offset = blocksize_per_stripe - offset_in_stripe;
+ int64_t total_bytes = first_io;
+
+ io_offset[0] = src_offset;
+ io_datasize[0] = first_io;
+ io_f_offset[0] = f_offset;
#ifdef VERBOSE
- printf("[%s] 0: mem_offset=%ld, datasize=%ld, f_offset=%ld\n",
- __func__, src_offset, first_io, f_offset);
- fflush(stdout);
+ printf("[%s] 0: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", __func__, src_offset, first_io, f_offset);
+ fflush(stdout);
#endif
- if (first_io == target_datasize) {
- return;
- }
- if (first_io) {
- int k;
- f_offset += (blocksize_per_stripe - offset_in_stripe);
- for (k = 1; k <= ioc_depth; k++) {
- io_offset[k] = next_offset;
- io_datasize[k] = stripe_size;
- io_f_offset[k] = f_offset;
- total_bytes += stripe_size;
+ if (first_io == target_datasize) {
+ return;
+ }
+ if (first_io) {
+ int k;
+ f_offset += (blocksize_per_stripe - offset_in_stripe);
+ for (k = 1; k <= ioc_depth; k++) {
+ io_offset[k] = next_offset;
+ io_datasize[k] = stripe_size;
+ io_f_offset[k] = f_offset;
+ total_bytes += stripe_size;
#ifdef VERBOSE
- printf("[%s] %d: mem_offset=%ld, datasize=%ld, f_offset=%ld\n",
- __func__, k, next_offset, stripe_size, f_offset);
- fflush(stdout);
+ printf("[%s] %d: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", __func__, k, next_offset,
+ stripe_size, f_offset);
+ fflush(stdout);
#endif
- f_offset += context->sf_blocksize_per_stripe;
- next_offset += context->sf_blocksize_per_stripe;
- }
- if (total_bytes != target_datasize) {
- printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", __func__,
- total_bytes, target_datasize);
+ f_offset += context->sf_blocksize_per_stripe;
+ next_offset += context->sf_blocksize_per_stripe;
+ }
+ if (total_bytes != target_datasize) {
+ printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", __func__, total_bytes,
+ target_datasize);
+ }
}
- }
- return;
+ return;
} /* end H5FD__create_first_mpi_type() */
/*-------------------------------------------------------------------------
@@ -250,57 +250,56 @@ static void H5FD__create_first_mpi_type(
/* Fill the output vectors 'io_offset', 'io_datasize' and 'io_f_offset'
* All calculations are in terms of bytes.
*/
-static void H5FD__create_final_mpi_type(subfiling_context_t *context,
- int ioc_depth, int64_t src_offset,
- int64_t target_datasize,
- int64_t f_offset, int64_t *io_offset,
- int64_t *io_datasize,
- int64_t *io_f_offset, int64_t last_io) {
- int64_t stripe_size = context->sf_stripe_size;
- int64_t blocksize_per_stripe = context->sf_blocksize_per_stripe;
- int64_t next_offset = src_offset;
- int64_t total_bytes = 0;
-
- if (last_io == target_datasize) {
- io_offset[0] = src_offset;
- io_f_offset[0] = f_offset;
- io_datasize[0] = last_io;
+static void
+H5FD__create_final_mpi_type(subfiling_context_t *context, int ioc_depth, int64_t src_offset,
+ int64_t target_datasize, int64_t f_offset, int64_t *io_offset,
+ int64_t *io_datasize, int64_t *io_f_offset, int64_t last_io)
+{
+ int64_t stripe_size = context->sf_stripe_size;
+ int64_t blocksize_per_stripe = context->sf_blocksize_per_stripe;
+ int64_t next_offset = src_offset;
+ int64_t total_bytes = 0;
+
+ if (last_io == target_datasize) {
+ io_offset[0] = src_offset;
+ io_f_offset[0] = f_offset;
+ io_datasize[0] = last_io;
#ifdef VERBOSE
- printf("[%s] 0: mem_offset=%ld, datasize=%ld, f_offset=%ld\n",
- __func__, src_offset, last_io, f_offset);
- fflush(stdout);
+ printf("[%s] 0: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", __func__, src_offset, last_io,
+ f_offset);
+ fflush(stdout);
#endif
- return;
- }
-
- if (last_io) {
- int i, k;
- for (k = 0, i = 1; i < ioc_depth; i++) {
- io_offset[k] = next_offset;
- io_datasize[k] = stripe_size;
- io_f_offset[k] = f_offset;
+ return;
+ }
+
+ if (last_io) {
+ int i, k;
+ for (k = 0, i = 1; i < ioc_depth; i++) {
+ io_offset[k] = next_offset;
+ io_datasize[k] = stripe_size;
+ io_f_offset[k] = f_offset;
#ifdef VERBOSE
- printf("[%s] %d: mem_offset=%ld, datasize=%ld, f_offset=%ld\n",
- __func__, k, next_offset, stripe_size, f_offset);
- fflush(stdout);
+ printf("[%s] %d: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", __func__, k, next_offset,
+ stripe_size, f_offset);
+ fflush(stdout);
#endif
- k++;
- total_bytes += stripe_size;
- f_offset += blocksize_per_stripe;
- next_offset += context->sf_blocksize_per_stripe;
- }
+ k++;
+ total_bytes += stripe_size;
+ f_offset += blocksize_per_stripe;
+ next_offset += context->sf_blocksize_per_stripe;
+ }
- io_datasize[k] = last_io;
- io_offset[k] = next_offset;
- io_f_offset[k] = f_offset;
- total_bytes += last_io;
+ io_datasize[k] = last_io;
+ io_offset[k] = next_offset;
+ io_f_offset[k] = f_offset;
+ total_bytes += last_io;
- if (total_bytes != target_datasize) {
- printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", __func__,
- total_bytes, target_datasize);
+ if (total_bytes != target_datasize) {
+ printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", __func__, total_bytes,
+ target_datasize);
+ }
}
- }
- return;
+ return;
} /* end H5FD__create_final_mpi_type() */
/*-------------------------------------------------------------------------
@@ -325,63 +324,61 @@ static void H5FD__create_final_mpi_type(subfiling_context_t *context,
*-------------------------------------------------------------------------
*/
-static void H5FD__create_f_l_mpi_type(subfiling_context_t *context,
- int ioc_depth, int64_t src_offset,
- int64_t target_datasize, int64_t f_offset,
- int64_t *io_offset, int64_t *io_datasize,
- int64_t *io_f_offset, int64_t first_io,
- int64_t last_io) {
- int64_t stripe_size = context->sf_stripe_size;
- int64_t blocksize_per_stripe = context->sf_blocksize_per_stripe;
- int64_t offset_in_stripe = f_offset % stripe_size;
- int64_t next_offset = blocksize_per_stripe - offset_in_stripe;
- int64_t total_bytes = first_io;
-
- io_offset[0] = src_offset;
- io_datasize[0] = first_io;
- io_f_offset[0] = f_offset;
+static void
+H5FD__create_f_l_mpi_type(subfiling_context_t *context, int ioc_depth, int64_t src_offset,
+ int64_t target_datasize, int64_t f_offset, int64_t *io_offset, int64_t *io_datasize,
+ int64_t *io_f_offset, int64_t first_io, int64_t last_io)
+{
+ int64_t stripe_size = context->sf_stripe_size;
+ int64_t blocksize_per_stripe = context->sf_blocksize_per_stripe;
+ int64_t offset_in_stripe = f_offset % stripe_size;
+ int64_t next_offset = blocksize_per_stripe - offset_in_stripe;
+ int64_t total_bytes = first_io;
+
+ io_offset[0] = src_offset;
+ io_datasize[0] = first_io;
+ io_f_offset[0] = f_offset;
#ifdef VERBOSE
- printf("[%s] 0: mem_offset=%ld, datasize=%ld, f_offset=%ld\n",
- __func__, src_offset, first_io, f_offset);
- fflush(stdout);
+ printf("[%s] 0: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", __func__, src_offset, first_io, f_offset);
+ fflush(stdout);
#endif
- if (total_bytes == target_datasize) {
- return;
- }
-
- if (total_bytes) {
- int k;
- f_offset += (blocksize_per_stripe - offset_in_stripe);
- for (k = 1; k < ioc_depth; k++) {
- io_offset[k] = next_offset;
- io_datasize[k] = stripe_size;
- io_f_offset[k] = f_offset;
- total_bytes += stripe_size;
+ if (total_bytes == target_datasize) {
+ return;
+ }
+
+ if (total_bytes) {
+ int k;
+ f_offset += (blocksize_per_stripe - offset_in_stripe);
+ for (k = 1; k < ioc_depth; k++) {
+ io_offset[k] = next_offset;
+ io_datasize[k] = stripe_size;
+ io_f_offset[k] = f_offset;
+ total_bytes += stripe_size;
#ifdef VERBOSE
- printf("[%s] %d: mem_offset=%ld, datasize=%ld, f_offset=%ld\n",
- __func__, k, next_offset, stripe_size, f_offset);
- fflush(stdout);
+ printf("[%s] %d: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", __func__, k, next_offset,
+ stripe_size, f_offset);
+ fflush(stdout);
#endif
- f_offset += blocksize_per_stripe;
- next_offset += blocksize_per_stripe;
- }
- io_datasize[ioc_depth] = last_io;
- io_f_offset[ioc_depth] = f_offset;
- io_offset[ioc_depth] = next_offset;
+ f_offset += blocksize_per_stripe;
+ next_offset += blocksize_per_stripe;
+ }
+ io_datasize[ioc_depth] = last_io;
+ io_f_offset[ioc_depth] = f_offset;
+ io_offset[ioc_depth] = next_offset;
#ifdef VERBOSE
- printf("[%s] %d: mem_offset=%ld, datasize=%ld, f_offset=%ld\n",
- __func__, k, next_offset, last_io, f_offset);
- fflush(stdout);
+ printf("[%s] %d: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", __func__, k, next_offset, last_io,
+ f_offset);
+ fflush(stdout);
#endif
- total_bytes += last_io;
+ total_bytes += last_io;
- if (total_bytes != target_datasize) {
- printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n",
- __func__, total_bytes, target_datasize);
+ if (total_bytes != target_datasize) {
+ printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", __func__, total_bytes,
+ target_datasize);
+ }
}
- }
- return;
+ return;
} /* end H5FD__create_f_l_mpi_type() */
/*-------------------------------------------------------------------------
@@ -406,28 +403,27 @@ static void H5FD__create_f_l_mpi_type(subfiling_context_t *context,
*
*-------------------------------------------------------------------------
*/
-static void H5FD__create_mpi_uniform_type(subfiling_context_t *context,
- int ioc_depth, int64_t src_offset,
- int64_t target_datasize,
- int64_t f_offset, int64_t *io_offset,
- int64_t *io_datasize,
- int64_t *io_f_offset) {
- int64_t stripe_size = context->sf_stripe_size;
- int64_t blocksize_per_stripe = context->sf_blocksize_per_stripe;
- int64_t next_offset = src_offset + blocksize_per_stripe;
- int64_t total_bytes = 0;
-
- io_offset[0] = src_offset;
- io_datasize[0] = stripe_size;
- io_f_offset[0] = f_offset;
- if (target_datasize == 0) {
+static void
+H5FD__create_mpi_uniform_type(subfiling_context_t *context, int ioc_depth, int64_t src_offset,
+ int64_t target_datasize, int64_t f_offset, int64_t *io_offset,
+ int64_t *io_datasize, int64_t *io_f_offset)
+{
+ int64_t stripe_size = context->sf_stripe_size;
+ int64_t blocksize_per_stripe = context->sf_blocksize_per_stripe;
+ int64_t next_offset = src_offset + blocksize_per_stripe;
+ int64_t total_bytes = 0;
+
+ io_offset[0] = src_offset;
+ io_datasize[0] = stripe_size;
+ io_f_offset[0] = f_offset;
+ if (target_datasize == 0) {
#if 0
printf("[%s] 0: datasize=0\n", __func__);
fflush(stdout);
#endif
- io_datasize[0] = 0;
- return;
- }
+ io_datasize[0] = 0;
+ return;
+ }
#if 0
printf("[%s] 0: mem_offset=%ld, datasize=%ld, f_offset=%ld\n",
@@ -435,31 +431,31 @@ static void H5FD__create_mpi_uniform_type(subfiling_context_t *context,
fflush(stdout);
#endif
- f_offset += blocksize_per_stripe;
- total_bytes = stripe_size;
+ f_offset += blocksize_per_stripe;
+ total_bytes = stripe_size;
- if (target_datasize > stripe_size) {
- int k;
- for (k = 1; k < ioc_depth; k++) {
- io_offset[k] = next_offset;
- io_datasize[k] = stripe_size;
- io_f_offset[k] = f_offset;
+ if (target_datasize > stripe_size) {
+ int k;
+ for (k = 1; k < ioc_depth; k++) {
+ io_offset[k] = next_offset;
+ io_datasize[k] = stripe_size;
+ io_f_offset[k] = f_offset;
#if 0
printf("[%s] %d: mem_offset=%ld, datasize=%ld, f_offset=%ld\n",
__func__, k, next_offset, stripe_size, f_offset);
fflush(stdout);
#endif
- total_bytes += stripe_size;
- f_offset += blocksize_per_stripe;
- next_offset += blocksize_per_stripe;
- }
+ total_bytes += stripe_size;
+ f_offset += blocksize_per_stripe;
+ next_offset += blocksize_per_stripe;
+ }
- if (total_bytes != target_datasize) {
- printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", __func__,
- total_bytes, target_datasize);
+ if (total_bytes != target_datasize) {
+ printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", __func__, total_bytes,
+ target_datasize);
+ }
}
- }
- return;
+ return;
} /* end H5FD__create_mpi_uniform_type() */
/*-------------------------------------------------------------------------
@@ -504,153 +500,149 @@ static void H5FD__create_mpi_uniform_type(subfiling_context_t *context,
*-------------------------------------------------------------------------
*/
-int init__indep_io(void *_sf_context, size_t maxdepth, int H5_ATTR_PARALLEL_UNUSED ioc_total,
- int64_t *sf_source_data_offset, int64_t *sf_datasize,
- int64_t *sf_offset, int *first_index, int *n_containers,
- int64_t offset, int64_t elements,
- int dtype_extent)
+int
+init__indep_io(void *_sf_context, size_t maxdepth, int H5_ATTR_PARALLEL_UNUSED ioc_total,
+ int64_t *sf_source_data_offset, int64_t *sf_datasize, int64_t *sf_offset, int *first_index,
+ int *n_containers, int64_t offset, int64_t elements, int dtype_extent)
{
- subfiling_context_t *sf_context = _sf_context;
- int container_count = sf_context->topology->n_io_concentrators;
- int64_t stripe_size = sf_context->sf_stripe_size;
- int64_t data_size = elements * dtype_extent;
-
- int64_t start_id = offset / stripe_size;
- int64_t offset_in_stripe = offset % sf_context->sf_blocksize_per_stripe;
- int64_t container_offset = offset % stripe_size;
- int64_t start_length = MIN(data_size, (stripe_size - container_offset));
- int64_t start_row = start_id / container_count;
- int64_t ioc_start = start_id % container_count;
- int64_t final_offset = offset + data_size;
- int64_t final_id = final_offset / stripe_size;
- int64_t final_length =
- (start_length == data_size ? 0 : final_offset % stripe_size);
- int64_t ioc_final = final_id % container_count;
- int64_t container_bytes = 0, total_bytes = 0;
- int64_t source_offset = 0;
-
- int row_id_start = (int)(start_id - ioc_start);
- int row_id_final = (int)(final_id - ioc_final);
- int i, k, depth = ((row_id_final - row_id_start) / container_count) + 1;
- int container_id = (int)start_id;
- int64_t row_offset = (int64_t)(start_row * stripe_size);
-
- *first_index = (int)ioc_start;
-
- /* Given the IO parameters, we loop thru the set of IOCs
- * to determine the various vector components for each.
- * Those IOCs whose datasize is zero (0), will not have
- * IO requests passed to them.
- */
-
- for (i = 0, k = (int)ioc_start; i < container_count; i++) {
- /* We use 'output_offset' as an index into a linear
- * version of a 2D array. In 'C' the last subscript
- * is the one that varies most rapidly.
- * In our case, the 2D array is represented as
- * array[ container_count ][ maxdepth ]
+ subfiling_context_t *sf_context = _sf_context;
+ int container_count = sf_context->topology->n_io_concentrators;
+ int64_t stripe_size = sf_context->sf_stripe_size;
+ int64_t data_size = elements * dtype_extent;
+
+ int64_t start_id = offset / stripe_size;
+ int64_t offset_in_stripe = offset % sf_context->sf_blocksize_per_stripe;
+ int64_t container_offset = offset % stripe_size;
+ int64_t start_length = MIN(data_size, (stripe_size - container_offset));
+ int64_t start_row = start_id / container_count;
+ int64_t ioc_start = start_id % container_count;
+ int64_t final_offset = offset + data_size;
+ int64_t final_id = final_offset / stripe_size;
+ int64_t final_length = (start_length == data_size ? 0 : final_offset % stripe_size);
+ int64_t ioc_final = final_id % container_count;
+ int64_t container_bytes = 0, total_bytes = 0;
+ int64_t source_offset = 0;
+
+ int row_id_start = (int)(start_id - ioc_start);
+ int row_id_final = (int)(final_id - ioc_final);
+ int i, k, depth = ((row_id_final - row_id_start) / container_count) + 1;
+ int container_id = (int)start_id;
+ int64_t row_offset = (int64_t)(start_row * stripe_size);
+
+ *first_index = (int)ioc_start;
+
+ /* Given the IO parameters, we loop thru the set of IOCs
+ * to determine the various vector components for each.
+ * Those IOCs whose datasize is zero (0), will not have
+ * IO requests passed to them.
*/
- size_t depthsize = maxdepth * sizeof(int64_t); /* ONLY used for memset */
- size_t output_offset = (size_t)(k) * maxdepth;
- int container_depth = depth;
- hbool_t is_first = false, is_last = false;
- int64_t *__sf_source_data_offset = sf_source_data_offset + output_offset;
- int64_t *__sf_datasize = sf_datasize + output_offset;
- int64_t *__sf_offset = sf_offset + output_offset;
+ for (i = 0, k = (int)ioc_start; i < container_count; i++) {
+ /* We use 'output_offset' as an index into a linear
+ * version of a 2D array. In 'C' the last subscript
+ * is the one that varies most rapidly.
+ * In our case, the 2D array is represented as
+ * array[ container_count ][ maxdepth ]
+ */
+ size_t depthsize = maxdepth * sizeof(int64_t); /* ONLY used for memset */
+ size_t output_offset = (size_t)(k)*maxdepth;
+ int container_depth = depth;
- memset(__sf_source_data_offset, 0, depthsize);
- memset(__sf_datasize, 0, depthsize);
- memset(__sf_offset, 0, depthsize);
+ hbool_t is_first = false, is_last = false;
+ int64_t *__sf_source_data_offset = sf_source_data_offset + output_offset;
+ int64_t *__sf_datasize = sf_datasize + output_offset;
+ int64_t *__sf_offset = sf_offset + output_offset;
- container_bytes = 0;
+ memset(__sf_source_data_offset, 0, depthsize);
+ memset(__sf_datasize, 0, depthsize);
+ memset(__sf_offset, 0, depthsize);
- if (total_bytes == data_size) {
- *n_containers = i;
- return depth + 1;
- }
- if (total_bytes < data_size) {
- if (k == ioc_start) {
- is_first = true;
- container_bytes = start_length;
- container_depth--; /* Account for the start_length */
- if (ioc_final < ioc_start) {
- container_depth--;
- depth--;
+ container_bytes = 0;
+
+ if (total_bytes == data_size) {
+ *n_containers = i;
+ return depth + 1;
}
- }
- if (k == ioc_final) {
- is_last = true;
- container_bytes += final_length;
- if (container_depth)
- container_depth--; /* Account for the final_length */
- if (depth)
- depth--;
- }
- container_bytes += container_depth * stripe_size;
- total_bytes += container_bytes;
- }
+ if (total_bytes < data_size) {
+ if (k == ioc_start) {
+ is_first = true;
+ container_bytes = start_length;
+ container_depth--; /* Account for the start_length */
+ if (ioc_final < ioc_start) {
+ container_depth--;
+ depth--;
+ }
+ }
+ if (k == ioc_final) {
+ is_last = true;
+ container_bytes += final_length;
+ if (container_depth)
+ container_depth--; /* Account for the final_length */
+ if (depth)
+ depth--;
+ }
+ container_bytes += container_depth * stripe_size;
+ total_bytes += container_bytes;
+ }
+
+ __sf_source_data_offset[0] = source_offset;
+ __sf_datasize[0] = container_bytes;
+ __sf_offset[0] = row_offset + offset_in_stripe;
- __sf_source_data_offset[0] = source_offset;
- __sf_datasize[0] = container_bytes;
- __sf_offset[0] = row_offset + offset_in_stripe;
-
- if (container_count == 1) {
-
- } else {
- /* Fill the IO datatypes */
- if (is_first) {
- if (is_last) { /* First + Last */
- H5FD__create_f_l_mpi_type(
- sf_context, container_depth + 1, source_offset, container_bytes,
- row_offset + offset_in_stripe, __sf_source_data_offset,
- __sf_datasize, __sf_offset, start_length, final_length);
- } else { /* First ONLY */
- H5FD__create_first_mpi_type(
- sf_context, container_depth, source_offset, container_bytes,
- row_offset + offset_in_stripe, __sf_source_data_offset,
- __sf_datasize, __sf_offset, start_length);
+ if (container_count == 1) {
+ }
+ else {
+ /* Fill the IO datatypes */
+ if (is_first) {
+ if (is_last) { /* First + Last */
+ H5FD__create_f_l_mpi_type(sf_context, container_depth + 1, source_offset, container_bytes,
+ row_offset + offset_in_stripe, __sf_source_data_offset,
+ __sf_datasize, __sf_offset, start_length, final_length);
+ }
+ else { /* First ONLY */
+ H5FD__create_first_mpi_type(sf_context, container_depth, source_offset, container_bytes,
+ row_offset + offset_in_stripe, __sf_source_data_offset,
+ __sf_datasize, __sf_offset, start_length);
+ }
+ /* Move the memory pointer to the starting location
+ * for next IOC request.
+ */
+ source_offset += start_length;
+ }
+ else if (is_last) { /* Last ONLY */
+ H5FD__create_final_mpi_type(sf_context, container_depth, source_offset, container_bytes,
+ row_offset + offset_in_stripe, __sf_source_data_offset,
+ __sf_datasize, __sf_offset, final_length);
+ /* Probably not needed... */
+ source_offset += stripe_size;
+ }
+ else { /* Everything else (uniform) */
+ H5FD__create_mpi_uniform_type(sf_context, container_depth, source_offset, container_bytes,
+ row_offset + offset_in_stripe, __sf_source_data_offset,
+ __sf_datasize, __sf_offset);
+ source_offset += stripe_size;
+ }
}
- /* Move the memory pointer to the starting location
- * for next IOC request.
- */
- source_offset += start_length;
- } else if (is_last) { /* Last ONLY */
- H5FD__create_final_mpi_type(
- sf_context, container_depth, source_offset, container_bytes,
- row_offset + offset_in_stripe, __sf_source_data_offset,
- __sf_datasize, __sf_offset, final_length);
- /* Probably not needed... */
- source_offset += stripe_size;
- } else { /* Everything else (uniform) */
- H5FD__create_mpi_uniform_type(
- sf_context, container_depth, source_offset, container_bytes,
- row_offset + offset_in_stripe, __sf_source_data_offset,
- __sf_datasize, __sf_offset);
- source_offset += stripe_size;
- }
- }
- k++;
- offset_in_stripe += __sf_datasize[0];
- container_id++;
+ k++;
+ offset_in_stripe += __sf_datasize[0];
+ container_id++;
- if (k == container_count) {
- k = 0;
- offset_in_stripe = 0;
- depth = ((row_id_final - container_id) / container_count) + 1;
- row_offset += sf_context->sf_blocksize_per_stripe;
+ if (k == container_count) {
+ k = 0;
+ offset_in_stripe = 0;
+ depth = ((row_id_final - container_id) / container_count) + 1;
+ row_offset += sf_context->sf_blocksize_per_stripe;
+ }
+ }
+ if (total_bytes != data_size) {
+ printf("Error: total_bytes != data_size\n");
}
- }
- if (total_bytes != data_size) {
- printf("Error: total_bytes != data_size\n");
- }
- *n_containers = container_count;
- return depth + 1;
+ *n_containers = container_count;
+ return depth + 1;
} /* end init__indep_io() */
-
/*-------------------------------------------------------------------------
* Function: Internal read__independent_async
*
@@ -685,97 +677,97 @@ int init__indep_io(void *_sf_context, size_t maxdepth, int H5_ATTR_PARALLEL_UNUS
#define WORLD_SIZE(ctx) ((ctx)->topology->app_layout->world_size)
#define WORLD_RANK(ctx) ((ctx)->topology->app_layout->world_size)
-static int read__independent_async(int n_io_concentrators, hid_t context_id,
- int64_t offset, int64_t elements,
- int H5_ATTR_PARALLEL_UNUSED dtype_extent,
- void *data,io_req_t **io_req) {
- int status = 0;
- int64_t stripe_size, ioc_row, start_id, ioc_start, ioc_offset;
- int *io_concentrator = NULL;
- io_req_t *sf_io_request = NULL;
- int64_t msg[3] = {0, };
-
- subfiling_context_t *sf_context = get__subfiling_object(context_id);
- assert(sf_context != NULL);
-
- /* Calculate the IOC that we'll send the IO request to */
- stripe_size = sf_context->sf_stripe_size;
-
- start_id = offset / stripe_size;
- ioc_row = start_id / n_io_concentrators;
- ioc_offset = (offset % stripe_size) + (ioc_row * stripe_size);
-
- ioc_start = start_id % n_io_concentrators;
-
- io_concentrator = sf_context->topology->io_concentrator;
- assert(io_concentrator != NULL);
-
- /* Make sure that we can return a request structure
- * if everything is working correctly
- */
- assert(io_req);
-
- /* Prepare an IO request.
- * This gets sent to the ioc identified by the file offset
- */
- msg[0] = elements;
- msg[1] = ioc_offset;
- msg[2] = context_id;
+static int
+read__independent_async(int n_io_concentrators, hid_t context_id, int64_t offset, int64_t elements,
+ int H5_ATTR_PARALLEL_UNUSED dtype_extent, void *data, io_req_t **io_req)
+{
+ int status = 0;
+ int64_t stripe_size, ioc_row, start_id, ioc_start, ioc_offset;
+ int * io_concentrator = NULL;
+ io_req_t *sf_io_request = NULL;
+ int64_t msg[3] = {
+ 0,
+ };
+
+ subfiling_context_t *sf_context = get__subfiling_object(context_id);
+ assert(sf_context != NULL);
+
+ /* Calculate the IOC that we'll send the IO request to */
+ stripe_size = sf_context->sf_stripe_size;
+
+ start_id = offset / stripe_size;
+ ioc_row = start_id / n_io_concentrators;
+ ioc_offset = (offset % stripe_size) + (ioc_row * stripe_size);
+
+ ioc_start = start_id % n_io_concentrators;
+
+ io_concentrator = sf_context->topology->io_concentrator;
+ assert(io_concentrator != NULL);
+
+ /* Make sure that we can return a request structure
+ * if everything is working correctly
+ */
+ assert(io_req);
+
+ /* Prepare an IO request.
+ * This gets sent to the ioc identified by the file offset
+ */
+ msg[0] = elements;
+ msg[1] = ioc_offset;
+ msg[2] = context_id;
#ifdef VERBOSE
- printf("[%s ioc(%ld)] elements=%ld, offset=%ld, file_offset=%ld\n",
- __func__, ioc_start, elements, offset, ioc_offset);
- fflush(stdout);
-#endif
- status = MPI_Send(msg, 3, MPI_INT64_T, io_concentrator[ioc_start], READ_INDEP,
- sf_context->sf_msg_comm);
-
- if (status != MPI_SUCCESS) {
- int len;
- char estring[MPI_MAX_ERROR_STRING];
- MPI_Error_string(status, estring, &len);
- printf("[%d] ERROR! MPI_Send request header (%ld) "
- "bytes to %d returned an error(%s)\n",
- WORLD_RANK(sf_context), sizeof(msg), io_concentrator[ioc_start], estring);
+ printf("[%s ioc(%ld)] elements=%ld, offset=%ld, file_offset=%ld\n", __func__, ioc_start, elements, offset,
+ ioc_offset);
fflush(stdout);
- return -1;
- }
-
- /* At this point in the new implementation, we should queue
- * the async recv so that when the top level VFD tells us
- * to complete all pending IO requests, we have all the info
- * we need to accomplish that.
- */
- sf_io_request = (io_req_t *)malloc(sizeof(io_req_t));
- assert(sf_io_request);
-
- sf_io_request->completion_func.io_args.ioc = (int)ioc_start;
- sf_io_request->completion_func.io_args.context_id = context_id;
- sf_io_request->completion_func.io_args.offset = offset;
- sf_io_request->completion_func.io_args.elements = elements;
- sf_io_request->completion_func.io_args.data = data;
- sf_io_request->completion_func.io_args.io_req = MPI_REQUEST_NULL;
- sf_io_request->completion_func.io_function = async_completion;
- sf_io_request->completion_func.pending = 0;
-
- sf_io_request->prev = sf_io_request->next = NULL;
- /* Start the actual data transfer */
-
- status = MPI_Irecv(data, (int)elements, MPI_BYTE, io_concentrator[ioc_start],
- READ_INDEP_DATA, sf_context->sf_data_comm,
- &sf_io_request->completion_func.io_args.io_req);
-
- if (status == MPI_SUCCESS) {
- sf_io_request->completion_func.pending = 1;
- *io_req = sf_io_request;
- } else {
- puts("MPI_Irecv must have failed!");
- free(sf_io_request);
- *io_req = NULL;
- }
-
- return status;
-} /* end read__independent_async() */
+#endif
+ status = MPI_Send(msg, 3, MPI_INT64_T, io_concentrator[ioc_start], READ_INDEP, sf_context->sf_msg_comm);
+ if (status != MPI_SUCCESS) {
+ int len;
+ char estring[MPI_MAX_ERROR_STRING];
+ MPI_Error_string(status, estring, &len);
+ printf("[%d] ERROR! MPI_Send request header (%ld) "
+ "bytes to %d returned an error(%s)\n",
+ WORLD_RANK(sf_context), sizeof(msg), io_concentrator[ioc_start], estring);
+ fflush(stdout);
+ return -1;
+ }
+
+ /* At this point in the new implementation, we should queue
+ * the async recv so that when the top level VFD tells us
+ * to complete all pending IO requests, we have all the info
+ * we need to accomplish that.
+ */
+ sf_io_request = (io_req_t *)malloc(sizeof(io_req_t));
+ assert(sf_io_request);
+
+ sf_io_request->completion_func.io_args.ioc = (int)ioc_start;
+ sf_io_request->completion_func.io_args.context_id = context_id;
+ sf_io_request->completion_func.io_args.offset = offset;
+ sf_io_request->completion_func.io_args.elements = elements;
+ sf_io_request->completion_func.io_args.data = data;
+ sf_io_request->completion_func.io_args.io_req = MPI_REQUEST_NULL;
+ sf_io_request->completion_func.io_function = async_completion;
+ sf_io_request->completion_func.pending = 0;
+
+ sf_io_request->prev = sf_io_request->next = NULL;
+ /* Start the actual data transfer */
+
+ status = MPI_Irecv(data, (int)elements, MPI_BYTE, io_concentrator[ioc_start], READ_INDEP_DATA,
+ sf_context->sf_data_comm, &sf_io_request->completion_func.io_args.io_req);
+
+ if (status == MPI_SUCCESS) {
+ sf_io_request->completion_func.pending = 1;
+ *io_req = sf_io_request;
+ }
+ else {
+ puts("MPI_Irecv must have failed!");
+ free(sf_io_request);
+ *io_req = NULL;
+ }
+
+ return status;
+} /* end read__independent_async() */
/*-------------------------------------------------------------------------
* Function: get_ioc_subfile_path
@@ -793,23 +785,23 @@ static int read__independent_async(int n_io_concentrators, hid_t context_id,
* Return: A full filepath which should be copied, e.g. using strdup
*-------------------------------------------------------------------------
*/
-static char *get_ioc_subfile_path(int ioc, int ioc_count,
- subfiling_context_t *sf_context) {
- static char filepath[PATH_MAX];
- char *subfile_dir = NULL;
- char *prefix = sf_context->subfile_prefix;
-
- if (prefix != NULL) {
- sprintf(filepath, "%s/" SF_NODE_LOCAL_TEMPLATE, prefix,
- sf_context->h5_file_id, ioc, ioc_count);
- } else {
- strcpy(filepath, sf_context->filename);
- subfile_dir = strrchr(filepath, '/');
- assert(subfile_dir);
- sprintf(subfile_dir + 1, SF_FILENAME_TEMPLATE, sf_context->h5_file_id, ioc,
- ioc_count);
- }
- return filepath;
+static char *
+get_ioc_subfile_path(int ioc, int ioc_count, subfiling_context_t *sf_context)
+{
+ static char filepath[PATH_MAX];
+ char * subfile_dir = NULL;
+ char * prefix = sf_context->subfile_prefix;
+
+ if (prefix != NULL) {
+ sprintf(filepath, "%s/" SF_FILENAME_TEMPLATE, prefix, sf_context->h5_file_id, ioc, ioc_count);
+ }
+ else {
+ strcpy(filepath, sf_context->h5_filename);
+ subfile_dir = strrchr(filepath, '/');
+ assert(subfile_dir);
+ sprintf(subfile_dir + 1, SF_FILENAME_TEMPLATE, sf_context->h5_file_id, ioc, ioc_count);
+ }
+ return filepath;
} /* end get_ioc_subfile_path() */
/*-------------------------------------------------------------------------
@@ -831,10 +823,12 @@ static char *get_ioc_subfile_path(int ioc, int ioc_count,
* values (-1) indicates an error.
*-------------------------------------------------------------------------
*/
-static int progress_this_pending_io(io_req_t *this_req) {
- assert(this_req);
- assert(this_req->completion_func.io_function);
- return (*this_req->completion_func.io_function)(&this_req->completion_func);
+static int
+progress_this_pending_io(io_req_t *this_req)
+{
+ assert(this_req);
+ assert(this_req->completion_func.io_function);
+ return (*this_req->completion_func.io_function)(&this_req->completion_func);
}
/*-------------------------------------------------------------------------
@@ -848,25 +842,26 @@ static int progress_this_pending_io(io_req_t *this_req) {
* values (-1) indicates an error.
*-------------------------------------------------------------------------
*/
-static int write_data(io_func_t *this_func) {
- int ioc, status;
- int64_t elements;
- void *data;
- int *io_concentrator = NULL;
- subfiling_context_t *sf_context = NULL;
- assert(this_func);
+static int
+write_data(io_func_t *this_func)
+{
+ int ioc, status;
+ int64_t elements;
+ void * data;
+ int * io_concentrator = NULL;
+ subfiling_context_t *sf_context = NULL;
+ assert(this_func);
- sf_context = get__subfiling_object(this_func->io_args.context_id);
+ sf_context = get__subfiling_object(this_func->io_args.context_id);
- assert(sf_context);
+ assert(sf_context);
- io_concentrator = sf_context->topology->io_concentrator;
- ioc = this_func->io_args.ioc;
+ io_concentrator = sf_context->topology->io_concentrator;
+ ioc = this_func->io_args.ioc;
- status = MPI_Isend(data, (int)elements, MPI_BYTE, io_concentrator[ioc],
- WRITE_INDEP_DATA, sf_context->sf_data_comm,
- &this_func->io_args.io_req);
- return status;
+ status = MPI_Isend(data, (int)elements, MPI_BYTE, io_concentrator[ioc], WRITE_INDEP_DATA,
+ sf_context->sf_data_comm, &this_func->io_args.io_req);
+ return status;
}
/*-------------------------------------------------------------------------
@@ -888,43 +883,44 @@ static int write_data(io_func_t *this_func) {
* values (-1) indicates an error.
*-------------------------------------------------------------------------
*/
-static int async_completion(void *arg) {
- struct async_arg {
- int n_reqs;
- MPI_Request *sf_reqs;
- } *in_progress = (struct async_arg *)arg;
-
- assert(arg);
- int status, errors = 0;
- int count = in_progress->n_reqs;
- int n_waiting = count;
- int indices[count];
- MPI_Status stats[count];
- useconds_t delay = 5;
-
- while (n_waiting) {
- int i, ready = 0;
- status = MPI_Testsome(count, in_progress->sf_reqs, &ready, indices, stats);
- if (status != MPI_SUCCESS) {
- int len;
- char estring[MPI_MAX_ERROR_STRING];
- MPI_Error_string(status, estring, &len);
- printf("[%s] MPI_ERROR! MPI_Testsome returned an error(%s)\n", __func__,
- estring);
- fflush(stdout);
- errors++;
- return -1;
- }
+static int
+async_completion(void *arg)
+{
+ struct async_arg {
+ int n_reqs;
+ MPI_Request *sf_reqs;
+ } *in_progress = (struct async_arg *)arg;
+
+ assert(arg);
+ int status, errors = 0;
+ int count = in_progress->n_reqs;
+ int n_waiting = count;
+ int indices[count];
+ MPI_Status stats[count];
+ useconds_t delay = 5;
+
+ while (n_waiting) {
+ int i, ready = 0;
+ status = MPI_Testsome(count, in_progress->sf_reqs, &ready, indices, stats);
+ if (status != MPI_SUCCESS) {
+ int len;
+ char estring[MPI_MAX_ERROR_STRING];
+ MPI_Error_string(status, estring, &len);
+ printf("[%s] MPI_ERROR! MPI_Testsome returned an error(%s)\n", __func__, estring);
+ fflush(stdout);
+ errors++;
+ return -1;
+ }
- if (ready == 0) {
- usleep(delay);
- }
+ if (ready == 0) {
+ usleep(delay);
+ }
- for (i = 0; i < ready; i++) {
- n_waiting--;
+ for (i = 0; i < ready; i++) {
+ n_waiting--;
+ }
}
- }
- return errors;
+ return errors;
}
/*-------------------------------------------------------------------------
@@ -960,137 +956,138 @@ static int async_completion(void *arg) {
* Changes: Initial Version/None.
*-------------------------------------------------------------------------
*/
-static int write__independent_async(int n_io_concentrators, hid_t context_id,
- int64_t offset, int64_t elements,
- int H5_ATTR_PARALLEL_UNUSED dtype_extent,
- const void *data, io_req_t **io_req) {
-
- int ack = 0, active_sends = 0, n_waiting = 0, status = 0;
- int64_t stripe_size, ioc_row, start_id, ioc_start, ioc_offset;
- int *io_concentrator = NULL;
- io_req_t *sf_io_request = NULL;
- MPI_Request ackrequest;
- int64_t msg[3] = {0, };
-
- subfiling_context_t *sf_context = get__subfiling_object(context_id);
- assert(sf_context != NULL);
-
- /* Calculate the IOC that we'll send the IO request to */
- stripe_size = sf_context->sf_stripe_size;
-
- start_id = offset / stripe_size;
- ioc_row = start_id / n_io_concentrators;
- ioc_offset = (offset % stripe_size) + (ioc_row * stripe_size);
- ioc_start = start_id % n_io_concentrators;
-
- io_concentrator = sf_context->topology->io_concentrator;
- assert(io_concentrator != NULL);
-
- /* Make sure that we can return a request structure
- * if everything is working correctly
- */
- assert(io_req);
-
-
- /* Prepare an IO request.
- * This gets sent to the ioc identified by the file offset.
- * (see above: Calculate the IOC))
- */
- msg[0] = elements;
- msg[1] = ioc_offset;
- msg[2] = context_id;
+static int
+write__independent_async(int n_io_concentrators, hid_t context_id, int64_t offset, int64_t elements,
+ int H5_ATTR_PARALLEL_UNUSED dtype_extent, const void *data, io_req_t **io_req)
+{
+
+ int ack = 0, active_sends = 0, n_waiting = 0, status = 0;
+ int64_t stripe_size, ioc_row, start_id, ioc_start, ioc_offset;
+ int * io_concentrator = NULL;
+ io_req_t * sf_io_request = NULL;
+ MPI_Request ackrequest;
+ int64_t msg[3] = {
+ 0,
+ };
+
+ subfiling_context_t *sf_context = get__subfiling_object(context_id);
+ assert(sf_context != NULL);
+
+ /* Calculate the IOC that we'll send the IO request to */
+ stripe_size = sf_context->sf_stripe_size;
+
+ start_id = offset / stripe_size;
+ ioc_row = start_id / n_io_concentrators;
+ ioc_offset = (offset % stripe_size) + (ioc_row * stripe_size);
+ ioc_start = start_id % n_io_concentrators;
+
+ io_concentrator = sf_context->topology->io_concentrator;
+ assert(io_concentrator != NULL);
+
+ /* Make sure that we can return a request structure
+ * if everything is working correctly
+ */
+ assert(io_req);
+
+ /* Prepare an IO request.
+ * This gets sent to the ioc identified by the file offset.
+ * (see above: Calculate the IOC))
+ */
+ msg[0] = elements;
+ msg[1] = ioc_offset;
+ msg[2] = context_id;
#ifdef VERBOSE
- printf("[%s ioc(%ld)] elements=%ld, offset=%ld, file_offset=%ld\n",
- __func__, ioc_start, elements, offset, ioc_offset);
- fflush(stdout);
-#endif
- status = MPI_Send(msg, 3, MPI_INT64_T, io_concentrator[ioc_start],
- WRITE_INDEP, sf_context->sf_msg_comm);
-
- if (status != MPI_SUCCESS) {
- int len;
- char estring[MPI_MAX_ERROR_STRING];
- MPI_Error_string(status, estring, &len);
- printf("[%d] ERROR! MPI_Send of %ld bytes to %d returned an "
- "error(%s)\n",
- WORLD_RANK(sf_context), sizeof(msg), io_concentrator[ioc_start], estring);
+ printf("[%s ioc(%ld)] elements=%ld, offset=%ld, file_offset=%ld\n", __func__, ioc_start, elements, offset,
+ ioc_offset);
fflush(stdout);
- return -1;
- } else
- active_sends++;
- /*
- * We wait for memory to be allocated on the target IOC so that we can
- * start sending user data. Once memory is allocated, we will receive
- * an ACK (or NACK) message from the IOC to allow us to proceed.
- */
- status = MPI_Irecv(&ack, 1, MPI_INT, io_concentrator[ioc_start],
- WRITE_INDEP_ACK, sf_context->sf_data_comm, &ackrequest);
-
- if (status != MPI_SUCCESS) {
- printf("[%d %s] MPI_Irecv failed\n", WORLD_RANK(sf_context), __func__);
- fflush(stdout);
- return -1;
- }
+#endif
+ status = MPI_Send(msg, 3, MPI_INT64_T, io_concentrator[ioc_start], WRITE_INDEP, sf_context->sf_msg_comm);
- n_waiting = active_sends;
+ if (status != MPI_SUCCESS) {
+ int len;
+ char estring[MPI_MAX_ERROR_STRING];
+ MPI_Error_string(status, estring, &len);
+ printf("[%d] ERROR! MPI_Send of %ld bytes to %d returned an "
+ "error(%s)\n",
+ WORLD_RANK(sf_context), sizeof(msg), io_concentrator[ioc_start], estring);
+ fflush(stdout);
+ return -1;
+ }
+ else
+ active_sends++;
+ /*
+ * We wait for memory to be allocated on the target IOC so that we can
+ * start sending user data. Once memory is allocated, we will receive
+ * an ACK (or NACK) message from the IOC to allow us to proceed.
+ */
+ status = MPI_Irecv(&ack, 1, MPI_INT, io_concentrator[ioc_start], WRITE_INDEP_ACK,
+ sf_context->sf_data_comm, &ackrequest);
- while (n_waiting) {
- int flag = 0;
- status = MPI_Test(&ackrequest, &flag, MPI_STATUS_IGNORE);
- if (status == MPI_SUCCESS) {
- if (flag == 0)
- usleep(0);
- else {
- n_waiting--;
- if (ack == 0) { /* NACK */
- printf("%s - Received NACK!\n", __func__);
+ if (status != MPI_SUCCESS) {
+ printf("[%d %s] MPI_Irecv failed\n", WORLD_RANK(sf_context), __func__);
+ fflush(stdout);
+ return -1;
+ }
+
+ n_waiting = active_sends;
+
+ while (n_waiting) {
+ int flag = 0;
+ status = MPI_Test(&ackrequest, &flag, MPI_STATUS_IGNORE);
+ if (status == MPI_SUCCESS) {
+ if (flag == 0)
+ usleep(0);
+ else {
+ n_waiting--;
+ if (ack == 0) { /* NACK */
+ printf("%s - Received NACK!\n", __func__);
+ }
+ }
}
- }
}
- }
-
- /* At this point in the new implementation, we should queue
- * the async write so that when the top level VFD tells us
- * to complete all pending IO requests, we have all the info
- * we need to accomplish that.
- */
- sf_io_request = (io_req_t *)malloc(sizeof(io_req_t));
- assert(sf_io_request);
-
- sf_io_request->completion_func.io_args.ioc = (int)ioc_start;
- sf_io_request->completion_func.io_args.context_id = context_id;
- sf_io_request->completion_func.io_args.offset = offset;
- sf_io_request->completion_func.io_args.elements = elements;
- sf_io_request->completion_func.io_args.data = cast_to_void(data);
- sf_io_request->completion_func.io_args.io_req = MPI_REQUEST_NULL;
- sf_io_request->completion_func.io_function = async_completion;
- sf_io_request->completion_func.pending = 0;
-
- sf_io_request->prev = sf_io_request->next = NULL;
- /* Start the actual data transfer */
-
- status = MPI_Isend(data, (int)elements, MPI_BYTE, io_concentrator[ioc_start],
- WRITE_INDEP_DATA, sf_context->sf_data_comm,
- &sf_io_request->completion_func.io_args.io_req);
-
- /* When we actually have the async IO support,
- * the request should be queued before we
- * return to the caller.
- * Having queued the IO operation, we might want to
- * get additional work started before allowing the
- * queued IO requests to make further progress and/or
- * to complete, so we just return to the caller.
- */
-
- if (status == MPI_SUCCESS) {
- sf_io_request->completion_func.pending = 1;
- *io_req = sf_io_request;
- } else {
- puts("MPI_Isend must have failed!");
- free(sf_io_request);
- *io_req = NULL;
- }
- return status;
+
+ /* At this point in the new implementation, we should queue
+ * the async write so that when the top level VFD tells us
+ * to complete all pending IO requests, we have all the info
+ * we need to accomplish that.
+ */
+ sf_io_request = (io_req_t *)malloc(sizeof(io_req_t));
+ assert(sf_io_request);
+
+ sf_io_request->completion_func.io_args.ioc = (int)ioc_start;
+ sf_io_request->completion_func.io_args.context_id = context_id;
+ sf_io_request->completion_func.io_args.offset = offset;
+ sf_io_request->completion_func.io_args.elements = elements;
+ sf_io_request->completion_func.io_args.data = cast_to_void(data);
+ sf_io_request->completion_func.io_args.io_req = MPI_REQUEST_NULL;
+ sf_io_request->completion_func.io_function = async_completion;
+ sf_io_request->completion_func.pending = 0;
+
+ sf_io_request->prev = sf_io_request->next = NULL;
+ /* Start the actual data transfer */
+
+ status = MPI_Isend(data, (int)elements, MPI_BYTE, io_concentrator[ioc_start], WRITE_INDEP_DATA,
+ sf_context->sf_data_comm, &sf_io_request->completion_func.io_args.io_req);
+
+ /* When we actually have the async IO support,
+ * the request should be queued before we
+ * return to the caller.
+ * Having queued the IO operation, we might want to
+ * get additional work started before allowing the
+ * queued IO requests to make further progress and/or
+ * to complete, so we just return to the caller.
+ */
+
+ if (status == MPI_SUCCESS) {
+ sf_io_request->completion_func.pending = 1;
+ *io_req = sf_io_request;
+ }
+ else {
+ puts("MPI_Isend must have failed!");
+ free(sf_io_request);
+ *io_req = NULL;
+ }
+ return status;
} /* end write__independent_async() */
/*
@@ -1108,81 +1105,82 @@ static int write__independent_async(int n_io_concentrators, hid_t context_id,
*
* Return: SUCCEED if no errors, FAIL otherwise.
*/
-herr_t H5FD__write_vector_internal(hid_t h5_fid, hssize_t count,
- haddr_t addrs[], size_t sizes[],
- const void *bufs[] /* data_in */) {
- herr_t ret_value = SUCCEED;
- hssize_t status = 0, k = 0;
- hid_t sf_context_id = fid_map_to_context((uint64_t)h5_fid);
- subfiling_context_t *sf_context = NULL;
- io_req_t **sf_async_reqs = NULL;
- MPI_Request *active_reqs = NULL;
- struct __mpi_req {
- int n_reqs;
- MPI_Request *active_reqs;
- } *mpi_reqs = NULL;
-
- sf_context = get__subfiling_object(sf_context_id);
- assert(sf_context != NULL);
-
- active_reqs = (MPI_Request *)calloc((size_t)(count+2), sizeof(MPI_Request));
- assert(active_reqs);
-
- sf_async_reqs = (io_req_t **)calloc((size_t)count, sizeof(void *));
- assert(sf_async_reqs);
-
- /*
- * Note: We allocated extra space in the active_requests (above).
- * The extra should be enough for an integer plus a pointer.
- */
- mpi_reqs = (struct __mpi_req *)&active_reqs[count];
- mpi_reqs->n_reqs = (int)count;
- mpi_reqs->active_reqs = active_reqs;
-
- /* Each pass thru the following should queue an MPI write
- * to a new IOC. Both the IOC selection and offset within the
- * particular subfile are based on the combinatation of striping
- * factors and the virtual file offset (addrs[k]).
- */
- for (k = 0; k < count; k++) {
- if (sizes[k] == 0) {
- puts("Something wrong with the size argument: size is 0!");
- fflush(stdout);
- }
- status = write__independent_async(
- sf_context->topology->n_io_concentrators, sf_context_id,
- (int64_t)addrs[k],(int64_t)sizes[k], 1, bufs[k], &sf_async_reqs[k]);
- if (status < 0) {
- printf("%s - encountered an internal error!\n", __func__);
- goto errors;
- } else {
- mpi_reqs->active_reqs[k] =
- sf_async_reqs[k]->completion_func.io_args.io_req;
- }
- }
+herr_t
+H5FD__write_vector_internal(hid_t h5_fid, hssize_t count, haddr_t addrs[], size_t sizes[],
+ const void *bufs[] /* data_in */)
+{
+ herr_t ret_value = SUCCEED;
+ hssize_t status = 0, k = 0;
+ hid_t sf_context_id = fid_map_to_context((uint64_t)h5_fid);
+ subfiling_context_t *sf_context = NULL;
+ io_req_t ** sf_async_reqs = NULL;
+ MPI_Request * active_reqs = NULL;
+ struct __mpi_req {
+ int n_reqs;
+ MPI_Request *active_reqs;
+ } *mpi_reqs = NULL;
+
+ sf_context = get__subfiling_object(sf_context_id);
+ assert(sf_context != NULL);
- /* Here, we should have queued 'count' async requests.
- * We can can now try to complete those before returning
- * to the caller for the next set of IO operations.
- */
- if (sf_async_reqs[0]->completion_func.io_function)
- ret_value = (*sf_async_reqs[0]->completion_func.io_function)(mpi_reqs);
+ active_reqs = (MPI_Request *)calloc((size_t)(count + 2), sizeof(struct __mpi_req));
+ assert(active_reqs);
- if (active_reqs)
- free(active_reqs);
+ sf_async_reqs = (io_req_t **)calloc((size_t)count, sizeof(void *));
+ assert(sf_async_reqs);
- if (sf_async_reqs) {
+ /*
+ * Note: We allocated extra space in the active_requests (above).
+ * The extra should be enough for an integer plus a pointer.
+ */
+ mpi_reqs = (struct __mpi_req *)&active_reqs[count];
+ mpi_reqs->n_reqs = (int)count;
+ mpi_reqs->active_reqs = active_reqs;
+
+ /* Each pass thru the following should queue an MPI write
+ * to a new IOC. Both the IOC selection and offset within the
+ * particular subfile are based on the combinatation of striping
+ * factors and the virtual file offset (addrs[k]).
+ */
for (k = 0; k < count; k++) {
- if (sf_async_reqs[k]) {
- free(sf_async_reqs[k]);
- }
+ if (sizes[k] == 0) {
+ puts("Something wrong with the size argument: size is 0!");
+ fflush(stdout);
+ }
+ status =
+ write__independent_async(sf_context->topology->n_io_concentrators, sf_context_id,
+ (int64_t)addrs[k], (int64_t)sizes[k], 1, bufs[k], &sf_async_reqs[k]);
+ if (status < 0) {
+ printf("%s - encountered an internal error!\n", __func__);
+ goto errors;
+ }
+ else {
+ mpi_reqs->active_reqs[k] = sf_async_reqs[k]->completion_func.io_args.io_req;
+ }
+ }
+
+ /* Here, we should have queued 'count' async requests.
+ * We can can now try to complete those before returning
+ * to the caller for the next set of IO operations.
+ */
+ if (sf_async_reqs[0]->completion_func.io_function)
+ ret_value = (*sf_async_reqs[0]->completion_func.io_function)(mpi_reqs);
+
+ if (active_reqs)
+ free(active_reqs);
+
+ if (sf_async_reqs) {
+ for (k = 0; k < count; k++) {
+ if (sf_async_reqs[k]) {
+ free(sf_async_reqs[k]);
+ }
+ }
+ free(sf_async_reqs);
}
- free(sf_async_reqs);
- }
- return ret_value;
+ return ret_value;
errors:
- return FAIL;
+ return FAIL;
}
/*
@@ -1190,91 +1188,95 @@ errors:
* The H5FD__ioc_read_vector VFD call included additional 'hid_t dxpl'
* and 'H5FD_mem_t types[]'. These are now removed.
*/
-herr_t H5FD__read_vector_internal(hid_t h5_fid, hssize_t count, haddr_t addrs[],
- size_t sizes[],
- void *bufs[] /* data_out */) {
- herr_t ret_value = SUCCEED;
- hssize_t status = 0, k = 0;
- hid_t sf_context_id = fid_map_to_context((uint64_t)h5_fid);
- subfiling_context_t *sf_context = NULL;
- io_req_t **sf_async_reqs = NULL;
- MPI_Request *active_reqs = NULL;
- struct __mpi_req {
- int n_reqs;
- MPI_Request *active_reqs;
- } *mpi_reqs = NULL;
-
- sf_context = get__subfiling_object(sf_context_id);
- assert(sf_context != NULL);
-
- active_reqs = (MPI_Request *)calloc((size_t)(count+2), sizeof(MPI_Request));
- assert(active_reqs);
-
- sf_async_reqs = (io_req_t **)calloc((size_t)count, sizeof(void *));
- assert(sf_async_reqs);
-
- /*
- * Note: We allocated extra space in the active_requests (above).
- * The extra should be enough for an integer plus a pointer.
- */
- mpi_reqs = (struct __mpi_req *)&active_reqs[count];
- mpi_reqs->n_reqs = (int)count;
- mpi_reqs->active_reqs = active_reqs;
-
- for (k = 0; k < count; k++) {
- status = read__independent_async(
- sf_context->topology->n_io_concentrators, sf_context_id,
- (int64_t)addrs[k], (int64_t)sizes[k], 1, bufs[k], &sf_async_reqs[k]);
- if (status < 0) {
- printf("%s - encountered an internal error!\n", __func__);
- goto errors;
- } else {
- mpi_reqs->active_reqs[k] =
- sf_async_reqs[k]->completion_func.io_args.io_req;
- }
- }
- /* Here, we should have queued 'count' async requests
- * (one to each required IOC).
- *
- * We can can now try to complete those before returning
- * to the caller for the next set of IO operations.
- */
- if (sf_async_reqs[0]->completion_func.io_function)
- ret_value = (*sf_async_reqs[0]->completion_func.io_function)(mpi_reqs);
-
- if (active_reqs)
- free(active_reqs);
-
- if (sf_async_reqs) {
+herr_t
+H5FD__read_vector_internal(hid_t h5_fid, hssize_t count, haddr_t addrs[], size_t sizes[],
+ void *bufs[] /* data_out */)
+{
+ herr_t ret_value = SUCCEED;
+ hssize_t status = 0, k = 0;
+ hid_t sf_context_id = fid_map_to_context((uint64_t)h5_fid);
+ subfiling_context_t *sf_context = NULL;
+ io_req_t ** sf_async_reqs = NULL;
+ MPI_Request * active_reqs = NULL;
+ struct __mpi_req {
+ int n_reqs;
+ MPI_Request *active_reqs;
+ } *mpi_reqs = NULL;
+
+ sf_context = get__subfiling_object(sf_context_id);
+ assert(sf_context != NULL);
+
+ active_reqs = (MPI_Request *)calloc((size_t)(count + 2), sizeof(struct __mpi_req));
+ assert(active_reqs);
+
+ sf_async_reqs = (io_req_t **)calloc((size_t)count, sizeof(void *));
+ assert(sf_async_reqs);
+
+ /*
+ * Note: We allocated extra space in the active_requests (above).
+ * The extra should be enough for an integer plus a pointer.
+ */
+ mpi_reqs = (struct __mpi_req *)&active_reqs[count];
+ mpi_reqs->n_reqs = (int)count;
+ mpi_reqs->active_reqs = active_reqs;
+
for (k = 0; k < count; k++) {
- if (sf_async_reqs[k]) {
- free(sf_async_reqs[k]);
- }
+ status = read__independent_async(sf_context->topology->n_io_concentrators, sf_context_id,
+ (int64_t)addrs[k], (int64_t)sizes[k], 1, bufs[k], &sf_async_reqs[k]);
+ if (status < 0) {
+ printf("%s - encountered an internal error!\n", __func__);
+ goto errors;
+ }
+ else {
+ mpi_reqs->active_reqs[k] = sf_async_reqs[k]->completion_func.io_args.io_req;
+ }
}
- free(sf_async_reqs);
- }
- return ret_value;
+ /* Here, we should have queued 'count' async requests
+ * (one to each required IOC).
+ *
+ * We can can now try to complete those before returning
+ * to the caller for the next set of IO operations.
+ */
+ if (sf_async_reqs[0]->completion_func.io_function)
+ ret_value = (*sf_async_reqs[0]->completion_func.io_function)(mpi_reqs);
+
+ if (active_reqs)
+ free(active_reqs);
+
+ if (sf_async_reqs) {
+ for (k = 0; k < count; k++) {
+ if (sf_async_reqs[k]) {
+ free(sf_async_reqs[k]);
+ }
+ }
+ free(sf_async_reqs);
+ }
+ return ret_value;
errors:
- return FAIL;
+ return FAIL;
}
-int sf_truncate(hid_t h5_fid, haddr_t H5_ATTR_PARALLEL_UNUSED addr) {
- hid_t sf_context_id = fid_map_to_context((uint64_t)h5_fid);
- subfiling_context_t *sf_context = get__subfiling_object(sf_context_id);
+int
+sf_truncate(hid_t h5_fid, haddr_t H5_ATTR_PARALLEL_UNUSED addr)
+{
+ hid_t sf_context_id = fid_map_to_context((uint64_t)h5_fid);
+ subfiling_context_t *sf_context = get__subfiling_object(sf_context_id);
- assert(sf_context != NULL);
- return 0;
+ assert(sf_context != NULL);
+ return 0;
}
-int sf_shutdown_local_ioc(hid_t fid) {
- hid_t context_id = fid_map_to_context((uint64_t)fid);
- subfiling_context_t *sf_context = get__subfiling_object(context_id);
- assert(sf_context != NULL);
- if (sf_context->topology->rank_is_ioc) {
- sf_shutdown_flag = 1;
- }
- return 0;
+int
+sf_shutdown_local_ioc(hid_t fid)
+{
+ hid_t context_id = fid_map_to_context((uint64_t)fid);
+ subfiling_context_t *sf_context = get__subfiling_object(context_id);
+ assert(sf_context != NULL);
+ if (sf_context->topology->rank_is_ioc) {
+ atomic_fetch_add(&sf_shutdown_flag, 1);
+ }
+ return 0;
}
/*-------------------------------------------------------------------------
@@ -1339,119 +1341,121 @@ int sf_shutdown_local_ioc(hid_t fid) {
* Changes: Initial Version/None.
*-------------------------------------------------------------------------
*/
-int ioc_main(int64_t context_id) {
- int subfile_rank;
- int flag, ret;
- int max_work_depth;
- int shutdown_requested;
- MPI_Status status, msg_status;
- sf_work_request_t *incoming_requests = NULL;
- useconds_t delay = 20;
- subfiling_context_t *context = get__subfiling_object(context_id);
- double queue_start_time;
-
- assert(context != NULL);
- /* We can't have opened any files at this point..
- * The file open approach has changed so that the normal
- * application rank (hosting this thread) does the file open.
- * We can simply utilize the file descriptor (which should now
- * represent an open file).
- */
-
- subfile_rank = context->sf_group_rank;
-
- if (request_count_per_rank == NULL) {
- request_count_per_rank = (int *)calloc((size_t)WORLD_SIZE(context), sizeof(int));
- assert(request_count_per_rank != NULL);
- }
-
- max_work_depth = MAX(8, WORLD_SIZE(context) * MAX_WORK_PER_RANK);
- incoming_requests = (sf_work_request_t *)calloc((size_t)(max_work_depth + 1),
- sizeof(sf_work_request_t));
-
- /* Validate that the allocation succeeded */
- assert(incoming_requests != NULL);
-
- /* Initialize atomic vars */
- atomic_init(&sf_workinprogress, 0);
- atomic_init(&sf_work_pending, 0);
- atomic_init(&sf_file_close_count, 0);
- atomic_init(&sf_file_refcount, 0);
- atomic_init(&sf_ioc_fini_refcount, 0);
- atomic_init(&sf_ioc_ready, 1);
- shutdown_requested = 0;
-
- while (!shutdown_requested || sf_work_pending) {
- flag = 0;
- ret = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, context->sf_msg_comm, &flag,
- &status);
- if ((ret == MPI_SUCCESS) && (flag != 0)) {
- sf_work_request_t *msg = NULL;
- int count;
- int index = 0;
- int request_size = (int)sizeof(sf_work_request_t);
- int source = status.MPI_SOURCE;
- int tag = status.MPI_TAG;
-
- MPI_Get_count(&status, MPI_BYTE, &count);
- if (count > request_size) {
- msg = (sf_work_request_t *)malloc((size_t)count);
- ret = MPI_Recv(msg, count, MPI_BYTE, source, tag, context->sf_msg_comm,
- &msg_status);
- } else {
- index = atomic_load(&sf_workinprogress);
- ret = MPI_Recv(&incoming_requests[index], count, MPI_BYTE, source, tag,
- context->sf_msg_comm, &msg_status);
- if (MPI_SUCCESS == ret) {
- int howmany = 0;
- MPI_Get_count(&msg_status, MPI_BYTE, &howmany);
- if (howmany != count) {
- printf("%s: MPI_Recv completed %d bytes of %d\n", __func__, howmany,
- count);
- fflush(stdout);
- }
+int
+ioc_main(int64_t context_id)
+{
+ int subfile_rank;
+ int flag, ret;
+ int max_work_depth;
+ int shutdown_requested;
+ MPI_Status status, msg_status;
+ sf_work_request_t * incoming_requests = NULL;
+ useconds_t delay = 20;
+ subfiling_context_t *context = get__subfiling_object(context_id);
+ double queue_start_time;
+
+ assert(context != NULL);
+ /* We can't have opened any files at this point..
+ * The file open approach has changed so that the normal
+ * application rank (hosting this thread) does the file open.
+ * We can simply utilize the file descriptor (which should now
+ * represent an open file).
+ */
+
+ subfile_rank = context->sf_group_rank;
+
+ if (request_count_per_rank == NULL) {
+ request_count_per_rank = (int *)calloc((size_t)WORLD_SIZE(context), sizeof(int));
+ assert(request_count_per_rank != NULL);
+ }
+
+ max_work_depth = MAX(8, WORLD_SIZE(context) * MAX_WORK_PER_RANK);
+ incoming_requests = (sf_work_request_t *)calloc((size_t)(max_work_depth + 1), sizeof(sf_work_request_t));
+
+ /* Validate that the allocation succeeded */
+ assert(incoming_requests != NULL);
+
+ /* Initialize atomic vars */
+ atomic_init(&sf_workinprogress, 0);
+ atomic_init(&sf_work_pending, 0);
+ atomic_init(&sf_file_close_count, 0);
+ atomic_init(&sf_file_refcount, 0);
+ atomic_init(&sf_ioc_fini_refcount, 0);
+ atomic_init(&sf_shutdown_flag, 0);
+ atomic_init(&sf_ioc_ready, 1);
+ shutdown_requested = 0;
+
+ while (!shutdown_requested || sf_work_pending) {
+ flag = 0;
+ ret = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, context->sf_msg_comm, &flag, &status);
+ if ((ret == MPI_SUCCESS) && (flag != 0)) {
+ sf_work_request_t *msg = NULL;
+ int count;
+ int index = 0;
+ int request_size = (int)sizeof(sf_work_request_t);
+ int source = status.MPI_SOURCE;
+ int tag = status.MPI_TAG;
+
+ MPI_Get_count(&status, MPI_BYTE, &count);
+ if (count > request_size) {
+ msg = (sf_work_request_t *)malloc((size_t)count);
+ ret = MPI_Recv(msg, count, MPI_BYTE, source, tag, context->sf_msg_comm, &msg_status);
+ }
+ else {
+ index = atomic_load(&sf_workinprogress);
+ ret = MPI_Recv(&incoming_requests[index], count, MPI_BYTE, source, tag, context->sf_msg_comm,
+ &msg_status);
+ if (MPI_SUCCESS == ret) {
+ int howmany = 0;
+ MPI_Get_count(&msg_status, MPI_BYTE, &howmany);
+ if (howmany != count) {
+ printf("%s: MPI_Recv completed %d bytes of %d\n", __func__, howmany, count);
+ fflush(stdout);
+ }
+ }
+ }
+ queue_start_time = MPI_Wtime();
+ if (ret == MPI_SUCCESS) {
+ if (msg) {
+ printf("%s: non-std msg=(%p) from %d\n", __func__, (void *)msg, source);
+ fflush(stdout);
+
+ msg->source = source;
+ msg->subfile_rank = subfile_rank;
+ msg->context_id = context->sf_context_id;
+ msg->start_time = queue_start_time;
+ tpool_add_work(msg);
+ }
+ else {
+ incoming_requests[index].tag = tag;
+ incoming_requests[index].source = source;
+ incoming_requests[index].subfile_rank = subfile_rank;
+ incoming_requests[index].start_time = queue_start_time;
+ incoming_requests[index].buffer = NULL;
+ tpool_add_work(&incoming_requests[index]);
+ if (index == max_work_depth - 1) {
+ atomic_init(&sf_workinprogress, 0);
+ }
+ else {
+ atomic_fetch_add(&sf_workinprogress, 1); // atomic
+ }
+ }
+ }
}
- }
- queue_start_time = MPI_Wtime();
- if (ret == MPI_SUCCESS) {
- if (msg) {
- printf("%s: non-std msg=(%p) from %d\n", __func__, (void *)msg, source);
- fflush(stdout);
-
- msg->source = source;
- msg->subfile_rank = subfile_rank;
- msg->context_id = context->sf_context_id;
- msg->start_time = queue_start_time;
- tpool_add_work(msg);
- } else {
- incoming_requests[index].tag = tag;
- incoming_requests[index].source = source;
- incoming_requests[index].subfile_rank = subfile_rank;
- incoming_requests[index].start_time = queue_start_time;
- incoming_requests[index].buffer = NULL;
- incoming_requests[index].completed = 0;
- tpool_add_work(&incoming_requests[index]);
- if (index == max_work_depth - 1) {
- atomic_init(&sf_workinprogress, 0);
- } else {
- atomic_fetch_add(&sf_workinprogress, 1); // atomic
- }
+ else {
+ usleep(delay);
}
- }
- } else {
- usleep(delay);
+ shutdown_requested = atomic_load(&sf_shutdown_flag);
}
- shutdown_requested = sf_shutdown_flag;
- }
- if (incoming_requests) {
- free(incoming_requests);
- }
+ if (incoming_requests) {
+ free(incoming_requests);
+ }
- /* Reset the shutdown flag */
- sf_shutdown_flag = 0;
+ /* Reset the shutdown flag */
+ atomic_init(&sf_shutdown_flag, 0);
- return 0;
+ return 0;
}
/*
@@ -1460,33 +1464,35 @@ Private helper functions
=========================================
*/
-static int send_ack__(int target, int subfile_rank, int tag, MPI_Comm comm) {
- int ack = 1;
- int ret = MPI_Send(&ack, 1, MPI_INT, target, tag, comm);
+static int
+send_ack__(int target, int subfile_rank, int tag, MPI_Comm comm)
+{
+ int ack = 1;
+ int ret = MPI_Send(&ack, 1, MPI_INT, target, tag, comm);
#ifndef NDEBUG
- if (sf_verbose_flag) {
- if (sf_logfile) {
- fprintf(sf_logfile, "[ioc(%d): Sending ACK to MPI_rank(%d)\n",
- subfile_rank, target);
+ if (sf_verbose_flag) {
+ if (sf_logfile) {
+ fprintf(sf_logfile, "[ioc(%d): Sending ACK to MPI_rank(%d)\n", subfile_rank, target);
+ }
}
- }
#endif
- return ret;
+ return ret;
}
-static int send_nack__(int target, int subfile_rank, int tag, MPI_Comm comm) {
- int nack = 0;
- int ret = MPI_Send(&nack, 1, MPI_INT, target, tag, comm);
+static int
+send_nack__(int target, int subfile_rank, int tag, MPI_Comm comm)
+{
+ int nack = 0;
+ int ret = MPI_Send(&nack, 1, MPI_INT, target, tag, comm);
#ifndef NDEBUG
- if (sf_verbose_flag) {
- if (sf_logfile) {
- fprintf(sf_logfile, "[ioc(%d): Sending NACK to MPI_rank(%d)\n",
- subfile_rank, target);
+ if (sf_verbose_flag) {
+ if (sf_logfile) {
+ fprintf(sf_logfile, "[ioc(%d): Sending NACK to MPI_rank(%d)\n", subfile_rank, target);
+ }
}
- }
#endif
- return ret;
+ return ret;
}
/*
@@ -1517,106 +1523,118 @@ from the thread pool threads...
*
*-------------------------------------------------------------------------
*/
-int queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source,
- MPI_Comm comm) {
- int fd;
- char *recv_buffer = NULL;
- int ret = MPI_SUCCESS;
- MPI_Status msg_status;
- int64_t data_size = msg->header[0];
- int64_t file_offset = msg->header[1];
- int64_t file_context_id = msg->header[2];
- double t_start, t_end;
- double t_write, t_wait, t_queue_delay;
- subfiling_context_t *sf_context = get__subfiling_object(file_context_id);
- assert(sf_context != NULL);
-
- /* flag that we've attempted to write data to the file */
- sf_context->sf_write_count++;
- /* For debugging performance */
- sf_write_ops++;
-
- t_start = MPI_Wtime();
- t_queue_delay = t_start - msg->start_time;
+int
+queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm)
+{
+ int fd;
+ char * recv_buffer = NULL;
+ int ret = MPI_SUCCESS;
+ MPI_Status msg_status;
+ int64_t data_size = msg->header[0];
+ int64_t file_offset = msg->header[1];
+ int64_t file_context_id = msg->header[2];
+ double t_start, t_end;
+ double t_write, t_wait, t_queue_delay;
+ subfiling_context_t *sf_context = get__subfiling_object(file_context_id);
+ int64_t stripe_id = file_offset + data_size;
+ haddr_t sf_eof;
+ assert(sf_context != NULL);
+
+ sf_eof = (haddr_t)(stripe_id % sf_context->sf_stripe_size);
+ stripe_id /= sf_context->sf_stripe_size;
+ sf_eof += (haddr_t)((stripe_id * sf_context->sf_blocksize_per_stripe) + sf_context->sf_base_addr);
+
+ /* flag that we've attempted to write data to the file */
+ sf_context->sf_write_count++;
+ /* For debugging performance */
+ sf_write_ops++;
+
+ t_start = MPI_Wtime();
+ t_queue_delay = t_start - msg->start_time;
#ifndef NDEBUG
- if (sf_verbose_flag) {
- if (sf_logfile) {
- fprintf(sf_logfile,
- "[ioc(%d) %s]: msg from %d: datasize=%ld\toffset=%ld, "
- "queue_delay = %lf seconds\n",
- subfile_rank, __func__, source, data_size, file_offset,
- t_queue_delay);
+ if (sf_verbose_flag) {
+ if (sf_logfile) {
+ fprintf(sf_logfile,
+ "[ioc(%d) %s]: msg from %d: datasize=%ld\toffset=%ld, "
+ "queue_delay = %lf seconds\n",
+ subfile_rank, __func__, source, data_size, file_offset, t_queue_delay);
+ }
}
- }
#endif
- if (recv_buffer == NULL) {
- if ((recv_buffer = (char *)malloc((size_t)data_size)) == NULL) {
- perror("malloc");
- send_nack__(source, subfile_rank, WRITE_INDEP_ACK, comm);
- return -1;
+ if (recv_buffer == NULL) {
+ if ((recv_buffer = (char *)malloc((size_t)data_size)) == NULL) {
+ perror("malloc");
+ send_nack__(source, subfile_rank, WRITE_INDEP_ACK, comm);
+ return -1;
+ }
}
- }
- send_ack__(source, subfile_rank, WRITE_INDEP_ACK, comm);
- ret = MPI_Recv(recv_buffer, (int)data_size, MPI_BYTE, source,
- WRITE_INDEP_DATA, comm, &msg_status);
+ send_ack__(source, subfile_rank, WRITE_INDEP_ACK, comm);
+ ret = MPI_Recv(recv_buffer, (int)data_size, MPI_BYTE, source, WRITE_INDEP_DATA, comm, &msg_status);
- t_end = MPI_Wtime();
- t_wait = t_end - t_start;
- sf_write_wait_time += t_wait;
- t_start = t_end;
+ t_end = MPI_Wtime();
+ t_wait = t_end - t_start;
+ sf_write_wait_time += t_wait;
+ t_start = t_end;
#ifndef NDEBUG
- if (sf_verbose_flag) {
- if (sf_logfile) {
- fprintf(sf_logfile,
- "[ioc(%d) %s] MPI_Recv(%ld bytes, from = %d) status = %d\n",
- subfile_rank, __func__, data_size, source, ret);
+ if (sf_verbose_flag) {
+ if (sf_logfile) {
+ fprintf(sf_logfile, "[ioc(%d) %s] MPI_Recv(%ld bytes, from = %d) status = %d\n", subfile_rank,
+ __func__, data_size, source, ret);
+ }
}
- }
#endif
- if (ret != MPI_SUCCESS) {
- int len;
- char estring[MPI_MAX_ERROR_STRING];
- MPI_Error_string(ret, estring, &len);
- printf("[ioc(%d) %s] MPI_ERROR(%d)! MPI_Recv of %ld bytes from %d "
- "returned an error(%s)\n",
- subfile_rank, __func__, msg_status.MPI_ERROR, data_size, source,
- estring);
- fflush(stdout);
- return ret;
- }
+ if (ret != MPI_SUCCESS) {
+ int len;
+ char estring[MPI_MAX_ERROR_STRING];
+ MPI_Error_string(ret, estring, &len);
+ printf("[ioc(%d) %s] MPI_ERROR(%d)! MPI_Recv of %ld bytes from %d "
+ "returned an error(%s)\n",
+ subfile_rank, __func__, msg_status.MPI_ERROR, data_size, source, estring);
+ fflush(stdout);
+ return ret;
+ }
+
+ if (msg->serialize)
+ ioc__wait_for_serialize(msg);
+
+ fd = sf_context->sf_fid;
+
+ if (fd < 0) {
+ printf("[ioc(%d)] WARNING: %s called while subfile_fid = %d (closed)\n", subfile_rank, __func__, fd);
+ fflush(stdout);
+ }
+ else {
+ if (sf_write_data(fd, file_offset, recv_buffer, data_size, subfile_rank) < 0) {
+ free(recv_buffer);
+ recv_buffer = NULL;
+ printf("[ioc(%d) %s] sf_write_data returned an error!\n", subfile_rank, __func__);
+ fflush(stdout);
+ return -1;
+ }
+ t_end = MPI_Wtime();
+ t_write = t_end - t_start;
+ sf_pwrite_time += t_write;
+ }
+
+ sf_queue_delay_time += t_queue_delay;
+
+ /* Done... */
+ if (sf_eof > sf_context->sf_eof)
+ sf_context->sf_eof = sf_eof;
- fd = sf_context->sf_fid;
- if (fd < 0) {
- printf("[ioc(%d)] WARNING: %s called while subfile_fid = %d (closed)\n",
- subfile_rank, __func__, fd);
+#ifdef VERBOSE
+ printf("[ioc(%d)] %s local sf_eof = %ld sf_context=%p\n", subfile_rank, __func__, sf_context->sf_eof,
+ (void *)sf_context);
fflush(stdout);
- } else {
- if (sf_write_data(fd, file_offset, recv_buffer, data_size, subfile_rank) <
- 0) {
- free(recv_buffer);
- recv_buffer = NULL;
- printf("[ioc(%d) %s] sf_write_data returned an error!\n", subfile_rank,
- __func__);
- fflush(stdout);
- return -1;
+#endif
+ if (recv_buffer) {
+ free(recv_buffer);
}
- t_end = MPI_Wtime();
- t_write = t_end - t_start;
- sf_pwrite_time += t_write;
- }
-
- sf_queue_delay_time += t_queue_delay;
-
- /* Done... */
- msg->completed = 1;
- if (recv_buffer) {
- free(recv_buffer);
- }
- return 0;
+ return 0;
}
/*-------------------------------------------------------------------------
@@ -1639,146 +1657,146 @@ int queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source,
*
*-------------------------------------------------------------------------
*/
-int queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source,
- MPI_Comm comm) {
- int fd;
- char *send_buffer = NULL;
- int ret = MPI_SUCCESS;
- int64_t data_size = msg->header[0];
- int64_t file_offset = msg->header[1];
- int64_t file_context_id = msg->header[2];
- double t_start, t_end;
- double t_read, t_queue_delay;
-
- subfiling_context_t *sf_context = get__subfiling_object(file_context_id);
- assert(sf_context != NULL);
-
- sf_context->sf_read_count++;
- /* For debugging performance */
- sf_read_ops++;
-
- t_start = MPI_Wtime();
- t_queue_delay = t_start - msg->start_time;
-
- fd = sf_context->sf_fid;
- if (fd < 0) {
- printf("[ioc(%d) %s] subfile(%d) file descriptor not valid\n", subfile_rank,
- __func__, fd);
- return -1;
- }
+int
+queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm)
+{
+ int fd;
+ char * send_buffer = NULL;
+ int ret = MPI_SUCCESS;
+ int64_t data_size = msg->header[0];
+ int64_t file_offset = msg->header[1];
+ int64_t file_context_id = msg->header[2];
+ double t_start, t_end;
+ double t_read, t_queue_delay;
+
+ subfiling_context_t *sf_context = get__subfiling_object(file_context_id);
+ assert(sf_context != NULL);
+
+ sf_context->sf_read_count++;
+ /* For debugging performance */
+ sf_read_ops++;
+
+ t_start = MPI_Wtime();
+ t_queue_delay = t_start - msg->start_time;
+
+ fd = sf_context->sf_fid;
+ if (fd < 0) {
+ printf("[ioc(%d) %s] subfile(%d) file descriptor not valid\n", subfile_rank, __func__, fd);
+ return -1;
+ }
#ifndef NDEBUG
- if (sf_verbose_flag && (sf_logfile != NULL)) {
- fprintf(sf_logfile,
- "[ioc(%d) %s] msg from %d: datasize=%ld\toffset=%ld "
- "queue_delay=%lf seconds\n",
- subfile_rank, __func__, source, data_size, file_offset,
- t_queue_delay);
- }
+ if (sf_verbose_flag && (sf_logfile != NULL)) {
+ fprintf(sf_logfile,
+ "[ioc(%d) %s] msg from %d: datasize=%ld\toffset=%ld "
+ "queue_delay=%lf seconds\n",
+ subfile_rank, __func__, source, data_size, file_offset, t_queue_delay);
+ }
#endif
- if ((send_buffer = (char *)malloc((size_t)data_size)) == NULL) {
- perror("malloc");
- return -1;
- }
-
- if (sf_read_data(fd, file_offset, send_buffer, data_size, subfile_rank) < 0) {
- printf("[%d] %s - sf_read_data fd=%d for source(%d) returned an error!\n",
- subfile_rank, __func__, fd, source );
- fflush(stdout);
- /*
- * Should send a zero(0) byte message to the client to prevent
- * it from hanging...
- */
- MPI_Send(send_buffer, 0, MPI_BYTE, source, READ_INDEP_DATA, comm);
- free(send_buffer);
- return -1;
- }
-
- ret = MPI_Send(send_buffer, (int)data_size, MPI_BYTE, source, READ_INDEP_DATA,
- comm);
- if (ret != MPI_SUCCESS) {
- int len;
- char estring[MPI_MAX_ERROR_STRING];
- MPI_Error_string(ret, estring, &len);
- printf("[ioc(%d)] ERROR! MPI_Send of %ld bytes to %d returned an "
- "error(%s)\n",
- subfile_rank, data_size, source, estring);
- fflush(stdout);
- return ret;
- }
- t_end = MPI_Wtime();
- t_read = t_end - t_start;
- sf_pread_time += t_read;
- sf_queue_delay_time += t_queue_delay;
+ if ((send_buffer = (char *)malloc((size_t)data_size)) == NULL) {
+ perror("malloc");
+ return -1;
+ }
+
+ if (sf_read_data(fd, file_offset, send_buffer, data_size, subfile_rank) < 0) {
+ printf("[%d] %s - sf_read_data fd=%d for source(%d) returned an error!\n", subfile_rank, __func__, fd,
+ source);
+ fflush(stdout);
+ /*
+ * Should send a zero(0) byte message to the client to prevent
+ * it from hanging...
+ */
+ MPI_Send(send_buffer, 0, MPI_BYTE, source, READ_INDEP_DATA, comm);
+ free(send_buffer);
+ return -1;
+ }
+
+ ret = MPI_Send(send_buffer, (int)data_size, MPI_BYTE, source, READ_INDEP_DATA, comm);
+ if (ret != MPI_SUCCESS) {
+ int len;
+ char estring[MPI_MAX_ERROR_STRING];
+ MPI_Error_string(ret, estring, &len);
+ printf("[ioc(%d)] ERROR! MPI_Send of %ld bytes to %d returned an "
+ "error(%s)\n",
+ subfile_rank, data_size, source, estring);
+ fflush(stdout);
+ return ret;
+ }
+ t_end = MPI_Wtime();
+ t_read = t_end - t_start;
+ sf_pread_time += t_read;
+ sf_queue_delay_time += t_queue_delay;
#ifndef NDEBUG
- if (sf_verbose_flag && (sf_logfile != NULL)) {
- fprintf(sf_logfile, "[ioc(%d)] MPI_Send to source(%d) completed\n",
- subfile_rank, source);
- }
+ if (sf_verbose_flag && (sf_logfile != NULL)) {
+ fprintf(sf_logfile, "[ioc(%d)] MPI_Send to source(%d) completed\n", subfile_rank, source);
+ }
#endif
- if (send_buffer) {
- free(send_buffer);
- send_buffer = NULL;
- }
+ if (send_buffer) {
+ free(send_buffer);
+ send_buffer = NULL;
+ }
- return 0;
+ return 0;
} /* end queue_read_indep() */
/* ---------------------------------------------------
* Helper function for subfiling_open_file() see below
+ * Subfiles should be located in the same directory
+ * as the HDF5 file unless the user has provided
+ * an alternate directory name as indicated by the
+ * sf_context->subfile_prefix argument.
* ---------------------------------------------------*/
-static
-void get__subfile_name(subfiling_context_t *sf_context, int64_t h5_file_id, int subfile_rank,
- char **_prefix, char **_subfile_dir, char *filepath)
+static void
+get__subfile_name(subfiling_context_t *sf_context, int64_t h5_file_id, int subfile_rank, char **_basename,
+ char **_subfile_dir, char *filepath)
{
- const char slash = '/';
- char workdir[PATH_MAX];
- char configdir[PATH_MAX];
-
- char *prefix = NULL, *subfile_dir = NULL;
- int n_io_concentrators = sf_context->topology->n_io_concentrators;
-
- memset(workdir, 0, PATH_MAX);
- getcwd(workdir,PATH_MAX);
-
- if ((prefix = sf_context->subfile_prefix) == NULL) {
- memset(configdir, 0, PATH_MAX);
- strncpy(configdir, sf_context->filename, strlen(sf_context->filename));
- prefix = dirname(configdir);
- }
-
- size_t prefix_len = strlen(prefix);
- if (strcmp(prefix, workdir)) {
- if (prefix[prefix_len-1] == slash) {
- if (sf_context->subfile_prefix)
- sprintf(filepath, "%s" SF_NODE_LOCAL_TEMPLATE, prefix, h5_file_id,
- subfile_rank, n_io_concentrators);
- else
- sprintf(filepath, "%s" SF_FILENAME_TEMPLATE, prefix, h5_file_id,
- subfile_rank, n_io_concentrators);
- }
- else {
- if (sf_context->subfile_prefix)
- sprintf(filepath, "%s/" SF_NODE_LOCAL_TEMPLATE, prefix, h5_file_id,
- subfile_rank, n_io_concentrators);
- else
- sprintf(filepath, "%s/" SF_FILENAME_TEMPLATE, prefix, h5_file_id,
- subfile_rank, n_io_concentrators);
- }
- } else {
- memset(configdir, 0, PATH_MAX);
- strcpy(configdir, sf_context->filename);
- subfile_dir = dirname(configdir);
- sprintf(filepath, "%s/" SF_FILENAME_TEMPLATE, subfile_dir, h5_file_id,
- subfile_rank, n_io_concentrators);
- }
-
- *_prefix = prefix;
- *_subfile_dir = subfile_dir;
+ char *prefix = NULL, *subfile_dir = NULL;
+ char *base = NULL;
+ int n_io_concentrators = sf_context->topology->n_io_concentrators;
+
+ /* We require this to be non-null */
+ HDassert(sf_context);
+
+ prefix = (char *)malloc(PATH_MAX);
+ HDassert(prefix);
+
+ /* Under normal operation, we co-locate subfiles
+ * with the HDF5 file
+ */
+ strcpy(prefix, sf_context->h5_filename);
+ base = basename(prefix);
+ *_basename = strdup(base);
+
+ if (sf_context->subfile_prefix == NULL) {
+ subfile_dir = dirname(prefix);
+ *_subfile_dir = strdup(subfile_dir);
+ }
+ else {
+ /* Note: Users may specify a directory name which is inaccessable
+ * from where the current is running. In particular, "node-local"
+ * storage is not uniformly available to all processes.
+ * We would like to check if the user pathname unavailable and
+ * if so, we could default to creating the subfiles in the
+ * current directory. (?)
+ */
+ *_subfile_dir = strdup(sf_context->subfile_prefix);
+ }
+
+ /* The subfile naming should produce files of the following form:
+ * If we assume the HDF5 file is named ABC.h5, then subfiles
+ * will have names:
+ * ABC.h5.subfile_<file-number>_0_of_2,
+ * ABC.h5.subfile_<file-number>_1_of_2, and
+ * ABC.h5.subfile_<file-number>.config
+ */
+ sprintf(filepath, "%s/%s" SF_FILENAME_TEMPLATE, subfile_dir, base, h5_file_id, subfile_rank,
+ n_io_concentrators);
+ if (prefix)
+ HDfree(prefix);
}
-
+
/*-------------------------------------------------------------------------
* Function: Public/IOC subfiling_open_file
*
@@ -1805,153 +1823,165 @@ void get__subfile_name(subfiling_context_t *sf_context, int64_t h5_file_id, int
*
*-------------------------------------------------------------------------
*/
-int subfiling_open_file(sf_work_request_t *msg, int subfile_rank, int flags) {
- int errors = 0;
+int
+subfiling_open_file(sf_work_request_t *msg, int subfile_rank, int flags)
+{
+ int errors = 0;
+#if 0
char filepath[PATH_MAX];
- char *prefix = NULL;
- char *subfile_dir = NULL;
- mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
-
- double t_start = 0.0, t_end = 0.0;
- /* Only the real IOCs open the subfiles
- * Once a file is opened, all additional file open requests
- * can return immediately.
- */
-
- t_start = MPI_Wtime();
- /* Only allow the actual IO concentrator ranks to create sub-files */
- if (subfile_rank >= 0) {
- char config[PATH_MAX];
- int64_t h5_file_id = msg->header[1];
- int64_t file_context_id = msg->header[2];
- subfiling_context_t *sf_context = get__subfiling_object(file_context_id);
- assert(sf_context != NULL);
+ char config[PATH_MAX];
+ char linebuf[PATH_MAX];
+#else
+ char filepath[PATH_MAX];
+ char linebuf[PATH_MAX];
+#endif
+ char * temp = NULL;
+ char * prefix = NULL;
+ char * subfile_dir = NULL;
+ char * base = NULL;
+ mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
+
+ double t_start = 0.0, t_end = 0.0;
+ /* Only the real IOCs open the subfiles
+ * Once a file is opened, all additional file open requests
+ * can return immediately.
+ */
- memset(filepath, 0, PATH_MAX);
+ t_start = MPI_Wtime();
+ /* Only allow the actual IO concentrator ranks to create sub-files */
+ if (subfile_rank >= 0) {
+ int k, retries = 2;
+ int64_t h5_file_id = msg->header[1];
+ int64_t file_context_id = msg->header[2];
+ subfiling_context_t *sf_context = get__subfiling_object(file_context_id);
+ assert(sf_context != NULL);
- begin_thread_exclusive();
- /* Check to see whether we need to create the subfile
- * and possibly (IFF our subfile_rank is 0) a config file.
- */
+ memset(filepath, 0, PATH_MAX);
- get__subfile_name(sf_context, h5_file_id, subfile_rank, &prefix, &subfile_dir, filepath);
-
- if (sf_context->sf_fid == -2) {
- const char *dotconfig = ".subfile_config";
- int n_io_concentrators = sf_context->topology->n_io_concentrators;
- int *io_concentrator = sf_context->topology->io_concentrator;
- hid_t fapl = H5Pcreate(H5P_FILE_ACCESS);
- void *fptr = H5FD_open(filepath, H5F_ACC_CREAT|H5F_ACC_RDWR, fapl, HADDR_UNDEF);
- if (fptr) {
- H5FD_close(fptr);
- }
- if ((sf_context->sf_fid = HDopen(filepath, flags|O_CREAT , mode)) < 0) {
- end_thread_exclusive();
- HDprintf("[%d %s] file create(%s) failed!\n", subfile_rank, __func__,
- filepath);
- HDfflush(stdout);
+ begin_thread_exclusive();
+ /* Check to see whether we need to create the subfile
+ * and possibly (IFF our subfile_rank is 0) a config file.
+ */
+
+ get__subfile_name(sf_context, h5_file_id, subfile_rank, &base, &subfile_dir, filepath);
+ sf_context->sf_filename = strdup(filepath);
+
+ assert(sf_context->sf_filename);
+
+ /* Check if we need to create the subfiles */
+ if (sf_context->sf_fid == -2) {
+ int n_io_concentrators = sf_context->topology->n_io_concentrators;
+ int *io_concentrator = sf_context->topology->io_concentrator;
+ for (k = 0; k < retries; k++) {
+ int fd;
+ if ((fd = HDopen(filepath, O_CREAT | O_RDWR | O_TRUNC, mode)) > 0) {
+ sf_context->sf_fid = fd;
+ sf_context->sf_eof = 0;
+ break;
+ }
+ }
+ if (sf_context->sf_fid < 0) {
+ end_thread_exclusive();
+ perror("subfiling_open_file/open");
+ HDprintf("[%d %s] file create(%s) failed!\n", subfile_rank, __func__, filepath);
+ HDfflush(stdout);
#ifndef NDEBUG
- if (sf_verbose_flag) {
- printf("[%d %s] file open(%s) failed!\n", subfile_rank, __func__,
- filepath);
- fflush(stdout);
- }
+ if (sf_verbose_flag) {
+ printf("[%d %s] file create(%s) failed!\n", subfile_rank, __func__, filepath);
+ fflush(stdout);
+ }
#endif
- errors++;
- goto done;
- }
-
- memset(filepath, 0, sizeof(filepath));
- strcpy(filepath, sf_context->filename);
- subfile_dir = strrchr(filepath, '/');
- if (subfile_dir) {
- size_t remaining = strlen(subfile_dir);
- memset(subfile_dir, 0, remaining);
- sprintf(subfile_dir, "/%ld%s", h5_file_id, dotconfig);
- strcpy(config, filepath);
- }
-
- if ((subfile_rank == 0) && (flags & O_CREAT)) {
- FILE *f = NULL;
- /* If a config file already exists, AND
- * the user wants to truncate subfiles (if they exist),
- * then we should also truncate an existing config file.
- */
- if (access(config, flags) == 0) {
- truncate(config, 0);
- }
- f = HDfopen(config, "w+");
- if (f != NULL) {
- int k;
- char linebuf[PATH_MAX];
- sprintf(linebuf, "stripe_size=%ld\n", sf_context->sf_stripe_size);
- HDfwrite(linebuf, strlen(linebuf), 1, f);
- sprintf(linebuf, "aggregator_count=%d\n", n_io_concentrators);
- HDfwrite(linebuf, strlen(linebuf), 1, f);
- sprintf(linebuf, "hdf5_file=%s\n", sf_context->filename);
- HDfwrite(linebuf, strlen(linebuf), 1, f);
-
- for (k = 0; k < n_io_concentrators; k++) {
- if (prefix)
- sprintf(linebuf, "%s/%ld_node_local_temp_%d_of_%d:%d\n", prefix,
- h5_file_id, subfile_rank, n_io_concentrators,
- io_concentrator[k]);
- else
- sprintf(linebuf, "%ld_node_local_temp_%d_of_%d:%d\n", h5_file_id,
- subfile_rank, n_io_concentrators, io_concentrator[k]);
-
- HDfwrite(linebuf, strlen(linebuf), 1, f);
- }
-
- fclose(f);
- } else {
- perror("fopen(config)");
- errors++;
- goto done;
- }
- }
+ errors++;
+ goto done;
+ }
+ sprintf(filepath, "%s/%s.subfile_%ld.config", subfile_dir, base, h5_file_id);
+ /* SUBFILE rank 0 does the work creating a config file */
+ if ((subfile_rank == 0) && (flags & O_CREAT)) {
+ FILE *f = NULL;
+ /* If a config file already exists, AND
+ * the user wants to truncate subfiles (if they exist),
+ * then we should also truncate an existing config file.
+ */
+ if (access(filepath, flags) == 0) {
+ truncate(filepath, 0);
+ }
+ f = HDfopen(filepath, "w+");
+ if (f != NULL) {
+ sprintf(linebuf, "stripe_size=%ld\n", sf_context->sf_stripe_size);
+ HDfwrite(linebuf, 1, strlen(linebuf), f);
+ sprintf(linebuf, "aggregator_count=%d\n", n_io_concentrators);
+ HDfwrite(linebuf, 1, strlen(linebuf), f);
+ sprintf(linebuf, "hdf5_file=%s\n", sf_context->h5_filename);
+ HDfwrite(linebuf, 1, strlen(linebuf), f);
+ sprintf(linebuf, "subfile_dir=%s\n", subfile_dir);
+
+ for (k = 0; k < n_io_concentrators; k++) {
+ sprintf(linebuf, "%s.subfile_%ld_%d_of_%d:%d\n", base, h5_file_id, subfile_rank,
+ n_io_concentrators, io_concentrator[k]);
+ HDfwrite(linebuf, 1, strlen(linebuf), f);
+ }
+
+ fclose(f);
+ }
+ else {
+ perror("fopen(config)");
+ errors++;
+ goto done;
+ }
+ }
+
#ifndef NDEBUG
- if (sf_verbose_flag) {
- if (sf_logfile) {
- HDfprintf(sf_logfile, "[ioc:%d] Opened subfile %s\n", subfile_rank,
- filepath);
- }
- }
+ if (sf_verbose_flag) {
+ if (sf_logfile) {
+ HDfprintf(sf_logfile, "[ioc:%d] Opened subfile %s\n", subfile_rank, filepath);
+ }
+ }
#endif
- }
- else {
- if ((sf_context->sf_fid = HDopen(filepath, flags, mode)) < 0) {
- end_thread_exclusive();
- HDprintf("[%d %s] file open(%s) failed!\n", subfile_rank, __func__,
- filepath);
- HDfflush(stdout);
+ }
+ else {
+ for (k = 0; k < retries; k++) {
+ int fd;
+ if ((fd = HDopen(filepath, O_CREAT | O_RDWR, mode)) > 0) {
+ sf_context->sf_fid = fd;
+ break;
+ }
+ }
+ if (sf_context->sf_fid < 0) {
+ end_thread_exclusive();
+ perror("subfiling_open_file/open");
+ HDprintf("[%d %s] file open(%s) failed!\n", subfile_rank, __func__, filepath);
+ HDfflush(stdout);
#ifndef NDEBUG
- if (sf_verbose_flag) {
- HDprintf("[%d %s] file open(%s) failed!\n", subfile_rank, __func__,
- filepath);
- HDfflush(stdout);
- }
+ if (sf_verbose_flag) {
+ HDprintf("[%d %s] file open(%s) failed!\n", subfile_rank, __func__, filepath);
+ HDfflush(stdout);
+ }
#endif
- errors++;
- goto done;
- }
- }
- end_thread_exclusive();
- }
+ errors++;
+ goto done;
+ }
+ }
+ end_thread_exclusive();
+ }
done:
- t_end = MPI_Wtime();
+ t_end = MPI_Wtime();
+
+ if (base)
+ HDfree(base);
+ if (subfile_dir)
+ HDfree(subfile_dir);
#ifndef NDEBUG
- if (sf_verbose_flag) {
- printf("[%s %d] open completed in %lf seconds with %d errors\n", __func__,
- subfile_rank, (t_end - t_start), errors);
- fflush(stdout);
- }
+ if (sf_verbose_flag) {
+ printf("[%s %d] open completed in %lf seconds with %d errors\n", __func__, subfile_rank,
+ (t_end - t_start), errors);
+ fflush(stdout);
+ }
#endif
- return errors;
+ return errors;
} /* end subfiling_open_file() */
/*-------------------------------------------------------------------------
@@ -1983,57 +2013,65 @@ done:
*-------------------------------------------------------------------------
*/
-int sf_get_mpi_rank(hid_t fid, int *rank) {
- hid_t context_id = fid_map_to_context((uint64_t)fid);
- subfiling_context_t *sf_context = get__subfiling_object(context_id);
- assert(sf_context != NULL);
- assert(rank != NULL);
- *rank = sf_context->sf_group_rank;
- return 0;
+int
+sf_get_mpi_rank(hid_t fid, int *rank)
+{
+ hid_t context_id = fid_map_to_context((uint64_t)fid);
+ subfiling_context_t *sf_context = get__subfiling_object(context_id);
+ assert(sf_context != NULL);
+ assert(rank != NULL);
+ *rank = sf_context->sf_group_rank;
+ return 0;
}
-int sf_get_mpi_size(hid_t fid, int *size) {
- hid_t context_id = fid_map_to_context((uint64_t)fid);
- subfiling_context_t *sf_context = get__subfiling_object(context_id);
- assert(sf_context != NULL);
- assert(size != NULL);
- *size = sf_context->sf_group_size;
- return 0;
+int
+sf_get_mpi_size(hid_t fid, int *size)
+{
+ hid_t context_id = fid_map_to_context((uint64_t)fid);
+ subfiling_context_t *sf_context = get__subfiling_object(context_id);
+ assert(sf_context != NULL);
+ assert(size != NULL);
+ *size = sf_context->sf_group_size;
+ return 0;
}
-int sf_get_group_comm(hid_t fid, MPI_Comm *comm) {
- hid_t context_id = fid_map_to_context((uint64_t)fid);
- subfiling_context_t *sf_context = get__subfiling_object(context_id);
- assert(sf_context != NULL);
- assert(comm != NULL);
- *comm = sf_context->sf_group_comm;
- return 0;
+int
+sf_get_group_comm(hid_t fid, MPI_Comm *comm)
+{
+ hid_t context_id = fid_map_to_context((uint64_t)fid);
+ subfiling_context_t *sf_context = get__subfiling_object(context_id);
+ assert(sf_context != NULL);
+ assert(comm != NULL);
+ *comm = sf_context->sf_group_comm;
+ return 0;
}
-int sf_subfile_set_logging(hid_t sf_fid, int ioc_rank, int flag) {
- int ioc;
- int status = 0;
- hid_t context_id = fid_map_to_context((uint64_t)sf_fid);
- subfiling_context_t *sf_context = get__subfiling_object(context_id);
- int n_io_concentrators;
- int *io_concentrator = NULL;
- int64_t lflag = (int64_t)(flag & 0xFF);
- int64_t msg[3];
-
- assert(sf_context != NULL);
-
- msg[0] = lflag;
- msg[1] = 0;
- msg[2] = sf_context->sf_context_id;
-
- n_io_concentrators = sf_context->topology->n_io_concentrators;
- io_concentrator = sf_context->topology->io_concentrator;
-
- for (ioc = 0; ioc < n_io_concentrators; ioc++) {
- if ((flag < 0) || (flag == ioc_rank)) {
- status = MPI_Ssend(msg, 3, MPI_INT64_T, io_concentrator[ioc], LOGGING_OP,
- sf_context->sf_msg_comm);
+int
+sf_subfile_set_logging(hid_t sf_fid, int ioc_rank, int flag)
+{
+ int ioc;
+ int status = 0;
+ hid_t context_id = fid_map_to_context((uint64_t)sf_fid);
+ subfiling_context_t *sf_context = get__subfiling_object(context_id);
+ int n_io_concentrators;
+ int * io_concentrator = NULL;
+ int64_t lflag = (int64_t)(flag & 0xFF);
+ int64_t msg[3];
+
+ assert(sf_context != NULL);
+
+ msg[0] = lflag;
+ msg[1] = 0;
+ msg[2] = sf_context->sf_context_id;
+
+ n_io_concentrators = sf_context->topology->n_io_concentrators;
+ io_concentrator = sf_context->topology->io_concentrator;
+
+ for (ioc = 0; ioc < n_io_concentrators; ioc++) {
+ if ((flag < 0) || (flag == ioc_rank)) {
+ status =
+ MPI_Ssend(msg, 3, MPI_INT64_T, io_concentrator[ioc], LOGGING_OP, sf_context->sf_msg_comm);
+ }
}
- }
- return status;
+ return status;
}