diff options
Diffstat (limited to 'src/H5FDsubfile_mpi.c')
-rw-r--r-- | src/H5FDsubfile_mpi.c | 2628 |
1 files changed, 1333 insertions, 1295 deletions
diff --git a/src/H5FDsubfile_mpi.c b/src/H5FDsubfile_mpi.c index 64a3959..5963870 100644 --- a/src/H5FDsubfile_mpi.c +++ b/src/H5FDsubfile_mpi.c @@ -13,17 +13,17 @@ #include "H5FDsubfiling.h" -static int sf_close_file_count = 0; +static int sf_close_file_count = 0; static int sf_ops_after_first_close = 0; -static int sf_enable_directIO = 0; +static int sf_enable_directIO = 0; -static int sf_write_ops = 0; -static double sf_pwrite_time = 0.0; +static int sf_write_ops = 0; +static double sf_pwrite_time = 0.0; static double sf_write_wait_time = 0.0; -static int sf_read_ops = 0; -static double sf_pread_time = 0.0; -static double sf_read_wait_time = 0.0; +static int sf_read_ops = 0; +static double sf_pread_time = 0.0; +static double sf_read_wait_time = 0.0; static double sf_queue_delay_time = 0.0; /* The following is our basic template for a subfile filename. @@ -31,19 +31,19 @@ static double sf_queue_delay_time = 0.0; * intend to use the user defined HDF5 filename for a * zeroth subfile as well as for all metadata. */ -#define SF_NODE_LOCAL_TEMPLATE "%ld_node_local_%d_of_%d" -#define SF_FILENAME_TEMPLATE "%ld_subfile_%d_of_%d" +#define SF_FILENAME_TEMPLATE ".subfile_%ld_%d_of_%d" static int *request_count_per_rank = NULL; -atomic_int sf_workinprogress = 0; -atomic_int sf_work_pending = 0; -atomic_int sf_file_open_count = 0; -atomic_int sf_file_close_count = 0; -atomic_int sf_file_refcount = 0; +atomic_int sf_workinprogress = 0; +atomic_int sf_work_pending = 0; +atomic_int sf_file_open_count = 0; +atomic_int sf_file_close_count = 0; +atomic_int sf_file_refcount = 0; atomic_int sf_ioc_fini_refcount = 0; -atomic_int sf_ioc_ready = 0; -volatile int sf_shutdown_flag = 0; +atomic_int sf_ioc_ready = 0; +atomic_int sf_shutdown_flag = 0; +// volatile int sf_shutdown_flag = 0; /* * Structure definitions to enable async io completions @@ -52,69 +52,68 @@ volatile int sf_shutdown_flag = 0; * invoked. See below. */ typedef struct _client_io_args { - int ioc; /* ID of the IO Concentrator handling this IO. */ - hid_t context_id; /* The context id provided for the read or write */ - int64_t offset; /* The file offset for the IO operation */ - int64_t elements; /* How many bytes */ - void *data; /* A pointer to the (contiguous) data segment */ - MPI_Request io_req; /* An MPI request to allow the code to loop while */ - /* making progress on multiple IOs */ + int ioc; /* ID of the IO Concentrator handling this IO. */ + hid_t context_id; /* The context id provided for the read or write */ + int64_t offset; /* The file offset for the IO operation */ + int64_t elements; /* How many bytes */ + void * data; /* A pointer to the (contiguous) data segment */ + MPI_Request io_req; /* An MPI request to allow the code to loop while */ + /* making progress on multiple IOs */ } io_args_t; /* pre-define */ typedef struct _client_io_func io_func_t; struct _client_io_func { - int (*io_function)(void *this_io); /* pointer to a completion function */ - io_args_t io_args; /* arguments passed to the completion function */ - int pending; /* The function is complete (0) or pending (1)? */ + int (*io_function)(void *this_io); /* pointer to a completion function */ + io_args_t io_args; /* arguments passed to the completion function */ + int pending; /* The function is complete (0) or pending (1)? */ }; typedef struct _io_req { - struct _io_req *prev; /* A simple list structure containing completion */ - struct _io_req *next; /* functions. These should get removed as IO ops */ - io_func_t completion_func; /* are completed */ + struct _io_req *prev; /* A simple list structure containing completion */ + struct _io_req *next; /* functions. These should get removed as IO ops */ + io_func_t completion_func; /* are completed */ } io_req_t; -int n_io_pending = 0; +int n_io_pending = 0; io_req_t pending_io_requests; typedef struct _client_xfer_info { - int64_t offset; - int64_t length; - int ioc_targets; - io_op_t op; + int64_t offset; + int64_t length; + int ioc_targets; + io_op_t op; } client_xfer_info_t; typedef struct _xfer_info { - int64_t offset; - int64_t length; + int64_t offset; + int64_t length; } xfer_info_t; #define STAT_BLOCKSIZE 1024 typedef struct _ioc_stats { - int read_index; - int read_size; - xfer_info_t *read_info; - int write_index; - int write_size; - xfer_info_t *write_info; + int read_index; + int read_size; + xfer_info_t *read_info; + int write_index; + int write_size; + xfer_info_t *write_info; } ioc_stats_t; static ioc_stats_t ioc_xfer_records; -int client_op_index = 0; -int client_op_size = 0; -client_xfer_info_t *client_ops = NULL; +int client_op_index = 0; +int client_op_size = 0; +client_xfer_info_t *client_ops = NULL; /* const char *sf_subfile_prefix = "."; */ -#define MAX_WORK_PER_RANK 2 -#define K(n) ((n)*1024) -#define M(n) ((n) * (1024 * 1024)) +#define MAX_WORK_PER_RANK 2 +#define K(n) ((n)*1024) +#define M(n) ((n) * (1024 * 1024)) #define DEFAULT_STRIPE_SIZE M(32) -#define MAX_DEPTH 1024 - +#define MAX_DEPTH 1024 /* ========================================= @@ -122,17 +121,18 @@ Private functions ========================================= */ -static inline void *cast_to_void(const void *data) { - union { - const void *const_ptr_to_data; - void *ptr_to_data; - } eliminate_const_warning; - eliminate_const_warning.const_ptr_to_data = data; - return eliminate_const_warning.ptr_to_data; +static inline void * +cast_to_void(const void *data) +{ + union { + const void *const_ptr_to_data; + void * ptr_to_data; + } eliminate_const_warning; + eliminate_const_warning.const_ptr_to_data = data; + return eliminate_const_warning.ptr_to_data; } -static char *get_ioc_subfile_path(int ioc, int ioc_count, - subfiling_context_t *sf_context); -static int async_completion(void *arg); +static char *get_ioc_subfile_path(int ioc, int ioc_count, subfiling_context_t *sf_context); +static int async_completion(void *arg); /* ===================================================================== */ /* MPI_Datatype Creation functions. @@ -176,49 +176,49 @@ static int async_completion(void *arg); /* Fill the output vectors 'io_offset', 'io_datasize' and 'io_f_offset' * All calculations are in terms of bytes. */ -static void H5FD__create_first_mpi_type( - subfiling_context_t *context, int ioc_depth, int64_t src_offset, - int64_t target_datasize, int64_t f_offset, int64_t *io_offset, - int64_t *io_datasize, int64_t *io_f_offset, int64_t first_io) { - int64_t stripe_size = context->sf_stripe_size; - int64_t blocksize_per_stripe = context->sf_blocksize_per_stripe; - int64_t offset_in_stripe = f_offset % stripe_size; - int64_t next_offset = blocksize_per_stripe - offset_in_stripe; - int64_t total_bytes = first_io; - - io_offset[0] = src_offset; - io_datasize[0] = first_io; - io_f_offset[0] = f_offset; +static void +H5FD__create_first_mpi_type(subfiling_context_t *context, int ioc_depth, int64_t src_offset, + int64_t target_datasize, int64_t f_offset, int64_t *io_offset, + int64_t *io_datasize, int64_t *io_f_offset, int64_t first_io) +{ + int64_t stripe_size = context->sf_stripe_size; + int64_t blocksize_per_stripe = context->sf_blocksize_per_stripe; + int64_t offset_in_stripe = f_offset % stripe_size; + int64_t next_offset = blocksize_per_stripe - offset_in_stripe; + int64_t total_bytes = first_io; + + io_offset[0] = src_offset; + io_datasize[0] = first_io; + io_f_offset[0] = f_offset; #ifdef VERBOSE - printf("[%s] 0: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", - __func__, src_offset, first_io, f_offset); - fflush(stdout); + printf("[%s] 0: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", __func__, src_offset, first_io, f_offset); + fflush(stdout); #endif - if (first_io == target_datasize) { - return; - } - if (first_io) { - int k; - f_offset += (blocksize_per_stripe - offset_in_stripe); - for (k = 1; k <= ioc_depth; k++) { - io_offset[k] = next_offset; - io_datasize[k] = stripe_size; - io_f_offset[k] = f_offset; - total_bytes += stripe_size; + if (first_io == target_datasize) { + return; + } + if (first_io) { + int k; + f_offset += (blocksize_per_stripe - offset_in_stripe); + for (k = 1; k <= ioc_depth; k++) { + io_offset[k] = next_offset; + io_datasize[k] = stripe_size; + io_f_offset[k] = f_offset; + total_bytes += stripe_size; #ifdef VERBOSE - printf("[%s] %d: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", - __func__, k, next_offset, stripe_size, f_offset); - fflush(stdout); + printf("[%s] %d: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", __func__, k, next_offset, + stripe_size, f_offset); + fflush(stdout); #endif - f_offset += context->sf_blocksize_per_stripe; - next_offset += context->sf_blocksize_per_stripe; - } - if (total_bytes != target_datasize) { - printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", __func__, - total_bytes, target_datasize); + f_offset += context->sf_blocksize_per_stripe; + next_offset += context->sf_blocksize_per_stripe; + } + if (total_bytes != target_datasize) { + printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", __func__, total_bytes, + target_datasize); + } } - } - return; + return; } /* end H5FD__create_first_mpi_type() */ /*------------------------------------------------------------------------- @@ -250,57 +250,56 @@ static void H5FD__create_first_mpi_type( /* Fill the output vectors 'io_offset', 'io_datasize' and 'io_f_offset' * All calculations are in terms of bytes. */ -static void H5FD__create_final_mpi_type(subfiling_context_t *context, - int ioc_depth, int64_t src_offset, - int64_t target_datasize, - int64_t f_offset, int64_t *io_offset, - int64_t *io_datasize, - int64_t *io_f_offset, int64_t last_io) { - int64_t stripe_size = context->sf_stripe_size; - int64_t blocksize_per_stripe = context->sf_blocksize_per_stripe; - int64_t next_offset = src_offset; - int64_t total_bytes = 0; - - if (last_io == target_datasize) { - io_offset[0] = src_offset; - io_f_offset[0] = f_offset; - io_datasize[0] = last_io; +static void +H5FD__create_final_mpi_type(subfiling_context_t *context, int ioc_depth, int64_t src_offset, + int64_t target_datasize, int64_t f_offset, int64_t *io_offset, + int64_t *io_datasize, int64_t *io_f_offset, int64_t last_io) +{ + int64_t stripe_size = context->sf_stripe_size; + int64_t blocksize_per_stripe = context->sf_blocksize_per_stripe; + int64_t next_offset = src_offset; + int64_t total_bytes = 0; + + if (last_io == target_datasize) { + io_offset[0] = src_offset; + io_f_offset[0] = f_offset; + io_datasize[0] = last_io; #ifdef VERBOSE - printf("[%s] 0: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", - __func__, src_offset, last_io, f_offset); - fflush(stdout); + printf("[%s] 0: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", __func__, src_offset, last_io, + f_offset); + fflush(stdout); #endif - return; - } - - if (last_io) { - int i, k; - for (k = 0, i = 1; i < ioc_depth; i++) { - io_offset[k] = next_offset; - io_datasize[k] = stripe_size; - io_f_offset[k] = f_offset; + return; + } + + if (last_io) { + int i, k; + for (k = 0, i = 1; i < ioc_depth; i++) { + io_offset[k] = next_offset; + io_datasize[k] = stripe_size; + io_f_offset[k] = f_offset; #ifdef VERBOSE - printf("[%s] %d: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", - __func__, k, next_offset, stripe_size, f_offset); - fflush(stdout); + printf("[%s] %d: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", __func__, k, next_offset, + stripe_size, f_offset); + fflush(stdout); #endif - k++; - total_bytes += stripe_size; - f_offset += blocksize_per_stripe; - next_offset += context->sf_blocksize_per_stripe; - } + k++; + total_bytes += stripe_size; + f_offset += blocksize_per_stripe; + next_offset += context->sf_blocksize_per_stripe; + } - io_datasize[k] = last_io; - io_offset[k] = next_offset; - io_f_offset[k] = f_offset; - total_bytes += last_io; + io_datasize[k] = last_io; + io_offset[k] = next_offset; + io_f_offset[k] = f_offset; + total_bytes += last_io; - if (total_bytes != target_datasize) { - printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", __func__, - total_bytes, target_datasize); + if (total_bytes != target_datasize) { + printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", __func__, total_bytes, + target_datasize); + } } - } - return; + return; } /* end H5FD__create_final_mpi_type() */ /*------------------------------------------------------------------------- @@ -325,63 +324,61 @@ static void H5FD__create_final_mpi_type(subfiling_context_t *context, *------------------------------------------------------------------------- */ -static void H5FD__create_f_l_mpi_type(subfiling_context_t *context, - int ioc_depth, int64_t src_offset, - int64_t target_datasize, int64_t f_offset, - int64_t *io_offset, int64_t *io_datasize, - int64_t *io_f_offset, int64_t first_io, - int64_t last_io) { - int64_t stripe_size = context->sf_stripe_size; - int64_t blocksize_per_stripe = context->sf_blocksize_per_stripe; - int64_t offset_in_stripe = f_offset % stripe_size; - int64_t next_offset = blocksize_per_stripe - offset_in_stripe; - int64_t total_bytes = first_io; - - io_offset[0] = src_offset; - io_datasize[0] = first_io; - io_f_offset[0] = f_offset; +static void +H5FD__create_f_l_mpi_type(subfiling_context_t *context, int ioc_depth, int64_t src_offset, + int64_t target_datasize, int64_t f_offset, int64_t *io_offset, int64_t *io_datasize, + int64_t *io_f_offset, int64_t first_io, int64_t last_io) +{ + int64_t stripe_size = context->sf_stripe_size; + int64_t blocksize_per_stripe = context->sf_blocksize_per_stripe; + int64_t offset_in_stripe = f_offset % stripe_size; + int64_t next_offset = blocksize_per_stripe - offset_in_stripe; + int64_t total_bytes = first_io; + + io_offset[0] = src_offset; + io_datasize[0] = first_io; + io_f_offset[0] = f_offset; #ifdef VERBOSE - printf("[%s] 0: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", - __func__, src_offset, first_io, f_offset); - fflush(stdout); + printf("[%s] 0: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", __func__, src_offset, first_io, f_offset); + fflush(stdout); #endif - if (total_bytes == target_datasize) { - return; - } - - if (total_bytes) { - int k; - f_offset += (blocksize_per_stripe - offset_in_stripe); - for (k = 1; k < ioc_depth; k++) { - io_offset[k] = next_offset; - io_datasize[k] = stripe_size; - io_f_offset[k] = f_offset; - total_bytes += stripe_size; + if (total_bytes == target_datasize) { + return; + } + + if (total_bytes) { + int k; + f_offset += (blocksize_per_stripe - offset_in_stripe); + for (k = 1; k < ioc_depth; k++) { + io_offset[k] = next_offset; + io_datasize[k] = stripe_size; + io_f_offset[k] = f_offset; + total_bytes += stripe_size; #ifdef VERBOSE - printf("[%s] %d: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", - __func__, k, next_offset, stripe_size, f_offset); - fflush(stdout); + printf("[%s] %d: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", __func__, k, next_offset, + stripe_size, f_offset); + fflush(stdout); #endif - f_offset += blocksize_per_stripe; - next_offset += blocksize_per_stripe; - } - io_datasize[ioc_depth] = last_io; - io_f_offset[ioc_depth] = f_offset; - io_offset[ioc_depth] = next_offset; + f_offset += blocksize_per_stripe; + next_offset += blocksize_per_stripe; + } + io_datasize[ioc_depth] = last_io; + io_f_offset[ioc_depth] = f_offset; + io_offset[ioc_depth] = next_offset; #ifdef VERBOSE - printf("[%s] %d: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", - __func__, k, next_offset, last_io, f_offset); - fflush(stdout); + printf("[%s] %d: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", __func__, k, next_offset, last_io, + f_offset); + fflush(stdout); #endif - total_bytes += last_io; + total_bytes += last_io; - if (total_bytes != target_datasize) { - printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", - __func__, total_bytes, target_datasize); + if (total_bytes != target_datasize) { + printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", __func__, total_bytes, + target_datasize); + } } - } - return; + return; } /* end H5FD__create_f_l_mpi_type() */ /*------------------------------------------------------------------------- @@ -406,28 +403,27 @@ static void H5FD__create_f_l_mpi_type(subfiling_context_t *context, * *------------------------------------------------------------------------- */ -static void H5FD__create_mpi_uniform_type(subfiling_context_t *context, - int ioc_depth, int64_t src_offset, - int64_t target_datasize, - int64_t f_offset, int64_t *io_offset, - int64_t *io_datasize, - int64_t *io_f_offset) { - int64_t stripe_size = context->sf_stripe_size; - int64_t blocksize_per_stripe = context->sf_blocksize_per_stripe; - int64_t next_offset = src_offset + blocksize_per_stripe; - int64_t total_bytes = 0; - - io_offset[0] = src_offset; - io_datasize[0] = stripe_size; - io_f_offset[0] = f_offset; - if (target_datasize == 0) { +static void +H5FD__create_mpi_uniform_type(subfiling_context_t *context, int ioc_depth, int64_t src_offset, + int64_t target_datasize, int64_t f_offset, int64_t *io_offset, + int64_t *io_datasize, int64_t *io_f_offset) +{ + int64_t stripe_size = context->sf_stripe_size; + int64_t blocksize_per_stripe = context->sf_blocksize_per_stripe; + int64_t next_offset = src_offset + blocksize_per_stripe; + int64_t total_bytes = 0; + + io_offset[0] = src_offset; + io_datasize[0] = stripe_size; + io_f_offset[0] = f_offset; + if (target_datasize == 0) { #if 0 printf("[%s] 0: datasize=0\n", __func__); fflush(stdout); #endif - io_datasize[0] = 0; - return; - } + io_datasize[0] = 0; + return; + } #if 0 printf("[%s] 0: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", @@ -435,31 +431,31 @@ static void H5FD__create_mpi_uniform_type(subfiling_context_t *context, fflush(stdout); #endif - f_offset += blocksize_per_stripe; - total_bytes = stripe_size; + f_offset += blocksize_per_stripe; + total_bytes = stripe_size; - if (target_datasize > stripe_size) { - int k; - for (k = 1; k < ioc_depth; k++) { - io_offset[k] = next_offset; - io_datasize[k] = stripe_size; - io_f_offset[k] = f_offset; + if (target_datasize > stripe_size) { + int k; + for (k = 1; k < ioc_depth; k++) { + io_offset[k] = next_offset; + io_datasize[k] = stripe_size; + io_f_offset[k] = f_offset; #if 0 printf("[%s] %d: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", __func__, k, next_offset, stripe_size, f_offset); fflush(stdout); #endif - total_bytes += stripe_size; - f_offset += blocksize_per_stripe; - next_offset += blocksize_per_stripe; - } + total_bytes += stripe_size; + f_offset += blocksize_per_stripe; + next_offset += blocksize_per_stripe; + } - if (total_bytes != target_datasize) { - printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", __func__, - total_bytes, target_datasize); + if (total_bytes != target_datasize) { + printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", __func__, total_bytes, + target_datasize); + } } - } - return; + return; } /* end H5FD__create_mpi_uniform_type() */ /*------------------------------------------------------------------------- @@ -504,153 +500,149 @@ static void H5FD__create_mpi_uniform_type(subfiling_context_t *context, *------------------------------------------------------------------------- */ -int init__indep_io(void *_sf_context, size_t maxdepth, int H5_ATTR_PARALLEL_UNUSED ioc_total, - int64_t *sf_source_data_offset, int64_t *sf_datasize, - int64_t *sf_offset, int *first_index, int *n_containers, - int64_t offset, int64_t elements, - int dtype_extent) +int +init__indep_io(void *_sf_context, size_t maxdepth, int H5_ATTR_PARALLEL_UNUSED ioc_total, + int64_t *sf_source_data_offset, int64_t *sf_datasize, int64_t *sf_offset, int *first_index, + int *n_containers, int64_t offset, int64_t elements, int dtype_extent) { - subfiling_context_t *sf_context = _sf_context; - int container_count = sf_context->topology->n_io_concentrators; - int64_t stripe_size = sf_context->sf_stripe_size; - int64_t data_size = elements * dtype_extent; - - int64_t start_id = offset / stripe_size; - int64_t offset_in_stripe = offset % sf_context->sf_blocksize_per_stripe; - int64_t container_offset = offset % stripe_size; - int64_t start_length = MIN(data_size, (stripe_size - container_offset)); - int64_t start_row = start_id / container_count; - int64_t ioc_start = start_id % container_count; - int64_t final_offset = offset + data_size; - int64_t final_id = final_offset / stripe_size; - int64_t final_length = - (start_length == data_size ? 0 : final_offset % stripe_size); - int64_t ioc_final = final_id % container_count; - int64_t container_bytes = 0, total_bytes = 0; - int64_t source_offset = 0; - - int row_id_start = (int)(start_id - ioc_start); - int row_id_final = (int)(final_id - ioc_final); - int i, k, depth = ((row_id_final - row_id_start) / container_count) + 1; - int container_id = (int)start_id; - int64_t row_offset = (int64_t)(start_row * stripe_size); - - *first_index = (int)ioc_start; - - /* Given the IO parameters, we loop thru the set of IOCs - * to determine the various vector components for each. - * Those IOCs whose datasize is zero (0), will not have - * IO requests passed to them. - */ - - for (i = 0, k = (int)ioc_start; i < container_count; i++) { - /* We use 'output_offset' as an index into a linear - * version of a 2D array. In 'C' the last subscript - * is the one that varies most rapidly. - * In our case, the 2D array is represented as - * array[ container_count ][ maxdepth ] + subfiling_context_t *sf_context = _sf_context; + int container_count = sf_context->topology->n_io_concentrators; + int64_t stripe_size = sf_context->sf_stripe_size; + int64_t data_size = elements * dtype_extent; + + int64_t start_id = offset / stripe_size; + int64_t offset_in_stripe = offset % sf_context->sf_blocksize_per_stripe; + int64_t container_offset = offset % stripe_size; + int64_t start_length = MIN(data_size, (stripe_size - container_offset)); + int64_t start_row = start_id / container_count; + int64_t ioc_start = start_id % container_count; + int64_t final_offset = offset + data_size; + int64_t final_id = final_offset / stripe_size; + int64_t final_length = (start_length == data_size ? 0 : final_offset % stripe_size); + int64_t ioc_final = final_id % container_count; + int64_t container_bytes = 0, total_bytes = 0; + int64_t source_offset = 0; + + int row_id_start = (int)(start_id - ioc_start); + int row_id_final = (int)(final_id - ioc_final); + int i, k, depth = ((row_id_final - row_id_start) / container_count) + 1; + int container_id = (int)start_id; + int64_t row_offset = (int64_t)(start_row * stripe_size); + + *first_index = (int)ioc_start; + + /* Given the IO parameters, we loop thru the set of IOCs + * to determine the various vector components for each. + * Those IOCs whose datasize is zero (0), will not have + * IO requests passed to them. */ - size_t depthsize = maxdepth * sizeof(int64_t); /* ONLY used for memset */ - size_t output_offset = (size_t)(k) * maxdepth; - int container_depth = depth; - hbool_t is_first = false, is_last = false; - int64_t *__sf_source_data_offset = sf_source_data_offset + output_offset; - int64_t *__sf_datasize = sf_datasize + output_offset; - int64_t *__sf_offset = sf_offset + output_offset; + for (i = 0, k = (int)ioc_start; i < container_count; i++) { + /* We use 'output_offset' as an index into a linear + * version of a 2D array. In 'C' the last subscript + * is the one that varies most rapidly. + * In our case, the 2D array is represented as + * array[ container_count ][ maxdepth ] + */ + size_t depthsize = maxdepth * sizeof(int64_t); /* ONLY used for memset */ + size_t output_offset = (size_t)(k)*maxdepth; + int container_depth = depth; - memset(__sf_source_data_offset, 0, depthsize); - memset(__sf_datasize, 0, depthsize); - memset(__sf_offset, 0, depthsize); + hbool_t is_first = false, is_last = false; + int64_t *__sf_source_data_offset = sf_source_data_offset + output_offset; + int64_t *__sf_datasize = sf_datasize + output_offset; + int64_t *__sf_offset = sf_offset + output_offset; - container_bytes = 0; + memset(__sf_source_data_offset, 0, depthsize); + memset(__sf_datasize, 0, depthsize); + memset(__sf_offset, 0, depthsize); - if (total_bytes == data_size) { - *n_containers = i; - return depth + 1; - } - if (total_bytes < data_size) { - if (k == ioc_start) { - is_first = true; - container_bytes = start_length; - container_depth--; /* Account for the start_length */ - if (ioc_final < ioc_start) { - container_depth--; - depth--; + container_bytes = 0; + + if (total_bytes == data_size) { + *n_containers = i; + return depth + 1; } - } - if (k == ioc_final) { - is_last = true; - container_bytes += final_length; - if (container_depth) - container_depth--; /* Account for the final_length */ - if (depth) - depth--; - } - container_bytes += container_depth * stripe_size; - total_bytes += container_bytes; - } + if (total_bytes < data_size) { + if (k == ioc_start) { + is_first = true; + container_bytes = start_length; + container_depth--; /* Account for the start_length */ + if (ioc_final < ioc_start) { + container_depth--; + depth--; + } + } + if (k == ioc_final) { + is_last = true; + container_bytes += final_length; + if (container_depth) + container_depth--; /* Account for the final_length */ + if (depth) + depth--; + } + container_bytes += container_depth * stripe_size; + total_bytes += container_bytes; + } + + __sf_source_data_offset[0] = source_offset; + __sf_datasize[0] = container_bytes; + __sf_offset[0] = row_offset + offset_in_stripe; - __sf_source_data_offset[0] = source_offset; - __sf_datasize[0] = container_bytes; - __sf_offset[0] = row_offset + offset_in_stripe; - - if (container_count == 1) { - - } else { - /* Fill the IO datatypes */ - if (is_first) { - if (is_last) { /* First + Last */ - H5FD__create_f_l_mpi_type( - sf_context, container_depth + 1, source_offset, container_bytes, - row_offset + offset_in_stripe, __sf_source_data_offset, - __sf_datasize, __sf_offset, start_length, final_length); - } else { /* First ONLY */ - H5FD__create_first_mpi_type( - sf_context, container_depth, source_offset, container_bytes, - row_offset + offset_in_stripe, __sf_source_data_offset, - __sf_datasize, __sf_offset, start_length); + if (container_count == 1) { + } + else { + /* Fill the IO datatypes */ + if (is_first) { + if (is_last) { /* First + Last */ + H5FD__create_f_l_mpi_type(sf_context, container_depth + 1, source_offset, container_bytes, + row_offset + offset_in_stripe, __sf_source_data_offset, + __sf_datasize, __sf_offset, start_length, final_length); + } + else { /* First ONLY */ + H5FD__create_first_mpi_type(sf_context, container_depth, source_offset, container_bytes, + row_offset + offset_in_stripe, __sf_source_data_offset, + __sf_datasize, __sf_offset, start_length); + } + /* Move the memory pointer to the starting location + * for next IOC request. + */ + source_offset += start_length; + } + else if (is_last) { /* Last ONLY */ + H5FD__create_final_mpi_type(sf_context, container_depth, source_offset, container_bytes, + row_offset + offset_in_stripe, __sf_source_data_offset, + __sf_datasize, __sf_offset, final_length); + /* Probably not needed... */ + source_offset += stripe_size; + } + else { /* Everything else (uniform) */ + H5FD__create_mpi_uniform_type(sf_context, container_depth, source_offset, container_bytes, + row_offset + offset_in_stripe, __sf_source_data_offset, + __sf_datasize, __sf_offset); + source_offset += stripe_size; + } } - /* Move the memory pointer to the starting location - * for next IOC request. - */ - source_offset += start_length; - } else if (is_last) { /* Last ONLY */ - H5FD__create_final_mpi_type( - sf_context, container_depth, source_offset, container_bytes, - row_offset + offset_in_stripe, __sf_source_data_offset, - __sf_datasize, __sf_offset, final_length); - /* Probably not needed... */ - source_offset += stripe_size; - } else { /* Everything else (uniform) */ - H5FD__create_mpi_uniform_type( - sf_context, container_depth, source_offset, container_bytes, - row_offset + offset_in_stripe, __sf_source_data_offset, - __sf_datasize, __sf_offset); - source_offset += stripe_size; - } - } - k++; - offset_in_stripe += __sf_datasize[0]; - container_id++; + k++; + offset_in_stripe += __sf_datasize[0]; + container_id++; - if (k == container_count) { - k = 0; - offset_in_stripe = 0; - depth = ((row_id_final - container_id) / container_count) + 1; - row_offset += sf_context->sf_blocksize_per_stripe; + if (k == container_count) { + k = 0; + offset_in_stripe = 0; + depth = ((row_id_final - container_id) / container_count) + 1; + row_offset += sf_context->sf_blocksize_per_stripe; + } + } + if (total_bytes != data_size) { + printf("Error: total_bytes != data_size\n"); } - } - if (total_bytes != data_size) { - printf("Error: total_bytes != data_size\n"); - } - *n_containers = container_count; - return depth + 1; + *n_containers = container_count; + return depth + 1; } /* end init__indep_io() */ - /*------------------------------------------------------------------------- * Function: Internal read__independent_async * @@ -685,97 +677,97 @@ int init__indep_io(void *_sf_context, size_t maxdepth, int H5_ATTR_PARALLEL_UNUS #define WORLD_SIZE(ctx) ((ctx)->topology->app_layout->world_size) #define WORLD_RANK(ctx) ((ctx)->topology->app_layout->world_size) -static int read__independent_async(int n_io_concentrators, hid_t context_id, - int64_t offset, int64_t elements, - int H5_ATTR_PARALLEL_UNUSED dtype_extent, - void *data,io_req_t **io_req) { - int status = 0; - int64_t stripe_size, ioc_row, start_id, ioc_start, ioc_offset; - int *io_concentrator = NULL; - io_req_t *sf_io_request = NULL; - int64_t msg[3] = {0, }; - - subfiling_context_t *sf_context = get__subfiling_object(context_id); - assert(sf_context != NULL); - - /* Calculate the IOC that we'll send the IO request to */ - stripe_size = sf_context->sf_stripe_size; - - start_id = offset / stripe_size; - ioc_row = start_id / n_io_concentrators; - ioc_offset = (offset % stripe_size) + (ioc_row * stripe_size); - - ioc_start = start_id % n_io_concentrators; - - io_concentrator = sf_context->topology->io_concentrator; - assert(io_concentrator != NULL); - - /* Make sure that we can return a request structure - * if everything is working correctly - */ - assert(io_req); - - /* Prepare an IO request. - * This gets sent to the ioc identified by the file offset - */ - msg[0] = elements; - msg[1] = ioc_offset; - msg[2] = context_id; +static int +read__independent_async(int n_io_concentrators, hid_t context_id, int64_t offset, int64_t elements, + int H5_ATTR_PARALLEL_UNUSED dtype_extent, void *data, io_req_t **io_req) +{ + int status = 0; + int64_t stripe_size, ioc_row, start_id, ioc_start, ioc_offset; + int * io_concentrator = NULL; + io_req_t *sf_io_request = NULL; + int64_t msg[3] = { + 0, + }; + + subfiling_context_t *sf_context = get__subfiling_object(context_id); + assert(sf_context != NULL); + + /* Calculate the IOC that we'll send the IO request to */ + stripe_size = sf_context->sf_stripe_size; + + start_id = offset / stripe_size; + ioc_row = start_id / n_io_concentrators; + ioc_offset = (offset % stripe_size) + (ioc_row * stripe_size); + + ioc_start = start_id % n_io_concentrators; + + io_concentrator = sf_context->topology->io_concentrator; + assert(io_concentrator != NULL); + + /* Make sure that we can return a request structure + * if everything is working correctly + */ + assert(io_req); + + /* Prepare an IO request. + * This gets sent to the ioc identified by the file offset + */ + msg[0] = elements; + msg[1] = ioc_offset; + msg[2] = context_id; #ifdef VERBOSE - printf("[%s ioc(%ld)] elements=%ld, offset=%ld, file_offset=%ld\n", - __func__, ioc_start, elements, offset, ioc_offset); - fflush(stdout); -#endif - status = MPI_Send(msg, 3, MPI_INT64_T, io_concentrator[ioc_start], READ_INDEP, - sf_context->sf_msg_comm); - - if (status != MPI_SUCCESS) { - int len; - char estring[MPI_MAX_ERROR_STRING]; - MPI_Error_string(status, estring, &len); - printf("[%d] ERROR! MPI_Send request header (%ld) " - "bytes to %d returned an error(%s)\n", - WORLD_RANK(sf_context), sizeof(msg), io_concentrator[ioc_start], estring); + printf("[%s ioc(%ld)] elements=%ld, offset=%ld, file_offset=%ld\n", __func__, ioc_start, elements, offset, + ioc_offset); fflush(stdout); - return -1; - } - - /* At this point in the new implementation, we should queue - * the async recv so that when the top level VFD tells us - * to complete all pending IO requests, we have all the info - * we need to accomplish that. - */ - sf_io_request = (io_req_t *)malloc(sizeof(io_req_t)); - assert(sf_io_request); - - sf_io_request->completion_func.io_args.ioc = (int)ioc_start; - sf_io_request->completion_func.io_args.context_id = context_id; - sf_io_request->completion_func.io_args.offset = offset; - sf_io_request->completion_func.io_args.elements = elements; - sf_io_request->completion_func.io_args.data = data; - sf_io_request->completion_func.io_args.io_req = MPI_REQUEST_NULL; - sf_io_request->completion_func.io_function = async_completion; - sf_io_request->completion_func.pending = 0; - - sf_io_request->prev = sf_io_request->next = NULL; - /* Start the actual data transfer */ - - status = MPI_Irecv(data, (int)elements, MPI_BYTE, io_concentrator[ioc_start], - READ_INDEP_DATA, sf_context->sf_data_comm, - &sf_io_request->completion_func.io_args.io_req); - - if (status == MPI_SUCCESS) { - sf_io_request->completion_func.pending = 1; - *io_req = sf_io_request; - } else { - puts("MPI_Irecv must have failed!"); - free(sf_io_request); - *io_req = NULL; - } - - return status; -} /* end read__independent_async() */ +#endif + status = MPI_Send(msg, 3, MPI_INT64_T, io_concentrator[ioc_start], READ_INDEP, sf_context->sf_msg_comm); + if (status != MPI_SUCCESS) { + int len; + char estring[MPI_MAX_ERROR_STRING]; + MPI_Error_string(status, estring, &len); + printf("[%d] ERROR! MPI_Send request header (%ld) " + "bytes to %d returned an error(%s)\n", + WORLD_RANK(sf_context), sizeof(msg), io_concentrator[ioc_start], estring); + fflush(stdout); + return -1; + } + + /* At this point in the new implementation, we should queue + * the async recv so that when the top level VFD tells us + * to complete all pending IO requests, we have all the info + * we need to accomplish that. + */ + sf_io_request = (io_req_t *)malloc(sizeof(io_req_t)); + assert(sf_io_request); + + sf_io_request->completion_func.io_args.ioc = (int)ioc_start; + sf_io_request->completion_func.io_args.context_id = context_id; + sf_io_request->completion_func.io_args.offset = offset; + sf_io_request->completion_func.io_args.elements = elements; + sf_io_request->completion_func.io_args.data = data; + sf_io_request->completion_func.io_args.io_req = MPI_REQUEST_NULL; + sf_io_request->completion_func.io_function = async_completion; + sf_io_request->completion_func.pending = 0; + + sf_io_request->prev = sf_io_request->next = NULL; + /* Start the actual data transfer */ + + status = MPI_Irecv(data, (int)elements, MPI_BYTE, io_concentrator[ioc_start], READ_INDEP_DATA, + sf_context->sf_data_comm, &sf_io_request->completion_func.io_args.io_req); + + if (status == MPI_SUCCESS) { + sf_io_request->completion_func.pending = 1; + *io_req = sf_io_request; + } + else { + puts("MPI_Irecv must have failed!"); + free(sf_io_request); + *io_req = NULL; + } + + return status; +} /* end read__independent_async() */ /*------------------------------------------------------------------------- * Function: get_ioc_subfile_path @@ -793,23 +785,23 @@ static int read__independent_async(int n_io_concentrators, hid_t context_id, * Return: A full filepath which should be copied, e.g. using strdup *------------------------------------------------------------------------- */ -static char *get_ioc_subfile_path(int ioc, int ioc_count, - subfiling_context_t *sf_context) { - static char filepath[PATH_MAX]; - char *subfile_dir = NULL; - char *prefix = sf_context->subfile_prefix; - - if (prefix != NULL) { - sprintf(filepath, "%s/" SF_NODE_LOCAL_TEMPLATE, prefix, - sf_context->h5_file_id, ioc, ioc_count); - } else { - strcpy(filepath, sf_context->filename); - subfile_dir = strrchr(filepath, '/'); - assert(subfile_dir); - sprintf(subfile_dir + 1, SF_FILENAME_TEMPLATE, sf_context->h5_file_id, ioc, - ioc_count); - } - return filepath; +static char * +get_ioc_subfile_path(int ioc, int ioc_count, subfiling_context_t *sf_context) +{ + static char filepath[PATH_MAX]; + char * subfile_dir = NULL; + char * prefix = sf_context->subfile_prefix; + + if (prefix != NULL) { + sprintf(filepath, "%s/" SF_FILENAME_TEMPLATE, prefix, sf_context->h5_file_id, ioc, ioc_count); + } + else { + strcpy(filepath, sf_context->h5_filename); + subfile_dir = strrchr(filepath, '/'); + assert(subfile_dir); + sprintf(subfile_dir + 1, SF_FILENAME_TEMPLATE, sf_context->h5_file_id, ioc, ioc_count); + } + return filepath; } /* end get_ioc_subfile_path() */ /*------------------------------------------------------------------------- @@ -831,10 +823,12 @@ static char *get_ioc_subfile_path(int ioc, int ioc_count, * values (-1) indicates an error. *------------------------------------------------------------------------- */ -static int progress_this_pending_io(io_req_t *this_req) { - assert(this_req); - assert(this_req->completion_func.io_function); - return (*this_req->completion_func.io_function)(&this_req->completion_func); +static int +progress_this_pending_io(io_req_t *this_req) +{ + assert(this_req); + assert(this_req->completion_func.io_function); + return (*this_req->completion_func.io_function)(&this_req->completion_func); } /*------------------------------------------------------------------------- @@ -848,25 +842,26 @@ static int progress_this_pending_io(io_req_t *this_req) { * values (-1) indicates an error. *------------------------------------------------------------------------- */ -static int write_data(io_func_t *this_func) { - int ioc, status; - int64_t elements; - void *data; - int *io_concentrator = NULL; - subfiling_context_t *sf_context = NULL; - assert(this_func); +static int +write_data(io_func_t *this_func) +{ + int ioc, status; + int64_t elements; + void * data; + int * io_concentrator = NULL; + subfiling_context_t *sf_context = NULL; + assert(this_func); - sf_context = get__subfiling_object(this_func->io_args.context_id); + sf_context = get__subfiling_object(this_func->io_args.context_id); - assert(sf_context); + assert(sf_context); - io_concentrator = sf_context->topology->io_concentrator; - ioc = this_func->io_args.ioc; + io_concentrator = sf_context->topology->io_concentrator; + ioc = this_func->io_args.ioc; - status = MPI_Isend(data, (int)elements, MPI_BYTE, io_concentrator[ioc], - WRITE_INDEP_DATA, sf_context->sf_data_comm, - &this_func->io_args.io_req); - return status; + status = MPI_Isend(data, (int)elements, MPI_BYTE, io_concentrator[ioc], WRITE_INDEP_DATA, + sf_context->sf_data_comm, &this_func->io_args.io_req); + return status; } /*------------------------------------------------------------------------- @@ -888,43 +883,44 @@ static int write_data(io_func_t *this_func) { * values (-1) indicates an error. *------------------------------------------------------------------------- */ -static int async_completion(void *arg) { - struct async_arg { - int n_reqs; - MPI_Request *sf_reqs; - } *in_progress = (struct async_arg *)arg; - - assert(arg); - int status, errors = 0; - int count = in_progress->n_reqs; - int n_waiting = count; - int indices[count]; - MPI_Status stats[count]; - useconds_t delay = 5; - - while (n_waiting) { - int i, ready = 0; - status = MPI_Testsome(count, in_progress->sf_reqs, &ready, indices, stats); - if (status != MPI_SUCCESS) { - int len; - char estring[MPI_MAX_ERROR_STRING]; - MPI_Error_string(status, estring, &len); - printf("[%s] MPI_ERROR! MPI_Testsome returned an error(%s)\n", __func__, - estring); - fflush(stdout); - errors++; - return -1; - } +static int +async_completion(void *arg) +{ + struct async_arg { + int n_reqs; + MPI_Request *sf_reqs; + } *in_progress = (struct async_arg *)arg; + + assert(arg); + int status, errors = 0; + int count = in_progress->n_reqs; + int n_waiting = count; + int indices[count]; + MPI_Status stats[count]; + useconds_t delay = 5; + + while (n_waiting) { + int i, ready = 0; + status = MPI_Testsome(count, in_progress->sf_reqs, &ready, indices, stats); + if (status != MPI_SUCCESS) { + int len; + char estring[MPI_MAX_ERROR_STRING]; + MPI_Error_string(status, estring, &len); + printf("[%s] MPI_ERROR! MPI_Testsome returned an error(%s)\n", __func__, estring); + fflush(stdout); + errors++; + return -1; + } - if (ready == 0) { - usleep(delay); - } + if (ready == 0) { + usleep(delay); + } - for (i = 0; i < ready; i++) { - n_waiting--; + for (i = 0; i < ready; i++) { + n_waiting--; + } } - } - return errors; + return errors; } /*------------------------------------------------------------------------- @@ -960,137 +956,138 @@ static int async_completion(void *arg) { * Changes: Initial Version/None. *------------------------------------------------------------------------- */ -static int write__independent_async(int n_io_concentrators, hid_t context_id, - int64_t offset, int64_t elements, - int H5_ATTR_PARALLEL_UNUSED dtype_extent, - const void *data, io_req_t **io_req) { - - int ack = 0, active_sends = 0, n_waiting = 0, status = 0; - int64_t stripe_size, ioc_row, start_id, ioc_start, ioc_offset; - int *io_concentrator = NULL; - io_req_t *sf_io_request = NULL; - MPI_Request ackrequest; - int64_t msg[3] = {0, }; - - subfiling_context_t *sf_context = get__subfiling_object(context_id); - assert(sf_context != NULL); - - /* Calculate the IOC that we'll send the IO request to */ - stripe_size = sf_context->sf_stripe_size; - - start_id = offset / stripe_size; - ioc_row = start_id / n_io_concentrators; - ioc_offset = (offset % stripe_size) + (ioc_row * stripe_size); - ioc_start = start_id % n_io_concentrators; - - io_concentrator = sf_context->topology->io_concentrator; - assert(io_concentrator != NULL); - - /* Make sure that we can return a request structure - * if everything is working correctly - */ - assert(io_req); - - - /* Prepare an IO request. - * This gets sent to the ioc identified by the file offset. - * (see above: Calculate the IOC)) - */ - msg[0] = elements; - msg[1] = ioc_offset; - msg[2] = context_id; +static int +write__independent_async(int n_io_concentrators, hid_t context_id, int64_t offset, int64_t elements, + int H5_ATTR_PARALLEL_UNUSED dtype_extent, const void *data, io_req_t **io_req) +{ + + int ack = 0, active_sends = 0, n_waiting = 0, status = 0; + int64_t stripe_size, ioc_row, start_id, ioc_start, ioc_offset; + int * io_concentrator = NULL; + io_req_t * sf_io_request = NULL; + MPI_Request ackrequest; + int64_t msg[3] = { + 0, + }; + + subfiling_context_t *sf_context = get__subfiling_object(context_id); + assert(sf_context != NULL); + + /* Calculate the IOC that we'll send the IO request to */ + stripe_size = sf_context->sf_stripe_size; + + start_id = offset / stripe_size; + ioc_row = start_id / n_io_concentrators; + ioc_offset = (offset % stripe_size) + (ioc_row * stripe_size); + ioc_start = start_id % n_io_concentrators; + + io_concentrator = sf_context->topology->io_concentrator; + assert(io_concentrator != NULL); + + /* Make sure that we can return a request structure + * if everything is working correctly + */ + assert(io_req); + + /* Prepare an IO request. + * This gets sent to the ioc identified by the file offset. + * (see above: Calculate the IOC)) + */ + msg[0] = elements; + msg[1] = ioc_offset; + msg[2] = context_id; #ifdef VERBOSE - printf("[%s ioc(%ld)] elements=%ld, offset=%ld, file_offset=%ld\n", - __func__, ioc_start, elements, offset, ioc_offset); - fflush(stdout); -#endif - status = MPI_Send(msg, 3, MPI_INT64_T, io_concentrator[ioc_start], - WRITE_INDEP, sf_context->sf_msg_comm); - - if (status != MPI_SUCCESS) { - int len; - char estring[MPI_MAX_ERROR_STRING]; - MPI_Error_string(status, estring, &len); - printf("[%d] ERROR! MPI_Send of %ld bytes to %d returned an " - "error(%s)\n", - WORLD_RANK(sf_context), sizeof(msg), io_concentrator[ioc_start], estring); + printf("[%s ioc(%ld)] elements=%ld, offset=%ld, file_offset=%ld\n", __func__, ioc_start, elements, offset, + ioc_offset); fflush(stdout); - return -1; - } else - active_sends++; - /* - * We wait for memory to be allocated on the target IOC so that we can - * start sending user data. Once memory is allocated, we will receive - * an ACK (or NACK) message from the IOC to allow us to proceed. - */ - status = MPI_Irecv(&ack, 1, MPI_INT, io_concentrator[ioc_start], - WRITE_INDEP_ACK, sf_context->sf_data_comm, &ackrequest); - - if (status != MPI_SUCCESS) { - printf("[%d %s] MPI_Irecv failed\n", WORLD_RANK(sf_context), __func__); - fflush(stdout); - return -1; - } +#endif + status = MPI_Send(msg, 3, MPI_INT64_T, io_concentrator[ioc_start], WRITE_INDEP, sf_context->sf_msg_comm); - n_waiting = active_sends; + if (status != MPI_SUCCESS) { + int len; + char estring[MPI_MAX_ERROR_STRING]; + MPI_Error_string(status, estring, &len); + printf("[%d] ERROR! MPI_Send of %ld bytes to %d returned an " + "error(%s)\n", + WORLD_RANK(sf_context), sizeof(msg), io_concentrator[ioc_start], estring); + fflush(stdout); + return -1; + } + else + active_sends++; + /* + * We wait for memory to be allocated on the target IOC so that we can + * start sending user data. Once memory is allocated, we will receive + * an ACK (or NACK) message from the IOC to allow us to proceed. + */ + status = MPI_Irecv(&ack, 1, MPI_INT, io_concentrator[ioc_start], WRITE_INDEP_ACK, + sf_context->sf_data_comm, &ackrequest); - while (n_waiting) { - int flag = 0; - status = MPI_Test(&ackrequest, &flag, MPI_STATUS_IGNORE); - if (status == MPI_SUCCESS) { - if (flag == 0) - usleep(0); - else { - n_waiting--; - if (ack == 0) { /* NACK */ - printf("%s - Received NACK!\n", __func__); + if (status != MPI_SUCCESS) { + printf("[%d %s] MPI_Irecv failed\n", WORLD_RANK(sf_context), __func__); + fflush(stdout); + return -1; + } + + n_waiting = active_sends; + + while (n_waiting) { + int flag = 0; + status = MPI_Test(&ackrequest, &flag, MPI_STATUS_IGNORE); + if (status == MPI_SUCCESS) { + if (flag == 0) + usleep(0); + else { + n_waiting--; + if (ack == 0) { /* NACK */ + printf("%s - Received NACK!\n", __func__); + } + } } - } } - } - - /* At this point in the new implementation, we should queue - * the async write so that when the top level VFD tells us - * to complete all pending IO requests, we have all the info - * we need to accomplish that. - */ - sf_io_request = (io_req_t *)malloc(sizeof(io_req_t)); - assert(sf_io_request); - - sf_io_request->completion_func.io_args.ioc = (int)ioc_start; - sf_io_request->completion_func.io_args.context_id = context_id; - sf_io_request->completion_func.io_args.offset = offset; - sf_io_request->completion_func.io_args.elements = elements; - sf_io_request->completion_func.io_args.data = cast_to_void(data); - sf_io_request->completion_func.io_args.io_req = MPI_REQUEST_NULL; - sf_io_request->completion_func.io_function = async_completion; - sf_io_request->completion_func.pending = 0; - - sf_io_request->prev = sf_io_request->next = NULL; - /* Start the actual data transfer */ - - status = MPI_Isend(data, (int)elements, MPI_BYTE, io_concentrator[ioc_start], - WRITE_INDEP_DATA, sf_context->sf_data_comm, - &sf_io_request->completion_func.io_args.io_req); - - /* When we actually have the async IO support, - * the request should be queued before we - * return to the caller. - * Having queued the IO operation, we might want to - * get additional work started before allowing the - * queued IO requests to make further progress and/or - * to complete, so we just return to the caller. - */ - - if (status == MPI_SUCCESS) { - sf_io_request->completion_func.pending = 1; - *io_req = sf_io_request; - } else { - puts("MPI_Isend must have failed!"); - free(sf_io_request); - *io_req = NULL; - } - return status; + + /* At this point in the new implementation, we should queue + * the async write so that when the top level VFD tells us + * to complete all pending IO requests, we have all the info + * we need to accomplish that. + */ + sf_io_request = (io_req_t *)malloc(sizeof(io_req_t)); + assert(sf_io_request); + + sf_io_request->completion_func.io_args.ioc = (int)ioc_start; + sf_io_request->completion_func.io_args.context_id = context_id; + sf_io_request->completion_func.io_args.offset = offset; + sf_io_request->completion_func.io_args.elements = elements; + sf_io_request->completion_func.io_args.data = cast_to_void(data); + sf_io_request->completion_func.io_args.io_req = MPI_REQUEST_NULL; + sf_io_request->completion_func.io_function = async_completion; + sf_io_request->completion_func.pending = 0; + + sf_io_request->prev = sf_io_request->next = NULL; + /* Start the actual data transfer */ + + status = MPI_Isend(data, (int)elements, MPI_BYTE, io_concentrator[ioc_start], WRITE_INDEP_DATA, + sf_context->sf_data_comm, &sf_io_request->completion_func.io_args.io_req); + + /* When we actually have the async IO support, + * the request should be queued before we + * return to the caller. + * Having queued the IO operation, we might want to + * get additional work started before allowing the + * queued IO requests to make further progress and/or + * to complete, so we just return to the caller. + */ + + if (status == MPI_SUCCESS) { + sf_io_request->completion_func.pending = 1; + *io_req = sf_io_request; + } + else { + puts("MPI_Isend must have failed!"); + free(sf_io_request); + *io_req = NULL; + } + return status; } /* end write__independent_async() */ /* @@ -1108,81 +1105,82 @@ static int write__independent_async(int n_io_concentrators, hid_t context_id, * * Return: SUCCEED if no errors, FAIL otherwise. */ -herr_t H5FD__write_vector_internal(hid_t h5_fid, hssize_t count, - haddr_t addrs[], size_t sizes[], - const void *bufs[] /* data_in */) { - herr_t ret_value = SUCCEED; - hssize_t status = 0, k = 0; - hid_t sf_context_id = fid_map_to_context((uint64_t)h5_fid); - subfiling_context_t *sf_context = NULL; - io_req_t **sf_async_reqs = NULL; - MPI_Request *active_reqs = NULL; - struct __mpi_req { - int n_reqs; - MPI_Request *active_reqs; - } *mpi_reqs = NULL; - - sf_context = get__subfiling_object(sf_context_id); - assert(sf_context != NULL); - - active_reqs = (MPI_Request *)calloc((size_t)(count+2), sizeof(MPI_Request)); - assert(active_reqs); - - sf_async_reqs = (io_req_t **)calloc((size_t)count, sizeof(void *)); - assert(sf_async_reqs); - - /* - * Note: We allocated extra space in the active_requests (above). - * The extra should be enough for an integer plus a pointer. - */ - mpi_reqs = (struct __mpi_req *)&active_reqs[count]; - mpi_reqs->n_reqs = (int)count; - mpi_reqs->active_reqs = active_reqs; - - /* Each pass thru the following should queue an MPI write - * to a new IOC. Both the IOC selection and offset within the - * particular subfile are based on the combinatation of striping - * factors and the virtual file offset (addrs[k]). - */ - for (k = 0; k < count; k++) { - if (sizes[k] == 0) { - puts("Something wrong with the size argument: size is 0!"); - fflush(stdout); - } - status = write__independent_async( - sf_context->topology->n_io_concentrators, sf_context_id, - (int64_t)addrs[k],(int64_t)sizes[k], 1, bufs[k], &sf_async_reqs[k]); - if (status < 0) { - printf("%s - encountered an internal error!\n", __func__); - goto errors; - } else { - mpi_reqs->active_reqs[k] = - sf_async_reqs[k]->completion_func.io_args.io_req; - } - } +herr_t +H5FD__write_vector_internal(hid_t h5_fid, hssize_t count, haddr_t addrs[], size_t sizes[], + const void *bufs[] /* data_in */) +{ + herr_t ret_value = SUCCEED; + hssize_t status = 0, k = 0; + hid_t sf_context_id = fid_map_to_context((uint64_t)h5_fid); + subfiling_context_t *sf_context = NULL; + io_req_t ** sf_async_reqs = NULL; + MPI_Request * active_reqs = NULL; + struct __mpi_req { + int n_reqs; + MPI_Request *active_reqs; + } *mpi_reqs = NULL; + + sf_context = get__subfiling_object(sf_context_id); + assert(sf_context != NULL); - /* Here, we should have queued 'count' async requests. - * We can can now try to complete those before returning - * to the caller for the next set of IO operations. - */ - if (sf_async_reqs[0]->completion_func.io_function) - ret_value = (*sf_async_reqs[0]->completion_func.io_function)(mpi_reqs); + active_reqs = (MPI_Request *)calloc((size_t)(count + 2), sizeof(struct __mpi_req)); + assert(active_reqs); - if (active_reqs) - free(active_reqs); + sf_async_reqs = (io_req_t **)calloc((size_t)count, sizeof(void *)); + assert(sf_async_reqs); - if (sf_async_reqs) { + /* + * Note: We allocated extra space in the active_requests (above). + * The extra should be enough for an integer plus a pointer. + */ + mpi_reqs = (struct __mpi_req *)&active_reqs[count]; + mpi_reqs->n_reqs = (int)count; + mpi_reqs->active_reqs = active_reqs; + + /* Each pass thru the following should queue an MPI write + * to a new IOC. Both the IOC selection and offset within the + * particular subfile are based on the combinatation of striping + * factors and the virtual file offset (addrs[k]). + */ for (k = 0; k < count; k++) { - if (sf_async_reqs[k]) { - free(sf_async_reqs[k]); - } + if (sizes[k] == 0) { + puts("Something wrong with the size argument: size is 0!"); + fflush(stdout); + } + status = + write__independent_async(sf_context->topology->n_io_concentrators, sf_context_id, + (int64_t)addrs[k], (int64_t)sizes[k], 1, bufs[k], &sf_async_reqs[k]); + if (status < 0) { + printf("%s - encountered an internal error!\n", __func__); + goto errors; + } + else { + mpi_reqs->active_reqs[k] = sf_async_reqs[k]->completion_func.io_args.io_req; + } + } + + /* Here, we should have queued 'count' async requests. + * We can can now try to complete those before returning + * to the caller for the next set of IO operations. + */ + if (sf_async_reqs[0]->completion_func.io_function) + ret_value = (*sf_async_reqs[0]->completion_func.io_function)(mpi_reqs); + + if (active_reqs) + free(active_reqs); + + if (sf_async_reqs) { + for (k = 0; k < count; k++) { + if (sf_async_reqs[k]) { + free(sf_async_reqs[k]); + } + } + free(sf_async_reqs); } - free(sf_async_reqs); - } - return ret_value; + return ret_value; errors: - return FAIL; + return FAIL; } /* @@ -1190,91 +1188,95 @@ errors: * The H5FD__ioc_read_vector VFD call included additional 'hid_t dxpl' * and 'H5FD_mem_t types[]'. These are now removed. */ -herr_t H5FD__read_vector_internal(hid_t h5_fid, hssize_t count, haddr_t addrs[], - size_t sizes[], - void *bufs[] /* data_out */) { - herr_t ret_value = SUCCEED; - hssize_t status = 0, k = 0; - hid_t sf_context_id = fid_map_to_context((uint64_t)h5_fid); - subfiling_context_t *sf_context = NULL; - io_req_t **sf_async_reqs = NULL; - MPI_Request *active_reqs = NULL; - struct __mpi_req { - int n_reqs; - MPI_Request *active_reqs; - } *mpi_reqs = NULL; - - sf_context = get__subfiling_object(sf_context_id); - assert(sf_context != NULL); - - active_reqs = (MPI_Request *)calloc((size_t)(count+2), sizeof(MPI_Request)); - assert(active_reqs); - - sf_async_reqs = (io_req_t **)calloc((size_t)count, sizeof(void *)); - assert(sf_async_reqs); - - /* - * Note: We allocated extra space in the active_requests (above). - * The extra should be enough for an integer plus a pointer. - */ - mpi_reqs = (struct __mpi_req *)&active_reqs[count]; - mpi_reqs->n_reqs = (int)count; - mpi_reqs->active_reqs = active_reqs; - - for (k = 0; k < count; k++) { - status = read__independent_async( - sf_context->topology->n_io_concentrators, sf_context_id, - (int64_t)addrs[k], (int64_t)sizes[k], 1, bufs[k], &sf_async_reqs[k]); - if (status < 0) { - printf("%s - encountered an internal error!\n", __func__); - goto errors; - } else { - mpi_reqs->active_reqs[k] = - sf_async_reqs[k]->completion_func.io_args.io_req; - } - } - /* Here, we should have queued 'count' async requests - * (one to each required IOC). - * - * We can can now try to complete those before returning - * to the caller for the next set of IO operations. - */ - if (sf_async_reqs[0]->completion_func.io_function) - ret_value = (*sf_async_reqs[0]->completion_func.io_function)(mpi_reqs); - - if (active_reqs) - free(active_reqs); - - if (sf_async_reqs) { +herr_t +H5FD__read_vector_internal(hid_t h5_fid, hssize_t count, haddr_t addrs[], size_t sizes[], + void *bufs[] /* data_out */) +{ + herr_t ret_value = SUCCEED; + hssize_t status = 0, k = 0; + hid_t sf_context_id = fid_map_to_context((uint64_t)h5_fid); + subfiling_context_t *sf_context = NULL; + io_req_t ** sf_async_reqs = NULL; + MPI_Request * active_reqs = NULL; + struct __mpi_req { + int n_reqs; + MPI_Request *active_reqs; + } *mpi_reqs = NULL; + + sf_context = get__subfiling_object(sf_context_id); + assert(sf_context != NULL); + + active_reqs = (MPI_Request *)calloc((size_t)(count + 2), sizeof(struct __mpi_req)); + assert(active_reqs); + + sf_async_reqs = (io_req_t **)calloc((size_t)count, sizeof(void *)); + assert(sf_async_reqs); + + /* + * Note: We allocated extra space in the active_requests (above). + * The extra should be enough for an integer plus a pointer. + */ + mpi_reqs = (struct __mpi_req *)&active_reqs[count]; + mpi_reqs->n_reqs = (int)count; + mpi_reqs->active_reqs = active_reqs; + for (k = 0; k < count; k++) { - if (sf_async_reqs[k]) { - free(sf_async_reqs[k]); - } + status = read__independent_async(sf_context->topology->n_io_concentrators, sf_context_id, + (int64_t)addrs[k], (int64_t)sizes[k], 1, bufs[k], &sf_async_reqs[k]); + if (status < 0) { + printf("%s - encountered an internal error!\n", __func__); + goto errors; + } + else { + mpi_reqs->active_reqs[k] = sf_async_reqs[k]->completion_func.io_args.io_req; + } } - free(sf_async_reqs); - } - return ret_value; + /* Here, we should have queued 'count' async requests + * (one to each required IOC). + * + * We can can now try to complete those before returning + * to the caller for the next set of IO operations. + */ + if (sf_async_reqs[0]->completion_func.io_function) + ret_value = (*sf_async_reqs[0]->completion_func.io_function)(mpi_reqs); + + if (active_reqs) + free(active_reqs); + + if (sf_async_reqs) { + for (k = 0; k < count; k++) { + if (sf_async_reqs[k]) { + free(sf_async_reqs[k]); + } + } + free(sf_async_reqs); + } + return ret_value; errors: - return FAIL; + return FAIL; } -int sf_truncate(hid_t h5_fid, haddr_t H5_ATTR_PARALLEL_UNUSED addr) { - hid_t sf_context_id = fid_map_to_context((uint64_t)h5_fid); - subfiling_context_t *sf_context = get__subfiling_object(sf_context_id); +int +sf_truncate(hid_t h5_fid, haddr_t H5_ATTR_PARALLEL_UNUSED addr) +{ + hid_t sf_context_id = fid_map_to_context((uint64_t)h5_fid); + subfiling_context_t *sf_context = get__subfiling_object(sf_context_id); - assert(sf_context != NULL); - return 0; + assert(sf_context != NULL); + return 0; } -int sf_shutdown_local_ioc(hid_t fid) { - hid_t context_id = fid_map_to_context((uint64_t)fid); - subfiling_context_t *sf_context = get__subfiling_object(context_id); - assert(sf_context != NULL); - if (sf_context->topology->rank_is_ioc) { - sf_shutdown_flag = 1; - } - return 0; +int +sf_shutdown_local_ioc(hid_t fid) +{ + hid_t context_id = fid_map_to_context((uint64_t)fid); + subfiling_context_t *sf_context = get__subfiling_object(context_id); + assert(sf_context != NULL); + if (sf_context->topology->rank_is_ioc) { + atomic_fetch_add(&sf_shutdown_flag, 1); + } + return 0; } /*------------------------------------------------------------------------- @@ -1339,119 +1341,121 @@ int sf_shutdown_local_ioc(hid_t fid) { * Changes: Initial Version/None. *------------------------------------------------------------------------- */ -int ioc_main(int64_t context_id) { - int subfile_rank; - int flag, ret; - int max_work_depth; - int shutdown_requested; - MPI_Status status, msg_status; - sf_work_request_t *incoming_requests = NULL; - useconds_t delay = 20; - subfiling_context_t *context = get__subfiling_object(context_id); - double queue_start_time; - - assert(context != NULL); - /* We can't have opened any files at this point.. - * The file open approach has changed so that the normal - * application rank (hosting this thread) does the file open. - * We can simply utilize the file descriptor (which should now - * represent an open file). - */ - - subfile_rank = context->sf_group_rank; - - if (request_count_per_rank == NULL) { - request_count_per_rank = (int *)calloc((size_t)WORLD_SIZE(context), sizeof(int)); - assert(request_count_per_rank != NULL); - } - - max_work_depth = MAX(8, WORLD_SIZE(context) * MAX_WORK_PER_RANK); - incoming_requests = (sf_work_request_t *)calloc((size_t)(max_work_depth + 1), - sizeof(sf_work_request_t)); - - /* Validate that the allocation succeeded */ - assert(incoming_requests != NULL); - - /* Initialize atomic vars */ - atomic_init(&sf_workinprogress, 0); - atomic_init(&sf_work_pending, 0); - atomic_init(&sf_file_close_count, 0); - atomic_init(&sf_file_refcount, 0); - atomic_init(&sf_ioc_fini_refcount, 0); - atomic_init(&sf_ioc_ready, 1); - shutdown_requested = 0; - - while (!shutdown_requested || sf_work_pending) { - flag = 0; - ret = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, context->sf_msg_comm, &flag, - &status); - if ((ret == MPI_SUCCESS) && (flag != 0)) { - sf_work_request_t *msg = NULL; - int count; - int index = 0; - int request_size = (int)sizeof(sf_work_request_t); - int source = status.MPI_SOURCE; - int tag = status.MPI_TAG; - - MPI_Get_count(&status, MPI_BYTE, &count); - if (count > request_size) { - msg = (sf_work_request_t *)malloc((size_t)count); - ret = MPI_Recv(msg, count, MPI_BYTE, source, tag, context->sf_msg_comm, - &msg_status); - } else { - index = atomic_load(&sf_workinprogress); - ret = MPI_Recv(&incoming_requests[index], count, MPI_BYTE, source, tag, - context->sf_msg_comm, &msg_status); - if (MPI_SUCCESS == ret) { - int howmany = 0; - MPI_Get_count(&msg_status, MPI_BYTE, &howmany); - if (howmany != count) { - printf("%s: MPI_Recv completed %d bytes of %d\n", __func__, howmany, - count); - fflush(stdout); - } +int +ioc_main(int64_t context_id) +{ + int subfile_rank; + int flag, ret; + int max_work_depth; + int shutdown_requested; + MPI_Status status, msg_status; + sf_work_request_t * incoming_requests = NULL; + useconds_t delay = 20; + subfiling_context_t *context = get__subfiling_object(context_id); + double queue_start_time; + + assert(context != NULL); + /* We can't have opened any files at this point.. + * The file open approach has changed so that the normal + * application rank (hosting this thread) does the file open. + * We can simply utilize the file descriptor (which should now + * represent an open file). + */ + + subfile_rank = context->sf_group_rank; + + if (request_count_per_rank == NULL) { + request_count_per_rank = (int *)calloc((size_t)WORLD_SIZE(context), sizeof(int)); + assert(request_count_per_rank != NULL); + } + + max_work_depth = MAX(8, WORLD_SIZE(context) * MAX_WORK_PER_RANK); + incoming_requests = (sf_work_request_t *)calloc((size_t)(max_work_depth + 1), sizeof(sf_work_request_t)); + + /* Validate that the allocation succeeded */ + assert(incoming_requests != NULL); + + /* Initialize atomic vars */ + atomic_init(&sf_workinprogress, 0); + atomic_init(&sf_work_pending, 0); + atomic_init(&sf_file_close_count, 0); + atomic_init(&sf_file_refcount, 0); + atomic_init(&sf_ioc_fini_refcount, 0); + atomic_init(&sf_shutdown_flag, 0); + atomic_init(&sf_ioc_ready, 1); + shutdown_requested = 0; + + while (!shutdown_requested || sf_work_pending) { + flag = 0; + ret = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, context->sf_msg_comm, &flag, &status); + if ((ret == MPI_SUCCESS) && (flag != 0)) { + sf_work_request_t *msg = NULL; + int count; + int index = 0; + int request_size = (int)sizeof(sf_work_request_t); + int source = status.MPI_SOURCE; + int tag = status.MPI_TAG; + + MPI_Get_count(&status, MPI_BYTE, &count); + if (count > request_size) { + msg = (sf_work_request_t *)malloc((size_t)count); + ret = MPI_Recv(msg, count, MPI_BYTE, source, tag, context->sf_msg_comm, &msg_status); + } + else { + index = atomic_load(&sf_workinprogress); + ret = MPI_Recv(&incoming_requests[index], count, MPI_BYTE, source, tag, context->sf_msg_comm, + &msg_status); + if (MPI_SUCCESS == ret) { + int howmany = 0; + MPI_Get_count(&msg_status, MPI_BYTE, &howmany); + if (howmany != count) { + printf("%s: MPI_Recv completed %d bytes of %d\n", __func__, howmany, count); + fflush(stdout); + } + } + } + queue_start_time = MPI_Wtime(); + if (ret == MPI_SUCCESS) { + if (msg) { + printf("%s: non-std msg=(%p) from %d\n", __func__, (void *)msg, source); + fflush(stdout); + + msg->source = source; + msg->subfile_rank = subfile_rank; + msg->context_id = context->sf_context_id; + msg->start_time = queue_start_time; + tpool_add_work(msg); + } + else { + incoming_requests[index].tag = tag; + incoming_requests[index].source = source; + incoming_requests[index].subfile_rank = subfile_rank; + incoming_requests[index].start_time = queue_start_time; + incoming_requests[index].buffer = NULL; + tpool_add_work(&incoming_requests[index]); + if (index == max_work_depth - 1) { + atomic_init(&sf_workinprogress, 0); + } + else { + atomic_fetch_add(&sf_workinprogress, 1); // atomic + } + } + } } - } - queue_start_time = MPI_Wtime(); - if (ret == MPI_SUCCESS) { - if (msg) { - printf("%s: non-std msg=(%p) from %d\n", __func__, (void *)msg, source); - fflush(stdout); - - msg->source = source; - msg->subfile_rank = subfile_rank; - msg->context_id = context->sf_context_id; - msg->start_time = queue_start_time; - tpool_add_work(msg); - } else { - incoming_requests[index].tag = tag; - incoming_requests[index].source = source; - incoming_requests[index].subfile_rank = subfile_rank; - incoming_requests[index].start_time = queue_start_time; - incoming_requests[index].buffer = NULL; - incoming_requests[index].completed = 0; - tpool_add_work(&incoming_requests[index]); - if (index == max_work_depth - 1) { - atomic_init(&sf_workinprogress, 0); - } else { - atomic_fetch_add(&sf_workinprogress, 1); // atomic - } + else { + usleep(delay); } - } - } else { - usleep(delay); + shutdown_requested = atomic_load(&sf_shutdown_flag); } - shutdown_requested = sf_shutdown_flag; - } - if (incoming_requests) { - free(incoming_requests); - } + if (incoming_requests) { + free(incoming_requests); + } - /* Reset the shutdown flag */ - sf_shutdown_flag = 0; + /* Reset the shutdown flag */ + atomic_init(&sf_shutdown_flag, 0); - return 0; + return 0; } /* @@ -1460,33 +1464,35 @@ Private helper functions ========================================= */ -static int send_ack__(int target, int subfile_rank, int tag, MPI_Comm comm) { - int ack = 1; - int ret = MPI_Send(&ack, 1, MPI_INT, target, tag, comm); +static int +send_ack__(int target, int subfile_rank, int tag, MPI_Comm comm) +{ + int ack = 1; + int ret = MPI_Send(&ack, 1, MPI_INT, target, tag, comm); #ifndef NDEBUG - if (sf_verbose_flag) { - if (sf_logfile) { - fprintf(sf_logfile, "[ioc(%d): Sending ACK to MPI_rank(%d)\n", - subfile_rank, target); + if (sf_verbose_flag) { + if (sf_logfile) { + fprintf(sf_logfile, "[ioc(%d): Sending ACK to MPI_rank(%d)\n", subfile_rank, target); + } } - } #endif - return ret; + return ret; } -static int send_nack__(int target, int subfile_rank, int tag, MPI_Comm comm) { - int nack = 0; - int ret = MPI_Send(&nack, 1, MPI_INT, target, tag, comm); +static int +send_nack__(int target, int subfile_rank, int tag, MPI_Comm comm) +{ + int nack = 0; + int ret = MPI_Send(&nack, 1, MPI_INT, target, tag, comm); #ifndef NDEBUG - if (sf_verbose_flag) { - if (sf_logfile) { - fprintf(sf_logfile, "[ioc(%d): Sending NACK to MPI_rank(%d)\n", - subfile_rank, target); + if (sf_verbose_flag) { + if (sf_logfile) { + fprintf(sf_logfile, "[ioc(%d): Sending NACK to MPI_rank(%d)\n", subfile_rank, target); + } } - } #endif - return ret; + return ret; } /* @@ -1517,106 +1523,118 @@ from the thread pool threads... * *------------------------------------------------------------------------- */ -int queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, - MPI_Comm comm) { - int fd; - char *recv_buffer = NULL; - int ret = MPI_SUCCESS; - MPI_Status msg_status; - int64_t data_size = msg->header[0]; - int64_t file_offset = msg->header[1]; - int64_t file_context_id = msg->header[2]; - double t_start, t_end; - double t_write, t_wait, t_queue_delay; - subfiling_context_t *sf_context = get__subfiling_object(file_context_id); - assert(sf_context != NULL); - - /* flag that we've attempted to write data to the file */ - sf_context->sf_write_count++; - /* For debugging performance */ - sf_write_ops++; - - t_start = MPI_Wtime(); - t_queue_delay = t_start - msg->start_time; +int +queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm) +{ + int fd; + char * recv_buffer = NULL; + int ret = MPI_SUCCESS; + MPI_Status msg_status; + int64_t data_size = msg->header[0]; + int64_t file_offset = msg->header[1]; + int64_t file_context_id = msg->header[2]; + double t_start, t_end; + double t_write, t_wait, t_queue_delay; + subfiling_context_t *sf_context = get__subfiling_object(file_context_id); + int64_t stripe_id = file_offset + data_size; + haddr_t sf_eof; + assert(sf_context != NULL); + + sf_eof = (haddr_t)(stripe_id % sf_context->sf_stripe_size); + stripe_id /= sf_context->sf_stripe_size; + sf_eof += (haddr_t)((stripe_id * sf_context->sf_blocksize_per_stripe) + sf_context->sf_base_addr); + + /* flag that we've attempted to write data to the file */ + sf_context->sf_write_count++; + /* For debugging performance */ + sf_write_ops++; + + t_start = MPI_Wtime(); + t_queue_delay = t_start - msg->start_time; #ifndef NDEBUG - if (sf_verbose_flag) { - if (sf_logfile) { - fprintf(sf_logfile, - "[ioc(%d) %s]: msg from %d: datasize=%ld\toffset=%ld, " - "queue_delay = %lf seconds\n", - subfile_rank, __func__, source, data_size, file_offset, - t_queue_delay); + if (sf_verbose_flag) { + if (sf_logfile) { + fprintf(sf_logfile, + "[ioc(%d) %s]: msg from %d: datasize=%ld\toffset=%ld, " + "queue_delay = %lf seconds\n", + subfile_rank, __func__, source, data_size, file_offset, t_queue_delay); + } } - } #endif - if (recv_buffer == NULL) { - if ((recv_buffer = (char *)malloc((size_t)data_size)) == NULL) { - perror("malloc"); - send_nack__(source, subfile_rank, WRITE_INDEP_ACK, comm); - return -1; + if (recv_buffer == NULL) { + if ((recv_buffer = (char *)malloc((size_t)data_size)) == NULL) { + perror("malloc"); + send_nack__(source, subfile_rank, WRITE_INDEP_ACK, comm); + return -1; + } } - } - send_ack__(source, subfile_rank, WRITE_INDEP_ACK, comm); - ret = MPI_Recv(recv_buffer, (int)data_size, MPI_BYTE, source, - WRITE_INDEP_DATA, comm, &msg_status); + send_ack__(source, subfile_rank, WRITE_INDEP_ACK, comm); + ret = MPI_Recv(recv_buffer, (int)data_size, MPI_BYTE, source, WRITE_INDEP_DATA, comm, &msg_status); - t_end = MPI_Wtime(); - t_wait = t_end - t_start; - sf_write_wait_time += t_wait; - t_start = t_end; + t_end = MPI_Wtime(); + t_wait = t_end - t_start; + sf_write_wait_time += t_wait; + t_start = t_end; #ifndef NDEBUG - if (sf_verbose_flag) { - if (sf_logfile) { - fprintf(sf_logfile, - "[ioc(%d) %s] MPI_Recv(%ld bytes, from = %d) status = %d\n", - subfile_rank, __func__, data_size, source, ret); + if (sf_verbose_flag) { + if (sf_logfile) { + fprintf(sf_logfile, "[ioc(%d) %s] MPI_Recv(%ld bytes, from = %d) status = %d\n", subfile_rank, + __func__, data_size, source, ret); + } } - } #endif - if (ret != MPI_SUCCESS) { - int len; - char estring[MPI_MAX_ERROR_STRING]; - MPI_Error_string(ret, estring, &len); - printf("[ioc(%d) %s] MPI_ERROR(%d)! MPI_Recv of %ld bytes from %d " - "returned an error(%s)\n", - subfile_rank, __func__, msg_status.MPI_ERROR, data_size, source, - estring); - fflush(stdout); - return ret; - } + if (ret != MPI_SUCCESS) { + int len; + char estring[MPI_MAX_ERROR_STRING]; + MPI_Error_string(ret, estring, &len); + printf("[ioc(%d) %s] MPI_ERROR(%d)! MPI_Recv of %ld bytes from %d " + "returned an error(%s)\n", + subfile_rank, __func__, msg_status.MPI_ERROR, data_size, source, estring); + fflush(stdout); + return ret; + } + + if (msg->serialize) + ioc__wait_for_serialize(msg); + + fd = sf_context->sf_fid; + + if (fd < 0) { + printf("[ioc(%d)] WARNING: %s called while subfile_fid = %d (closed)\n", subfile_rank, __func__, fd); + fflush(stdout); + } + else { + if (sf_write_data(fd, file_offset, recv_buffer, data_size, subfile_rank) < 0) { + free(recv_buffer); + recv_buffer = NULL; + printf("[ioc(%d) %s] sf_write_data returned an error!\n", subfile_rank, __func__); + fflush(stdout); + return -1; + } + t_end = MPI_Wtime(); + t_write = t_end - t_start; + sf_pwrite_time += t_write; + } + + sf_queue_delay_time += t_queue_delay; + + /* Done... */ + if (sf_eof > sf_context->sf_eof) + sf_context->sf_eof = sf_eof; - fd = sf_context->sf_fid; - if (fd < 0) { - printf("[ioc(%d)] WARNING: %s called while subfile_fid = %d (closed)\n", - subfile_rank, __func__, fd); +#ifdef VERBOSE + printf("[ioc(%d)] %s local sf_eof = %ld sf_context=%p\n", subfile_rank, __func__, sf_context->sf_eof, + (void *)sf_context); fflush(stdout); - } else { - if (sf_write_data(fd, file_offset, recv_buffer, data_size, subfile_rank) < - 0) { - free(recv_buffer); - recv_buffer = NULL; - printf("[ioc(%d) %s] sf_write_data returned an error!\n", subfile_rank, - __func__); - fflush(stdout); - return -1; +#endif + if (recv_buffer) { + free(recv_buffer); } - t_end = MPI_Wtime(); - t_write = t_end - t_start; - sf_pwrite_time += t_write; - } - - sf_queue_delay_time += t_queue_delay; - - /* Done... */ - msg->completed = 1; - if (recv_buffer) { - free(recv_buffer); - } - return 0; + return 0; } /*------------------------------------------------------------------------- @@ -1639,146 +1657,146 @@ int queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, * *------------------------------------------------------------------------- */ -int queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, - MPI_Comm comm) { - int fd; - char *send_buffer = NULL; - int ret = MPI_SUCCESS; - int64_t data_size = msg->header[0]; - int64_t file_offset = msg->header[1]; - int64_t file_context_id = msg->header[2]; - double t_start, t_end; - double t_read, t_queue_delay; - - subfiling_context_t *sf_context = get__subfiling_object(file_context_id); - assert(sf_context != NULL); - - sf_context->sf_read_count++; - /* For debugging performance */ - sf_read_ops++; - - t_start = MPI_Wtime(); - t_queue_delay = t_start - msg->start_time; - - fd = sf_context->sf_fid; - if (fd < 0) { - printf("[ioc(%d) %s] subfile(%d) file descriptor not valid\n", subfile_rank, - __func__, fd); - return -1; - } +int +queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm) +{ + int fd; + char * send_buffer = NULL; + int ret = MPI_SUCCESS; + int64_t data_size = msg->header[0]; + int64_t file_offset = msg->header[1]; + int64_t file_context_id = msg->header[2]; + double t_start, t_end; + double t_read, t_queue_delay; + + subfiling_context_t *sf_context = get__subfiling_object(file_context_id); + assert(sf_context != NULL); + + sf_context->sf_read_count++; + /* For debugging performance */ + sf_read_ops++; + + t_start = MPI_Wtime(); + t_queue_delay = t_start - msg->start_time; + + fd = sf_context->sf_fid; + if (fd < 0) { + printf("[ioc(%d) %s] subfile(%d) file descriptor not valid\n", subfile_rank, __func__, fd); + return -1; + } #ifndef NDEBUG - if (sf_verbose_flag && (sf_logfile != NULL)) { - fprintf(sf_logfile, - "[ioc(%d) %s] msg from %d: datasize=%ld\toffset=%ld " - "queue_delay=%lf seconds\n", - subfile_rank, __func__, source, data_size, file_offset, - t_queue_delay); - } + if (sf_verbose_flag && (sf_logfile != NULL)) { + fprintf(sf_logfile, + "[ioc(%d) %s] msg from %d: datasize=%ld\toffset=%ld " + "queue_delay=%lf seconds\n", + subfile_rank, __func__, source, data_size, file_offset, t_queue_delay); + } #endif - if ((send_buffer = (char *)malloc((size_t)data_size)) == NULL) { - perror("malloc"); - return -1; - } - - if (sf_read_data(fd, file_offset, send_buffer, data_size, subfile_rank) < 0) { - printf("[%d] %s - sf_read_data fd=%d for source(%d) returned an error!\n", - subfile_rank, __func__, fd, source ); - fflush(stdout); - /* - * Should send a zero(0) byte message to the client to prevent - * it from hanging... - */ - MPI_Send(send_buffer, 0, MPI_BYTE, source, READ_INDEP_DATA, comm); - free(send_buffer); - return -1; - } - - ret = MPI_Send(send_buffer, (int)data_size, MPI_BYTE, source, READ_INDEP_DATA, - comm); - if (ret != MPI_SUCCESS) { - int len; - char estring[MPI_MAX_ERROR_STRING]; - MPI_Error_string(ret, estring, &len); - printf("[ioc(%d)] ERROR! MPI_Send of %ld bytes to %d returned an " - "error(%s)\n", - subfile_rank, data_size, source, estring); - fflush(stdout); - return ret; - } - t_end = MPI_Wtime(); - t_read = t_end - t_start; - sf_pread_time += t_read; - sf_queue_delay_time += t_queue_delay; + if ((send_buffer = (char *)malloc((size_t)data_size)) == NULL) { + perror("malloc"); + return -1; + } + + if (sf_read_data(fd, file_offset, send_buffer, data_size, subfile_rank) < 0) { + printf("[%d] %s - sf_read_data fd=%d for source(%d) returned an error!\n", subfile_rank, __func__, fd, + source); + fflush(stdout); + /* + * Should send a zero(0) byte message to the client to prevent + * it from hanging... + */ + MPI_Send(send_buffer, 0, MPI_BYTE, source, READ_INDEP_DATA, comm); + free(send_buffer); + return -1; + } + + ret = MPI_Send(send_buffer, (int)data_size, MPI_BYTE, source, READ_INDEP_DATA, comm); + if (ret != MPI_SUCCESS) { + int len; + char estring[MPI_MAX_ERROR_STRING]; + MPI_Error_string(ret, estring, &len); + printf("[ioc(%d)] ERROR! MPI_Send of %ld bytes to %d returned an " + "error(%s)\n", + subfile_rank, data_size, source, estring); + fflush(stdout); + return ret; + } + t_end = MPI_Wtime(); + t_read = t_end - t_start; + sf_pread_time += t_read; + sf_queue_delay_time += t_queue_delay; #ifndef NDEBUG - if (sf_verbose_flag && (sf_logfile != NULL)) { - fprintf(sf_logfile, "[ioc(%d)] MPI_Send to source(%d) completed\n", - subfile_rank, source); - } + if (sf_verbose_flag && (sf_logfile != NULL)) { + fprintf(sf_logfile, "[ioc(%d)] MPI_Send to source(%d) completed\n", subfile_rank, source); + } #endif - if (send_buffer) { - free(send_buffer); - send_buffer = NULL; - } + if (send_buffer) { + free(send_buffer); + send_buffer = NULL; + } - return 0; + return 0; } /* end queue_read_indep() */ /* --------------------------------------------------- * Helper function for subfiling_open_file() see below + * Subfiles should be located in the same directory + * as the HDF5 file unless the user has provided + * an alternate directory name as indicated by the + * sf_context->subfile_prefix argument. * ---------------------------------------------------*/ -static -void get__subfile_name(subfiling_context_t *sf_context, int64_t h5_file_id, int subfile_rank, - char **_prefix, char **_subfile_dir, char *filepath) +static void +get__subfile_name(subfiling_context_t *sf_context, int64_t h5_file_id, int subfile_rank, char **_basename, + char **_subfile_dir, char *filepath) { - const char slash = '/'; - char workdir[PATH_MAX]; - char configdir[PATH_MAX]; - - char *prefix = NULL, *subfile_dir = NULL; - int n_io_concentrators = sf_context->topology->n_io_concentrators; - - memset(workdir, 0, PATH_MAX); - getcwd(workdir,PATH_MAX); - - if ((prefix = sf_context->subfile_prefix) == NULL) { - memset(configdir, 0, PATH_MAX); - strncpy(configdir, sf_context->filename, strlen(sf_context->filename)); - prefix = dirname(configdir); - } - - size_t prefix_len = strlen(prefix); - if (strcmp(prefix, workdir)) { - if (prefix[prefix_len-1] == slash) { - if (sf_context->subfile_prefix) - sprintf(filepath, "%s" SF_NODE_LOCAL_TEMPLATE, prefix, h5_file_id, - subfile_rank, n_io_concentrators); - else - sprintf(filepath, "%s" SF_FILENAME_TEMPLATE, prefix, h5_file_id, - subfile_rank, n_io_concentrators); - } - else { - if (sf_context->subfile_prefix) - sprintf(filepath, "%s/" SF_NODE_LOCAL_TEMPLATE, prefix, h5_file_id, - subfile_rank, n_io_concentrators); - else - sprintf(filepath, "%s/" SF_FILENAME_TEMPLATE, prefix, h5_file_id, - subfile_rank, n_io_concentrators); - } - } else { - memset(configdir, 0, PATH_MAX); - strcpy(configdir, sf_context->filename); - subfile_dir = dirname(configdir); - sprintf(filepath, "%s/" SF_FILENAME_TEMPLATE, subfile_dir, h5_file_id, - subfile_rank, n_io_concentrators); - } - - *_prefix = prefix; - *_subfile_dir = subfile_dir; + char *prefix = NULL, *subfile_dir = NULL; + char *base = NULL; + int n_io_concentrators = sf_context->topology->n_io_concentrators; + + /* We require this to be non-null */ + HDassert(sf_context); + + prefix = (char *)malloc(PATH_MAX); + HDassert(prefix); + + /* Under normal operation, we co-locate subfiles + * with the HDF5 file + */ + strcpy(prefix, sf_context->h5_filename); + base = basename(prefix); + *_basename = strdup(base); + + if (sf_context->subfile_prefix == NULL) { + subfile_dir = dirname(prefix); + *_subfile_dir = strdup(subfile_dir); + } + else { + /* Note: Users may specify a directory name which is inaccessable + * from where the current is running. In particular, "node-local" + * storage is not uniformly available to all processes. + * We would like to check if the user pathname unavailable and + * if so, we could default to creating the subfiles in the + * current directory. (?) + */ + *_subfile_dir = strdup(sf_context->subfile_prefix); + } + + /* The subfile naming should produce files of the following form: + * If we assume the HDF5 file is named ABC.h5, then subfiles + * will have names: + * ABC.h5.subfile_<file-number>_0_of_2, + * ABC.h5.subfile_<file-number>_1_of_2, and + * ABC.h5.subfile_<file-number>.config + */ + sprintf(filepath, "%s/%s" SF_FILENAME_TEMPLATE, subfile_dir, base, h5_file_id, subfile_rank, + n_io_concentrators); + if (prefix) + HDfree(prefix); } - + /*------------------------------------------------------------------------- * Function: Public/IOC subfiling_open_file * @@ -1805,153 +1823,165 @@ void get__subfile_name(subfiling_context_t *sf_context, int64_t h5_file_id, int * *------------------------------------------------------------------------- */ -int subfiling_open_file(sf_work_request_t *msg, int subfile_rank, int flags) { - int errors = 0; +int +subfiling_open_file(sf_work_request_t *msg, int subfile_rank, int flags) +{ + int errors = 0; +#if 0 char filepath[PATH_MAX]; - char *prefix = NULL; - char *subfile_dir = NULL; - mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; - - double t_start = 0.0, t_end = 0.0; - /* Only the real IOCs open the subfiles - * Once a file is opened, all additional file open requests - * can return immediately. - */ - - t_start = MPI_Wtime(); - /* Only allow the actual IO concentrator ranks to create sub-files */ - if (subfile_rank >= 0) { - char config[PATH_MAX]; - int64_t h5_file_id = msg->header[1]; - int64_t file_context_id = msg->header[2]; - subfiling_context_t *sf_context = get__subfiling_object(file_context_id); - assert(sf_context != NULL); + char config[PATH_MAX]; + char linebuf[PATH_MAX]; +#else + char filepath[PATH_MAX]; + char linebuf[PATH_MAX]; +#endif + char * temp = NULL; + char * prefix = NULL; + char * subfile_dir = NULL; + char * base = NULL; + mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; + + double t_start = 0.0, t_end = 0.0; + /* Only the real IOCs open the subfiles + * Once a file is opened, all additional file open requests + * can return immediately. + */ - memset(filepath, 0, PATH_MAX); + t_start = MPI_Wtime(); + /* Only allow the actual IO concentrator ranks to create sub-files */ + if (subfile_rank >= 0) { + int k, retries = 2; + int64_t h5_file_id = msg->header[1]; + int64_t file_context_id = msg->header[2]; + subfiling_context_t *sf_context = get__subfiling_object(file_context_id); + assert(sf_context != NULL); - begin_thread_exclusive(); - /* Check to see whether we need to create the subfile - * and possibly (IFF our subfile_rank is 0) a config file. - */ + memset(filepath, 0, PATH_MAX); - get__subfile_name(sf_context, h5_file_id, subfile_rank, &prefix, &subfile_dir, filepath); - - if (sf_context->sf_fid == -2) { - const char *dotconfig = ".subfile_config"; - int n_io_concentrators = sf_context->topology->n_io_concentrators; - int *io_concentrator = sf_context->topology->io_concentrator; - hid_t fapl = H5Pcreate(H5P_FILE_ACCESS); - void *fptr = H5FD_open(filepath, H5F_ACC_CREAT|H5F_ACC_RDWR, fapl, HADDR_UNDEF); - if (fptr) { - H5FD_close(fptr); - } - if ((sf_context->sf_fid = HDopen(filepath, flags|O_CREAT , mode)) < 0) { - end_thread_exclusive(); - HDprintf("[%d %s] file create(%s) failed!\n", subfile_rank, __func__, - filepath); - HDfflush(stdout); + begin_thread_exclusive(); + /* Check to see whether we need to create the subfile + * and possibly (IFF our subfile_rank is 0) a config file. + */ + + get__subfile_name(sf_context, h5_file_id, subfile_rank, &base, &subfile_dir, filepath); + sf_context->sf_filename = strdup(filepath); + + assert(sf_context->sf_filename); + + /* Check if we need to create the subfiles */ + if (sf_context->sf_fid == -2) { + int n_io_concentrators = sf_context->topology->n_io_concentrators; + int *io_concentrator = sf_context->topology->io_concentrator; + for (k = 0; k < retries; k++) { + int fd; + if ((fd = HDopen(filepath, O_CREAT | O_RDWR | O_TRUNC, mode)) > 0) { + sf_context->sf_fid = fd; + sf_context->sf_eof = 0; + break; + } + } + if (sf_context->sf_fid < 0) { + end_thread_exclusive(); + perror("subfiling_open_file/open"); + HDprintf("[%d %s] file create(%s) failed!\n", subfile_rank, __func__, filepath); + HDfflush(stdout); #ifndef NDEBUG - if (sf_verbose_flag) { - printf("[%d %s] file open(%s) failed!\n", subfile_rank, __func__, - filepath); - fflush(stdout); - } + if (sf_verbose_flag) { + printf("[%d %s] file create(%s) failed!\n", subfile_rank, __func__, filepath); + fflush(stdout); + } #endif - errors++; - goto done; - } - - memset(filepath, 0, sizeof(filepath)); - strcpy(filepath, sf_context->filename); - subfile_dir = strrchr(filepath, '/'); - if (subfile_dir) { - size_t remaining = strlen(subfile_dir); - memset(subfile_dir, 0, remaining); - sprintf(subfile_dir, "/%ld%s", h5_file_id, dotconfig); - strcpy(config, filepath); - } - - if ((subfile_rank == 0) && (flags & O_CREAT)) { - FILE *f = NULL; - /* If a config file already exists, AND - * the user wants to truncate subfiles (if they exist), - * then we should also truncate an existing config file. - */ - if (access(config, flags) == 0) { - truncate(config, 0); - } - f = HDfopen(config, "w+"); - if (f != NULL) { - int k; - char linebuf[PATH_MAX]; - sprintf(linebuf, "stripe_size=%ld\n", sf_context->sf_stripe_size); - HDfwrite(linebuf, strlen(linebuf), 1, f); - sprintf(linebuf, "aggregator_count=%d\n", n_io_concentrators); - HDfwrite(linebuf, strlen(linebuf), 1, f); - sprintf(linebuf, "hdf5_file=%s\n", sf_context->filename); - HDfwrite(linebuf, strlen(linebuf), 1, f); - - for (k = 0; k < n_io_concentrators; k++) { - if (prefix) - sprintf(linebuf, "%s/%ld_node_local_temp_%d_of_%d:%d\n", prefix, - h5_file_id, subfile_rank, n_io_concentrators, - io_concentrator[k]); - else - sprintf(linebuf, "%ld_node_local_temp_%d_of_%d:%d\n", h5_file_id, - subfile_rank, n_io_concentrators, io_concentrator[k]); - - HDfwrite(linebuf, strlen(linebuf), 1, f); - } - - fclose(f); - } else { - perror("fopen(config)"); - errors++; - goto done; - } - } + errors++; + goto done; + } + sprintf(filepath, "%s/%s.subfile_%ld.config", subfile_dir, base, h5_file_id); + /* SUBFILE rank 0 does the work creating a config file */ + if ((subfile_rank == 0) && (flags & O_CREAT)) { + FILE *f = NULL; + /* If a config file already exists, AND + * the user wants to truncate subfiles (if they exist), + * then we should also truncate an existing config file. + */ + if (access(filepath, flags) == 0) { + truncate(filepath, 0); + } + f = HDfopen(filepath, "w+"); + if (f != NULL) { + sprintf(linebuf, "stripe_size=%ld\n", sf_context->sf_stripe_size); + HDfwrite(linebuf, 1, strlen(linebuf), f); + sprintf(linebuf, "aggregator_count=%d\n", n_io_concentrators); + HDfwrite(linebuf, 1, strlen(linebuf), f); + sprintf(linebuf, "hdf5_file=%s\n", sf_context->h5_filename); + HDfwrite(linebuf, 1, strlen(linebuf), f); + sprintf(linebuf, "subfile_dir=%s\n", subfile_dir); + + for (k = 0; k < n_io_concentrators; k++) { + sprintf(linebuf, "%s.subfile_%ld_%d_of_%d:%d\n", base, h5_file_id, subfile_rank, + n_io_concentrators, io_concentrator[k]); + HDfwrite(linebuf, 1, strlen(linebuf), f); + } + + fclose(f); + } + else { + perror("fopen(config)"); + errors++; + goto done; + } + } + #ifndef NDEBUG - if (sf_verbose_flag) { - if (sf_logfile) { - HDfprintf(sf_logfile, "[ioc:%d] Opened subfile %s\n", subfile_rank, - filepath); - } - } + if (sf_verbose_flag) { + if (sf_logfile) { + HDfprintf(sf_logfile, "[ioc:%d] Opened subfile %s\n", subfile_rank, filepath); + } + } #endif - } - else { - if ((sf_context->sf_fid = HDopen(filepath, flags, mode)) < 0) { - end_thread_exclusive(); - HDprintf("[%d %s] file open(%s) failed!\n", subfile_rank, __func__, - filepath); - HDfflush(stdout); + } + else { + for (k = 0; k < retries; k++) { + int fd; + if ((fd = HDopen(filepath, O_CREAT | O_RDWR, mode)) > 0) { + sf_context->sf_fid = fd; + break; + } + } + if (sf_context->sf_fid < 0) { + end_thread_exclusive(); + perror("subfiling_open_file/open"); + HDprintf("[%d %s] file open(%s) failed!\n", subfile_rank, __func__, filepath); + HDfflush(stdout); #ifndef NDEBUG - if (sf_verbose_flag) { - HDprintf("[%d %s] file open(%s) failed!\n", subfile_rank, __func__, - filepath); - HDfflush(stdout); - } + if (sf_verbose_flag) { + HDprintf("[%d %s] file open(%s) failed!\n", subfile_rank, __func__, filepath); + HDfflush(stdout); + } #endif - errors++; - goto done; - } - } - end_thread_exclusive(); - } + errors++; + goto done; + } + } + end_thread_exclusive(); + } done: - t_end = MPI_Wtime(); + t_end = MPI_Wtime(); + + if (base) + HDfree(base); + if (subfile_dir) + HDfree(subfile_dir); #ifndef NDEBUG - if (sf_verbose_flag) { - printf("[%s %d] open completed in %lf seconds with %d errors\n", __func__, - subfile_rank, (t_end - t_start), errors); - fflush(stdout); - } + if (sf_verbose_flag) { + printf("[%s %d] open completed in %lf seconds with %d errors\n", __func__, subfile_rank, + (t_end - t_start), errors); + fflush(stdout); + } #endif - return errors; + return errors; } /* end subfiling_open_file() */ /*------------------------------------------------------------------------- @@ -1983,57 +2013,65 @@ done: *------------------------------------------------------------------------- */ -int sf_get_mpi_rank(hid_t fid, int *rank) { - hid_t context_id = fid_map_to_context((uint64_t)fid); - subfiling_context_t *sf_context = get__subfiling_object(context_id); - assert(sf_context != NULL); - assert(rank != NULL); - *rank = sf_context->sf_group_rank; - return 0; +int +sf_get_mpi_rank(hid_t fid, int *rank) +{ + hid_t context_id = fid_map_to_context((uint64_t)fid); + subfiling_context_t *sf_context = get__subfiling_object(context_id); + assert(sf_context != NULL); + assert(rank != NULL); + *rank = sf_context->sf_group_rank; + return 0; } -int sf_get_mpi_size(hid_t fid, int *size) { - hid_t context_id = fid_map_to_context((uint64_t)fid); - subfiling_context_t *sf_context = get__subfiling_object(context_id); - assert(sf_context != NULL); - assert(size != NULL); - *size = sf_context->sf_group_size; - return 0; +int +sf_get_mpi_size(hid_t fid, int *size) +{ + hid_t context_id = fid_map_to_context((uint64_t)fid); + subfiling_context_t *sf_context = get__subfiling_object(context_id); + assert(sf_context != NULL); + assert(size != NULL); + *size = sf_context->sf_group_size; + return 0; } -int sf_get_group_comm(hid_t fid, MPI_Comm *comm) { - hid_t context_id = fid_map_to_context((uint64_t)fid); - subfiling_context_t *sf_context = get__subfiling_object(context_id); - assert(sf_context != NULL); - assert(comm != NULL); - *comm = sf_context->sf_group_comm; - return 0; +int +sf_get_group_comm(hid_t fid, MPI_Comm *comm) +{ + hid_t context_id = fid_map_to_context((uint64_t)fid); + subfiling_context_t *sf_context = get__subfiling_object(context_id); + assert(sf_context != NULL); + assert(comm != NULL); + *comm = sf_context->sf_group_comm; + return 0; } -int sf_subfile_set_logging(hid_t sf_fid, int ioc_rank, int flag) { - int ioc; - int status = 0; - hid_t context_id = fid_map_to_context((uint64_t)sf_fid); - subfiling_context_t *sf_context = get__subfiling_object(context_id); - int n_io_concentrators; - int *io_concentrator = NULL; - int64_t lflag = (int64_t)(flag & 0xFF); - int64_t msg[3]; - - assert(sf_context != NULL); - - msg[0] = lflag; - msg[1] = 0; - msg[2] = sf_context->sf_context_id; - - n_io_concentrators = sf_context->topology->n_io_concentrators; - io_concentrator = sf_context->topology->io_concentrator; - - for (ioc = 0; ioc < n_io_concentrators; ioc++) { - if ((flag < 0) || (flag == ioc_rank)) { - status = MPI_Ssend(msg, 3, MPI_INT64_T, io_concentrator[ioc], LOGGING_OP, - sf_context->sf_msg_comm); +int +sf_subfile_set_logging(hid_t sf_fid, int ioc_rank, int flag) +{ + int ioc; + int status = 0; + hid_t context_id = fid_map_to_context((uint64_t)sf_fid); + subfiling_context_t *sf_context = get__subfiling_object(context_id); + int n_io_concentrators; + int * io_concentrator = NULL; + int64_t lflag = (int64_t)(flag & 0xFF); + int64_t msg[3]; + + assert(sf_context != NULL); + + msg[0] = lflag; + msg[1] = 0; + msg[2] = sf_context->sf_context_id; + + n_io_concentrators = sf_context->topology->n_io_concentrators; + io_concentrator = sf_context->topology->io_concentrator; + + for (ioc = 0; ioc < n_io_concentrators; ioc++) { + if ((flag < 0) || (flag == ioc_rank)) { + status = + MPI_Ssend(msg, 3, MPI_INT64_T, io_concentrator[ioc], LOGGING_OP, sf_context->sf_msg_comm); + } } - } - return status; + return status; } |