diff options
Diffstat (limited to 'src/H5Dmpio.c')
-rw-r--r-- | src/H5Dmpio.c | 5396 |
1 files changed, 3989 insertions, 1407 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c index 448e92d..2cde4d3 100644 --- a/src/H5Dmpio.c +++ b/src/H5Dmpio.c @@ -36,6 +36,7 @@ #include "H5Eprivate.h" /* Error handling */ #include "H5Fprivate.h" /* File access */ #include "H5FDprivate.h" /* File drivers */ +#include "H5FLprivate.h" /* Free Lists */ #include "H5Iprivate.h" /* IDs */ #include "H5MMprivate.h" /* Memory management */ #include "H5Oprivate.h" /* Object headers */ @@ -43,6 +44,15 @@ #include "H5Sprivate.h" /* Dataspaces */ #include "H5VMprivate.h" /* Vector */ +/* uthash is an external, header-only hash table implementation. + * + * We include the file directly in src/ and #define a few functions + * to use our internal memory calls. + */ +#define uthash_malloc(sz) H5MM_malloc(sz) +#define uthash_free(ptr, sz) H5MM_free(ptr) /* Ignoring sz is intentional */ +#include "uthash.h" + #ifdef H5_HAVE_PARALLEL /****************/ @@ -81,9 +91,54 @@ /* Macros to represent the regularity of the selection for multiple chunk IO case. */ #define H5D_CHUNK_SELECT_REG 1 +/* + * Threshold value for redistributing shared filtered chunks + * on all MPI ranks, or just MPI rank 0 + */ +#define H5D_CHUNK_REDISTRIBUTE_THRES ((size_t)((25 * H5_MB) / sizeof(H5D_chunk_redistribute_info_t))) + +/* + * Initial allocation size for the arrays that hold + * buffers for chunk modification data that is sent + * to other ranks and the MPI_Request objects for + * those send operations + */ +#define H5D_CHUNK_NUM_SEND_MSGS_INIT 64 + +/* + * Define a tag value for the MPI messages sent/received for + * chunk modification data + */ +#define H5D_CHUNK_MOD_DATA_TAG 64 + +/* + * Macro to initialize a H5D_chk_idx_info_t + * structure, given a pointer to a H5D_io_info_t + * structure + */ +#define H5D_MPIO_INIT_CHUNK_IDX_INFO(index_info, io_info_ptr) \ + do { \ + index_info.f = (io_info_ptr)->dset->oloc.file; \ + index_info.pline = &((io_info_ptr)->dset->shared->dcpl_cache.pline); \ + index_info.layout = &((io_info_ptr)->dset->shared->layout.u.chunk); \ + index_info.storage = &((io_info_ptr)->dset->shared->layout.storage.u.chunk); \ + } while (0) + +/* + * Macro to initialize a H5D_chunk_ud_t structure + * given a pointer to a H5D_chk_idx_info_t structure + */ +#define H5D_MPIO_INIT_CHUNK_UD_INFO(chunk_ud, index_info_ptr) \ + do { \ + HDmemset(&chunk_ud, 0, sizeof(H5D_chunk_ud_t)); \ + chunk_ud.common.layout = (index_info_ptr)->layout; \ + chunk_ud.common.storage = (index_info_ptr)->storage; \ + } while (0) + /******************/ /* Local Typedefs */ /******************/ + /* Combine chunk address and chunk info into a struct for better performance. */ typedef struct H5D_chunk_addr_info_t { haddr_t chunk_addr; @@ -100,115 +155,137 @@ typedef enum H5D_mpio_no_rank0_bcast_cause_t { } H5D_mpio_no_rank0_bcast_cause_t; /* + * Information necessary for re-allocating file space for a chunk + * during a parallel write of a chunked dataset with filters + * applied. + */ +typedef struct H5D_chunk_alloc_info_t { + H5F_block_t chunk_current; + H5F_block_t chunk_new; + hsize_t chunk_idx; +} H5D_chunk_alloc_info_t; + +/* + * Information for a chunk pertaining to the dataset's chunk + * index entry for the chunk + */ +typedef struct H5D_chunk_index_info_t { + hsize_t chunk_idx; + unsigned filter_mask; + hbool_t need_insert; +} H5D_chunk_index_info_t; + +/* * Information about a single chunk when performing collective filtered I/O. All * of the fields of one of these structs are initialized at the start of collective - * filtered I/O in the function H5D__construct_filtered_io_info_list(). - * - * This struct's fields are as follows: - * - * index - The "Index" of the chunk in the dataset. The index of a chunk is used during - * the collective re-insertion of chunks into the chunk index after the collective - * I/O has been performed. + * filtered I/O in the function H5D__mpio_collective_filtered_chunk_io_setup(). This + * struct's fields are as follows: * - * scaled - The scaled coordinates of the chunk in the dataset's file dataspace. The - * coordinates are used in both the collective re-allocation of space in the file - * and the collective re-insertion of chunks into the chunk index after the collective - * I/O has been performed. + * index_info - A structure containing the information needed when collectively + * re-inserting the chunk into the dataset's chunk index. The structure + * is distributed to all ranks during the re-insertion operation. Its fields + * are as follows: * - * full_overwrite - A flag which determines whether or not a chunk needs to be read from the - * file when being updated. If a chunk is being fully overwritten (the entire - * extent is selected in its file dataspace), then it is not necessary to - * read the chunk from the file. However, if the chunk is not being fully - * overwritten, it has to be read from the file in order to update the chunk - * without trashing the parts of the chunk that are not selected. + * chunk_idx - The index of the chunk in the dataset's chunk index. * - * num_writers - The total number of processors writing to this chunk. This field is used - * when the new owner of a chunk is receiving messages, which contain selections in - * the chunk and data to update the chunk with, from other processors which have this - * chunk selected in the I/O operation. The new owner must know how many processors it - * should expect messages from so that it can post an equal number of receive calls. + * filter_mask - A bit-mask that indicates which filters are to be applied to the + * chunk. Each filter in a chunk's filter pipeline has a bit position + * that can be masked to disable that particular filter for the chunk. + * This filter mask is saved alongside the chunk in the file. * - * io_size - The total size of I/O to this chunk. This field is an accumulation of the size of - * I/O to the chunk from each processor which has the chunk selected and is used to - * determine the value for the previous full_overwrite flag. + * need_insert - A flag which determines whether or not a chunk needs to be re-inserted into + * the chunk index after the write operation. * - * buf - A pointer which serves the dual purpose of holding either the chunk data which is to be - * written to the file or the chunk data which has been read from the file. + * chunk_info - A pointer to the chunk's H5D_chunk_info_t structure, which contains useful + * information like the dataspaces containing the selection in the chunk. * - * chunk_states - In the case of dataset writes only, this struct is used to track a chunk's size and - * address in the file before and after the filtering operation has occurred. + * chunk_current - The address in the file and size of this chunk before the filtering + * operation. When reading a chunk from the file, this field is used to + * read the correct amount of bytes. It is also used when redistributing + * shared chunks among MPI ranks and as a parameter to the chunk file + * space reallocation function. * - * Its fields are as follows: + * chunk_new - The address in the file and size of this chunk after the filtering + * operation. This field is relevant when collectively re-allocating space + * in the file for all of the chunks written to in the I/O operation, as + * their sizes may have changed after their data has been filtered. * - * chunk_current - The address in the file and size of this chunk before the filtering - * operation. When reading a chunk from the file, this field is used to - * read the correct amount of bytes. It is also used when redistributing - * shared chunks among processors and as a parameter to the chunk file - * space reallocation function. + * need_read - A flag which determines whether or not a chunk needs to be read from the + * file. During writes, if a chunk is being fully overwritten (the entire extent + * is selected in its file dataspace), then it is not necessary to read the chunk + * from the file. However, if the chunk is not being fully overwritten, it has to + * be read from the file in order to update the chunk without trashing the parts + * of the chunk that are not selected. During reads, this field should generally + * be true, but may be false if the chunk isn't allocated, for example. * - * new_chunk - The address in the file and size of this chunk after the filtering - * operation. This field is relevant when collectively re-allocating space - * in the file for all of the chunks written to in the I/O operation, as - * their sizes may have changed after their data has been filtered. + * skip_filter_pline - A flag which determines whether to skip calls to the filter pipeline + * for this chunk. This flag is mostly useful for correct handling of + * partial edge chunks when the "don't filter partial edge chunks" flag + * is set on the dataset's DCPL. * - * owners - In the case of dataset writes only, this struct is used to manage which single processor - * will ultimately write data out to the chunk. It allows the other processors to act according - * to the decision and send their selection in the chunk, as well as the data they wish - * to update the chunk with, to the processor which is writing to the chunk. + * io_size - The total size of I/O to this chunk. This field is an accumulation of the size of + * I/O to the chunk from each MPI rank which has the chunk selected and is used to + * determine the value for the previous `full_overwrite` flag. * - * Its fields are as follows: + * chunk_buf_size - The size in bytes of the data buffer allocated for the chunk * - * original_owner - The processor which originally had this chunk selected at the beginning of - * the collective filtered I/O operation. This field is currently used when - * redistributing shared chunks among processors. + * orig_owner - The MPI rank which originally had this chunk selected at the beginning of + * the collective filtered I/O operation. This field is currently used when + * redistributing shared chunks among MPI ranks. * - * new_owner - The processor which has been selected to perform the write to this chunk. + * new_owner - The MPI rank which has been selected to perform the modifications to this chunk. * - * async_info - In the case of dataset writes only, this struct is used by the owning processor of the - * chunk in order to manage the MPI send and receive calls made between it and all of - * the other processors which have this chunk selected in the I/O operation. + * num_writers - The total number of MPI ranks writing to this chunk. This field is used when + * the new owner of a chunk is receiving messages from other MPI ranks that + * contain their selections in the chunk and the data to update the chunk with. + * The new owner must know how many MPI ranks it should expect messages from so + * that it can post an equal number of receive calls. * - * Its fields are as follows: + * buf - A pointer which serves the dual purpose of holding either the chunk data which is to be + * written to the file or the chunk data which has been read from the file. * - * receive_requests_array - An array containing one MPI_Request for each of the - * asynchronous MPI receive calls the owning processor of this - * chunk makes to another processor in order to receive that - * processor's chunk modification data and selection in the chunk. + * hh - A handle for hash tables provided by the uthash.h header * - * receive_buffer_array - An array of buffers into which the owning processor of this chunk - * will store chunk modification data and the selection in the chunk - * received from another processor. - * - * num_receive_requests - The number of entries in the receive_request_array and - * receive_buffer_array fields. */ typedef struct H5D_filtered_collective_io_info_t { - hsize_t index; - hsize_t scaled[H5O_LAYOUT_NDIMS]; - hbool_t full_overwrite; - size_t num_writers; - size_t io_size; - void * buf; - - struct { - H5F_block_t chunk_current; - H5F_block_t new_chunk; - } chunk_states; - - struct { - int original_owner; - int new_owner; - } owners; - - struct { - MPI_Request * receive_requests_array; - unsigned char **receive_buffer_array; - int num_receive_requests; - } async_info; + H5D_chunk_index_info_t index_info; + + H5D_chunk_info_t *chunk_info; + H5F_block_t chunk_current; + H5F_block_t chunk_new; + hbool_t need_read; + hbool_t skip_filter_pline; + size_t io_size; + size_t chunk_buf_size; + int orig_owner; + int new_owner; + int num_writers; + void * buf; + + UT_hash_handle hh; } H5D_filtered_collective_io_info_t; -/* Function pointer typedef for sort function */ -typedef int (*H5D_mpio_sort_func_cb_t)(const void *, const void *); +/* + * Information necessary for redistributing shared chunks during + * a parallel write of a chunked dataset with filters applied. + */ +typedef struct H5D_chunk_redistribute_info_t { + H5F_block_t chunk_block; + hsize_t chunk_idx; + int orig_owner; + int new_owner; + int num_writers; +} H5D_chunk_redistribute_info_t; + +/* + * Information used when re-inserting a chunk into a dataset's + * chunk index during a parallel write of a chunked dataset with + * filters applied. + */ +typedef struct H5D_chunk_insert_info_t { + H5F_block_t chunk_block; + H5D_chunk_index_info_t index_info; +} H5D_chunk_insert_info_t; /********************/ /* Local Prototypes */ @@ -216,53 +293,98 @@ typedef int (*H5D_mpio_sort_func_cb_t)(const void *, const void *); static herr_t H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, H5D_chunk_map_t *fm); static herr_t H5D__multi_chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, - H5D_chunk_map_t *fm); + H5D_chunk_map_t *fm, int mpi_rank, int mpi_size); static herr_t H5D__multi_chunk_filtered_collective_io(H5D_io_info_t * io_info, - const H5D_type_info_t *type_info, H5D_chunk_map_t *fm); + const H5D_type_info_t *type_info, H5D_chunk_map_t *fm, + int mpi_rank, int mpi_size); static herr_t H5D__link_chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, - H5D_chunk_map_t *fm, int sum_chunk); + H5D_chunk_map_t *fm, int sum_chunk, int mpi_rank, int mpi_size); static herr_t H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, - H5D_chunk_map_t *fm); + H5D_chunk_map_t *fm, int mpi_rank, int mpi_size); static herr_t H5D__inter_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, const H5S_t *file_space, const H5S_t *mem_space); static herr_t H5D__final_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, hsize_t nelmts, MPI_Datatype mpi_file_type, MPI_Datatype mpi_buf_type); static herr_t H5D__sort_chunk(H5D_io_info_t *io_info, const H5D_chunk_map_t *fm, - H5D_chunk_addr_info_t chunk_addr_info_array[], int many_chunk_opt); + H5D_chunk_addr_info_t chunk_addr_info_array[], int many_chunk_opt, int mpi_rank, + int mpi_size); static herr_t H5D__obtain_mpio_mode(H5D_io_info_t *io_info, H5D_chunk_map_t *fm, uint8_t assign_io_mode[], - haddr_t chunk_addr[]); + haddr_t chunk_addr[], int mpi_rank, int mpi_size); static herr_t H5D__mpio_get_sum_chunk(const H5D_io_info_t *io_info, const H5D_chunk_map_t *fm, int *sum_chunkf); -static herr_t H5D__construct_filtered_io_info_list(const H5D_io_info_t * io_info, - const H5D_type_info_t * type_info, - const H5D_chunk_map_t * fm, - H5D_filtered_collective_io_info_t **chunk_list, - size_t * num_entries); -#if MPI_VERSION >= 3 -static herr_t H5D__chunk_redistribute_shared_chunks(const H5D_io_info_t * io_info, - const H5D_type_info_t * type_info, - const H5D_chunk_map_t * fm, - H5D_filtered_collective_io_info_t *local_chunk_array, - size_t *local_chunk_array_num_entries); -#endif -static herr_t H5D__mpio_array_gatherv(void *local_array, size_t local_array_num_entries, - size_t array_entry_size, void **gathered_array, - size_t *gathered_array_num_entries, hbool_t allgather, int root, - MPI_Comm comm, int (*sort_func)(const void *, const void *)); -static herr_t H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chunk_list, - size_t num_entries, MPI_Datatype *new_mem_type, - hbool_t *mem_type_derived, MPI_Datatype *new_file_type, - hbool_t *file_type_derived); -static herr_t H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk_entry, - const H5D_io_info_t * io_info, - const H5D_type_info_t * type_info, - const H5D_chunk_map_t * fm); +static herr_t H5D__mpio_collective_filtered_chunk_io_setup(const H5D_io_info_t * io_info, + const H5D_type_info_t * type_info, + const H5D_chunk_map_t * fm, + H5D_filtered_collective_io_info_t **chunk_list, + size_t *num_entries, int mpi_rank); +static herr_t H5D__mpio_redistribute_shared_chunks(H5D_filtered_collective_io_info_t *chunk_list, + size_t chunk_list_num_entries, + const H5D_io_info_t *io_info, const H5D_chunk_map_t *fm, + int mpi_rank, int mpi_size, + size_t **rank_chunks_assigned_map); +static herr_t H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chunk_list, + size_t * num_chunks_assigned_map, + hbool_t all_ranks_involved, + const H5D_io_info_t * io_info, + const H5D_chunk_map_t *fm, int mpi_rank, int mpi_size); +static herr_t H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk_list, + size_t *chunk_list_num_entries, H5D_io_info_t *io_info, + const H5D_type_info_t *type_info, int mpi_rank, + int mpi_size, + H5D_filtered_collective_io_info_t **chunk_hash_table, + unsigned char *** chunk_msg_bufs, + int * chunk_msg_bufs_len); +static herr_t H5D__mpio_collective_filtered_chunk_common_io(H5D_filtered_collective_io_info_t *chunk_list, + size_t chunk_list_num_entries, + const H5D_io_info_t * io_info, + const H5D_type_info_t *type_info, int mpi_size); +static herr_t H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chunk_list, + size_t chunk_list_num_entries, + const H5D_io_info_t * io_info, + const H5D_type_info_t *type_info, int mpi_rank, + int mpi_size); +static herr_t H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *chunk_list, + size_t chunk_list_num_entries, + H5D_filtered_collective_io_info_t *chunk_hash_table, + unsigned char ** chunk_msg_bufs, + int chunk_msg_bufs_len, const H5D_io_info_t *io_info, + const H5D_type_info_t *type_info, int mpi_rank, + int mpi_size); +static herr_t H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t *chunk_list, + size_t chunk_list_num_entries, + size_t * num_chunks_assigned_map, + H5D_io_info_t * io_info, + H5D_chk_idx_info_t *idx_info, int mpi_rank, + int mpi_size); +static herr_t H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t *chunk_list, + size_t chunk_list_num_entries, + size_t * num_chunks_assigned_map, + H5D_io_info_t * io_info, + H5D_chk_idx_info_t *idx_info, int mpi_rank, + int mpi_size); +static herr_t H5D__mpio_get_chunk_redistribute_info_types(MPI_Datatype *contig_type, + hbool_t * contig_type_derived, + MPI_Datatype *resized_type, + hbool_t * resized_type_derived); +static herr_t H5D__mpio_get_chunk_alloc_info_types(MPI_Datatype *contig_type, hbool_t *contig_type_derived, + MPI_Datatype *resized_type, hbool_t *resized_type_derived); +static herr_t H5D__mpio_get_chunk_insert_info_types(MPI_Datatype *contig_type, hbool_t *contig_type_derived, + MPI_Datatype *resized_type, + hbool_t * resized_type_derived); +static herr_t H5D__mpio_collective_filtered_io_type(H5D_filtered_collective_io_info_t *chunk_list, + size_t num_entries, H5D_io_op_type_t op_type, + MPI_Datatype *new_mem_type, hbool_t *mem_type_derived, + MPI_Datatype *new_file_type, hbool_t *file_type_derived); static int H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2); static int H5D__cmp_filtered_collective_io_info_entry(const void *filtered_collective_io_info_entry1, const void *filtered_collective_io_info_entry2); -#if MPI_VERSION >= 3 -static int H5D__cmp_filtered_collective_io_info_entry_owner(const void *filtered_collective_io_info_entry1, - const void *filtered_collective_io_info_entry2); +static int H5D__cmp_chunk_redistribute_info(const void *entry1, const void *entry2); +static int H5D__cmp_chunk_redistribute_info_orig_owner(const void *entry1, const void *entry2); + +#ifdef H5Dmpio_DEBUG +static herr_t H5D__mpio_debug_init(void); +static herr_t H5D__mpio_dump_collective_filtered_chunk_list(H5D_filtered_collective_io_info_t *chunk_list, + size_t chunk_list_num_entries, int mpi_rank); #endif /*********************/ @@ -273,6 +395,188 @@ static int H5D__cmp_filtered_collective_io_info_entry_owner(const void *filtered /* Local Variables */ /*******************/ +/* Declare extern free list to manage the H5S_sel_iter_t struct */ +H5FL_EXTERN(H5S_sel_iter_t); + +#ifdef H5Dmpio_DEBUG + +/* Flags to control debug actions in this file. + * (Meant to be indexed by characters) + * + * These flags can be set with either (or both) the environment variable + * "H5D_mpio_Debug" set to a string containing one or more characters + * (flags) or by setting them as a string value for the + * "H5D_mpio_debug_key" MPI Info key. + * + * Supported characters in 'H5D_mpio_Debug' string: + * 't' trace function entry and exit + * 'f' log to file rather than debugging stream + * 'm' show (rough) memory usage statistics + * 'c' show critical timing information + * + * To only show output from a particular MPI rank, specify its rank + * number as a character, e.g.: + * + * '0' only show output from rank 0 + * + * To only show output from a particular range (up to 8 ranks supported + * between 0-9) of MPI ranks, specify the start and end ranks separated + * by a hyphen, e.g.: + * + * '0-7' only show output from ranks 0 through 7 + * + */ +static int H5D_mpio_debug_flags_s[256]; +static int H5D_mpio_debug_rank_s[8] = {-1, -1, -1, -1, -1, -1, -1, -1}; +static hbool_t H5D_mpio_debug_inited = FALSE; +static const char *const trace_in_pre = "-> "; +static const char *const trace_out_pre = "<- "; +static int debug_indent = 0; +static FILE * debug_stream = NULL; + +/* Determine if this rank should output debugging info */ +#define H5D_MPIO_DEBUG_THIS_RANK(rank) \ + (H5D_mpio_debug_rank_s[0] < 0 || rank == H5D_mpio_debug_rank_s[0] || rank == H5D_mpio_debug_rank_s[1] || \ + rank == H5D_mpio_debug_rank_s[2] || rank == H5D_mpio_debug_rank_s[3] || \ + rank == H5D_mpio_debug_rank_s[4] || rank == H5D_mpio_debug_rank_s[5] || \ + rank == H5D_mpio_debug_rank_s[6] || rank == H5D_mpio_debug_rank_s[7]) + +/* Print some debugging string */ +#define H5D_MPIO_DEBUG(rank, string) \ + do { \ + if (debug_stream && H5D_MPIO_DEBUG_THIS_RANK(rank)) { \ + HDfprintf(debug_stream, "%*s(Rank %d) " string "\n", debug_indent, "", rank); \ + HDfflush(debug_stream); \ + } \ + } while (0) + +/* Print some debugging string with printf-style arguments */ +#define H5D_MPIO_DEBUG_VA(rank, string, ...) \ + do { \ + if (debug_stream && H5D_MPIO_DEBUG_THIS_RANK(rank)) { \ + HDfprintf(debug_stream, "%*s(Rank %d) " string "\n", debug_indent, "", rank, __VA_ARGS__); \ + HDfflush(debug_stream); \ + } \ + } while (0) + +#define H5D_MPIO_TRACE_ENTER(rank) \ + do { \ + hbool_t trace_flag = H5D_mpio_debug_flags_s[(int)'t']; \ + \ + if (trace_flag) { \ + H5D_MPIO_DEBUG_VA(rank, "%s%s", trace_in_pre, __func__); \ + debug_indent += (int)HDstrlen(trace_in_pre); \ + } \ + } while (0) + +#define H5D_MPIO_TRACE_EXIT(rank) \ + do { \ + hbool_t trace_flag = H5D_mpio_debug_flags_s[(int)'t']; \ + \ + if (trace_flag) { \ + debug_indent -= (int)HDstrlen(trace_out_pre); \ + H5D_MPIO_DEBUG_VA(rank, "%s%s", trace_out_pre, __func__); \ + } \ + } while (0) + +#define H5D_MPIO_TIME_START(rank, op_name) \ + { \ + hbool_t time_flag = H5D_mpio_debug_flags_s[(int)'c']; \ + double start_time = 0.0, end_time = 0.0; \ + const char *const op = op_name; \ + \ + if (time_flag) { \ + start_time = MPI_Wtime(); \ + } + +#define H5D_MPIO_TIME_STOP(rank) \ + if (time_flag) { \ + end_time = MPI_Wtime(); \ + H5D_MPIO_DEBUG_VA(rank, "'%s' took %f seconds", op, (end_time - start_time)); \ + } \ + } + +/*--------------------------------------------------------------------------- + * Function: H5D__mpio_parse_debug_str + * + * Purpose: Parse a string for H5Dmpio-related debugging flags + * + * Returns: N/A + * + *--------------------------------------------------------------------------- + */ +static void +H5D__mpio_parse_debug_str(const char *s) +{ + FUNC_ENTER_STATIC_NOERR + + HDassert(s); + + while (*s) { + int c = (int)(*s); + + if (c >= (int)'0' && c <= (int)'9') { + hbool_t range = FALSE; + + if (*(s + 1) && *(s + 2)) + range = (int)*(s + 1) == '-' && (int)*(s + 2) >= (int)'0' && (int)*(s + 2) <= (int)'9'; + + if (range) { + int start_rank = c - (int)'0'; + int end_rank = (int)*(s + 2) - '0'; + int num_ranks = end_rank - start_rank + 1; + int i; + + if (num_ranks > 8) { + end_rank = start_rank + 7; + num_ranks = 8; + } + + for (i = 0; i < num_ranks; i++) + H5D_mpio_debug_rank_s[i] = start_rank++; + + s += 3; + } + else + H5D_mpio_debug_rank_s[0] = c - (int)'0'; + } + else + H5D_mpio_debug_flags_s[c]++; + + s++; + } + + FUNC_LEAVE_NOAPI_VOID +} + +static herr_t +H5D__mpio_debug_init(void) +{ + const char *debug_str; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC_NOERR + + HDassert(!H5D_mpio_debug_inited); + + /* Clear the debug flag buffer */ + HDmemset(H5D_mpio_debug_flags_s, 0, sizeof(H5D_mpio_debug_flags_s)); + + /* Retrieve and parse the H5Dmpio debug string */ + debug_str = HDgetenv("H5D_mpio_Debug"); + if (debug_str) + H5D__mpio_parse_debug_str(debug_str); + + if (H5DEBUG(D)) + debug_stream = H5DEBUG(D); + + H5D_mpio_debug_inited = TRUE; + + FUNC_LEAVE_NOAPI(ret_value) +} + +#endif + /*------------------------------------------------------------------------- * Function: H5D__mpio_opt_possible * @@ -347,14 +651,9 @@ H5D__mpio_opt_possible(const H5D_io_info_t *io_info, const H5S_t *file_space, co * use collective IO will defer until each chunk IO is reached. */ -#if MPI_VERSION < 3 - /* - * Don't allow parallel writes to filtered datasets if the MPI version - * is less than 3. The functions needed (MPI_Mprobe and MPI_Imrecv) will - * not be available. - */ - if (io_info->op_type == H5D_IO_OP_WRITE && io_info->dset->shared->layout.type == H5D_CHUNKED && - io_info->dset->shared->dcpl_cache.pline.nused > 0) +#ifndef H5_HAVE_PARALLEL_FILTERED_WRITES + /* Don't allow writes to filtered datasets if the functionality is disabled */ + if (io_info->op_type == H5D_IO_OP_WRITE && io_info->dset->shared->dcpl_cache.pline.nused > 0) local_cause[0] |= H5D_MPIO_PARALLEL_FILTERED_WRITES_DISABLED; #endif @@ -365,7 +664,7 @@ H5D__mpio_opt_possible(const H5D_io_info_t *io_info, const H5S_t *file_space, co /* Check to see if the process is reading the entire dataset */ if (H5S_GET_SELECT_TYPE(file_space) != H5S_SEL_ALL) local_cause[1] |= H5D_MPIO_RANK0_NOT_H5S_ALL; - /* Only perform this optimization for contigous datasets, currently */ + /* Only perform this optimization for contiguous datasets, currently */ else if (H5D_CONTIGUOUS != io_info->dset->shared->layout.type) /* Flag to do a MPI_Bcast of the data from one proc instead of * having all the processes involved in the collective I/O. @@ -437,6 +736,150 @@ done: } /* H5D__mpio_opt_possible() */ /*------------------------------------------------------------------------- + * Function: H5D__mpio_get_no_coll_cause_strings + * + * Purpose: When collective I/O is broken internally, it can be useful + * for users to see a representative string for the reason(s) + * why it was broken. This routine inspects the current + * "cause" flags from the API context and prints strings into + * the caller's buffers for the local and global reasons that + * collective I/O was broken. + * + * Return: Non-negative on success/Negative on failure + * + *------------------------------------------------------------------------- + */ +herr_t +H5D__mpio_get_no_coll_cause_strings(char *local_cause, size_t local_cause_len, char *global_cause, + size_t global_cause_len) +{ + uint32_t local_no_coll_cause; + uint32_t global_no_coll_cause; + size_t local_cause_bytes_written = 0; + size_t global_cause_bytes_written = 0; + int nbits; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_PACKAGE + + HDassert((local_cause && local_cause_len > 0) || (global_cause && global_cause_len > 0)); + + /* + * Use compile-time assertion so this routine is updated + * when any new "no collective cause" values are added + */ + HDcompile_assert(H5D_MPIO_NO_COLLECTIVE_MAX_CAUSE == (H5D_mpio_no_collective_cause_t)256); + + /* Initialize output buffers */ + if (local_cause) + *local_cause = '\0'; + if (global_cause) + *global_cause = '\0'; + + /* Retrieve the local and global cause flags from the API context */ + if (H5CX_get_mpio_local_no_coll_cause(&local_no_coll_cause) < 0) + HGOTO_ERROR(H5E_CONTEXT, H5E_CANTGET, FAIL, "unable to get local no collective cause value") + if (H5CX_get_mpio_global_no_coll_cause(&global_no_coll_cause) < 0) + HGOTO_ERROR(H5E_CONTEXT, H5E_CANTGET, FAIL, "unable to get global no collective cause value") + + /* + * Append each of the "reason for breaking collective I/O" + * error messages to the local and global cause string buffers + */ + nbits = 8 * sizeof(local_no_coll_cause); + for (int bit_pos = 0; bit_pos < nbits; bit_pos++) { + H5D_mpio_no_collective_cause_t cur_cause; + const char * cause_str; + size_t buf_space_left; + + cur_cause = (H5D_mpio_no_collective_cause_t)(1 << bit_pos); + if (cur_cause == H5D_MPIO_NO_COLLECTIVE_MAX_CAUSE) + break; + + switch (cur_cause) { + case H5D_MPIO_SET_INDEPENDENT: + cause_str = "independent I/O was requested"; + break; + case H5D_MPIO_DATATYPE_CONVERSION: + cause_str = "datatype conversions were required"; + break; + case H5D_MPIO_DATA_TRANSFORMS: + cause_str = "data transforms needed to be applied"; + break; + case H5D_MPIO_MPI_OPT_TYPES_ENV_VAR_DISABLED: + cause_str = "optimized MPI types flag wasn't set"; + break; + case H5D_MPIO_NOT_SIMPLE_OR_SCALAR_DATASPACES: + cause_str = "one of the dataspaces was neither simple nor scalar"; + break; + case H5D_MPIO_NOT_CONTIGUOUS_OR_CHUNKED_DATASET: + cause_str = "dataset was not contiguous or chunked"; + break; + case H5D_MPIO_PARALLEL_FILTERED_WRITES_DISABLED: + cause_str = "parallel writes to filtered datasets are disabled"; + break; + case H5D_MPIO_ERROR_WHILE_CHECKING_COLLECTIVE_POSSIBLE: + cause_str = "an error occurred while checking if collective I/O was possible"; + break; + case H5D_MPIO_COLLECTIVE: + case H5D_MPIO_NO_COLLECTIVE_MAX_CAUSE: + default: + HDassert(0 && "invalid no collective cause reason"); + break; + } + + /* + * Determine if the local reasons for breaking collective I/O + * included the current cause + */ + if (local_cause && (cur_cause & local_no_coll_cause)) { + buf_space_left = local_cause_len - local_cause_bytes_written; + + /* + * Check if there were any previous error messages included. If + * so, prepend a semicolon to separate the messages. + */ + if (buf_space_left && local_cause_bytes_written) { + HDstrncat(local_cause, "; ", buf_space_left); + local_cause_bytes_written += MIN(buf_space_left, 2); + buf_space_left -= MIN(buf_space_left, 2); + } + + if (buf_space_left) { + HDstrncat(local_cause, cause_str, buf_space_left); + local_cause_bytes_written += MIN(buf_space_left, HDstrlen(cause_str)); + } + } + + /* + * Determine if the global reasons for breaking collective I/O + * included the current cause + */ + if (global_cause && (cur_cause & global_no_coll_cause)) { + buf_space_left = global_cause_len - global_cause_bytes_written; + + /* + * Check if there were any previous error messages included. If + * so, prepend a semicolon to separate the messages. + */ + if (buf_space_left && global_cause_bytes_written) { + HDstrncat(global_cause, "; ", buf_space_left); + global_cause_bytes_written += MIN(buf_space_left, 2); + buf_space_left -= MIN(buf_space_left, 2); + } + + if (buf_space_left) { + HDstrncat(global_cause, cause_str, buf_space_left); + global_cause_bytes_written += MIN(buf_space_left, HDstrlen(cause_str)); + } + } + } + +done: + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__mpio_get_no_coll_cause_strings() */ + +/*------------------------------------------------------------------------- * Function: H5D__mpio_select_read * * Purpose: MPI-IO function to read directly from app buffer to file. @@ -449,8 +892,8 @@ done: */ herr_t H5D__mpio_select_read(const H5D_io_info_t *io_info, const H5D_type_info_t H5_ATTR_UNUSED *type_info, - hsize_t mpi_buf_count, const H5S_t H5_ATTR_UNUSED *file_space, - const H5S_t H5_ATTR_UNUSED *mem_space) + hsize_t mpi_buf_count, H5S_t H5_ATTR_UNUSED *file_space, + H5S_t H5_ATTR_UNUSED *mem_space) { const H5D_contig_storage_t *store_contig = &(io_info->store->contig); /* Contiguous storage info for this I/O operation */ @@ -480,8 +923,8 @@ done: */ herr_t H5D__mpio_select_write(const H5D_io_info_t *io_info, const H5D_type_info_t H5_ATTR_UNUSED *type_info, - hsize_t mpi_buf_count, const H5S_t H5_ATTR_UNUSED *file_space, - const H5S_t H5_ATTR_UNUSED *mem_space) + hsize_t mpi_buf_count, H5S_t H5_ATTR_UNUSED *file_space, + H5S_t H5_ATTR_UNUSED *mem_space) { const H5D_contig_storage_t *store_contig = &(io_info->store->contig); /* Contiguous storage info for this I/O operation */ @@ -500,145 +943,6 @@ done: } /* end H5D__mpio_select_write() */ /*------------------------------------------------------------------------- - * Function: H5D__mpio_array_gatherv - * - * Purpose: Given an array, specified in local_array, by each processor - * calling this function, collects each array into a single - * array which is then either gathered to the processor - * specified by root, when allgather is false, or is - * distributed back to all processors when allgather is true. - * - * The number of entries in the array contributed by an - * individual processor and the size of each entry should be - * specified in local_array_num_entries and array_entry_size, - * respectively. - * - * The MPI communicator to use should be specified for comm. - * - * If the sort_func argument is supplied, the array is sorted - * before the function returns. - * - * Note: if allgather is specified as true, root is ignored. - * - * Return: Non-negative on success/Negative on failure - * - * Programmer: Jordan Henderson - * Sunday, April 9th, 2017 - * - *------------------------------------------------------------------------- - */ -static herr_t -H5D__mpio_array_gatherv(void *local_array, size_t local_array_num_entries, size_t array_entry_size, - void **_gathered_array, size_t *_gathered_array_num_entries, hbool_t allgather, - int root, MPI_Comm comm, H5D_mpio_sort_func_cb_t sort_func) -{ - size_t gathered_array_num_entries = 0; /* The size of the newly-constructed array */ - void * gathered_array = NULL; /* The newly-constructed array returned to the caller */ - int *receive_counts_array = NULL; /* Array containing number of entries each processor is contributing */ - int *displacements_array = - NULL; /* Array of displacements where each processor places its data in the final array */ - int mpi_code, mpi_rank, mpi_size; - int sendcount; - herr_t ret_value = SUCCEED; - - FUNC_ENTER_STATIC - - HDassert(_gathered_array); - HDassert(_gathered_array_num_entries); - - MPI_Comm_size(comm, &mpi_size); - MPI_Comm_rank(comm, &mpi_rank); - - /* Determine the size of the end result array by collecting the number - * of entries contributed by each processor into a single total. - */ - if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&local_array_num_entries, &gathered_array_num_entries, 1, - MPI_INT, MPI_SUM, comm))) - HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code) - - /* If 0 entries resulted from the collective operation, no processor is contributing anything and there is - * nothing to do */ - if (gathered_array_num_entries > 0) { - /* - * If gathering to all processors, all processors need to allocate space for the resulting array, as - * well as the receive counts and displacements arrays for the collective MPI_Allgatherv call. - * Otherwise, only the root processor needs to allocate the space for an MPI_Gatherv call. - */ - if (allgather || (mpi_rank == root)) { - if (NULL == (gathered_array = H5MM_malloc(gathered_array_num_entries * array_entry_size))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate gathered array") - - if (NULL == (receive_counts_array = (int *)H5MM_malloc((size_t)mpi_size * sizeof(int)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive counts array") - - if (NULL == (displacements_array = (int *)H5MM_malloc((size_t)mpi_size * sizeof(int)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive displacements array") - } /* end if */ - - /* - * If gathering to all processors, inform each processor of how many entries each other processor is - * contributing to the resulting array by collecting the counts into each processor's "receive counts" - * array. Otherwise, inform only the root processor of how many entries each other processor is - * contributing. - */ - if (allgather) { - if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&local_array_num_entries, 1, MPI_INT, - receive_counts_array, 1, MPI_INT, comm))) - HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code) - } /* end if */ - else { - if (MPI_SUCCESS != (mpi_code = MPI_Gather(&local_array_num_entries, 1, MPI_INT, - receive_counts_array, 1, MPI_INT, root, comm))) - HMPI_GOTO_ERROR(FAIL, "MPI_Gather failed", mpi_code) - } /* end else */ - - if (allgather || (mpi_rank == root)) { - size_t i; - - /* Multiply each receive count by the size of the array entry, since the data is sent as bytes. */ - for (i = 0; i < (size_t)mpi_size; i++) - H5_CHECKED_ASSIGN(receive_counts_array[i], int, - (size_t)receive_counts_array[i] * array_entry_size, size_t); - - /* Set receive buffer offsets for the collective MPI_Allgatherv/MPI_Gatherv call. */ - displacements_array[0] = 0; - for (i = 1; i < (size_t)mpi_size; i++) - displacements_array[i] = displacements_array[i - 1] + receive_counts_array[i - 1]; - } /* end if */ - - /* As the data is sent as bytes, calculate the true sendcount for the data. */ - H5_CHECKED_ASSIGN(sendcount, int, local_array_num_entries *array_entry_size, size_t); - - if (allgather) { - if (MPI_SUCCESS != - (mpi_code = MPI_Allgatherv(local_array, sendcount, MPI_BYTE, gathered_array, - receive_counts_array, displacements_array, MPI_BYTE, comm))) - HMPI_GOTO_ERROR(FAIL, "MPI_Allgatherv failed", mpi_code) - } /* end if */ - else { - if (MPI_SUCCESS != - (mpi_code = MPI_Gatherv(local_array, sendcount, MPI_BYTE, gathered_array, - receive_counts_array, displacements_array, MPI_BYTE, root, comm))) - HMPI_GOTO_ERROR(FAIL, "MPI_Gatherv failed", mpi_code) - } /* end else */ - - if (sort_func && (allgather || (mpi_rank == root))) - HDqsort(gathered_array, gathered_array_num_entries, array_entry_size, sort_func); - } /* end if */ - - *_gathered_array = gathered_array; - *_gathered_array_num_entries = gathered_array_num_entries; - -done: - if (receive_counts_array) - H5MM_free(receive_counts_array); - if (displacements_array) - H5MM_free(displacements_array); - - FUNC_LEAVE_NOAPI(ret_value) -} /* end H5D__mpio_array_gatherv() */ - -/*------------------------------------------------------------------------- * Function: H5D__mpio_get_sum_chunk * * Purpose: Routine for obtaining total number of chunks to cover @@ -690,7 +994,7 @@ done: */ herr_t H5D__contig_collective_read(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, - hsize_t H5_ATTR_UNUSED nelmts, const H5S_t *file_space, const H5S_t *mem_space, + hsize_t H5_ATTR_UNUSED nelmts, H5S_t *file_space, H5S_t *mem_space, H5D_chunk_map_t H5_ATTR_UNUSED *fm) { H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_CONTIGUOUS_COLLECTIVE; @@ -729,7 +1033,7 @@ done: */ herr_t H5D__contig_collective_write(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, - hsize_t H5_ATTR_UNUSED nelmts, const H5S_t *file_space, const H5S_t *mem_space, + hsize_t H5_ATTR_UNUSED nelmts, H5S_t *file_space, H5S_t *mem_space, H5D_chunk_map_t H5_ATTR_UNUSED *fm) { H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_CONTIGUOUS_COLLECTIVE; @@ -793,11 +1097,17 @@ static herr_t H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, H5D_chunk_map_t *fm) { H5FD_mpio_chunk_opt_t chunk_opt_mode; - int io_option = H5D_MULTI_CHUNK_IO_MORE_OPT; - int sum_chunk = -1; +#ifdef H5Dmpio_DEBUG + hbool_t log_file_flag = FALSE; + FILE * debug_log_file = NULL; +#endif #ifdef H5_HAVE_INSTRUMENTED_LIBRARY htri_t temp_not_link_io = FALSE; #endif + int io_option = H5D_MULTI_CHUNK_IO_MORE_OPT; + int sum_chunk = -1; + int mpi_rank; + int mpi_size; herr_t ret_value = SUCCEED; FUNC_ENTER_STATIC @@ -808,9 +1118,35 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf HDassert(type_info); HDassert(fm); - /* Disable collective metadata reads for chunked dataset I/O operations - * in order to prevent potential hangs */ - H5CX_set_coll_metadata_read(FALSE); + /* Obtain the current rank of the process and the number of ranks */ + if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0) + HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain MPI rank") + if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0) + HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain MPI size") + +#ifdef H5Dmpio_DEBUG + /* Initialize file-level debugging if not initialized */ + if (!H5D_mpio_debug_inited && H5D__mpio_debug_init() < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "can't initialize H5Dmpio debugging") + + /* Open file for debugging if necessary */ + log_file_flag = H5D_mpio_debug_flags_s[(int)'f']; + if (log_file_flag) { + char debug_log_filename[1024]; + time_t time_now; + + HDsnprintf(debug_log_filename, 1024, "H5Dmpio_debug.rank%d", mpi_rank); + + if (NULL == (debug_log_file = HDfopen(debug_log_filename, "a"))) + HGOTO_ERROR(H5E_IO, H5E_OPENERROR, FAIL, "couldn't open debugging log file") + + /* Print a short header for this I/O operation */ + time_now = HDtime(NULL); + HDfprintf(debug_log_file, "##### %s", HDasctime(HDlocaltime(&time_now))); + + debug_stream = debug_log_file; + } +#endif /* Check the optional property list for the collective chunk IO optimization option */ if (H5CX_get_mpio_chunk_opt_mode(&chunk_opt_mode) < 0) @@ -824,13 +1160,10 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf /* via default path. branch by num threshold */ else { unsigned one_link_chunk_io_threshold; /* Threshold to use single collective I/O for all chunks */ - int mpi_size; /* Number of processes in MPI job */ if (H5D__mpio_get_sum_chunk(io_info, fm, &sum_chunk) < 0) HGOTO_ERROR(H5E_DATASPACE, H5E_CANTSWAP, FAIL, "unable to obtain the total chunk number of all processes"); - if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0) - HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size") /* Get the chunk optimization option threshold */ if (H5CX_get_mpio_chunk_opt_num(&one_link_chunk_io_threshold) < 0) @@ -876,22 +1209,12 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf case H5D_ONE_LINK_CHUNK_IO_MORE_OPT: /* Check if there are any filters in the pipeline */ if (io_info->dset->shared->dcpl_cache.pline.nused > 0) { - /* For now, Multi-chunk IO must be forced for parallel filtered read, - * so that data can be unfiltered as it is received. There is significant - * complexity in unfiltering the data when it is read all at once into a - * single buffer. - */ - if (io_info->op_type == H5D_IO_OP_READ) { - if (H5D__multi_chunk_filtered_collective_io(io_info, type_info, fm) < 0) - HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, - "couldn't finish optimized multiple filtered chunk MPI-IO") - } /* end if */ - else if (H5D__link_chunk_filtered_collective_io(io_info, type_info, fm) < 0) + if (H5D__link_chunk_filtered_collective_io(io_info, type_info, fm, mpi_rank, mpi_size) < 0) HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish filtered linked chunk MPI-IO") } /* end if */ else /* Perform unfiltered link chunk collective IO */ - if (H5D__link_chunk_collective_io(io_info, type_info, fm, sum_chunk) < 0) + if (H5D__link_chunk_collective_io(io_info, type_info, fm, sum_chunk, mpi_rank, mpi_size) < 0) HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish linked chunk MPI-IO") break; @@ -899,18 +1222,28 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf default: /* multiple chunk IO via threshold */ /* Check if there are any filters in the pipeline */ if (io_info->dset->shared->dcpl_cache.pline.nused > 0) { - if (H5D__multi_chunk_filtered_collective_io(io_info, type_info, fm) < 0) + if (H5D__multi_chunk_filtered_collective_io(io_info, type_info, fm, mpi_rank, mpi_size) < 0) HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple filtered chunk MPI-IO") } /* end if */ else /* Perform unfiltered multi chunk collective IO */ - if (H5D__multi_chunk_collective_io(io_info, type_info, fm) < 0) + if (H5D__multi_chunk_collective_io(io_info, type_info, fm, mpi_rank, mpi_size) < 0) HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple chunk MPI-IO") break; } /* end switch */ done: +#ifdef H5Dmpio_DEBUG + /* Close debugging log file */ + if (debug_log_file) { + HDfprintf(debug_log_file, "##############\n\n"); + if (EOF == HDfclose(debug_log_file)) + HDONE_ERROR(H5E_IO, H5E_CLOSEERROR, FAIL, "couldn't close debugging log file") + debug_stream = H5DEBUG(D); + } +#endif + FUNC_LEAVE_NOAPI(ret_value) } /* end H5D__chunk_collective_io */ @@ -929,8 +1262,8 @@ done: */ herr_t H5D__chunk_collective_read(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, - hsize_t H5_ATTR_UNUSED nelmts, const H5S_t H5_ATTR_UNUSED *file_space, - const H5S_t H5_ATTR_UNUSED *mem_space, H5D_chunk_map_t *fm) + hsize_t H5_ATTR_UNUSED nelmts, H5S_t H5_ATTR_UNUSED *file_space, + H5S_t H5_ATTR_UNUSED *mem_space, H5D_chunk_map_t *fm) { herr_t ret_value = SUCCEED; /* Return value */ @@ -959,8 +1292,8 @@ done: */ herr_t H5D__chunk_collective_write(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, - hsize_t H5_ATTR_UNUSED nelmts, const H5S_t H5_ATTR_UNUSED *file_space, - const H5S_t H5_ATTR_UNUSED *mem_space, H5D_chunk_map_t *fm) + hsize_t H5_ATTR_UNUSED nelmts, H5S_t H5_ATTR_UNUSED *file_space, + H5S_t H5_ATTR_UNUSED *mem_space, H5D_chunk_map_t *fm) { herr_t ret_value = SUCCEED; /* Return value */ @@ -993,12 +1326,12 @@ done: */ static herr_t H5D__link_chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, H5D_chunk_map_t *fm, - int sum_chunk) + int sum_chunk, int mpi_rank, int mpi_size) { H5D_chunk_addr_info_t *chunk_addr_info_array = NULL; - MPI_Datatype chunk_final_mtype; /* Final memory MPI datatype for all chunks with seletion */ + MPI_Datatype chunk_final_mtype; /* Final memory MPI datatype for all chunks with selection */ hbool_t chunk_final_mtype_is_derived = FALSE; - MPI_Datatype chunk_final_ftype; /* Final file MPI datatype for all chunks with seletion */ + MPI_Datatype chunk_final_ftype; /* Final file MPI datatype for all chunks with selection */ hbool_t chunk_final_ftype_is_derived = FALSE; H5D_storage_t ctg_store; /* Storage info for "fake" contiguous dataset */ size_t total_chunks; @@ -1074,9 +1407,8 @@ H5D__link_chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *typ /* Set up the base storage address for this chunk */ io_info->store = &ctg_store; -#ifdef H5D_DEBUG - if (H5DEBUG(D)) - HDfprintf(H5DEBUG(D), "before inter_collective_io for total chunk = 1 \n"); +#ifdef H5Dmpio_DEBUG + H5D_MPIO_DEBUG(mpi_rank, "before inter_collective_io for total chunk = 1"); #endif /* Perform I/O */ @@ -1092,9 +1424,8 @@ H5D__link_chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *typ num_chunk = H5SL_count(fm->sel_chunks); H5_CHECK_OVERFLOW(num_chunk, size_t, int); -#ifdef H5D_DEBUG - if (H5DEBUG(D)) - HDfprintf(H5DEBUG(D), "total_chunks = %zu, num_chunk = %zu\n", total_chunks, num_chunk); +#ifdef H5Dmpio_DEBUG + H5D_MPIO_DEBUG_VA(mpi_rank, "total_chunks = %zu, num_chunk = %zu", total_chunks, num_chunk); #endif /* Set up MPI datatype for chunks selected */ @@ -1125,18 +1456,17 @@ H5D__link_chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *typ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk file is derived datatype flags buffer") -#ifdef H5D_DEBUG - if (H5DEBUG(D)) - HDfprintf(H5DEBUG(D), "before sorting the chunk address \n"); +#ifdef H5Dmpio_DEBUG + H5D_MPIO_DEBUG(mpi_rank, "before sorting chunk addresses"); #endif + /* Sort the chunk address */ - if (H5D__sort_chunk(io_info, fm, chunk_addr_info_array, sum_chunk) < 0) + if (H5D__sort_chunk(io_info, fm, chunk_addr_info_array, sum_chunk, mpi_rank, mpi_size) < 0) HGOTO_ERROR(H5E_DATASPACE, H5E_CANTSWAP, FAIL, "unable to sort chunk address") ctg_store.contig.dset_addr = chunk_addr_info_array[0].chunk_addr; -#ifdef H5D_DEBUG - if (H5DEBUG(D)) - HDfprintf(H5DEBUG(D), "after sorting the chunk address \n"); +#ifdef H5Dmpio_DEBUG + H5D_MPIO_DEBUG(mpi_rank, "after sorting chunk addresses"); #endif /* Obtain MPI derived datatype from all individual chunks */ @@ -1241,9 +1571,9 @@ H5D__link_chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *typ /* No chunks selected for this process */ mpi_buf_count = (hsize_t)0; } /* end else */ -#ifdef H5D_DEBUG - if (H5DEBUG(D)) - HDfprintf(H5DEBUG(D), "before coming to final collective IO\n"); + +#ifdef H5Dmpio_DEBUG + H5D_MPIO_DEBUG(mpi_rank, "before coming to final collective I/O"); #endif /* Set up the base storage address for this chunk */ @@ -1256,11 +1586,11 @@ H5D__link_chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *typ } /* end else */ done: -#ifdef H5D_DEBUG - if (H5DEBUG(D)) - HDfprintf(H5DEBUG(D), "before freeing memory inside H5D_link_collective_io ret_value = %d\n", - ret_value); +#ifdef H5Dmpio_DEBUG + H5D_MPIO_DEBUG_VA(mpi_rank, "before freeing memory inside H5D_link_collective_io ret_value = %d", + ret_value); #endif + /* Release resources */ if (chunk_addr_info_array) H5MM_xfree(chunk_addr_info_array); @@ -1293,68 +1623,89 @@ done: /*------------------------------------------------------------------------- * Function: H5D__link_chunk_filtered_collective_io * - * Purpose: Routine for one collective IO with one MPI derived datatype - * to link with all filtered chunks - * - * 1. Construct a list of selected chunks in the collective IO - * operation - * A. If any chunk is being written to by more than 1 - * process, the process writing to the chunk which - * currently has the least amount of chunks assigned - * to it becomes the new owner (in the case of ties, - * the lowest MPI rank becomes the new owner) - * 2. If the operation is a write operation - * A. Loop through each chunk in the operation - * I. If this is not a full overwrite of the chunk - * a) Read the chunk from file and pass the chunk - * through the filter pipeline in reverse order - * (Unfilter the chunk) + * Purpose: Performs collective I/O on filtered chunks by creating a + * single MPI derived datatype to link with all filtered + * chunks. The general algorithm is as follows: + * + * 1. Construct a list of selected chunks in the collective + * I/O operation + * 2. If the operation is a read operation + * A. Ensure that the list of chunks is sorted in + * monotonically non-decreasing order of chunk offset + * in the file + * B. Participate in a collective read of chunks from + * the file + * C. Loop through each selected chunk, unfiltering it and + * scattering the data to the application's read buffer + * 3. If the operation is a write operation + * A. Redistribute any chunks being written by more than 1 + * MPI rank, such that the chunk is only owned by 1 MPI + * rank. The rank writing to the chunk which currently + * has the least amount of chunks assigned to it becomes + * the new owner (in the case of ties, the lowest MPI + * rank becomes the new owner) + * B. Participate in a collective read of chunks from the + * file + * C. Loop through each chunk selected in the operation + * and for each chunk: + * I. If we actually read the chunk from the file (if + * a chunk is being fully overwritten, we skip + * reading it), pass the chunk through the filter + * pipeline in reverse order (unfilter the chunk) * II. Update the chunk data with the modifications from - * the owning process + * the owning MPI rank * III. Receive any modification data from other - * processes and update the chunk data with these + * ranks and update the chunk data with those * modifications * IV. Filter the chunk - * B. Contribute the modified chunks to an array gathered - * by all processes which contains the new sizes of - * every chunk modified in the collective IO operation - * C. All processes collectively re-allocate each chunk - * from the gathered array with their new sizes after - * the filter operation - * D. If this process has any chunks selected in the IO - * operation, create an MPI derived type for memory and - * file to write out the process' selected chunks to the - * file - * E. Perform the collective write - * F. All processes collectively re-insert each modified + * D. Contribute the modified chunks to an array gathered + * by all ranks which contains information for + * re-allocating space in the file for every chunk + * modified. Then, each rank collectively re-allocates + * each chunk from the gathered array with their new + * sizes after the filter operation + * E. Proceed with the collective write operation for all + * the modified chunks + * F. Contribute the modified chunks to an array gathered + * by all ranks which contains information for + * re-inserting every chunk modified into the chunk + * index. Then, each rank collectively re-inserts each * chunk from the gathered array into the chunk index * + * TODO: Note that steps D. and F. here are both collective + * operations that partially share data from the + * H5D_filtered_collective_io_info_t structure. To + * try to conserve on memory a bit, the distributed + * arrays these operations create are discarded after + * each operation is performed. If memory consumption + * here proves to not be an issue, the necessary data + * for both operations could be combined into a single + * structure so that only one collective MPI operation + * is needed to carry out both operations, rather than + * two. * * Return: Non-negative on success/Negative on failure * - * Programmer: Jordan Henderson - * Friday, Nov. 4th, 2016 - * *------------------------------------------------------------------------- */ static herr_t H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, - H5D_chunk_map_t *fm) + H5D_chunk_map_t *fm, int mpi_rank, int mpi_size) { - H5D_filtered_collective_io_info_t *chunk_list = NULL; /* The list of chunks being read/written */ - H5D_filtered_collective_io_info_t *collective_chunk_list = - NULL; /* The list of chunks used during collective operations */ - H5D_storage_t ctg_store; /* Chunk storage information as contiguous dataset */ - MPI_Datatype mem_type = MPI_BYTE; - MPI_Datatype file_type = MPI_BYTE; - hbool_t mem_type_is_derived = FALSE; - hbool_t file_type_is_derived = FALSE; - size_t chunk_list_num_entries; - size_t collective_chunk_list_num_entries; - size_t * num_chunks_selected_array = NULL; /* Array of number of chunks selected on each process */ - size_t i; /* Local index variable */ - int mpi_rank, mpi_size, mpi_code; - herr_t ret_value = SUCCEED; + H5D_filtered_collective_io_info_t *chunk_list = NULL; /* The list of chunks being read/written */ + H5D_filtered_collective_io_info_t *chunk_hash_table = NULL; + unsigned char ** chunk_msg_bufs = NULL; + H5D_storage_t ctg_store; /* Chunk storage information as contiguous dataset */ + MPI_Datatype mem_type = MPI_BYTE; + MPI_Datatype file_type = MPI_BYTE; + hbool_t mem_type_is_derived = FALSE; + hbool_t file_type_is_derived = FALSE; + size_t * rank_chunks_assigned_map = NULL; + size_t chunk_list_num_entries; + size_t i; + int chunk_msg_bufs_len = 0; + int mpi_code; + herr_t ret_value = SUCCEED; FUNC_ENTER_STATIC @@ -1362,11 +1713,12 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in HDassert(type_info); HDassert(fm); - /* Obtain the current rank of the process and the number of processes */ - if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0) - HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank") - if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0) - HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size") +#ifdef H5Dmpio_DEBUG + H5D_MPIO_TRACE_ENTER(mpi_rank); + H5D_MPIO_DEBUG_VA(mpi_rank, "Performing Linked-chunk I/O (%s) with MPI Comm size of %d", + io_info->op_type == H5D_IO_OP_WRITE ? "write" : "read", mpi_size); + H5D_MPIO_TIME_START(mpi_rank, "Linked-chunk I/O"); +#endif /* Set the actual-chunk-opt-mode property. */ H5CX_set_mpio_actual_chunk_opt(H5D_MPIO_LINK_CHUNK); @@ -1377,123 +1729,127 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in H5CX_set_mpio_actual_io_mode(H5D_MPIO_CHUNK_COLLECTIVE); /* Build a list of selected chunks in the collective io operation */ - if (H5D__construct_filtered_io_info_list(io_info, type_info, fm, &chunk_list, &chunk_list_num_entries) < - 0) + if (H5D__mpio_collective_filtered_chunk_io_setup(io_info, type_info, fm, &chunk_list, + &chunk_list_num_entries, mpi_rank) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "couldn't construct filtered I/O info list") - if (io_info->op_type == H5D_IO_OP_WRITE) { /* Filtered collective write */ + if (io_info->op_type == H5D_IO_OP_READ) { /* Filtered collective read */ + if (H5D__mpio_collective_filtered_chunk_read(chunk_list, chunk_list_num_entries, io_info, type_info, + mpi_rank, mpi_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't read filtered chunks") + } + else { /* Filtered collective write */ H5D_chk_idx_info_t index_info; - H5D_chunk_ud_t udata; hsize_t mpi_buf_count; - /* Construct chunked index info */ - index_info.f = io_info->dset->oloc.file; - index_info.pline = &(io_info->dset->shared->dcpl_cache.pline); - index_info.layout = &(io_info->dset->shared->layout.u.chunk); - index_info.storage = &(io_info->dset->shared->layout.storage.u.chunk); - - /* Set up chunk information for insertion to chunk index */ - udata.common.layout = index_info.layout; - udata.common.storage = index_info.storage; - udata.filter_mask = 0; - - /* Iterate through all the chunks in the collective write operation, - * updating each chunk with the data modifications from other processes, - * then re-filtering the chunk. + H5D_MPIO_INIT_CHUNK_IDX_INFO(index_info, io_info); + + if (mpi_size > 1) { + /* Redistribute shared chunks being written to */ + if (H5D__mpio_redistribute_shared_chunks(chunk_list, chunk_list_num_entries, io_info, fm, + mpi_rank, mpi_size, &rank_chunks_assigned_map) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "unable to redistribute shared chunks") + + /* Send any chunk modification messages for chunks this rank no longer owns */ + if (H5D__mpio_share_chunk_modification_data(chunk_list, &chunk_list_num_entries, io_info, + type_info, mpi_rank, mpi_size, &chunk_hash_table, + &chunk_msg_bufs, &chunk_msg_bufs_len) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, + "unable to send chunk modification data between MPI ranks") + + /* Make sure the local chunk list was updated correctly */ + HDassert(chunk_list_num_entries == rank_chunks_assigned_map[mpi_rank]); + } + + /* Proceed to update all the chunks this rank owns with its own + * modification data and data from other ranks, before re-filtering + * the chunks. As chunk reads are done collectively here, all ranks + * must participate. */ - for (i = 0; i < chunk_list_num_entries; i++) - if (mpi_rank == chunk_list[i].owners.new_owner) - if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info, fm) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't process chunk entry") - - /* Gather the new chunk sizes to all processes for a collective reallocation - * of the chunks in the file. - */ - if (H5D__mpio_array_gatherv(chunk_list, chunk_list_num_entries, - sizeof(H5D_filtered_collective_io_info_t), - (void **)&collective_chunk_list, &collective_chunk_list_num_entries, true, - 0, io_info->comm, NULL) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather new chunk sizes") - - /* Collectively re-allocate the modified chunks (from each process) in the file */ - for (i = 0; i < collective_chunk_list_num_entries; i++) { - hbool_t insert; - - if (H5D__chunk_file_alloc(&index_info, &collective_chunk_list[i].chunk_states.chunk_current, - &collective_chunk_list[i].chunk_states.new_chunk, &insert, - collective_chunk_list[i].scaled) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk") - } /* end for */ - - if (NULL == (num_chunks_selected_array = (size_t *)H5MM_malloc((size_t)mpi_size * sizeof(size_t)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate num chunks selected array") - - if (MPI_SUCCESS != - (mpi_code = MPI_Allgather(&chunk_list_num_entries, 1, MPI_UNSIGNED_LONG_LONG, - num_chunks_selected_array, 1, MPI_UNSIGNED_LONG_LONG, io_info->comm))) - HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code) - - /* If this process has any chunks selected, create a MPI type for collectively - * writing out the chunks to file. Otherwise, the process contributes to the + if (H5D__mpio_collective_filtered_chunk_update(chunk_list, chunk_list_num_entries, chunk_hash_table, + chunk_msg_bufs, chunk_msg_bufs_len, io_info, type_info, + mpi_rank, mpi_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't update modified chunks") + + /* Free up resources used by chunk hash table now that we're done updating chunks */ + HASH_CLEAR(hh, chunk_hash_table); + + /* All ranks now collectively re-allocate file space for all chunks */ + if (H5D__mpio_collective_filtered_chunk_reallocate(chunk_list, chunk_list_num_entries, + rank_chunks_assigned_map, io_info, &index_info, + mpi_rank, mpi_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, + "couldn't collectively re-allocate file space for chunks") + + /* If this rank has any chunks selected, create a MPI type for collectively + * writing out the chunks to file. Otherwise, the rank contributes to the * collective write with a none type. */ - if (chunk_list_num_entries) { - size_t offset; - - /* During the collective re-allocation of chunks in the file, the record for each - * chunk is only updated in the collective array, not in the local copy of chunks on each - * process. However, each process needs the updated chunk records so that they can create - * a MPI type for the collective write that will write to the chunk's possible new locations - * in the file instead of the old ones. This ugly hack seems to be the best solution to - * copy the information back to the local array and avoid having to modify the collective - * write type function in an ugly way so that it will accept the collective array instead - * of the local array. This works correctly because the array gather function guarantees - * that the chunk data in the collective array is ordered in blocks by rank. - */ - for (i = 0, offset = 0; i < (size_t)mpi_rank; i++) - offset += num_chunks_selected_array[i]; - - H5MM_memcpy(chunk_list, &collective_chunk_list[offset], - num_chunks_selected_array[mpi_rank] * sizeof(H5D_filtered_collective_io_info_t)); + if (H5D__mpio_collective_filtered_io_type(chunk_list, chunk_list_num_entries, io_info->op_type, + &mem_type, &mem_type_is_derived, &file_type, + &file_type_is_derived) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, + "couldn't create MPI type for writing filtered chunks") - /* Create single MPI type encompassing each selection in the dataspace */ - if (H5D__mpio_filtered_collective_write_type(chunk_list, chunk_list_num_entries, &mem_type, - &mem_type_is_derived, &file_type, - &file_type_is_derived) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_BADTYPE, FAIL, "couldn't create MPI link chunk I/O type") + mpi_buf_count = (file_type_is_derived || mem_type_is_derived) ? 1 : 0; - /* Override the write buffer to point to the address of the first - * chunk data buffer + /* Setup contig storage info for I/O operation */ + if (chunk_list_num_entries) { + /* + * Override the write buffer to point to the first + * chunk's data buffer */ io_info->u.wbuf = chunk_list[0].buf; - } /* end if */ - /* We have a single, complicated MPI datatype for both memory & file */ - mpi_buf_count = (mem_type_is_derived && file_type_is_derived) ? (hsize_t)1 : (hsize_t)0; - - /* Set up the base storage address for this operation */ - ctg_store.contig.dset_addr = 0; /* Write address must be set to address 0 */ - io_info->store = &ctg_store; + /* + * Setup the base storage address for this operation + * to be the first chunk's file address + */ + ctg_store.contig.dset_addr = chunk_list[0].chunk_new.offset; + } + else + ctg_store.contig.dset_addr = 0; /* Perform I/O */ + io_info->store = &ctg_store; if (H5D__final_collective_io(io_info, type_info, mpi_buf_count, file_type, mem_type) < 0) HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish MPI-IO") + /* Free up resources in anticipation of following collective operation */ + for (i = 0; i < chunk_list_num_entries; i++) { + if (chunk_list[i].buf) { + H5MM_free(chunk_list[i].buf); + chunk_list[i].buf = NULL; + } + } + /* Participate in the collective re-insertion of all chunks modified - * in this iteration into the chunk index + * into the chunk index */ - for (i = 0; i < collective_chunk_list_num_entries; i++) { - udata.chunk_block = collective_chunk_list[i].chunk_states.new_chunk; - udata.common.scaled = collective_chunk_list[i].scaled; - udata.chunk_idx = collective_chunk_list[i].index; - - if ((index_info.storage->ops->insert)(&index_info, &udata, io_info->dset) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTINSERT, FAIL, "unable to insert chunk address into index") - } /* end for */ - } /* end if */ + if (H5D__mpio_collective_filtered_chunk_reinsert(chunk_list, chunk_list_num_entries, + rank_chunks_assigned_map, io_info, &index_info, + mpi_rank, mpi_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, + "couldn't collectively re-insert modified chunks into chunk index") + } done: - /* Free resources used by a process which had some selection */ + /* Free the MPI buf and file types, if they were derived */ + if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + + if (chunk_msg_bufs) { + for (i = 0; i < (size_t)chunk_msg_bufs_len; i++) + H5MM_free(chunk_msg_bufs[i]); + + H5MM_free(chunk_msg_bufs); + } + + HASH_CLEAR(hh, chunk_hash_table); + + /* Free resources used by a rank which had some selection */ if (chunk_list) { for (i = 0; i < chunk_list_num_entries; i++) if (chunk_list[i].buf) @@ -1502,16 +1858,13 @@ done: H5MM_free(chunk_list); } /* end if */ - if (num_chunks_selected_array) - H5MM_free(num_chunks_selected_array); - if (collective_chunk_list) - H5MM_free(collective_chunk_list); + if (rank_chunks_assigned_map) + H5MM_free(rank_chunks_assigned_map); - /* Free the MPI buf and file types, if they were derived */ - if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type))) - HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) - if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type))) - HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) +#ifdef H5Dmpio_DEBUG + H5D_MPIO_TIME_STOP(mpi_rank); + H5D_MPIO_TRACE_EXIT(mpi_rank); +#endif FUNC_LEAVE_NOAPI(ret_value) } /* end H5D__link_chunk_filtered_collective_io() */ @@ -1534,7 +1887,8 @@ done: *------------------------------------------------------------------------- */ static herr_t -H5D__multi_chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, H5D_chunk_map_t *fm) +H5D__multi_chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, H5D_chunk_map_t *fm, + int mpi_rank, int mpi_size) { H5D_io_info_t ctg_io_info; /* Contiguous I/O info object */ H5D_storage_t ctg_store; /* Chunk storage information as contiguous dataset */ @@ -1547,11 +1901,8 @@ H5D__multi_chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *ty H5FD_mpio_collective_opt_t last_coll_opt_mode = H5FD_MPIO_COLLECTIVE_IO; /* Last parallel transfer with independent IO or collective IO with this mode */ - size_t total_chunk; /* Total # of chunks in dataset */ -#ifdef H5Dmpio_DEBUG - int mpi_rank; -#endif - size_t u; /* Local index variable */ + size_t total_chunk; /* Total # of chunks in dataset */ + size_t u; /* Local index variable */ H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_NO_COLLECTIVE; /* Local variable for tracking the I/O mode used. */ herr_t ret_value = SUCCEED; @@ -1561,10 +1912,6 @@ H5D__multi_chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *ty /* Set the actual chunk opt mode property */ H5CX_set_mpio_actual_chunk_opt(H5D_MPIO_MULTI_CHUNK); -#ifdef H5Dmpio_DEBUG - mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file); -#endif - /* Retrieve total # of chunks in dataset */ H5_CHECKED_ASSIGN(total_chunk, size_t, fm->layout->u.chunk.nchunks, hsize_t); HDassert(total_chunk != 0); @@ -1572,13 +1919,13 @@ H5D__multi_chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *ty /* Allocate memories */ chunk_io_option = (uint8_t *)H5MM_calloc(total_chunk); chunk_addr = (haddr_t *)H5MM_calloc(total_chunk * sizeof(haddr_t)); -#ifdef H5D_DEBUG - if (H5DEBUG(D)) - HDfprintf(H5DEBUG(D), "total_chunk %zu\n", total_chunk); + +#ifdef H5Dmpio_DEBUG + H5D_MPIO_DEBUG_VA(mpi_rank, "total_chunk %zu", total_chunk); #endif /* Obtain IO option for each chunk */ - if (H5D__obtain_mpio_mode(io_info, fm, chunk_io_option, chunk_addr) < 0) + if (H5D__obtain_mpio_mode(io_info, fm, chunk_io_option, chunk_addr, mpi_rank, mpi_size) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "unable to obtain MPIO mode") /* Set up contiguous I/O info object */ @@ -1606,9 +1953,8 @@ H5D__multi_chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *ty H5S_t * fspace; /* Dataspace describing chunk & selection in it */ H5S_t * mspace; /* Dataspace describing selection in memory corresponding to this chunk */ -#ifdef H5D_DEBUG - if (H5DEBUG(D)) - HDfprintf(H5DEBUG(D), "mpi_rank = %d, chunk index = %zu\n", mpi_rank, u); +#ifdef H5Dmpio_DEBUG + H5D_MPIO_DEBUG_VA(mpi_rank, "mpi_rank = %d, chunk index = %zu", mpi_rank, u); #endif /* Get the chunk info for this chunk, if there are elements selected */ chunk_info = fm->select_chunk[u]; @@ -1626,10 +1972,9 @@ H5D__multi_chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *ty * needs to contribute MPI NONE TYPE. */ if (chunk_io_option[u] == H5D_CHUNK_IO_MODE_COL) { -#ifdef H5D_DEBUG - if (H5DEBUG(D)) - HDfprintf(H5DEBUG(D), "inside collective chunk IO mpi_rank = %d, chunk index = %zu\n", - mpi_rank, u); +#ifdef H5Dmpio_DEBUG + H5D_MPIO_DEBUG_VA(mpi_rank, "inside collective chunk IO mpi_rank = %d, chunk index = %zu", + mpi_rank, u); #endif /* Set the file & memory dataspaces */ @@ -1665,10 +2010,9 @@ H5D__multi_chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *ty HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish shared collective MPI-IO") } /* end if */ else { /* possible independent IO for this chunk */ -#ifdef H5D_DEBUG - if (H5DEBUG(D)) - HDfprintf(H5DEBUG(D), "inside independent IO mpi_rank = %d, chunk index = %zu\n", mpi_rank, - u); +#ifdef H5Dmpio_DEBUG + H5D_MPIO_DEBUG_VA(mpi_rank, "inside independent IO mpi_rank = %d, chunk index = %zu", mpi_rank, + u); #endif HDassert(chunk_io_option[u] == 0); @@ -1698,9 +2042,8 @@ H5D__multi_chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *ty /* Perform the I/O */ if (H5D__inter_collective_io(&ctg_io_info, type_info, fspace, mspace) < 0) HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish shared collective MPI-IO") -#ifdef H5D_DEBUG - if (H5DEBUG(D)) - HDfprintf(H5DEBUG(D), "after inter collective IO\n"); +#ifdef H5Dmpio_DEBUG + H5D_MPIO_DEBUG(mpi_rank, "after inter collective IO"); #endif } /* end else */ } /* end for */ @@ -1720,80 +2063,101 @@ done: /*------------------------------------------------------------------------- * Function: H5D__multi_chunk_filtered_collective_io * - * Purpose: To do filtered collective IO iteratively to save on memory. - * While link_chunk_filtered_collective_io will construct and - * work on a list of all of the chunks selected in the IO - * operation at once, this function works iteratively on a set - * of chunks at a time; at most one chunk per rank per - * iteration. - * - * 1. Construct a list of selected chunks in the collective IO - * operation - * A. If any chunk is being written to by more than 1 - * process, the process writing to the chunk which - * currently has the least amount of chunks assigned - * to it becomes the new owner (in the case of ties, - * the lowest MPI rank becomes the new owner) - * 2. If the operation is a read operation - * A. Loop through each chunk in the operation - * I. Read the chunk from the file - * II. Unfilter the chunk - * III. Scatter the read chunk data to the user's buffer - * 3. If the operation is a write operation - * A. Loop through each chunk in the operation - * I. If this is not a full overwrite of the chunk - * a) Read the chunk from file and pass the chunk - * through the filter pipeline in reverse order - * (Unfilter the chunk) - * II. Update the chunk data with the modifications from - * the owning process - * III. Receive any modification data from other - * processes and update the chunk data with these - * modifications - * IV. Filter the chunk - * V. Contribute the chunk to an array gathered by - * all processes which contains every chunk - * modified in this iteration (up to one chunk - * per process, some processes may not have a - * selection/may have less chunks to work on than - * other processes) - * VI. All processes collectively re-allocate each - * chunk from the gathered array with their new - * sizes after the filter operation - * VII. Proceed with the collective write operation - * for the chunks modified on this iteration - * VIII. All processes collectively re-insert each - * chunk from the gathered array into the chunk - * index + * Purpose: Performs collective I/O on filtered chunks iteratively to + * save on memory and potentially get better performance + * depending on the average number of chunks per rank. While + * linked-chunk I/O will construct and work on a list of all + * of the chunks selected in the I/O operation at once, this + * function works iteratively on a set of chunks at a time; at + * most one chunk per rank per iteration. The general + * algorithm is as follows: + * + * 1. Construct a list of selected chunks in the collective + * I/O operation + * 2. If the operation is a read operation, loop an amount of + * times equal to the maximum number of chunks selected on + * any particular rank and on each iteration: + * A. Participate in a collective read of chunks from + * the file (ranks that run out of chunks still need + * to participate) + * B. Unfilter the chunk that was read (if any) + * C. Scatter the read chunk's data to the application's + * read buffer + * 3. If the operation is a write operation, redistribute any + * chunks being written to by more than 1 MPI rank, such + * that the chunk is only owned by 1 MPI rank. The rank + * writing to the chunk which currently has the least + * amount of chunks assigned to it becomes the new owner + * (in the case of ties, the lowest MPI rank becomes the + * new owner). Then, loop an amount of times equal to the + * maximum number of chunks selected on any particular + * rank and on each iteration: + * A. Participate in a collective read of chunks from + * the file (ranks that run out of chunks still need + * to participate) + * I. If we actually read a chunk from the file (if + * a chunk is being fully overwritten, we skip + * reading it), pass the chunk through the filter + * pipeline in reverse order (unfilter the chunk) + * B. Update the chunk data with the modifications from + * the owning rank + * C. Receive any modification data from other ranks and + * update the chunk data with those modifications + * D. Filter the chunk + * E. Contribute the chunk to an array gathered by + * all ranks which contains information for + * re-allocating space in the file for every chunk + * modified in this iteration (up to one chunk per + * rank; some ranks may not have a selection/may have + * less chunks to work on than other ranks). Then, + * each rank collectively re-allocates each chunk + * from the gathered array with their new sizes + * after the filter operation + * F. Proceed with the collective write operation + * for the chunks modified on this iteration + * G. Contribute the chunk to an array gathered by + * all ranks which contains information for + * re-inserting every chunk modified on this + * iteration into the chunk index. Then, each rank + * collectively re-inserts each chunk from the + * gathered array into the chunk index + * + * TODO: Note that steps E. and G. here are both collective + * operations that partially share data from the + * H5D_filtered_collective_io_info_t structure. To + * try to conserve on memory a bit, the distributed + * arrays these operations create are discarded after + * each operation is performed. If memory consumption + * here proves to not be an issue, the necessary data + * for both operations could be combined into a single + * structure so that only one collective MPI operation + * is needed to carry out both operations, rather than + * two. * * Return: Non-negative on success/Negative on failure * - * Programmer: Jordan Henderson - * Friday, Dec. 2nd, 2016 - * *------------------------------------------------------------------------- */ static herr_t H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, - H5D_chunk_map_t *fm) + H5D_chunk_map_t *fm, int mpi_rank, int mpi_size) { - H5D_filtered_collective_io_info_t *chunk_list = NULL; /* The list of chunks being read/written */ - H5D_filtered_collective_io_info_t *collective_chunk_list = - NULL; /* The list of chunks used during collective operations */ - H5D_storage_t store; /* union of EFL and chunk pointer in file space */ - H5D_io_info_t ctg_io_info; /* Contiguous I/O info object */ - H5D_storage_t ctg_store; /* Chunk storage information as contiguous dataset */ - MPI_Datatype *file_type_array = NULL; - MPI_Datatype *mem_type_array = NULL; - hbool_t * file_type_is_derived_array = NULL; - hbool_t * mem_type_is_derived_array = NULL; - hbool_t * has_chunk_selected_array = - NULL; /* Array of whether or not each process is contributing a chunk to each iteration */ - size_t chunk_list_num_entries; - size_t collective_chunk_list_num_entries; - size_t i, j; /* Local index variable */ - int mpi_rank, mpi_size, mpi_code; - herr_t ret_value = SUCCEED; + H5D_filtered_collective_io_info_t *chunk_list = NULL; /* The list of chunks being read/written */ + H5D_filtered_collective_io_info_t *chunk_hash_table = NULL; + unsigned char ** chunk_msg_bufs = NULL; + H5D_io_info_t ctg_io_info; /* Contiguous I/O info object */ + H5D_storage_t ctg_store; /* Chunk storage information as contiguous dataset */ + MPI_Datatype mem_type = MPI_BYTE; + MPI_Datatype file_type = MPI_BYTE; + hbool_t mem_type_is_derived = FALSE; + hbool_t file_type_is_derived = FALSE; + hbool_t have_chunk_to_process; + size_t chunk_list_num_entries; + size_t i; + size_t max_num_chunks; + int chunk_msg_bufs_len = 0; + int mpi_code; + herr_t ret_value = SUCCEED; FUNC_ENTER_STATIC @@ -1801,11 +2165,12 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i HDassert(type_info); HDassert(fm); - /* Obtain the current rank of the process and the number of processes */ - if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0) - HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank") - if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0) - HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size") +#ifdef H5Dmpio_DEBUG + H5D_MPIO_TRACE_ENTER(mpi_rank); + H5D_MPIO_DEBUG_VA(mpi_rank, "Performing Multi-chunk I/O (%s) with MPI Comm size of %d", + io_info->op_type == H5D_IO_OP_WRITE ? "write" : "read", mpi_size); + H5D_MPIO_TIME_START(mpi_rank, "Multi-chunk I/O"); +#endif /* Set the actual chunk opt mode property */ H5CX_set_mpio_actual_chunk_opt(H5D_MPIO_MULTI_CHUNK); @@ -1816,10 +2181,19 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i H5CX_set_mpio_actual_io_mode(H5D_MPIO_CHUNK_COLLECTIVE); /* Build a list of selected chunks in the collective IO operation */ - if (H5D__construct_filtered_io_info_list(io_info, type_info, fm, &chunk_list, &chunk_list_num_entries) < - 0) + if (H5D__mpio_collective_filtered_chunk_io_setup(io_info, type_info, fm, &chunk_list, + &chunk_list_num_entries, mpi_rank) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "couldn't construct filtered I/O info list") + /* Retrieve the maximum number of chunks selected for any rank */ + if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&chunk_list_num_entries, &max_num_chunks, 1, + MPI_UNSIGNED_LONG_LONG, MPI_MAX, io_info->comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code) + + /* If no one has anything selected at all, end the operation */ + if (0 == max_num_chunks) + HGOTO_DONE(SUCCEED); + /* Set up contiguous I/O info object */ H5MM_memcpy(&ctg_io_info, io_info, sizeof(ctg_io_info)); ctg_io_info.store = &ctg_store; @@ -1827,190 +2201,147 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i /* Initialize temporary contiguous storage info */ ctg_store.contig.dset_size = (hsize_t)io_info->dset->shared->layout.u.chunk.size; - ctg_store.contig.dset_addr = 0; - - /* Set dataset storage for I/O info */ - io_info->store = &store; if (io_info->op_type == H5D_IO_OP_READ) { /* Filtered collective read */ - for (i = 0; i < chunk_list_num_entries; i++) - if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info, fm) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't process chunk entry") - } /* end if */ + for (i = 0; i < max_num_chunks; i++) { + /* Check if this rank has a chunk to work on for this iteration */ + have_chunk_to_process = (i < chunk_list_num_entries); + + if (H5D__mpio_collective_filtered_chunk_read(have_chunk_to_process ? &chunk_list[i] : NULL, + have_chunk_to_process ? 1 : 0, io_info, type_info, + mpi_rank, mpi_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't read filtered chunks") + + if (have_chunk_to_process && chunk_list[i].buf) { + H5MM_free(chunk_list[i].buf); + chunk_list[i].buf = NULL; + } + } + } else { /* Filtered collective write */ H5D_chk_idx_info_t index_info; - H5D_chunk_ud_t udata; - size_t max_num_chunks; hsize_t mpi_buf_count; /* Construct chunked index info */ - index_info.f = io_info->dset->oloc.file; - index_info.pline = &(io_info->dset->shared->dcpl_cache.pline); - index_info.layout = &(io_info->dset->shared->layout.u.chunk); - index_info.storage = &(io_info->dset->shared->layout.storage.u.chunk); - - /* Set up chunk information for insertion to chunk index */ - udata.common.layout = index_info.layout; - udata.common.storage = index_info.storage; - udata.filter_mask = 0; - - /* Retrieve the maximum number of chunks being written among all processes */ - if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&chunk_list_num_entries, &max_num_chunks, 1, - MPI_UNSIGNED_LONG_LONG, MPI_MAX, io_info->comm))) - HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code) - - /* If no one is writing anything at all, end the operation */ - if (!(max_num_chunks > 0)) - HGOTO_DONE(SUCCEED); - - /* Allocate arrays for storing MPI file and mem types and whether or not the - * types were derived. - */ - if (NULL == (file_type_array = (MPI_Datatype *)H5MM_malloc(max_num_chunks * sizeof(MPI_Datatype)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate file type array") - - if (NULL == (file_type_is_derived_array = (hbool_t *)H5MM_calloc(max_num_chunks * sizeof(hbool_t)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate file type is derived array") - - if (NULL == (mem_type_array = (MPI_Datatype *)H5MM_malloc(max_num_chunks * sizeof(MPI_Datatype)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate mem type array") - - if (NULL == (mem_type_is_derived_array = (hbool_t *)H5MM_calloc(max_num_chunks * sizeof(hbool_t)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate mem type is derived array") - - /* Iterate over the max number of chunks among all processes, as this process could - * have no chunks left to work on, but it still needs to participate in the collective - * re-allocation and re-insertion of chunks modified by other processes. + H5D_MPIO_INIT_CHUNK_IDX_INFO(index_info, io_info); + + if (mpi_size > 1) { + /* Redistribute shared chunks being written to */ + if (H5D__mpio_redistribute_shared_chunks(chunk_list, chunk_list_num_entries, io_info, fm, + mpi_rank, mpi_size, NULL) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "unable to redistribute shared chunks") + + /* Send any chunk modification messages for chunks this rank no longer owns */ + if (H5D__mpio_share_chunk_modification_data(chunk_list, &chunk_list_num_entries, io_info, + type_info, mpi_rank, mpi_size, &chunk_hash_table, + &chunk_msg_bufs, &chunk_msg_bufs_len) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, + "unable to send chunk modification data between MPI ranks") + } + + /* Iterate over the max number of chunks among all ranks, as this rank could + * have no chunks left to work on, but it still needs to participate in the + * collective re-allocation and re-insertion of chunks modified by other ranks. */ for (i = 0; i < max_num_chunks; i++) { - /* Check if this process has a chunk to work on for this iteration */ - hbool_t have_chunk_to_process = - (i < chunk_list_num_entries) && (mpi_rank == chunk_list[i].owners.new_owner); - - if (have_chunk_to_process) - if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info, fm) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't process chunk entry") + /* Check if this rank has a chunk to work on for this iteration */ + have_chunk_to_process = (i < chunk_list_num_entries) && (mpi_rank == chunk_list[i].new_owner); - /* Gather the new chunk sizes to all processes for a collective re-allocation - * of the chunks in the file + /* Proceed to update the chunk this rank owns (if any left) with its + * own modification data and data from other ranks, before re-filtering + * the chunks. As chunk reads are done collectively here, all ranks + * must participate. */ - if (H5D__mpio_array_gatherv(&chunk_list[i], have_chunk_to_process ? 1 : 0, - sizeof(H5D_filtered_collective_io_info_t), - (void **)&collective_chunk_list, &collective_chunk_list_num_entries, - true, 0, io_info->comm, NULL) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather new chunk sizes") - - /* Participate in the collective re-allocation of all chunks modified - * in this iteration. + if (H5D__mpio_collective_filtered_chunk_update(have_chunk_to_process ? &chunk_list[i] : NULL, + have_chunk_to_process ? 1 : 0, chunk_hash_table, + chunk_msg_bufs, chunk_msg_bufs_len, io_info, + type_info, mpi_rank, mpi_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't update modified chunks") + + /* All ranks now collectively re-allocate file space for all chunks */ + if (H5D__mpio_collective_filtered_chunk_reallocate(have_chunk_to_process ? &chunk_list[i] : NULL, + have_chunk_to_process ? 1 : 0, NULL, io_info, + &index_info, mpi_rank, mpi_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, + "couldn't collectively re-allocate file space for chunks") + + /* + * If this rank has a chunk to work on, create a MPI type + * for writing out the chunk. Otherwise, the rank will + * use MPI_BYTE for the file and memory type and specify + * a count of 0. */ - for (j = 0; j < collective_chunk_list_num_entries; j++) { - hbool_t insert = FALSE; + if (H5D__mpio_collective_filtered_io_type( + have_chunk_to_process ? &chunk_list[i] : NULL, have_chunk_to_process ? 1 : 0, + io_info->op_type, &mem_type, &mem_type_is_derived, &file_type, &file_type_is_derived) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, + "couldn't create MPI type for writing filtered chunks") - if (H5D__chunk_file_alloc(&index_info, &collective_chunk_list[j].chunk_states.chunk_current, - &collective_chunk_list[j].chunk_states.new_chunk, &insert, - chunk_list[j].scaled) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk") - } /* end for */ - - if (NULL == - (has_chunk_selected_array = (hbool_t *)H5MM_malloc((size_t)mpi_size * sizeof(hbool_t)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate num chunks selected array") - - if (MPI_SUCCESS != - (mpi_code = MPI_Allgather(&have_chunk_to_process, 1, MPI_C_BOOL, has_chunk_selected_array, 1, - MPI_C_BOOL, io_info->comm))) - HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code) + mpi_buf_count = (file_type_is_derived || mem_type_is_derived) ? 1 : 0; - /* If this process has a chunk to work on, create a MPI type for the - * memory and file for writing out the chunk - */ + /* Override the write buffer to point to the chunk data buffer */ if (have_chunk_to_process) { - size_t offset; - int mpi_type_count; - - for (j = 0, offset = 0; j < (size_t)mpi_rank; j++) - offset += has_chunk_selected_array[j]; - - /* Collect the new chunk info back to the local copy, since only the record in the - * collective array gets updated by the chunk re-allocation */ - H5MM_memcpy(&chunk_list[i].chunk_states.new_chunk, - &collective_chunk_list[offset].chunk_states.new_chunk, - sizeof(chunk_list[i].chunk_states.new_chunk)); - - H5_CHECKED_ASSIGN(mpi_type_count, int, chunk_list[i].chunk_states.new_chunk.length, hsize_t); - - /* Create MPI memory type for writing to chunk */ - if (MPI_SUCCESS != - (mpi_code = MPI_Type_contiguous(mpi_type_count, MPI_BYTE, &mem_type_array[i]))) - HMPI_GOTO_ERROR(FAIL, "MPI_Type_contiguous failed", mpi_code) - if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(&mem_type_array[i]))) - HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) - mem_type_is_derived_array[i] = TRUE; - - /* Create MPI file type for writing to chunk */ - if (MPI_SUCCESS != - (mpi_code = MPI_Type_contiguous(mpi_type_count, MPI_BYTE, &file_type_array[i]))) - HMPI_GOTO_ERROR(FAIL, "MPI_Type_contiguous failed", mpi_code) - if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(&file_type_array[i]))) - HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) - file_type_is_derived_array[i] = TRUE; - - mpi_buf_count = 1; - - /* Set up the base storage address for this operation */ - ctg_store.contig.dset_addr = chunk_list[i].chunk_states.new_chunk.offset; - - /* Override the write buffer to point to the address of the - * chunk data buffer + /* + * Override the write buffer to point to the + * chunk's data buffer */ ctg_io_info.u.wbuf = chunk_list[i].buf; - } /* end if */ - else { - mem_type_array[i] = file_type_array[i] = MPI_BYTE; - mpi_buf_count = 0; - } /* end else */ + + /* + * Setup the base storage address for this + * operation to be the chunk's file address + */ + ctg_store.contig.dset_addr = chunk_list[i].chunk_new.offset; + } + else + ctg_store.contig.dset_addr = 0; /* Perform the I/O */ - if (H5D__final_collective_io(&ctg_io_info, type_info, mpi_buf_count, file_type_array[i], - mem_type_array[i]) < 0) + if (H5D__final_collective_io(&ctg_io_info, type_info, mpi_buf_count, file_type, mem_type) < 0) HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish MPI-IO") + /* Free up resources in anticipation of following collective operation */ + if (have_chunk_to_process && chunk_list[i].buf) { + H5MM_free(chunk_list[i].buf); + chunk_list[i].buf = NULL; + } + /* Participate in the collective re-insertion of all chunks modified * in this iteration into the chunk index */ - for (j = 0; j < collective_chunk_list_num_entries; j++) { - udata.chunk_block = collective_chunk_list[j].chunk_states.new_chunk; - udata.common.scaled = collective_chunk_list[j].scaled; - udata.chunk_idx = collective_chunk_list[j].index; - - if ((index_info.storage->ops->insert)(&index_info, &udata, io_info->dset) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTINSERT, FAIL, - "unable to insert chunk address into index") - } /* end for */ + if (H5D__mpio_collective_filtered_chunk_reinsert(have_chunk_to_process ? &chunk_list[i] : NULL, + have_chunk_to_process ? 1 : 0, NULL, io_info, + &index_info, mpi_rank, mpi_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, + "couldn't collectively re-insert modified chunks into chunk index") + + /* Free the MPI types, if they were derived */ + if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + mem_type_is_derived = FALSE; + if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + file_type_is_derived = FALSE; + } /* end for */ + } - if (collective_chunk_list) { - H5MM_free(collective_chunk_list); - collective_chunk_list = NULL; - } /* end if */ - if (has_chunk_selected_array) { - H5MM_free(has_chunk_selected_array); - has_chunk_selected_array = NULL; - } /* end if */ - } /* end for */ +done: + /* Free the MPI buf and file types, if they were derived */ + if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) - /* Free the MPI file and memory types, if they were derived */ - for (i = 0; i < max_num_chunks; i++) { - if (file_type_is_derived_array[i]) - if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type_array[i]))) - HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + if (chunk_msg_bufs) { + for (i = 0; i < (size_t)chunk_msg_bufs_len; i++) + H5MM_free(chunk_msg_bufs[i]); - if (mem_type_is_derived_array[i]) - if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type_array[i]))) - HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) - } /* end for */ - } /* end else */ + H5MM_free(chunk_msg_bufs); + } -done: + HASH_CLEAR(hh, chunk_hash_table); + + /* Free resources used by a rank which had some selection */ if (chunk_list) { for (i = 0; i < chunk_list_num_entries; i++) if (chunk_list[i].buf) @@ -2019,16 +2350,10 @@ done: H5MM_free(chunk_list); } /* end if */ - if (collective_chunk_list) - H5MM_free(collective_chunk_list); - if (file_type_array) - H5MM_free(file_type_array); - if (mem_type_array) - H5MM_free(mem_type_array); - if (file_type_is_derived_array) - H5MM_free(file_type_is_derived_array); - if (mem_type_is_derived_array) - H5MM_free(mem_type_is_derived_array); +#ifdef H5Dmpio_DEBUG + H5D_MPIO_TIME_STOP(mpi_rank); + H5D_MPIO_TRACE_EXIT(mpi_rank); +#endif FUNC_LEAVE_NOAPI(ret_value) } /* end H5D__multi_chunk_filtered_collective_io() */ @@ -2054,11 +2379,22 @@ H5D__inter_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf hbool_t mbt_is_derived = FALSE; hbool_t mft_is_derived = FALSE; MPI_Datatype mpi_file_type, mpi_buf_type; - int mpi_code; /* MPI return code */ - herr_t ret_value = SUCCEED; /* return value */ + int mpi_code; /* MPI return code */ +#ifdef H5Dmpio_DEBUG + int mpi_rank; +#endif + herr_t ret_value = SUCCEED; /* return value */ FUNC_ENTER_STATIC +#ifdef H5Dmpio_DEBUG + mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file); + H5D_MPIO_TRACE_ENTER(mpi_rank); + H5D_MPIO_TIME_START(mpi_rank, "Inter collective I/O"); + if (mpi_rank < 0) + HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain MPI rank") +#endif + if ((file_space != NULL) && (mem_space != NULL)) { int mpi_file_count; /* Number of file "objects" to transfer */ hsize_t *permute_map = NULL; /* array that holds the mapping from the old, @@ -2117,9 +2453,8 @@ H5D__inter_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf mft_is_derived = FALSE; } /* end else */ -#ifdef H5D_DEBUG - if (H5DEBUG(D)) - HDfprintf(H5DEBUG(D), "before final collective IO \n"); +#ifdef H5Dmpio_DEBUG + H5D_MPIO_DEBUG(mpi_rank, "before final collective I/O"); #endif /* Perform final collective I/O operation */ @@ -2133,9 +2468,10 @@ done: if (mft_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mpi_file_type))) HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) -#ifdef H5D_DEBUG - if (H5DEBUG(D)) - HDfprintf(H5DEBUG(D), "before leaving inter_collective_io ret_value = %d\n", ret_value); +#ifdef H5Dmpio_DEBUG + H5D_MPIO_TIME_STOP(mpi_rank); + H5D_MPIO_DEBUG_VA(mpi_rank, "before leaving inter_collective_io ret_value = %d", ret_value); + H5D_MPIO_TRACE_EXIT(mpi_rank); #endif FUNC_LEAVE_NOAPI(ret_value) @@ -2157,10 +2493,21 @@ static herr_t H5D__final_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, hsize_t mpi_buf_count, MPI_Datatype mpi_file_type, MPI_Datatype mpi_buf_type) { +#ifdef H5Dmpio_DEBUG + int mpi_rank; +#endif herr_t ret_value = SUCCEED; FUNC_ENTER_STATIC +#ifdef H5Dmpio_DEBUG + mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file); + H5D_MPIO_TRACE_ENTER(mpi_rank); + H5D_MPIO_TIME_START(mpi_rank, "Final collective I/O"); + if (mpi_rank < 0) + HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain MPI rank") +#endif + /* Pass buf type, file type to the file driver. */ if (H5CX_set_mpi_coll_datatypes(mpi_buf_type, mpi_file_type) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set MPI-I/O collective I/O datatypes") @@ -2175,10 +2522,12 @@ H5D__final_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf } /* end else */ done: -#ifdef H5D_DEBUG - if (H5DEBUG(D)) - HDfprintf(H5DEBUG(D), "ret_value before leaving final_collective_io=%d\n", ret_value); +#ifdef H5Dmpio_DEBUG + H5D_MPIO_TIME_STOP(mpi_rank); + H5D_MPIO_DEBUG_VA(mpi_rank, "ret_value before leaving final_collective_io=%d", ret_value); + H5D_MPIO_TRACE_EXIT(mpi_rank); #endif + FUNC_LEAVE_NOAPI(ret_value) } /* end H5D__final_collective_io */ @@ -2220,62 +2569,149 @@ H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2) * * Return: -1, 0, 1 * - * Programmer: Jordan Henderson - * Wednesday, Nov. 30th, 2016 - * *------------------------------------------------------------------------- */ static int H5D__cmp_filtered_collective_io_info_entry(const void *filtered_collective_io_info_entry1, const void *filtered_collective_io_info_entry2) { - haddr_t addr1 = HADDR_UNDEF, addr2 = HADDR_UNDEF; + const H5D_filtered_collective_io_info_t *entry1; + const H5D_filtered_collective_io_info_t *entry2; + haddr_t addr1 = HADDR_UNDEF; + haddr_t addr2 = HADDR_UNDEF; + int ret_value; FUNC_ENTER_STATIC_NOERR - addr1 = ((const H5D_filtered_collective_io_info_t *)filtered_collective_io_info_entry1) - ->chunk_states.new_chunk.offset; - addr2 = ((const H5D_filtered_collective_io_info_t *)filtered_collective_io_info_entry2) - ->chunk_states.new_chunk.offset; + entry1 = (const H5D_filtered_collective_io_info_t *)filtered_collective_io_info_entry1; + entry2 = (const H5D_filtered_collective_io_info_t *)filtered_collective_io_info_entry2; - FUNC_LEAVE_NOAPI(H5F_addr_cmp(addr1, addr2)) -} /* end H5D__cmp_filtered_collective_io_info_entry() */ + addr1 = entry1->chunk_new.offset; + addr2 = entry2->chunk_new.offset; + + /* + * If both chunk addresses are defined, H5F_addr_cmp is safe to use. + * Otherwise, if both addresses aren't defined, compared chunk + * entries based on their chunk index. Finally, if only one chunk + * address is defined, return the appropriate value based on which + * is defined. + */ + if (H5F_addr_defined(addr1) && H5F_addr_defined(addr2)) { + ret_value = H5F_addr_cmp(addr1, addr2); + } + else if (!H5F_addr_defined(addr1) && !H5F_addr_defined(addr2)) { + hsize_t chunk_idx1 = entry1->index_info.chunk_idx; + hsize_t chunk_idx2 = entry2->index_info.chunk_idx; -#if MPI_VERSION >= 3 + ret_value = (chunk_idx1 > chunk_idx2) - (chunk_idx1 < chunk_idx2); + } + else + ret_value = H5F_addr_defined(addr1) ? 1 : -1; + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__cmp_filtered_collective_io_info_entry() */ /*------------------------------------------------------------------------- - * Function: H5D__cmp_filtered_collective_io_info_entry_owner + * Function: H5D__cmp_chunk_redistribute_info * - * Purpose: Routine to compare filtered collective chunk io info - * entries's original owner fields + * Purpose: Routine to compare two H5D_chunk_redistribute_info_t + * structures * - * Description: Callback for qsort() to compare filtered collective chunk - * io info entries's original owner fields + * Description: Callback for qsort() to compare two + * H5D_chunk_redistribute_info_t structures + * + * Return: -1, 0, 1 + * + *------------------------------------------------------------------------- + */ +static int +H5D__cmp_chunk_redistribute_info(const void *_entry1, const void *_entry2) +{ + const H5D_chunk_redistribute_info_t *entry1; + const H5D_chunk_redistribute_info_t *entry2; + hsize_t chunk_index1; + hsize_t chunk_index2; + int ret_value; + + FUNC_ENTER_STATIC_NOERR + + entry1 = (const H5D_chunk_redistribute_info_t *)_entry1; + entry2 = (const H5D_chunk_redistribute_info_t *)_entry2; + + chunk_index1 = entry1->chunk_idx; + chunk_index2 = entry2->chunk_idx; + + if (chunk_index1 == chunk_index2) { + int orig_owner1 = entry1->orig_owner; + int orig_owner2 = entry2->orig_owner; + + ret_value = (orig_owner1 > orig_owner2) - (orig_owner1 < orig_owner2); + } + else + ret_value = (chunk_index1 > chunk_index2) - (chunk_index1 < chunk_index2); + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__cmp_chunk_redistribute_info() */ + +/*------------------------------------------------------------------------- + * Function: H5D__cmp_chunk_redistribute_info_orig_owner * - * Return: The difference between the two - * H5D_filtered_collective_io_info_t's original owner fields + * Purpose: Routine to compare the original owning MPI rank for two + * H5D_chunk_redistribute_info_t structures * - * Programmer: Jordan Henderson - * Monday, Apr. 10th, 2017 + * Description: Callback for qsort() to compare the original owning MPI + * rank for two H5D_chunk_redistribute_info_t + * structures + * + * Return: -1, 0, 1 * *------------------------------------------------------------------------- */ static int -H5D__cmp_filtered_collective_io_info_entry_owner(const void *filtered_collective_io_info_entry1, - const void *filtered_collective_io_info_entry2) +H5D__cmp_chunk_redistribute_info_orig_owner(const void *_entry1, const void *_entry2) { - int owner1 = -1, owner2 = -1; + const H5D_chunk_redistribute_info_t *entry1; + const H5D_chunk_redistribute_info_t *entry2; + int owner1 = -1; + int owner2 = -1; + int ret_value; FUNC_ENTER_STATIC_NOERR - owner1 = ((const H5D_filtered_collective_io_info_t *)filtered_collective_io_info_entry1) - ->owners.original_owner; - owner2 = ((const H5D_filtered_collective_io_info_t *)filtered_collective_io_info_entry2) - ->owners.original_owner; + entry1 = (const H5D_chunk_redistribute_info_t *)_entry1; + entry2 = (const H5D_chunk_redistribute_info_t *)_entry2; - FUNC_LEAVE_NOAPI(owner1 - owner2) -} /* end H5D__cmp_filtered_collective_io_info_entry_owner() */ -#endif + owner1 = entry1->orig_owner; + owner2 = entry2->orig_owner; + + if (owner1 == owner2) { + haddr_t addr1 = entry1->chunk_block.offset; + haddr_t addr2 = entry2->chunk_block.offset; + + /* + * If both chunk addresses are defined, H5F_addr_cmp is safe to use. + * Otherwise, if both addresses aren't defined, compared chunk + * entries based on their chunk index. Finally, if only one chunk + * address is defined, return the appropriate value based on which + * is defined. + */ + if (H5F_addr_defined(addr1) && H5F_addr_defined(addr2)) { + ret_value = H5F_addr_cmp(addr1, addr2); + } + else if (!H5F_addr_defined(addr1) && !H5F_addr_defined(addr2)) { + hsize_t chunk_idx1 = entry1->chunk_idx; + hsize_t chunk_idx2 = entry2->chunk_idx; + + ret_value = (chunk_idx1 > chunk_idx2) - (chunk_idx1 < chunk_idx2); + } + else + ret_value = H5F_addr_defined(addr1) ? 1 : -1; + } + else + ret_value = (owner1 > owner2) - (owner1 < owner2); + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__cmp_chunk_redistribute_info_orig_owner() */ /*------------------------------------------------------------------------- * Function: H5D__sort_chunk @@ -2304,26 +2740,24 @@ H5D__cmp_filtered_collective_io_info_entry_owner(const void *filtered_collective */ static herr_t H5D__sort_chunk(H5D_io_info_t *io_info, const H5D_chunk_map_t *fm, - H5D_chunk_addr_info_t chunk_addr_info_array[], int sum_chunk) + H5D_chunk_addr_info_t chunk_addr_info_array[], int sum_chunk, int mpi_rank, int mpi_size) { - H5SL_node_t * chunk_node; /* Current node in chunk skip list */ - H5D_chunk_info_t *chunk_info; /* Current chunking info. of this node. */ - haddr_t chunk_addr; /* Current chunking address of this node */ - haddr_t *total_chunk_addr_array = NULL; /* The array of chunk address for the total number of chunk */ - hbool_t do_sort = FALSE; /* Whether the addresses need to be sorted */ - int bsearch_coll_chunk_threshold; - int many_chunk_opt = H5D_OBTAIN_ONE_CHUNK_ADDR_IND; - int mpi_size; /* Number of MPI processes */ - int mpi_code; /* MPI return code */ - int i; /* Local index variable */ - herr_t ret_value = SUCCEED; /* Return value */ + H5SL_node_t * chunk_node; /* Current node in chunk skip list */ + H5D_chunk_info_t *chunk_info; /* Current chunking info. of this node. */ + haddr_t chunk_addr; /* Current chunking address of this node */ + haddr_t *total_chunk_addr_array = NULL; /* The array of chunk address for the total number of chunk */ + H5P_coll_md_read_flag_t md_reads_file_flag; + hbool_t md_reads_context_flag; + hbool_t restore_md_reads_state = FALSE; + hbool_t do_sort = FALSE; /* Whether the addresses need to be sorted */ + int bsearch_coll_chunk_threshold; + int many_chunk_opt = H5D_OBTAIN_ONE_CHUNK_ADDR_IND; + int mpi_code; /* MPI return code */ + int i; /* Local index variable */ + herr_t ret_value = SUCCEED; /* Return value */ FUNC_ENTER_STATIC - /* Retrieve # of MPI processes */ - if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0) - HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size") - /* Calculate the actual threshold to obtain all chunk addresses collectively * The bigger this number is, the more possible the use of obtaining chunk * address collectively. @@ -2337,31 +2771,56 @@ H5D__sort_chunk(H5D_io_info_t *io_info, const H5D_chunk_map_t *fm, ((sum_chunk / mpi_size) >= H5D_ALL_CHUNK_ADDR_THRES_COL_NUM)) many_chunk_opt = H5D_OBTAIN_ALL_CHUNK_ADDR_COL; -#ifdef H5D_DEBUG - if (H5DEBUG(D)) - HDfprintf(H5DEBUG(D), "many_chunk_opt= %d\n", many_chunk_opt); +#ifdef H5Dmpio_DEBUG + H5D_MPIO_DEBUG_VA(mpi_rank, "many_chunk_opt = %d", many_chunk_opt); #endif /* If we need to optimize the way to obtain the chunk address */ if (many_chunk_opt != H5D_OBTAIN_ONE_CHUNK_ADDR_IND) { - int mpi_rank; - -#ifdef H5D_DEBUG - if (H5DEBUG(D)) - HDfprintf(H5DEBUG(D), "Coming inside H5D_OBTAIN_ALL_CHUNK_ADDR_COL\n"); +#ifdef H5Dmpio_DEBUG + H5D_MPIO_DEBUG(mpi_rank, "Coming inside H5D_OBTAIN_ALL_CHUNK_ADDR_COL"); #endif /* Allocate array for chunk addresses */ if (NULL == (total_chunk_addr_array = (haddr_t *)H5MM_malloc(sizeof(haddr_t) * (size_t)fm->layout->u.chunk.nchunks))) HGOTO_ERROR(H5E_RESOURCE, H5E_NOSPACE, FAIL, "unable to allocate memory chunk address array") - /* Retrieve all the chunk addresses with process 0 */ - if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0) - HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank") - if (mpi_rank == 0) { - if (H5D__chunk_addrmap(io_info, total_chunk_addr_array) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get chunk address") + herr_t result; + + /* + * If enabled, disable collective metadata reads here. + * Since the chunk address mapping is done on rank 0 + * only here, it will cause problems if collective + * metadata reads are enabled. + */ + if (H5F_get_coll_metadata_reads(io_info->dset->oloc.file)) { + md_reads_file_flag = H5P_FORCE_FALSE; + md_reads_context_flag = FALSE; + H5F_set_coll_metadata_reads(io_info->dset->oloc.file, &md_reads_file_flag, + &md_reads_context_flag); + restore_md_reads_state = TRUE; + } + + result = H5D__chunk_addrmap(io_info, total_chunk_addr_array); + + /* Ensure that we restore the old collective metadata reads state */ + if (restore_md_reads_state) { + H5F_set_coll_metadata_reads(io_info->dset->oloc.file, &md_reads_file_flag, + &md_reads_context_flag); + restore_md_reads_state = FALSE; + } + + if (result < 0) { + size_t u; + + /* Clear total chunk address array */ + for (u = 0; u < (size_t)fm->layout->u.chunk.nchunks; u++) + total_chunk_addr_array[u] = HADDR_UNDEF; + + /* Push error, but still participate in following MPI_Bcast */ + HDONE_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get chunk address") + } } /* end if */ /* Broadcasting the MPI_IO option info. and chunk address info. */ @@ -2405,10 +2864,10 @@ H5D__sort_chunk(H5D_io_info_t *io_info, const H5D_chunk_map_t *fm, chunk_node = H5SL_next(chunk_node); } /* end while */ -#ifdef H5D_DEBUG - if (H5DEBUG(D)) - HDfprintf(H5DEBUG(D), "before Qsort\n"); +#ifdef H5Dmpio_DEBUG + H5D_MPIO_DEBUG(mpi_rank, "before Qsort"); #endif + if (do_sort) { size_t num_chunks = H5SL_count(fm->sel_chunks); @@ -2416,6 +2875,10 @@ H5D__sort_chunk(H5D_io_info_t *io_info, const H5D_chunk_map_t *fm, } /* end if */ done: + /* Re-enable collective metadata reads if we disabled them */ + if (restore_md_reads_state) + H5F_set_coll_metadata_reads(io_info->dset->oloc.file, &md_reads_file_flag, &md_reads_context_flag); + if (total_chunk_addr_array) H5MM_xfree(total_chunk_addr_array); @@ -2432,7 +2895,7 @@ done: * * 1) Each process provides two piece of information for all chunks having selection * a) chunk index - * b) wheather this chunk is regular(for MPI derived datatype not working case) + * b) whether this chunk is regular(for MPI derived datatype not working case) * * 2) Gather all the information to the root process * @@ -2461,22 +2924,24 @@ done: */ static herr_t H5D__obtain_mpio_mode(H5D_io_info_t *io_info, H5D_chunk_map_t *fm, uint8_t assign_io_mode[], - haddr_t chunk_addr[]) + haddr_t chunk_addr[], int mpi_rank, int mpi_size) { - size_t total_chunks; - unsigned percent_nproc_per_chunk, threshold_nproc_per_chunk; - uint8_t * io_mode_info = NULL; - uint8_t * recv_io_mode_info = NULL; - uint8_t * mergebuf = NULL; - uint8_t * tempbuf; - H5SL_node_t * chunk_node; - H5D_chunk_info_t *chunk_info; - int mpi_size, mpi_rank; - MPI_Comm comm; - int root; - size_t ic; - int mpi_code; - herr_t ret_value = SUCCEED; + size_t total_chunks; + unsigned percent_nproc_per_chunk, threshold_nproc_per_chunk; + uint8_t * io_mode_info = NULL; + uint8_t * recv_io_mode_info = NULL; + uint8_t * mergebuf = NULL; + uint8_t * tempbuf; + H5SL_node_t * chunk_node; + H5D_chunk_info_t * chunk_info; + H5P_coll_md_read_flag_t md_reads_file_flag; + hbool_t md_reads_context_flag; + hbool_t restore_md_reads_state = FALSE; + MPI_Comm comm; + int root; + size_t ic; + int mpi_code; + herr_t ret_value = SUCCEED; FUNC_ENTER_STATIC @@ -2484,12 +2949,6 @@ H5D__obtain_mpio_mode(H5D_io_info_t *io_info, H5D_chunk_map_t *fm, uint8_t assig root = 0; comm = io_info->comm; - /* Obtain the number of process and the current rank of the process */ - if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0) - HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank") - if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0) - HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size") - /* Setup parameters */ H5_CHECKED_ASSIGN(total_chunks, size_t, fm->layout->u.chunk.nchunks, hsize_t); if (H5CX_get_mpio_chunk_opt_ratio(&percent_nproc_per_chunk) < 0) @@ -2536,6 +2995,20 @@ H5D__obtain_mpio_mode(H5D_io_info_t *io_info, H5D_chunk_map_t *fm, uint8_t assig size_t nproc; unsigned *nproc_per_chunk; + /* + * If enabled, disable collective metadata reads here. + * Since the chunk address mapping is done on rank 0 + * only here, it will cause problems if collective + * metadata reads are enabled. + */ + if (H5F_get_coll_metadata_reads(io_info->dset->oloc.file)) { + md_reads_file_flag = H5P_FORCE_FALSE; + md_reads_context_flag = FALSE; + H5F_set_coll_metadata_reads(io_info->dset->oloc.file, &md_reads_file_flag, + &md_reads_context_flag); + restore_md_reads_state = TRUE; + } + /* pre-computing: calculate number of processes and regularity of the selection occupied in each chunk */ if (NULL == (nproc_per_chunk = (unsigned *)H5MM_calloc(total_chunks * sizeof(unsigned)))) @@ -2602,6 +3075,10 @@ H5D__obtain_mpio_mode(H5D_io_info_t *io_info, H5D_chunk_map_t *fm, uint8_t assig #endif done: + /* Re-enable collective metadata reads if we disabled them */ + if (restore_md_reads_state) + H5F_set_coll_metadata_reads(io_info->dset->oloc.file, &md_reads_file_flag, &md_reads_context_flag); + if (io_mode_info) H5MM_free(io_mode_info); if (mergebuf) @@ -2615,34 +3092,32 @@ done: } /* end H5D__obtain_mpio_mode() */ /*------------------------------------------------------------------------- - * Function: H5D__construct_filtered_io_info_list + * Function: H5D__mpio_collective_filtered_chunk_io_setup * * Purpose: Constructs a list of entries which contain the necessary * information for inter-process communication when performing * collective io on filtered chunks. This list is used by - * each process when performing I/O on locally selected chunks - * and also in operations that must be collectively done - * on every chunk, such as chunk re-allocation, insertion of - * chunks into the chunk index, etc. + * each MPI rank when performing I/O on locally selected + * chunks and also in operations that must be collectively + * done on every chunk, such as chunk re-allocation, insertion + * of chunks into the chunk index, etc. * * Return: Non-negative on success/Negative on failure * - * Programmer: Jordan Henderson - * Tuesday, January 10th, 2017 - * *------------------------------------------------------------------------- */ static herr_t -H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_type_info_t *type_info, - const H5D_chunk_map_t * fm, - H5D_filtered_collective_io_info_t **chunk_list, size_t *num_entries) +H5D__mpio_collective_filtered_chunk_io_setup(const H5D_io_info_t *io_info, const H5D_type_info_t *type_info, + const H5D_chunk_map_t * fm, + H5D_filtered_collective_io_info_t **chunk_list, + size_t *num_entries, int mpi_rank) { - H5D_filtered_collective_io_info_t *local_info_array = - NULL; /* The list of initially selected chunks for this process */ - size_t num_chunks_selected; - size_t i; - int mpi_rank; - herr_t ret_value = SUCCEED; + H5D_filtered_collective_io_info_t *local_info_array = NULL; + H5D_chunk_ud_t udata; + hbool_t filter_partial_edge_chunks; + size_t num_chunks_selected; + size_t i; + herr_t ret_value = SUCCEED; FUNC_ENTER_STATIC @@ -2652,19 +3127,23 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ HDassert(chunk_list); HDassert(num_entries); - if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0) - HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank") +#ifdef H5Dmpio_DEBUG + H5D_MPIO_TRACE_ENTER(mpi_rank); + H5D_MPIO_TIME_START(mpi_rank, "Filtered Collective I/O Setup"); +#endif - /* Each process builds a local list of the chunks they have selected */ + /* Each rank builds a local list of the chunks they have selected */ if ((num_chunks_selected = H5SL_count(fm->sel_chunks))) { H5D_chunk_info_t *chunk_info; - H5D_chunk_ud_t udata; H5SL_node_t * chunk_node; hsize_t select_npoints; - hssize_t chunk_npoints; + hbool_t need_sort = FALSE; - if (NULL == (local_info_array = (H5D_filtered_collective_io_info_t *)H5MM_malloc( - num_chunks_selected * sizeof(H5D_filtered_collective_io_info_t)))) + /* Determine whether partial edge chunks should be filtered */ + filter_partial_edge_chunks = !(io_info->dset->shared->layout.u.chunk.flags & + H5O_LAYOUT_CHUNK_DONT_FILTER_PARTIAL_BOUND_CHUNKS); + + if (NULL == (local_info_array = H5MM_malloc(num_chunks_selected * sizeof(*local_info_array)))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate local io info array buffer") chunk_node = H5SL_first(fm->sel_chunks); @@ -2675,275 +3154,787 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ if (H5D__chunk_lookup(io_info->dset, chunk_info->scaled, &udata) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "error looking up chunk address") - local_info_array[i].index = chunk_info->index; - local_info_array[i].chunk_states.chunk_current = local_info_array[i].chunk_states.new_chunk = - udata.chunk_block; - local_info_array[i].num_writers = 0; - local_info_array[i].owners.original_owner = local_info_array[i].owners.new_owner = mpi_rank; - local_info_array[i].buf = NULL; - - local_info_array[i].async_info.num_receive_requests = 0; - local_info_array[i].async_info.receive_buffer_array = NULL; - local_info_array[i].async_info.receive_requests_array = NULL; - - H5MM_memcpy(local_info_array[i].scaled, chunk_info->scaled, sizeof(chunk_info->scaled)); - - select_npoints = H5S_GET_SELECT_NPOINTS(chunk_info->mspace); - local_info_array[i].io_size = (size_t)select_npoints * type_info->src_type_size; - - /* Currently the full overwrite status of a chunk is only obtained on a per-process - * basis. This means that if the total selection in the chunk, as determined by the combination - * of selections of all of the processes interested in the chunk, covers the entire chunk, - * the performance optimization of not reading the chunk from the file is still valid, but - * is not applied in the current implementation. Something like an appropriately placed - * MPI_Allreduce or a running total of the number of chunk points selected during chunk - * redistribution should suffice for implementing this case - JTH. + /* Initialize rank-local chunk info */ + local_info_array[i].chunk_info = chunk_info; + local_info_array[i].chunk_buf_size = 0; + local_info_array[i].num_writers = 0; + local_info_array[i].orig_owner = mpi_rank; + local_info_array[i].new_owner = mpi_rank; + local_info_array[i].buf = NULL; + + select_npoints = H5S_GET_SELECT_NPOINTS(chunk_info->fspace); + local_info_array[i].io_size = (size_t)select_npoints * type_info->dst_type_size; + + /* + * Determine whether this chunk will need to be read from the file. If this is + * a read operation, the chunk will be read. If this is a write operation, we + * generally need to read a filtered chunk from the file before modifying it, + * unless the chunk is being fully overwritten. + * + * TODO: Currently the full overwrite status of a chunk is only obtained on a + * per-rank basis. This means that if the total selection in the chunk, as + * determined by the combination of selections of all of the ranks interested in + * the chunk, covers the entire chunk, the performance optimization of not reading + * the chunk from the file is still valid, but is not applied in the current + * implementation. + * + * To implement this case, a few approaches were considered: + * + * - Keep a running total (distributed to each rank) of the number of chunk + * elements selected during chunk redistribution and compare that to the total + * number of elements in the chunk once redistribution is finished + * + * - Process all incoming chunk messages before doing I/O (these are currently + * processed AFTER doing I/O), combine the owning rank's selection in a chunk + * with the selections received from other ranks and check to see whether that + * combined selection covers the entire chunk + * + * The first approach will be dangerous if the application performs an overlapping + * write to a chunk, as the number of selected elements can equal or exceed the + * number of elements in the chunk without the whole chunk selection being covered. + * While it might be considered erroneous for an application to do an overlapping + * write, we don't explicitly disallow it. + * + * The second approach contains a bit of complexity in that part of the chunk + * messages will be needed before doing I/O and part will be needed after doing I/O. + * Since modification data from chunk messages can't be applied until after any I/O + * is performed (otherwise, we'll overwrite any applied modification data), chunk + * messages are currently entirely processed after I/O. However, in order to determine + * if a chunk is being fully overwritten, we need the dataspace portion of the chunk + * messages before doing I/O. The naive way to do this is to process chunk messages + * twice, using just the relevant information from the message before and after I/O. + * The better way would be to avoid processing chunk messages twice by extracting (and + * keeping around) the dataspace portion of the message before I/O and processing the + * rest of the chunk message after I/O. Note that the dataspace portion of each chunk + * message is used to correctly apply chunk modification data from the message, so + * must be kept around both before and after I/O in this case. + */ + if (io_info->op_type == H5D_IO_OP_READ) + local_info_array[i].need_read = TRUE; + else { + local_info_array[i].need_read = + local_info_array[i].io_size < (size_t)io_info->dset->shared->layout.u.chunk.size; + } + + local_info_array[i].skip_filter_pline = FALSE; + if (!filter_partial_edge_chunks) { + /* + * If this is a partial edge chunk and the "don't filter partial edge + * chunks" flag is set, make sure not to apply filters to the chunk. + */ + if (H5D__chunk_is_partial_edge_chunk(io_info->dset->shared->ndims, + io_info->dset->shared->layout.u.chunk.dim, + chunk_info->scaled, io_info->dset->shared->curr_dims)) + local_info_array[i].skip_filter_pline = TRUE; + } + + /* Initialize the chunk's shared info */ + local_info_array[i].chunk_current = udata.chunk_block; + local_info_array[i].chunk_new = udata.chunk_block; + + /* + * Check if the list is not in ascending order of offset in the file + * or has unallocated chunks. In either case, the list should get + * sorted. + */ + if (i) { + haddr_t curr_chunk_offset = local_info_array[i].chunk_current.offset; + haddr_t prev_chunk_offset = local_info_array[i - 1].chunk_current.offset; + + if (!H5F_addr_defined(prev_chunk_offset) || !H5F_addr_defined(curr_chunk_offset) || + (curr_chunk_offset < prev_chunk_offset)) + need_sort = TRUE; + } + + /* + * Extensible arrays may calculate a chunk's index a little differently + * than normal when the dataset's unlimited dimension is not the + * slowest-changing dimension, so set the index here based on what the + * extensible array code calculated instead of what was calculated + * in the chunk file mapping. */ - if ((chunk_npoints = H5S_GET_EXTENT_NPOINTS(chunk_info->fspace)) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") - local_info_array[i].full_overwrite = - (local_info_array[i].io_size >= (hsize_t)chunk_npoints * type_info->dst_type_size) ? TRUE - : FALSE; + if (io_info->dset->shared->layout.u.chunk.idx_type == H5D_CHUNK_IDX_EARRAY) + local_info_array[i].index_info.chunk_idx = udata.chunk_idx; + else + local_info_array[i].index_info.chunk_idx = chunk_info->index; + + local_info_array[i].index_info.filter_mask = udata.filter_mask; + local_info_array[i].index_info.need_insert = FALSE; chunk_node = H5SL_next(chunk_node); - } /* end for */ - } /* end if */ - - /* Redistribute shared chunks to new owners as necessary */ - if (io_info->op_type == H5D_IO_OP_WRITE) -#if MPI_VERSION >= 3 - if (H5D__chunk_redistribute_shared_chunks(io_info, type_info, fm, local_info_array, - &num_chunks_selected) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "unable to redistribute shared chunks") -#else - HGOTO_ERROR( - H5E_DATASET, H5E_WRITEERROR, FAIL, - "unable to redistribute shared chunks - MPI version < 3 (MPI_Mprobe and MPI_Imrecv missing)") + } + + /* Ensure the chunk list is sorted in ascending order of offset in the file */ + if (need_sort) + HDqsort(local_info_array, num_chunks_selected, sizeof(H5D_filtered_collective_io_info_t), + H5D__cmp_filtered_collective_io_info_entry); + +#ifdef H5Dmpio_DEBUG + H5D__mpio_dump_collective_filtered_chunk_list(local_info_array, num_chunks_selected, mpi_rank); #endif + } + else if (H5F_get_coll_metadata_reads(io_info->dset->oloc.file)) { + hsize_t scaled[H5O_LAYOUT_NDIMS] = {0}; + + /* + * If this rank has no selection in the dataset and collective + * metadata reads are enabled, do a fake lookup of a chunk to + * ensure that this rank has the chunk index opened. Otherwise, + * only the ranks that had a selection will have opened the + * chunk index and they will have done so independently. Therefore, + * when ranks with no selection participate in later collective + * metadata reads, they will try to open the chunk index collectively + * and issues will occur since other ranks won't participate. + * + * In the future, we should consider having a chunk index "open" + * callback that can be used to ensure collectivity between ranks + * in a more natural way, but this hack should suffice for now. + */ + if (H5D__chunk_lookup(io_info->dset, scaled, &udata) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "error looking up chunk address") + } *chunk_list = local_info_array; *num_entries = num_chunks_selected; done: - FUNC_LEAVE_NOAPI(ret_value) -} /* end H5D__construct_filtered_io_info_list() */ +#ifdef H5Dmpio_DEBUG + H5D_MPIO_TIME_STOP(mpi_rank); + H5D_MPIO_TRACE_EXIT(mpi_rank); +#endif -#if MPI_VERSION >= 3 + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__mpio_collective_filtered_chunk_io_setup() */ /*------------------------------------------------------------------------- - * Function: H5D__chunk_redistribute_shared_chunks - * - * Purpose: When performing a collective write on a Dataset with - * filters applied, this function is used to redistribute any - * chunks which are selected by more than one process, so as - * to preserve file integrity after the write by ensuring - * that any shared chunks are only modified by one process. - * - * The current implementation follows this 3-phase process: - * - * - Collect everyone's list of chunks into one large list, - * sort the list in increasing order of chunk offset in the - * file and hand the list off to rank 0 - * - * - Rank 0 scans the list looking for matching runs of chunk - * offset in the file (corresponding to a shared chunk which - * has been selected by more than one rank in the I/O - * operation) and for each shared chunk, it redistributes - * the chunk to the process writing to the chunk which - * currently has the least amount of chunks assigned to it - * by modifying the "new_owner" field in each of the list - * entries corresponding to that chunk - * - * - After the chunks have been redistributed, rank 0 re-sorts - * the list in order of previous owner so that each rank - * will get back exactly the array that they contributed to - * the redistribution operation, with the "new_owner" field - * of each chunk they are modifying having possibly been - * modified. Rank 0 then scatters each segment of the list - * back to its corresponding rank + * Function: H5D__mpio_redistribute_shared_chunks + * + * Purpose: When performing a parallel write on a chunked Dataset with + * filters applied, we must ensure that any particular chunk + * is only written to by a single MPI rank in order to avoid + * potential data races on the chunk. This function is used to + * redistribute (by assigning ownership to a single rank) any + * chunks which are selected by more than one MPI rank. + * + * An initial Allgather is performed to determine how many + * chunks each rank has selected in the write operation and + * then that number is compared against a threshold value to + * determine whether chunk redistribution should be done on + * MPI rank 0 only, or on all MPI ranks. * * Return: Non-negative on success/Negative on failure * - * Programmer: Jordan Henderson - * Monday, May 1, 2017 + *------------------------------------------------------------------------- + */ +static herr_t +H5D__mpio_redistribute_shared_chunks(H5D_filtered_collective_io_info_t *chunk_list, + size_t chunk_list_num_entries, const H5D_io_info_t *io_info, + const H5D_chunk_map_t *fm, int mpi_rank, int mpi_size, + size_t **rank_chunks_assigned_map) +{ + hbool_t redistribute_on_all_ranks; + size_t *num_chunks_map = NULL; + size_t coll_chunk_list_size = 0; + size_t i; + int mpi_code; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + HDassert(chunk_list || 0 == chunk_list_num_entries); + HDassert(io_info); + HDassert(fm); + HDassert(mpi_size > 1); /* No chunk sharing is possible for MPI Comm size of 1 */ + +#ifdef H5Dmpio_DEBUG + H5D_MPIO_TRACE_ENTER(mpi_rank); + H5D_MPIO_TIME_START(mpi_rank, "Redistribute shared chunks"); +#endif + + /* + * Allocate an array for each rank to keep track of the number of + * chunks assigned to any other rank in order to cut down on future + * MPI communication. + */ + if (NULL == (num_chunks_map = H5MM_malloc((size_t)mpi_size * sizeof(*num_chunks_map)))) + HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "couldn't allocate assigned chunks array") + + /* Perform initial Allgather to determine the collective chunk list size */ + if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&chunk_list_num_entries, 1, H5_SIZE_T_AS_MPI_TYPE, + num_chunks_map, 1, H5_SIZE_T_AS_MPI_TYPE, io_info->comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code) + + for (i = 0; i < (size_t)mpi_size; i++) + coll_chunk_list_size += num_chunks_map[i]; + + /* + * Determine whether we should perform chunk redistribution on all + * ranks or just rank 0. For a relatively small number of chunks, + * we redistribute on all ranks to cut down on MPI communication + * overhead. For a larger number of chunks, we redistribute on + * rank 0 only to cut down on memory usage. + */ + redistribute_on_all_ranks = coll_chunk_list_size < H5D_CHUNK_REDISTRIBUTE_THRES; + + if (H5D__mpio_redistribute_shared_chunks_int(chunk_list, num_chunks_map, redistribute_on_all_ranks, + io_info, fm, mpi_rank, mpi_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTREDISTRIBUTE, FAIL, "can't redistribute shared chunks") + + /* + * If the caller provided a pointer for the mapping from + * rank value -> number of chunks assigned, return that + * mapping here. + */ + if (rank_chunks_assigned_map) { + /* + * If we performed chunk redistribution on rank 0 only, distribute + * the rank value -> number of chunks assigned mapping back to all + * ranks. + */ + if (!redistribute_on_all_ranks) { + if (MPI_SUCCESS != + (mpi_code = MPI_Bcast(num_chunks_map, mpi_size, H5_SIZE_T_AS_MPI_TYPE, 0, io_info->comm))) + HMPI_GOTO_ERROR(FAIL, "couldn't broadcast chunk mapping to other ranks", mpi_code) + } + + *rank_chunks_assigned_map = num_chunks_map; + } + +done: + if (!rank_chunks_assigned_map || (ret_value < 0)) { + num_chunks_map = H5MM_xfree(num_chunks_map); + } + +#ifdef H5Dmpio_DEBUG + H5D_MPIO_TIME_STOP(mpi_rank); + H5D_MPIO_TRACE_EXIT(mpi_rank); +#endif + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__mpio_redistribute_shared_chunks() */ + +/*------------------------------------------------------------------------- + * Function: H5D__mpio_redistribute_shared_chunks_int + * + * Purpose: Routine to perform redistribution of shared chunks during + * parallel writes to datasets with filters applied. + * + * If `all_ranks_involved` is TRUE, chunk redistribution + * occurs on all MPI ranks. This is usually done when there + * is a relatively small number of chunks involved in order to + * cut down on MPI communication overhead while increasing + * total memory usage a bit. + * + * If `all_ranks_involved` is FALSE, only rank 0 will perform + * chunk redistribution. This is usually done when there is + * a relatively large number of chunks involved in order to + * cut down on total memory usage at the cost of increased + * overhead from MPI communication. + * + * This implementation is as follows: + * + * - All MPI ranks send their list of selected chunks to the + * ranks involved in chunk redistribution. Then, the + * involved ranks sort this new list in order of chunk + * index. + * + * - The involved ranks scan the list looking for matching + * runs of chunk index values (corresponding to a shared + * chunk which has been selected by more than one rank in + * the I/O operation) and for each shared chunk, + * redistribute the chunk to the MPI rank writing to the + * chunk which currently has the least amount of chunks + * assigned to it. This is done by modifying the "new_owner" + * field in each of the list entries corresponding to that + * chunk. The involved ranks then re-sort the list in order + * of original chunk owner so that each rank's section of + * contributed chunks is contiguous in the collective chunk + * list. + * + * - If chunk redistribution occurred on all ranks, each rank + * scans through the collective chunk list to find their + * contributed section of chunks and uses that to update + * their local chunk list with the newly-updated "new_owner" + * and "num_writers" fields. If chunk redistribution + * occurred only on rank 0, an MPI_Scatterv operation will + * be used to scatter the segments of the collective chunk + * list from rank 0 back to the corresponding ranks. + * + * Return: Non-negative on success/Negative on failure * *------------------------------------------------------------------------- */ static herr_t -H5D__chunk_redistribute_shared_chunks(const H5D_io_info_t *io_info, const H5D_type_info_t *type_info, - const H5D_chunk_map_t * fm, - H5D_filtered_collective_io_info_t *local_chunk_array, - size_t * local_chunk_array_num_entries) +H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chunk_list, + size_t *num_chunks_assigned_map, hbool_t all_ranks_involved, + const H5D_io_info_t *io_info, const H5D_chunk_map_t *fm, + int mpi_rank, int mpi_size) { - H5D_filtered_collective_io_info_t *shared_chunks_info_array = - NULL; /* The list of all chunks selected in the operation by all processes */ - H5S_sel_iter_t *mem_iter = NULL; /* Memory iterator for H5D__gather_mem */ - unsigned char **mod_data = - NULL; /* Array of chunk modification data buffers sent by a process to new chunk owners */ - MPI_Request *send_requests = NULL; /* Array of MPI_Isend chunk modification data send requests */ - MPI_Status * send_statuses = NULL; /* Array of MPI_Isend chunk modification send statuses */ - hbool_t mem_iter_init = FALSE; - size_t shared_chunks_info_array_num_entries = 0; - size_t num_send_requests = 0; - size_t * num_assigned_chunks_array = NULL; - size_t i, last_assigned_idx; - int * send_counts = NULL; - int * send_displacements = NULL; - int scatter_recvcount_int; - int mpi_rank, mpi_size, mpi_code; + MPI_Datatype struct_type; + MPI_Datatype packed_type; + hbool_t struct_type_derived = FALSE; + hbool_t packed_type_derived = FALSE; + size_t i; + size_t coll_chunk_list_num_entries = 0; + void * coll_chunk_list = NULL; + int * counts_disps_array = NULL; + int * counts_ptr = NULL; + int * displacements_ptr = NULL; + int num_chunks_int; + int mpi_code; herr_t ret_value = SUCCEED; FUNC_ENTER_STATIC + HDassert(num_chunks_assigned_map); + HDassert(chunk_list || 0 == num_chunks_assigned_map[mpi_rank]); HDassert(io_info); - HDassert(type_info); HDassert(fm); - HDassert(local_chunk_array_num_entries); + HDassert(mpi_size > 1); - if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0) - HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank") - if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0) - HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size") +#ifdef H5Dmpio_DEBUG + H5D_MPIO_TRACE_ENTER(mpi_rank); + H5D_MPIO_TIME_START(mpi_rank, "Redistribute shared chunks (internal)"); +#endif - /* Set to latest format for encoding dataspace */ - H5CX_set_libver_bounds(NULL); + /* + * Make sure it's safe to cast this rank's number + * of chunks to be sent into an int for MPI + */ + H5_CHECKED_ASSIGN(num_chunks_int, int, num_chunks_assigned_map[mpi_rank], size_t); - if (*local_chunk_array_num_entries) - if (NULL == (send_requests = - (MPI_Request *)H5MM_malloc(*local_chunk_array_num_entries * sizeof(MPI_Request)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send requests buffer") + /* + * Phase 1 - Participate in collective gathering of every rank's + * list of chunks to the ranks which are performing the redistribution + * operation. + */ - if (NULL == (mem_iter = (H5S_sel_iter_t *)H5MM_malloc(sizeof(H5S_sel_iter_t)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator") + if (all_ranks_involved || (mpi_rank == 0)) { + /* + * Allocate array to store the receive counts of each rank, as well as + * the displacements into the final array where each rank will place + * their data. The first half of the array contains the receive counts + * (in rank order), while the latter half contains the displacements + * (also in rank order). + */ + if (NULL == (counts_disps_array = H5MM_malloc(2 * (size_t)mpi_size * sizeof(*counts_disps_array)))) { + /* Push an error, but still participate in collective gather operation */ + HDONE_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, + "couldn't allocate receive counts and displacements array") + } + else { + /* Set the receive counts from the assigned chunks map */ + counts_ptr = counts_disps_array; + + for (i = 0; i < (size_t)mpi_size; i++) + H5_CHECKED_ASSIGN(counts_ptr[i], int, num_chunks_assigned_map[i], size_t); + + /* Set the displacements into the receive buffer for the gather operation */ + displacements_ptr = &counts_disps_array[mpi_size]; + + *displacements_ptr = 0; + for (i = 1; i < (size_t)mpi_size; i++) + displacements_ptr[i] = displacements_ptr[i - 1] + counts_ptr[i - 1]; + } + } - /* Gather every rank's list of chunks to rank 0 to allow it to perform the redistribution operation. After - * this call, the gathered list will initially be sorted in increasing order of chunk offset in the file. + /* + * Construct MPI derived types for extracting information + * necessary for MPI communication */ - if (H5D__mpio_array_gatherv(local_chunk_array, *local_chunk_array_num_entries, - sizeof(H5D_filtered_collective_io_info_t), (void **)&shared_chunks_info_array, - &shared_chunks_info_array_num_entries, false, 0, io_info->comm, - H5D__cmp_filtered_collective_io_info_entry) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather array") + if (H5D__mpio_get_chunk_redistribute_info_types(&packed_type, &packed_type_derived, &struct_type, + &struct_type_derived) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, + "can't create derived datatypes for chunk redistribution info") + + /* Perform gather operation */ + if (H5_mpio_gatherv_alloc(chunk_list, num_chunks_int, struct_type, counts_ptr, displacements_ptr, + packed_type, all_ranks_involved, 0, io_info->comm, mpi_rank, mpi_size, + &coll_chunk_list, &coll_chunk_list_num_entries) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, + "can't gather chunk redistribution info to involved ranks") - /* Rank 0 redistributes any shared chunks to new owners as necessary */ - if (mpi_rank == 0) { - if (NULL == (send_counts = (int *)H5MM_calloc((size_t)mpi_size * sizeof(int)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate send counts buffer") + /* + * If all ranks are redistributing shared chunks, we no + * longer need the receive counts and displacements array + */ + if (all_ranks_involved) { + counts_disps_array = H5MM_xfree(counts_disps_array); + } - if (NULL == (send_displacements = (int *)H5MM_malloc((size_t)mpi_size * sizeof(int)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate send displacements buffer") + /* + * Phase 2 - Involved ranks now redistribute any shared chunks to new + * owners as necessary. + */ - if (NULL == (num_assigned_chunks_array = (size_t *)H5MM_calloc((size_t)mpi_size * sizeof(size_t)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, - "unable to allocate number of assigned chunks array") + if (all_ranks_involved || (mpi_rank == 0)) { + H5D_chunk_redistribute_info_t *chunk_entry; + hsize_t curr_chunk_idx; + size_t set_begin_index; + int num_writers; + int new_chunk_owner; - for (i = 0; i < shared_chunks_info_array_num_entries;) { - H5D_filtered_collective_io_info_t *chunk_entry; - haddr_t last_seen_addr = shared_chunks_info_array[i].chunk_states.chunk_current.offset; - size_t set_begin_index = i; - size_t num_writers = 0; - int new_chunk_owner = shared_chunks_info_array[i].owners.original_owner; + /* Clear the mapping from rank value -> number of assigned chunks */ + HDmemset(num_chunks_assigned_map, 0, (size_t)mpi_size * sizeof(*num_chunks_assigned_map)); - /* Process each set of duplicate entries caused by another process writing to the same chunk */ - do { - chunk_entry = &shared_chunks_info_array[i]; + /* Sort collective chunk list according to chunk index */ + HDqsort(coll_chunk_list, coll_chunk_list_num_entries, sizeof(H5D_chunk_redistribute_info_t), + H5D__cmp_chunk_redistribute_info); - send_counts[chunk_entry->owners.original_owner] += (int)sizeof(*chunk_entry); + /* + * Process all chunks in the collective chunk list. + * Note that the loop counter is incremented by both + * the outer loop (while processing each entry in + * the collective chunk list) and the inner loop + * (while processing duplicate entries for shared + * chunks). + */ + chunk_entry = &((H5D_chunk_redistribute_info_t *)coll_chunk_list)[0]; + for (i = 0; i < coll_chunk_list_num_entries;) { + /* Set chunk's initial new owner to its original owner */ + new_chunk_owner = chunk_entry->orig_owner; + + /* + * Set the current chunk index so we know when we've processed + * all duplicate entries for a particular shared chunk + */ + curr_chunk_idx = chunk_entry->chunk_idx; + + /* Reset the initial number of writers to this chunk */ + num_writers = 0; + + /* Set index for the beginning of this section of duplicate chunk entries */ + set_begin_index = i; - /* The new owner of the chunk is determined by the process + /* + * Process each chunk entry in the set for the current + * (possibly shared) chunk and increment the loop counter + * while doing so. + */ + do { + /* + * The new owner of the chunk is determined by the rank * writing to the chunk which currently has the least amount * of chunks assigned to it */ - if (num_assigned_chunks_array[chunk_entry->owners.original_owner] < - num_assigned_chunks_array[new_chunk_owner]) - new_chunk_owner = chunk_entry->owners.original_owner; + if (num_chunks_assigned_map[chunk_entry->orig_owner] < + num_chunks_assigned_map[new_chunk_owner]) + new_chunk_owner = chunk_entry->orig_owner; + /* Update the number of writers to this particular chunk */ num_writers++; - } while (++i < shared_chunks_info_array_num_entries && - shared_chunks_info_array[i].chunk_states.chunk_current.offset == last_seen_addr); - /* Set all of the chunk entries' "new_owner" fields */ + chunk_entry++; + } while (++i < coll_chunk_list_num_entries && chunk_entry->chunk_idx == curr_chunk_idx); + + /* We should never have more writers to a chunk than the number of MPI ranks */ + HDassert(num_writers <= mpi_size); + + /* Set all processed chunk entries' "new_owner" and "num_writers" fields */ for (; set_begin_index < i; set_begin_index++) { - shared_chunks_info_array[set_begin_index].owners.new_owner = new_chunk_owner; - shared_chunks_info_array[set_begin_index].num_writers = num_writers; - } /* end for */ + H5D_chunk_redistribute_info_t *entry; - num_assigned_chunks_array[new_chunk_owner]++; - } /* end for */ + entry = &((H5D_chunk_redistribute_info_t *)coll_chunk_list)[set_begin_index]; - /* Sort the new list in order of previous owner so that each original owner of a chunk - * entry gets that entry back, with the possibly newly-modified "new_owner" field + entry->new_owner = new_chunk_owner; + entry->num_writers = num_writers; + } + + /* Update the number of chunks assigned to the MPI rank that now owns this chunk */ + num_chunks_assigned_map[new_chunk_owner]++; + } + + /* + * Re-sort the collective chunk list in order of original chunk owner + * so that each rank's section of contributed chunks is contiguous in + * the collective chunk list. + * + * NOTE: this re-sort is frail in that it needs to sort the collective + * chunk list so that each rank's section of contributed chunks + * is in the exact order it was contributed in, or things will + * be scrambled when each rank's local chunk list is updated. + * Therefore, the sorting algorithm here is tied to the one + * used during the I/O setup operation. Specifically, chunks + * are first sorted by ascending order of offset in the file and + * then by chunk index. In the future, a better redistribution + * algorithm may be devised that doesn't rely on frail sorting, + * but the current implementation is a quick and naive approach. */ - if (shared_chunks_info_array_num_entries > 1) - HDqsort(shared_chunks_info_array, shared_chunks_info_array_num_entries, - sizeof(H5D_filtered_collective_io_info_t), - H5D__cmp_filtered_collective_io_info_entry_owner); - - send_displacements[0] = 0; - for (i = 1; i < (size_t)mpi_size; i++) - send_displacements[i] = send_displacements[i - 1] + send_counts[i - 1]; - } /* end if */ + HDqsort(coll_chunk_list, coll_chunk_list_num_entries, sizeof(H5D_chunk_redistribute_info_t), + H5D__cmp_chunk_redistribute_info_orig_owner); + } - /* Scatter the segments of the list back to each process */ - H5_CHECKED_ASSIGN(scatter_recvcount_int, int, - *local_chunk_array_num_entries * sizeof(H5D_filtered_collective_io_info_t), size_t); - if (MPI_SUCCESS != - (mpi_code = MPI_Scatterv(shared_chunks_info_array, send_counts, send_displacements, MPI_BYTE, - local_chunk_array, scatter_recvcount_int, MPI_BYTE, 0, io_info->comm))) - HMPI_GOTO_ERROR(FAIL, "unable to scatter shared chunks info buffer", mpi_code) + if (all_ranks_involved) { + /* + * If redistribution occurred on all ranks, search for the section + * in the collective chunk list corresponding to this rank's locally + * selected chunks and update the local list after redistribution. + */ + for (i = 0; i < coll_chunk_list_num_entries; i++) + if (mpi_rank == ((H5D_chunk_redistribute_info_t *)coll_chunk_list)[i].orig_owner) + break; - if (shared_chunks_info_array) { - H5MM_free(shared_chunks_info_array); - shared_chunks_info_array = NULL; - } /* end if */ + for (size_t j = 0; j < (size_t)num_chunks_int; j++) { + H5D_chunk_redistribute_info_t *coll_entry; - /* Now that the chunks have been redistributed, each process must send its modification data - * to the new owners of any of the chunks it previously possessed. Accordingly, each process - * must also issue asynchronous receives for any messages it may receive for each of the - * chunks it is assigned, in order to avoid potential deadlocking issues. + coll_entry = &((H5D_chunk_redistribute_info_t *)coll_chunk_list)[i++]; + + chunk_list[j].new_owner = coll_entry->new_owner; + chunk_list[j].num_writers = coll_entry->num_writers; + } + } + else { + /* + * If redistribution occurred only on rank 0, scatter the segments + * of the collective chunk list back to each rank so that their + * local chunk lists get updated + */ + if (MPI_SUCCESS != + (mpi_code = MPI_Scatterv(coll_chunk_list, counts_ptr, displacements_ptr, packed_type, chunk_list, + num_chunks_int, struct_type, 0, io_info->comm))) + HMPI_GOTO_ERROR(FAIL, "unable to scatter shared chunks info buffer", mpi_code) + } + +#ifdef H5Dmpio_DEBUG + H5D__mpio_dump_collective_filtered_chunk_list(chunk_list, num_chunks_assigned_map[mpi_rank], mpi_rank); +#endif + +done: + H5MM_free(coll_chunk_list); + + if (struct_type_derived) { + if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&struct_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + } + if (packed_type_derived) { + if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&packed_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + } + + H5MM_free(counts_disps_array); + +#ifdef H5Dmpio_DEBUG + H5D_MPIO_TIME_STOP(mpi_rank); + H5D_MPIO_TRACE_EXIT(mpi_rank); +#endif + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__mpio_redistribute_shared_chunks_int() */ + +/*------------------------------------------------------------------------- + * Function: H5D__mpio_share_chunk_modification_data + * + * Purpose: When performing a parallel write on a chunked dataset with + * filters applied, we must first ensure that any particular + * chunk is only written to by a single MPI rank in order to + * avoid potential data races on the chunk. Once dataset + * chunks have been redistributed in a suitable manner, each + * MPI rank must send its chunk data to other ranks for each + * chunk it no longer owns. + * + * The current implementation here follows the Nonblocking + * Consensus algorithm described in: + * http://unixer.de/publications/img/hoefler-dsde-protocols.pdf + * + * First, each MPI rank scans through its list of selected + * chunks and does the following for each chunk: + * + * * If a chunk in the MPI rank's chunk list is still owned + * by that rank, the rank checks how many messages are + * incoming for that chunk and adds that to its running + * total. Then, the rank updates its local chunk list so + * that any previous chunk entries for chunks that are no + * longer owned by the rank get overwritten by chunk + * entries for chunks the rank still owns. Since the data + * for the chunks no longer owned will have already been + * sent, those chunks can effectively be discarded. + * * If a chunk in the MPI rank's chunk list is no longer + * owned by that rank, the rank sends the data it wishes to + * update the chunk with to the MPI rank that now has + * ownership of that chunk. To do this, it encodes the + * chunk's index, its selection in the chunk and its + * modification data into a buffer and then posts a + * non-blocking MPI_Issend to the owning rank. + * + * Once this step is complete, all MPI ranks allocate arrays + * to hold chunk message receive buffers and MPI request + * objects for each non-blocking receive they will post for + * incoming chunk modification messages. Then, all MPI ranks + * enter a loop that alternates between non-blocking + * MPI_Iprobe calls to probe for incoming messages and + * MPI_Testall calls to see if all send requests have + * completed. As chunk modification messages arrive, + * non-blocking MPI_Irecv calls will be posted for each + * message. + * + * Once all send requests have completed, an MPI_Ibarrier is + * posted and the loop then alternates between MPI_Iprobe + * calls and MPI_Test calls to check if all ranks have reached + * the non-blocking barrier. Once all ranks have reached the + * barrier, processing can move on to updating the selected + * chunks that are owned in the operation. + * + * Any chunk messages that were received from other ranks + * will be returned through the `chunk_msg_bufs` array and + * `chunk_msg_bufs_len` will be set appropriately. + * + * NOTE: The use of non-blocking sends and receives of chunk + * data here may contribute to large amounts of memory + * usage/MPI request overhead if the number of shared + * chunks is high. If this becomes a problem, it may be + * useful to split the message receiving loop away so + * that chunk modification messages can be received and + * processed immediately (MPI_Recv) using a single chunk + * message buffer. However, it's possible this may + * degrade performance since the chunk message sends + * are synchronous (MPI_Issend) in the Nonblocking + * Consensus algorithm. + * + * Return: Non-negative on success/Negative on failure + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk_list, + size_t *chunk_list_num_entries, H5D_io_info_t *io_info, + const H5D_type_info_t *type_info, int mpi_rank, int mpi_size, + H5D_filtered_collective_io_info_t **chunk_hash_table, + unsigned char ***chunk_msg_bufs, int *chunk_msg_bufs_len) +{ +#if H5_CHECK_MPI_VERSION(3, 0) + H5D_filtered_collective_io_info_t *chunk_table = NULL; + H5S_sel_iter_t * mem_iter = NULL; + unsigned char ** msg_send_bufs = NULL; + unsigned char ** msg_recv_bufs = NULL; + MPI_Request * send_requests = NULL; + MPI_Request * recv_requests = NULL; + MPI_Request ibarrier = MPI_REQUEST_NULL; + hbool_t mem_iter_init = FALSE; + hbool_t ibarrier_posted = FALSE; + size_t send_bufs_nalloc = 0; + size_t num_send_requests = 0; + size_t num_recv_requests = 0; + size_t num_msgs_incoming = 0; + size_t last_assigned_idx; + size_t i; + int mpi_code; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + HDassert(chunk_list_num_entries); + HDassert(chunk_list || 0 == *chunk_list_num_entries); + HDassert(io_info); + HDassert(type_info); + HDassert(mpi_size > 1); + HDassert(chunk_msg_bufs); + HDassert(chunk_msg_bufs_len); + +#ifdef H5Dmpio_DEBUG + H5D_MPIO_TRACE_ENTER(mpi_rank); + H5D_MPIO_TIME_START(mpi_rank, "Share chunk modification data"); +#endif + + /* Set to latest format for encoding dataspace */ + H5CX_set_libver_bounds(NULL); + + if (*chunk_list_num_entries) { + /* Allocate a selection iterator for iterating over chunk dataspaces */ + if (NULL == (mem_iter = H5FL_MALLOC(H5S_sel_iter_t))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate dataspace selection iterator") + + /* + * Allocate send buffer and MPI_Request arrays for non-blocking + * sends of outgoing chunk messages + */ + send_bufs_nalloc = H5D_CHUNK_NUM_SEND_MSGS_INIT; + if (NULL == (msg_send_bufs = H5MM_malloc(send_bufs_nalloc * sizeof(*msg_send_bufs)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, + "couldn't allocate chunk modification message buffer array") + + if (NULL == (send_requests = H5MM_malloc(send_bufs_nalloc * sizeof(*send_requests)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send requests array") + } + + /* + * For each chunk this rank owns, add to the total number of + * incoming MPI messages, then update the local chunk list to + * overwrite any previous chunks no longer owned by this rank. + * Since the data for those chunks will have already been sent, + * this rank should no longer be interested in them and they + * can effectively be discarded. This bookkeeping also makes + * the code for the collective file space re-allocation and + * chunk re-insertion operations a bit simpler. + * + * For each chunk this rank doesn't own, use non-blocking + * synchronous sends to send the data this rank is writing to + * the rank that does own the chunk. */ - if (*local_chunk_array_num_entries) - if (NULL == (mod_data = (unsigned char **)H5MM_malloc(*local_chunk_array_num_entries * - sizeof(unsigned char *)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate modification data buffer array") - - /* Perform all the sends on the chunks that this rank doesn't own */ - /* (Sends and recvs must be two separate loops, to avoid deadlock) */ - for (i = 0, last_assigned_idx = 0; i < *local_chunk_array_num_entries; i++) { - H5D_filtered_collective_io_info_t *chunk_entry = &local_chunk_array[i]; - - if (mpi_rank != chunk_entry->owners.new_owner) { - H5D_chunk_info_t *chunk_info = NULL; + for (i = 0, last_assigned_idx = 0; i < *chunk_list_num_entries; i++) { + H5D_filtered_collective_io_info_t *chunk_entry = &chunk_list[i]; + + if (mpi_rank == chunk_entry->new_owner) { + num_msgs_incoming += (size_t)(chunk_entry->num_writers - 1); + + /* + * Overwrite chunk entries this rank doesn't own with entries that it + * does own, since it has sent the necessary data and is no longer + * interested in the chunks it doesn't own. + */ + chunk_list[last_assigned_idx] = chunk_list[i]; + + /* + * Since, at large scale, a chunk's index value may be larger than + * the maximum value that can be stored in an int, we cannot rely + * on using a chunk's index value as the tag for the MPI messages + * sent/received for a chunk. Therefore, add this chunk to a hash + * table with the chunk's index as a key so that we can quickly find + * the chunk when processing chunk messages that were received. The + * message itself will contain the chunk's index so we can update + * the correct chunk with the received data. + */ + HASH_ADD(hh, chunk_table, index_info.chunk_idx, sizeof(hsize_t), &chunk_list[last_assigned_idx]); + + last_assigned_idx++; + } + else { + H5D_chunk_info_t *chunk_info = chunk_entry->chunk_info; unsigned char * mod_data_p = NULL; hsize_t iter_nelmts; - size_t mod_data_size; + size_t mod_data_size = 0; + size_t space_size = 0; - /* Look up the chunk and get its file and memory dataspaces */ - if (NULL == (chunk_info = (H5D_chunk_info_t *)H5SL_search(fm->sel_chunks, &chunk_entry->index))) - HGOTO_ERROR(H5E_DATASPACE, H5E_NOTFOUND, FAIL, "can't locate chunk in skip list") + /* Add the size of the chunk index to the encoded size */ + mod_data_size += sizeof(hsize_t); - /* Determine size of serialized chunk file dataspace, plus the size of - * the data being written - */ - if (H5S_encode(chunk_info->fspace, &mod_data_p, &mod_data_size) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to get encoded dataspace size") + /* Determine size of serialized chunk file dataspace */ + if (H5S_encode(chunk_info->fspace, &mod_data_p, &space_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "unable to get encoded dataspace size") + mod_data_size += space_size; + /* Determine size of data being written */ iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace); - H5_CHECK_OVERFLOW(iter_nelmts, hsize_t, size_t); + mod_data_size += (size_t)iter_nelmts * type_info->src_type_size; - if (NULL == (mod_data[num_send_requests] = (unsigned char *)H5MM_malloc(mod_data_size))) + if (NULL == (msg_send_bufs[num_send_requests] = H5MM_malloc(mod_data_size))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, - "couldn't allocate chunk modification send buffer") + "couldn't allocate chunk modification message buffer") + + mod_data_p = msg_send_bufs[num_send_requests]; + + /* Store the chunk's index into the buffer */ + HDmemcpy(mod_data_p, &chunk_entry->index_info.chunk_idx, sizeof(hsize_t)); + mod_data_p += sizeof(hsize_t); /* Serialize the chunk's file dataspace into the buffer */ - mod_data_p = mod_data[num_send_requests]; if (H5S_encode(chunk_info->fspace, &mod_data_p, &mod_data_size) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to encode dataspace") /* Initialize iterator for memory selection */ - if (H5S_select_iter_init(mem_iter, chunk_info->mspace, type_info->src_type_size, 0) < 0) + if (H5S_select_iter_init(mem_iter, chunk_info->mspace, type_info->src_type_size, + H5S_SEL_ITER_SHARE_WITH_DATASPACE) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") mem_iter_init = TRUE; @@ -2952,466 +3943,2057 @@ H5D__chunk_redistribute_shared_chunks(const H5D_io_info_t *io_info, const H5D_ty if (0 == H5D__gather_mem(io_info->u.wbuf, mem_iter, (size_t)iter_nelmts, mod_data_p)) HGOTO_ERROR(H5E_IO, H5E_CANTGATHER, FAIL, "couldn't gather from write buffer") - /* Send modification data to new owner */ + /* + * Ensure that the size of the chunk data being sent can be + * safely cast to an int for MPI. Note that this should + * generally be OK for now (unless a rank is sending a + * whole 32-bit-sized chunk of data + its encoded selection), + * but if we allow larger than 32-bit-sized chunks in the + * future, this may become a problem and derived datatypes + * will need to be used. + */ H5_CHECK_OVERFLOW(mod_data_size, size_t, int) - H5_CHECK_OVERFLOW(chunk_entry->index, hsize_t, int) + + /* Send modification data to new owner */ if (MPI_SUCCESS != - (mpi_code = MPI_Isend(mod_data[num_send_requests], (int)mod_data_size, MPI_BYTE, - chunk_entry->owners.new_owner, (int)chunk_entry->index, io_info->comm, - &send_requests[num_send_requests]))) - HMPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code) + (mpi_code = MPI_Issend(msg_send_bufs[num_send_requests], (int)mod_data_size, MPI_BYTE, + chunk_entry->new_owner, H5D_CHUNK_MOD_DATA_TAG, io_info->comm, + &send_requests[num_send_requests]))) + HMPI_GOTO_ERROR(FAIL, "MPI_Issend failed", mpi_code) + + num_send_requests++; + + /* Resize send buffer and send request arrays if necessary */ + if (num_send_requests == send_bufs_nalloc) { + void *tmp_alloc; + + send_bufs_nalloc = (size_t)((double)send_bufs_nalloc * 1.5); + + if (NULL == + (tmp_alloc = H5MM_realloc(msg_send_bufs, send_bufs_nalloc * sizeof(*msg_send_bufs)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, + "couldn't resize chunk modification message buffer array") + msg_send_bufs = tmp_alloc; + + if (NULL == + (tmp_alloc = H5MM_realloc(send_requests, send_bufs_nalloc * sizeof(*send_requests)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't resize send requests array") + send_requests = tmp_alloc; + } - if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) + if (H5S_SELECT_ITER_RELEASE(mem_iter) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release memory selection iterator") mem_iter_init = FALSE; + } + } - num_send_requests++; - } /* end if */ - } /* end for */ + /* Check if the number of send or receive requests will overflow an int (MPI requirement) */ + if (num_send_requests > INT_MAX || num_msgs_incoming > INT_MAX) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, + "too many shared chunks in parallel filtered write operation") + + H5_CHECK_OVERFLOW(num_send_requests, size_t, int) + H5_CHECK_OVERFLOW(num_msgs_incoming, size_t, int) + + /* + * Allocate receive buffer and MPI_Request arrays for non-blocking + * receives of incoming chunk messages + */ + if (num_msgs_incoming) { + if (NULL == (msg_recv_bufs = H5MM_malloc(num_msgs_incoming * sizeof(*msg_recv_bufs)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, + "couldn't allocate chunk modification message buffer array") - /* Perform all the recvs on the chunks this rank owns */ - for (i = 0, last_assigned_idx = 0; i < *local_chunk_array_num_entries; i++) { - H5D_filtered_collective_io_info_t *chunk_entry = &local_chunk_array[i]; + if (NULL == (recv_requests = H5MM_malloc(num_msgs_incoming * sizeof(*recv_requests)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive requests array") + } - if (mpi_rank == chunk_entry->owners.new_owner) { - /* Allocate all necessary buffers for an asynchronous receive operation */ - if (chunk_entry->num_writers > 1) { - MPI_Message message; - MPI_Status status; - size_t j; + /* Process any incoming messages until everyone is done */ + do { + MPI_Status status; + int msg_flag; - chunk_entry->async_info.num_receive_requests = (int)chunk_entry->num_writers - 1; - if (NULL == (chunk_entry->async_info.receive_requests_array = (MPI_Request *)H5MM_malloc( - (size_t)chunk_entry->async_info.num_receive_requests * sizeof(MPI_Request)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate async requests array") + /* Probe for an incoming message from any rank */ + if (MPI_SUCCESS != (mpi_code = MPI_Iprobe(MPI_ANY_SOURCE, H5D_CHUNK_MOD_DATA_TAG, io_info->comm, + &msg_flag, &status))) + HMPI_GOTO_ERROR(FAIL, "MPI_Iprobe failed", mpi_code) - if (NULL == - (chunk_entry->async_info.receive_buffer_array = (unsigned char **)H5MM_malloc( - (size_t)chunk_entry->async_info.num_receive_requests * sizeof(unsigned char *)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate async receive buffers") + /* + * If a message was found, allocate a buffer for the message and + * post a non-blocking receive to receive it + */ + if (msg_flag) { +#if H5_CHECK_MPI_VERSION(3, 0) + MPI_Count msg_size = 0; - for (j = 0; j < chunk_entry->num_writers - 1; j++) { - int count = 0; + if (MPI_SUCCESS != (mpi_code = MPI_Get_elements_x(&status, MPI_BYTE, &msg_size))) + HMPI_GOTO_ERROR(FAIL, "MPI_Get_elements_x failed", mpi_code) - /* Probe for a particular message from any process, removing that message - * from the receive queue in the process and allocating that much memory - * for the asynchronous receive - */ - if (MPI_SUCCESS != (mpi_code = MPI_Mprobe(MPI_ANY_SOURCE, (int)chunk_entry->index, - io_info->comm, &message, &status))) - HMPI_GOTO_ERROR(FAIL, "MPI_Mprobe failed", mpi_code) - - if (MPI_SUCCESS != (mpi_code = MPI_Get_count(&status, MPI_BYTE, &count))) - HMPI_GOTO_ERROR(FAIL, "MPI_Get_count failed", mpi_code) - - HDassert(count >= 0); - if (NULL == (chunk_entry->async_info.receive_buffer_array[j] = - (unsigned char *)H5MM_malloc((size_t)count * sizeof(char *)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, - "unable to allocate modification data receive buffer") - - if (MPI_SUCCESS != (mpi_code = MPI_Imrecv( - chunk_entry->async_info.receive_buffer_array[j], count, MPI_BYTE, - &message, &chunk_entry->async_info.receive_requests_array[j]))) - HMPI_GOTO_ERROR(FAIL, "MPI_Imrecv failed", mpi_code) - } /* end for */ - } /* end if */ - - local_chunk_array[last_assigned_idx++] = local_chunk_array[i]; - } /* end else */ - } /* end for */ + H5_CHECK_OVERFLOW(msg_size, MPI_Count, int) +#else + int msg_size = 0; - *local_chunk_array_num_entries = last_assigned_idx; + if (MPI_SUCCESS != (mpi_code = MPI_Get_elements(&status, MPI_BYTE, &msg_size))) + HMPI_GOTO_ERROR(FAIL, "MPI_Get_elements failed", mpi_code) +#endif - /* Wait for all async send requests to complete before returning */ - if (num_send_requests) { - if (NULL == (send_statuses = (MPI_Status *)H5MM_malloc(num_send_requests * sizeof(MPI_Status)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send statuses buffer") + if (msg_size <= 0) + HGOTO_ERROR(H5E_DATASET, H5E_BADVALUE, FAIL, "invalid chunk modification message size") - H5_CHECK_OVERFLOW(num_send_requests, size_t, int); - if (MPI_SUCCESS != (mpi_code = MPI_Waitall((int)num_send_requests, send_requests, send_statuses))) - HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code) - } /* end if */ + HDassert((num_recv_requests + 1) <= num_msgs_incoming); + if (NULL == + (msg_recv_bufs[num_recv_requests] = H5MM_malloc((size_t)msg_size * sizeof(unsigned char)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, + "couldn't allocate chunk modification message receive buffer") -done: - /* Now that all async send requests have completed, free up the send - * buffers used in the async operations + if (MPI_SUCCESS != (mpi_code = MPI_Irecv(msg_recv_bufs[num_recv_requests], (int)msg_size, + MPI_BYTE, status.MPI_SOURCE, H5D_CHUNK_MOD_DATA_TAG, + io_info->comm, &recv_requests[num_recv_requests]))) + HMPI_GOTO_ERROR(FAIL, "MPI_Irecv failed", mpi_code) + + num_recv_requests++; + } + + if (ibarrier_posted) { + int ibarrier_completed; + + if (MPI_SUCCESS != (mpi_code = MPI_Test(&ibarrier, &ibarrier_completed, MPI_STATUS_IGNORE))) + HMPI_GOTO_ERROR(FAIL, "MPI_Test failed", mpi_code) + + if (ibarrier_completed) + break; + } + else { + int all_sends_completed; + + /* Determine if all send requests have completed */ + if (MPI_SUCCESS != (mpi_code = MPI_Testall((int)num_send_requests, send_requests, + &all_sends_completed, MPI_STATUSES_IGNORE))) + HMPI_GOTO_ERROR(FAIL, "MPI_Testall failed", mpi_code) + + if (all_sends_completed) { + /* Post non-blocking barrier */ + if (MPI_SUCCESS != (mpi_code = MPI_Ibarrier(io_info->comm, &ibarrier))) + HMPI_GOTO_ERROR(FAIL, "MPI_Ibarrier failed", mpi_code) + ibarrier_posted = TRUE; + + /* + * Now that all send requests have completed, free up the + * send buffers used in the non-blocking operations + */ + if (msg_send_bufs) { + for (i = 0; i < num_send_requests; i++) { + if (msg_send_bufs[i]) + H5MM_free(msg_send_bufs[i]); + } + + msg_send_bufs = H5MM_xfree(msg_send_bufs); + } + } + } + } while (1); + + /* + * Ensure all receive requests have completed before moving on. + * For linked-chunk I/O, more overlap with computation could + * theoretically be achieved by returning the receive requests + * array and postponing this wait until during chunk updating + * when the data is really needed. However, multi-chunk I/O + * only updates a chunk at a time and the messages may not come + * in the order that chunks are processed. So, the safest way to + * support both I/O modes is to simply make sure all messages + * are available. */ - for (i = 0; i < num_send_requests; i++) { - if (mod_data[i]) - H5MM_free(mod_data[i]); - } /* end for */ + if (MPI_SUCCESS != (mpi_code = MPI_Waitall((int)num_recv_requests, recv_requests, MPI_STATUSES_IGNORE))) + HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code) + + /* Set the new number of locally-selected chunks */ + *chunk_list_num_entries = last_assigned_idx; + + /* Return chunk message buffers if any were received */ + *chunk_hash_table = chunk_table; + *chunk_msg_bufs = msg_recv_bufs; + *chunk_msg_bufs_len = (int)num_recv_requests; +done: + if (ret_value < 0) { + /* If this rank failed, make sure to participate in collective barrier */ + if (!ibarrier_posted) { + if (MPI_SUCCESS != (mpi_code = MPI_Ibarrier(io_info->comm, &ibarrier))) + HMPI_GOTO_ERROR(FAIL, "MPI_Ibarrier failed", mpi_code) + } + + if (num_send_requests) { + for (i = 0; i < num_send_requests; i++) { + MPI_Cancel(&send_requests[i]); + } + } + + if (recv_requests) { + for (i = 0; i < num_recv_requests; i++) { + MPI_Cancel(&recv_requests[i]); + } + } + + if (msg_recv_bufs) { + for (i = 0; i < num_recv_requests; i++) { + H5MM_free(msg_recv_bufs[i]); + } + + H5MM_free(msg_recv_bufs); + } + + HASH_CLEAR(hh, chunk_table); + } + + if (recv_requests) + H5MM_free(recv_requests); if (send_requests) H5MM_free(send_requests); - if (send_statuses) - H5MM_free(send_statuses); - if (send_counts) - H5MM_free(send_counts); - if (send_displacements) - H5MM_free(send_displacements); - if (mod_data) - H5MM_free(mod_data); - if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) - HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") - if (mem_iter) - H5MM_free(mem_iter); - if (num_assigned_chunks_array) - H5MM_free(num_assigned_chunks_array); - if (shared_chunks_info_array) - H5MM_free(shared_chunks_info_array); + + if (msg_send_bufs) { + for (i = 0; i < num_send_requests; i++) { + if (msg_send_bufs[i]) + H5MM_free(msg_send_bufs[i]); + } + + H5MM_free(msg_send_bufs); + } + + if (mem_iter) { + if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) + HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release dataspace selection iterator") + mem_iter = H5FL_FREE(H5S_sel_iter_t, mem_iter); + } + +#ifdef H5Dmpio_DEBUG + H5D_MPIO_TIME_STOP(mpi_rank); + H5D_MPIO_TRACE_EXIT(mpi_rank); +#endif FUNC_LEAVE_NOAPI(ret_value) -} /* end H5D__chunk_redistribute_shared_chunks() */ +#else + FUNC_ENTER_STATIC + HERROR( + H5E_DATASET, H5E_WRITEERROR, + "unable to send chunk modification data between MPI ranks - MPI version < 3 (MPI_Ibarrier missing)") + FUNC_LEAVE_NOAPI(FAIL) #endif +} /* end H5D__mpio_share_chunk_modification_data() */ /*------------------------------------------------------------------------- - * Function: H5D__mpio_filtered_collective_write_type + * Function: H5D__mpio_collective_filtered_chunk_common_io * - * Purpose: Constructs a MPI derived datatype for both the memory and - * the file for a collective write of filtered chunks. The - * datatype contains the offsets in the file and the locations - * of the filtered chunk data buffers. + * Purpose: This routine performs the common part of collective I/O + * when reading or writing filtered chunks collectively. * * Return: Non-negative on success/Negative on failure * - * Programmer: Jordan Henderson - * Tuesday, November 22, 2016 - * *------------------------------------------------------------------------- */ static herr_t -H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chunk_list, size_t num_entries, - MPI_Datatype *new_mem_type, hbool_t *mem_type_derived, - MPI_Datatype *new_file_type, hbool_t *file_type_derived) +H5D__mpio_collective_filtered_chunk_common_io(H5D_filtered_collective_io_info_t *chunk_list, + size_t chunk_list_num_entries, const H5D_io_info_t *io_info, + const H5D_type_info_t *type_info, int mpi_size) { - MPI_Aint *write_buf_array = NULL; /* Relative displacements of filtered chunk data buffers */ - MPI_Aint *file_offset_array = NULL; /* Chunk offsets in the file */ - int * length_array = NULL; /* Filtered Chunk lengths */ - herr_t ret_value = SUCCEED; + H5D_io_info_t coll_io_info; + H5D_storage_t ctg_store; + MPI_Datatype file_type = MPI_DATATYPE_NULL; + MPI_Datatype mem_type = MPI_DATATYPE_NULL; + hbool_t mem_type_is_derived = FALSE; + hbool_t file_type_is_derived = FALSE; + hsize_t mpi_buf_count; + haddr_t base_read_offset = HADDR_UNDEF; + size_t num_chunks; + size_t i; + char fake_buf; /* Used as a fake buffer for ranks with no chunks, thus a NULL buf pointer */ + int mpi_code; + herr_t ret_value = SUCCEED; FUNC_ENTER_STATIC - HDassert(chunk_list); - HDassert(new_mem_type); - HDassert(mem_type_derived); - HDassert(new_file_type); - HDassert(file_type_derived); + HDassert(chunk_list || 0 == chunk_list_num_entries); + HDassert(io_info); + HDassert(type_info); - if (num_entries > 0) { - size_t i; - int mpi_code; - void * base_buf; - - H5_CHECK_OVERFLOW(num_entries, size_t, int); - - /* Allocate arrays */ - if (NULL == (length_array = (int *)H5MM_malloc((size_t)num_entries * sizeof(int)))) - HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, - "memory allocation failed for filtered collective write length array") - if (NULL == (write_buf_array = (MPI_Aint *)H5MM_malloc((size_t)num_entries * sizeof(MPI_Aint)))) - HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, - "memory allocation failed for filtered collective write buf length array") - if (NULL == (file_offset_array = (MPI_Aint *)H5MM_malloc((size_t)num_entries * sizeof(MPI_Aint)))) - HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, - "memory allocation failed for collective write offset array") - - /* Ensure the list is sorted in ascending order of offset in the file */ - HDqsort(chunk_list, num_entries, sizeof(H5D_filtered_collective_io_info_t), - H5D__cmp_filtered_collective_io_info_entry); + /* Initialize temporary I/O info */ + coll_io_info = *io_info; - base_buf = chunk_list[0].buf; - for (i = 0; i < num_entries; i++) { - /* Set up the offset in the file, the length of the chunk data, and the relative - * displacement of the chunk data write buffer - */ - file_offset_array[i] = (MPI_Aint)chunk_list[i].chunk_states.new_chunk.offset; - length_array[i] = (int)chunk_list[i].chunk_states.new_chunk.length; - write_buf_array[i] = (MPI_Aint)chunk_list[i].buf - (MPI_Aint)base_buf; - } /* end for */ + /* + * Construct MPI derived datatype for collective I/O on chunks + */ + if (H5D__mpio_collective_filtered_io_type(chunk_list, chunk_list_num_entries, io_info->op_type, &mem_type, + &mem_type_is_derived, &file_type, &file_type_is_derived) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_BADTYPE, FAIL, "couldn't create MPI I/O type for chunk I/O") - /* Create memory MPI type */ - if (MPI_SUCCESS != (mpi_code = MPI_Type_create_hindexed((int)num_entries, length_array, - write_buf_array, MPI_BYTE, new_mem_type))) - HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code) - *mem_type_derived = TRUE; - if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_mem_type))) - HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) - - /* Create file MPI type */ - if (MPI_SUCCESS != (mpi_code = MPI_Type_create_hindexed((int)num_entries, length_array, - file_offset_array, MPI_BYTE, new_file_type))) - HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code) - *file_type_derived = TRUE; - if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_file_type))) - HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) - } /* end if */ + /* + * For reads, determine how many chunks are actually being read. + * Note that if this is a read during a write operation + * (read chunk -> unfilter -> modify -> write back), some + * chunks may not need to be read if they're being fully + * overwritten during a write operation. + */ + if (io_info->op_type == H5D_IO_OP_READ) { + for (i = 0, num_chunks = 0; i < chunk_list_num_entries; i++) { + HDassert(chunk_list[i].buf); + + if (chunk_list[i].need_read) { + if (!H5F_addr_defined(base_read_offset)) + base_read_offset = chunk_list[i].chunk_current.offset; + + num_chunks++; + } + } + } + else + num_chunks = chunk_list_num_entries; + + /* + * If this rank doesn't have a selection, it can + * skip I/O if independent I/O was requested at + * the low level, or if the MPI communicator size + * is 1. + * + * Otherwise, this rank has to participate in + * collective I/O, but probably has a NULL buf + * pointer, so override to a fake buffer since our + * write/read function expects one. + */ + if (num_chunks == 0) { + H5FD_mpio_collective_opt_t coll_opt_mode; + + /* Get the collective_opt property to check whether the application wants to do IO individually. */ + if (H5CX_get_mpio_coll_opt(&coll_opt_mode) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get MPI-I/O collective_opt property") + + if ((mpi_size == 1) || (H5FD_MPIO_INDIVIDUAL_IO == coll_opt_mode)) { + HGOTO_DONE(SUCCEED) + } + else { + if (io_info->op_type == H5D_IO_OP_WRITE) + coll_io_info.u.wbuf = &fake_buf; + else + coll_io_info.u.rbuf = &fake_buf; + } + } + + /* + * Setup for I/O operation + */ + + mpi_buf_count = (num_chunks) ? 1 : 0; + + if (num_chunks) { + /* + * Setup the base storage address for this operation + * to be the first chunk's file address + */ + if (io_info->op_type == H5D_IO_OP_WRITE) + ctg_store.contig.dset_addr = chunk_list[0].chunk_new.offset; + else + ctg_store.contig.dset_addr = base_read_offset; + } + else + ctg_store.contig.dset_addr = 0; + + ctg_store.contig.dset_size = (hsize_t)io_info->dset->shared->layout.u.chunk.size; + coll_io_info.store = &ctg_store; + + /* Perform I/O */ + if (H5D__final_collective_io(&coll_io_info, type_info, mpi_buf_count, file_type, mem_type) < 0) + HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "couldn't finish MPI I/O") done: - if (write_buf_array) - H5MM_free(write_buf_array); - if (file_offset_array) - H5MM_free(file_offset_array); - if (length_array) - H5MM_free(length_array); + /* Free the MPI buf and file types, if they were derived */ + if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) FUNC_LEAVE_NOAPI(ret_value) -} /* end H5D__mpio_filtered_collective_write_type() */ +} /* end H5D__mpio_collective_filtered_chunk_common_io() */ /*------------------------------------------------------------------------- - * Function: H5D__filtered_collective_chunk_entry_io + * Function: H5D__mpio_collective_filtered_chunk_read * - * Purpose: Given an entry for a filtered chunk, performs the necessary - * steps for updating the chunk data during a collective - * write, or for reading the chunk from file during a - * collective read. + * Purpose: This routine coordinates a collective read across all ranks + * of the chunks they have selected. Each rank will then go + * and * * Return: Non-negative on success/Negative on failure * - * Programmer: Jordan Henderson - * Wednesday, January 18, 2017 - * *------------------------------------------------------------------------- */ static herr_t -H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk_entry, - const H5D_io_info_t *io_info, const H5D_type_info_t *type_info, - const H5D_chunk_map_t *fm) +H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chunk_list, + size_t chunk_list_num_entries, const H5D_io_info_t *io_info, + const H5D_type_info_t *type_info, int mpi_rank, int mpi_size) { - H5D_chunk_info_t *chunk_info = NULL; - H5S_sel_iter_t * mem_iter = NULL; /* Memory iterator for H5D__scatter_mem/H5D__gather_mem */ - H5S_sel_iter_t * file_iter = NULL; - H5Z_EDC_t err_detect; /* Error detection info */ - H5Z_cb_t filter_cb; /* I/O filter callback function */ - unsigned filter_mask = 0; - hsize_t iter_nelmts; /* Number of points to iterate over for the chunk IO operation */ - hssize_t extent_npoints; - hsize_t true_chunk_size; - hbool_t mem_iter_init = FALSE; - hbool_t file_iter_init = FALSE; - size_t buf_size; - size_t i; - H5S_t * dataspace = NULL; /* Other process' dataspace for the chunk */ - void * tmp_gath_buf = NULL; /* Temporary gather buffer to gather into from application buffer - before scattering out to the chunk data buffer (when writing data), - or vice versa (when reading data) */ - int mpi_code; - herr_t ret_value = SUCCEED; + H5D_fill_buf_info_t fb_info; + H5D_chunk_info_t * chunk_info = NULL; + H5D_io_info_t coll_io_info; + H5Z_EDC_t err_detect; /* Error detection info */ + H5Z_cb_t filter_cb; /* I/O filter callback function */ + hsize_t file_chunk_size = 0; + hsize_t iter_nelmts; /* Number of points to iterate over for the chunk IO operation */ + hbool_t should_fill = FALSE; + hbool_t fb_info_init = FALSE; + hbool_t index_empty = FALSE; + size_t i; + H5S_t * fill_space = NULL; + void * base_read_buf = NULL; + herr_t ret_value = SUCCEED; FUNC_ENTER_STATIC - HDassert(chunk_entry); + HDassert(chunk_list || 0 == chunk_list_num_entries); HDassert(io_info); HDassert(type_info); - HDassert(fm); - /* Retrieve filter settings from API context */ - if (H5CX_get_err_detect(&err_detect) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get error detection info") - if (H5CX_get_filter_cb(&filter_cb) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get I/O filter callback function") +#ifdef H5Dmpio_DEBUG + H5D_MPIO_TRACE_ENTER(mpi_rank); + H5D_MPIO_TIME_START(mpi_rank, "Filtered collective chunk read"); +#else + (void)mpi_rank; +#endif + + /* Initialize temporary I/O info */ + coll_io_info = *io_info; + coll_io_info.u.rbuf = NULL; - /* Look up the chunk and get its file and memory dataspaces */ - if (NULL == (chunk_info = (H5D_chunk_info_t *)H5SL_search(fm->sel_chunks, &chunk_entry->index))) - HGOTO_ERROR(H5E_DATASPACE, H5E_NOTFOUND, FAIL, "can't locate chunk in skip list") + if (chunk_list_num_entries) { + /* Retrieve filter settings from API context */ + if (H5CX_get_err_detect(&err_detect) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get error detection info") + if (H5CX_get_filter_cb(&filter_cb) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get I/O filter callback function") - if ((extent_npoints = H5S_GET_EXTENT_NPOINTS(chunk_info->fspace)) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") - true_chunk_size = (hsize_t)extent_npoints * type_info->src_type_size; + /* Set size of full chunks in dataset */ + file_chunk_size = io_info->dset->shared->layout.u.chunk.size; + + /* Determine if fill values should be "read" for unallocated chunks */ + should_fill = (io_info->dset->shared->dcpl_cache.fill.fill_time == H5D_FILL_TIME_ALLOC) || + ((io_info->dset->shared->dcpl_cache.fill.fill_time == H5D_FILL_TIME_IFSET) && + io_info->dset->shared->dcpl_cache.fill.fill_defined); + } - /* If the size of the filtered chunk is larger than the number of points in the - * chunk file space extent times the datatype size, allocate enough space to hold the - * whole filtered chunk. Otherwise, allocate a buffer equal to the size of the - * chunk so that the unfiltering operation doesn't have to grow the buffer. + /* + * Allocate memory buffers for all chunks being read. Chunk data buffers are of + * the largest size between the chunk's current filtered size and the chunk's true + * size, as calculated by the number of elements in the chunk's file space extent + * multiplied by the datatype size. This tries to ensure that: + * + * * If we're reading the chunk and the filter normally reduces the chunk size, + * the unfiltering operation won't need to grow the buffer. + * * If we're reading the chunk and the filter normally grows the chunk size, + * we make sure to read into a buffer of size equal to the filtered chunk's + * size; reading into a (smaller) buffer of size equal to the unfiltered + * chunk size would of course be bad. */ - buf_size = MAX(chunk_entry->chunk_states.chunk_current.length, true_chunk_size); + for (i = 0; i < chunk_list_num_entries; i++) { + HDassert(chunk_list[i].need_read); + + chunk_list[i].chunk_buf_size = MAX(chunk_list[i].chunk_current.length, file_chunk_size); + + if (NULL == (chunk_list[i].buf = H5MM_malloc(chunk_list[i].chunk_buf_size))) { + /* Push an error, but participate in collective read */ + HDONE_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer") + break; + } + + /* + * Check if chunk is currently allocated. If not, don't try to + * read it from the file. Instead, just fill the chunk buffer + * with the fill value if necessary. + */ + if (H5F_addr_defined(chunk_list[i].chunk_current.offset)) { + /* Set first read buffer */ + if (!base_read_buf) + base_read_buf = chunk_list[i].buf; + + /* Set chunk's new length for eventual filter pipeline calls */ + if (chunk_list[i].skip_filter_pline) + chunk_list[i].chunk_new.length = file_chunk_size; + else + chunk_list[i].chunk_new.length = chunk_list[i].chunk_current.length; + } + else { + chunk_list[i].need_read = FALSE; + + /* Set chunk's new length for eventual filter pipeline calls */ + chunk_list[i].chunk_new.length = file_chunk_size; + + if (should_fill) { + /* Initialize fill value buffer if not already initialized */ + if (!fb_info_init) { + hsize_t chunk_dims[H5S_MAX_RANK]; + + HDassert(io_info->dset->shared->ndims == io_info->dset->shared->layout.u.chunk.ndims - 1); + for (size_t j = 0; j < io_info->dset->shared->layout.u.chunk.ndims - 1; j++) + chunk_dims[j] = (hsize_t)io_info->dset->shared->layout.u.chunk.dim[j]; + + /* Get a dataspace for filling chunk memory buffers */ + if (NULL == (fill_space = H5S_create_simple( + io_info->dset->shared->layout.u.chunk.ndims - 1, chunk_dims, NULL))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to create chunk fill dataspace") + + /* Initialize fill value buffer */ + if (H5D__fill_init(&fb_info, NULL, (H5MM_allocate_t)H5D__chunk_mem_alloc, + (void *)&io_info->dset->shared->dcpl_cache.pline, + (H5MM_free_t)H5D__chunk_mem_free, + (void *)&io_info->dset->shared->dcpl_cache.pline, + &io_info->dset->shared->dcpl_cache.fill, io_info->dset->shared->type, + io_info->dset->shared->type_id, 0, file_chunk_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "can't initialize fill value buffer") + + fb_info_init = TRUE; + } - if (NULL == (chunk_entry->buf = H5MM_malloc(buf_size))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer") + /* Write fill value to memory buffer */ + HDassert(fb_info.fill_buf); + if (H5D__fill(fb_info.fill_buf, io_info->dset->shared->type, chunk_list[i].buf, + type_info->mem_type, fill_space) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "couldn't fill chunk buffer with fill value") + } + } + } - /* If this is not a full chunk overwrite or this is a read operation, the chunk must be - * read from the file and unfiltered. + /* + * If dataset is incrementally allocated and hasn't been written to + * yet, the chunk index should be empty. In this case, a collective + * read of chunks is essentially a no-op, so avoid it here. */ - if (!chunk_entry->full_overwrite || io_info->op_type == H5D_IO_OP_READ) { - H5FD_mpio_xfer_t xfer_mode; /* Parallel transfer for this request */ + index_empty = FALSE; + if (io_info->dset->shared->dcpl_cache.fill.alloc_time == H5D_ALLOC_TIME_INCR) + if (H5D__chunk_index_empty(io_info->dset, &index_empty) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "couldn't determine if chunk index is empty") - chunk_entry->chunk_states.new_chunk.length = chunk_entry->chunk_states.chunk_current.length; + if (!index_empty) { + /* + * Override the read buffer to point to the address of + * the first chunk data buffer being read into + */ + if (base_read_buf) + coll_io_info.u.rbuf = base_read_buf; - /* Currently, these chunk reads are done independently and will likely - * cause issues with collective metadata reads enabled. In the future, - * this should be refactored to use collective chunk reads - JTH */ + /* Perform collective chunk read */ + if (H5D__mpio_collective_filtered_chunk_common_io(chunk_list, chunk_list_num_entries, &coll_io_info, + type_info, mpi_size) < 0) + HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "couldn't finish collective filtered chunk read") + } - /* Get the original state of parallel I/O transfer mode */ - if (H5CX_get_io_xfer_mode(&xfer_mode) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get MPI-I/O transfer mode") + /* + * Iterate through all the read chunks, unfiltering them and scattering their + * data out to the application's read buffer. + */ + for (i = 0; i < chunk_list_num_entries; i++) { + chunk_info = chunk_list[i].chunk_info; + + /* Unfilter the chunk, unless we didn't read it from the file */ + if (chunk_list[i].need_read && !chunk_list[i].skip_filter_pline) { + if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, + &(chunk_list[i].index_info.filter_mask), err_detect, filter_cb, + (size_t *)&chunk_list[i].chunk_new.length, &chunk_list[i].chunk_buf_size, + &chunk_list[i].buf) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTFILTER, FAIL, "couldn't unfilter chunk for modifying") + } + + /* Scatter the chunk data to the read buffer */ + iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->fspace); + + if (H5D_select_io_mem(io_info->u.rbuf, chunk_info->mspace, chunk_list[i].buf, chunk_info->fspace, + type_info->src_type_size, (size_t)iter_nelmts) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't copy chunk data to read buffer") + } - /* Change the xfer_mode to independent for handling the I/O */ - if (H5CX_set_io_xfer_mode(H5FD_MPIO_INDEPENDENT) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set MPI-I/O transfer mode") +done: + /* Free all resources used by entries in the chunk list */ + for (i = 0; i < chunk_list_num_entries; i++) { + if (chunk_list[i].buf) { + H5MM_free(chunk_list[i].buf); + chunk_list[i].buf = NULL; + } + } - if (H5F_shared_block_read(io_info->f_sh, H5FD_MEM_DRAW, - chunk_entry->chunk_states.chunk_current.offset, - chunk_entry->chunk_states.new_chunk.length, chunk_entry->buf) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "unable to read raw data chunk") + /* Release the fill buffer info, if it's been initialized */ + if (fb_info_init && H5D__fill_term(&fb_info) < 0) + HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "Can't release fill buffer info") + if (fill_space && (H5S_close(fill_space) < 0)) + HDONE_ERROR(H5E_DATASET, H5E_CLOSEERROR, FAIL, "can't close fill space") - /* Return to the original I/O transfer mode setting */ - if (H5CX_set_io_xfer_mode(xfer_mode) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set MPI-I/O transfer mode") +#ifdef H5Dmpio_DEBUG + H5D_MPIO_TIME_STOP(mpi_rank); + H5D_MPIO_TRACE_EXIT(mpi_rank); +#endif - if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, &filter_mask, err_detect, - filter_cb, (size_t *)&chunk_entry->chunk_states.new_chunk.length, &buf_size, - &chunk_entry->buf) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTFILTER, FAIL, "couldn't unfilter chunk for modifying") - } /* end if */ - else { - chunk_entry->chunk_states.new_chunk.length = true_chunk_size; - } /* end else */ + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__mpio_collective_filtered_chunk_read() */ - /* Initialize iterator for memory selection */ - if (NULL == (mem_iter = (H5S_sel_iter_t *)H5MM_malloc(sizeof(H5S_sel_iter_t)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator") +/*------------------------------------------------------------------------- + * Function: H5D__mpio_collective_filtered_chunk_update + * + * Purpose: When performing a parallel write on a chunked dataset with + * filters applied, all ranks must update their owned chunks + * with their own modification data and data from other ranks. + * This routine is responsible for coordinating that process. + * + * Return: Non-negative on success/Negative on failure + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *chunk_list, + size_t chunk_list_num_entries, + H5D_filtered_collective_io_info_t *chunk_hash_table, + unsigned char **chunk_msg_bufs, int chunk_msg_bufs_len, + const H5D_io_info_t *io_info, const H5D_type_info_t *type_info, + int mpi_rank, int mpi_size) +{ + H5D_fill_buf_info_t fb_info; + H5D_chunk_info_t * chunk_info = NULL; + H5S_sel_iter_t * sel_iter = NULL; /* Dataspace selection iterator for H5D__scatter_mem */ + H5D_io_info_t coll_io_info; + H5Z_EDC_t err_detect; /* Error detection info */ + H5Z_cb_t filter_cb; /* I/O filter callback function */ + hsize_t file_chunk_size = 0; + hsize_t iter_nelmts; /* Number of points to iterate over for the chunk IO operation */ + hbool_t should_fill = FALSE; + hbool_t fb_info_init = FALSE; + hbool_t sel_iter_init = FALSE; + hbool_t index_empty = FALSE; + size_t i; + H5S_t * dataspace = NULL; + H5S_t * fill_space = NULL; + void * base_read_buf = NULL; + herr_t ret_value = SUCCEED; - if (H5S_select_iter_init(mem_iter, chunk_info->mspace, type_info->src_type_size, 0) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") - mem_iter_init = TRUE; + FUNC_ENTER_STATIC - /* If this is a read operation, scatter the read chunk data to the user's buffer. - * - * If this is a write operation, update the chunk data buffer with the modifications - * from the current process, then apply any modifications from other processes. Finally, - * filter the newly-updated chunk. - */ - switch (io_info->op_type) { - case H5D_IO_OP_READ: - if (NULL == (file_iter = (H5S_sel_iter_t *)H5MM_malloc(sizeof(H5S_sel_iter_t)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate file iterator") + HDassert(chunk_list || 0 == chunk_list_num_entries); + HDassert((chunk_msg_bufs && chunk_hash_table) || 0 == chunk_msg_bufs_len); + HDassert(io_info); + HDassert(type_info); - if (H5S_select_iter_init(file_iter, chunk_info->fspace, type_info->src_type_size, 0) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, - "unable to initialize memory selection information") - file_iter_init = TRUE; +#ifdef H5Dmpio_DEBUG + H5D_MPIO_TRACE_ENTER(mpi_rank); + H5D_MPIO_TIME_START(mpi_rank, "Filtered collective chunk update"); +#endif + + if (chunk_list_num_entries) { + /* Retrieve filter settings from API context */ + if (H5CX_get_err_detect(&err_detect) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get error detection info") + if (H5CX_get_filter_cb(&filter_cb) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get I/O filter callback function") - iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->fspace); + /* Set size of full chunks in dataset */ + file_chunk_size = io_info->dset->shared->layout.u.chunk.size; - if (NULL == (tmp_gath_buf = H5MM_malloc(iter_nelmts * type_info->src_type_size))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate temporary gather buffer") + /* Determine if fill values should be written to chunks */ + should_fill = (io_info->dset->shared->dcpl_cache.fill.fill_time == H5D_FILL_TIME_ALLOC) || + ((io_info->dset->shared->dcpl_cache.fill.fill_time == H5D_FILL_TIME_IFSET) && + io_info->dset->shared->dcpl_cache.fill.fill_defined); + } - if (!H5D__gather_mem(chunk_entry->buf, file_iter, (size_t)iter_nelmts, tmp_gath_buf)) - HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "couldn't gather from chunk buffer") + /* + * Allocate memory buffers for all owned chunks. Chunk data buffers are of the + * largest size between the chunk's current filtered size and the chunk's true + * size, as calculated by the number of elements in the chunk's file space extent + * multiplied by the datatype size. This tries to ensure that: + * + * * If we're fully overwriting the chunk and the filter normally reduces the + * chunk size, we simply have the exact buffer size required to hold the + * unfiltered chunk data. + * * If we're fully overwriting the chunk and the filter normally grows the + * chunk size (e.g., fletcher32 filter), the final filtering operation + * (hopefully) won't need to grow the buffer. + * * If we're reading the chunk and the filter normally reduces the chunk size, + * the unfiltering operation won't need to grow the buffer. + * * If we're reading the chunk and the filter normally grows the chunk size, + * we make sure to read into a buffer of size equal to the filtered chunk's + * size; reading into a (smaller) buffer of size equal to the unfiltered + * chunk size would of course be bad. + */ + for (i = 0; i < chunk_list_num_entries; i++) { + HDassert(mpi_rank == chunk_list[i].new_owner); - iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace); + chunk_list[i].chunk_buf_size = MAX(chunk_list[i].chunk_current.length, file_chunk_size); - if (H5D__scatter_mem(tmp_gath_buf, mem_iter, (size_t)iter_nelmts, io_info->u.rbuf) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't scatter to read buffer") + /* + * If this chunk hasn't been allocated yet and we aren't writing + * out fill values to it, make sure to 0-fill its memory buffer + * so we don't use uninitialized memory. + */ + if (!H5F_addr_defined(chunk_list[i].chunk_current.offset) && !should_fill) + chunk_list[i].buf = H5MM_calloc(chunk_list[i].chunk_buf_size); + else + chunk_list[i].buf = H5MM_malloc(chunk_list[i].chunk_buf_size); + if (NULL == chunk_list[i].buf) { + /* Push an error, but participate in collective read */ + HDONE_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer") break; + } + + /* Set chunk's new length for eventual filter pipeline calls */ + if (chunk_list[i].need_read) { + /* + * Check if chunk is currently allocated. If not, don't try to + * read it from the file. Instead, just fill the chunk buffer + * with the fill value if fill values are to be written. + */ + if (H5F_addr_defined(chunk_list[i].chunk_current.offset)) { + /* Set first read buffer */ + if (!base_read_buf) + base_read_buf = chunk_list[i].buf; + + /* Set chunk's new length for eventual filter pipeline calls */ + if (chunk_list[i].skip_filter_pline) + chunk_list[i].chunk_new.length = file_chunk_size; + else + chunk_list[i].chunk_new.length = chunk_list[i].chunk_current.length; + } + else { + chunk_list[i].need_read = FALSE; + + /* Set chunk's new length for eventual filter pipeline calls */ + chunk_list[i].chunk_new.length = file_chunk_size; + + if (should_fill) { + /* Initialize fill value buffer if not already initialized */ + if (!fb_info_init) { + hsize_t chunk_dims[H5S_MAX_RANK]; + + HDassert(io_info->dset->shared->ndims == + io_info->dset->shared->layout.u.chunk.ndims - 1); + for (size_t j = 0; j < io_info->dset->shared->layout.u.chunk.ndims - 1; j++) + chunk_dims[j] = (hsize_t)io_info->dset->shared->layout.u.chunk.dim[j]; + + /* Get a dataspace for filling chunk memory buffers */ + if (NULL == (fill_space = H5S_create_simple( + io_info->dset->shared->layout.u.chunk.ndims - 1, chunk_dims, NULL))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, + "unable to create chunk fill dataspace") + + /* Initialize fill value buffer */ + if (H5D__fill_init(&fb_info, NULL, (H5MM_allocate_t)H5D__chunk_mem_alloc, + (void *)&io_info->dset->shared->dcpl_cache.pline, + (H5MM_free_t)H5D__chunk_mem_free, + (void *)&io_info->dset->shared->dcpl_cache.pline, + &io_info->dset->shared->dcpl_cache.fill, + io_info->dset->shared->type, io_info->dset->shared->type_id, 0, + file_chunk_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "can't initialize fill value buffer") + + fb_info_init = TRUE; + } + + /* Write fill value to memory buffer */ + HDassert(fb_info.fill_buf); + if (H5D__fill(fb_info.fill_buf, io_info->dset->shared->type, chunk_list[i].buf, + type_info->mem_type, fill_space) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, + "couldn't fill chunk buffer with fill value") + } + } + } + else + chunk_list[i].chunk_new.length = file_chunk_size; + } - case H5D_IO_OP_WRITE: - iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace); + /* + * If dataset is incrementally allocated and hasn't been written to + * yet, the chunk index should be empty. In this case, a collective + * read of chunks is essentially a no-op, so avoid it here. + */ + index_empty = FALSE; + if (io_info->dset->shared->dcpl_cache.fill.alloc_time == H5D_ALLOC_TIME_INCR) + if (H5D__chunk_index_empty(io_info->dset, &index_empty) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "couldn't determine if chunk index is empty") - if (NULL == (tmp_gath_buf = H5MM_malloc(iter_nelmts * type_info->src_type_size))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate temporary gather buffer") + if (!index_empty) { + /* + * Setup for I/O operation + */ - /* Gather modification data from the application write buffer into a temporary buffer */ - if (0 == H5D__gather_mem(io_info->u.wbuf, mem_iter, (size_t)iter_nelmts, tmp_gath_buf)) - HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't gather from write buffer") + /* Initialize temporary I/O info */ + coll_io_info = *io_info; + coll_io_info.op_type = H5D_IO_OP_READ; - if (H5S_SELECT_ITER_RELEASE(mem_iter) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") - mem_iter_init = FALSE; + /* Override the read buffer to point to the address of the first + * chunk data buffer being read into + */ + if (base_read_buf) + coll_io_info.u.rbuf = base_read_buf; - /* Initialize iterator for file selection */ - if (H5S_select_iter_init(mem_iter, chunk_info->fspace, type_info->dst_type_size, 0) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, - "unable to initialize file selection information") - mem_iter_init = TRUE; + /* Read all chunks that need to be read from the file */ + if (H5D__mpio_collective_filtered_chunk_common_io(chunk_list, chunk_list_num_entries, &coll_io_info, + type_info, mpi_size) < 0) + HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "couldn't finish collective filtered chunk read") + } - iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->fspace); + /* + * Now that all owned chunks have been read, update the chunks + * with modification data from the owning rank and other ranks. + */ - /* Scatter the owner's modification data into the chunk data buffer according to - * the file space. - */ - if (H5D__scatter_mem(tmp_gath_buf, mem_iter, (size_t)iter_nelmts, chunk_entry->buf) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't scatter to chunk data buffer") + /* Process all chunks with data from the owning rank first */ + for (i = 0; i < chunk_list_num_entries; i++) { + HDassert(mpi_rank == chunk_list[i].new_owner); - if (H5S_SELECT_ITER_RELEASE(mem_iter) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") - mem_iter_init = FALSE; + chunk_info = chunk_list[i].chunk_info; - if (MPI_SUCCESS != - (mpi_code = MPI_Waitall(chunk_entry->async_info.num_receive_requests, - chunk_entry->async_info.receive_requests_array, MPI_STATUSES_IGNORE))) - HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code) + /* + * If this chunk wasn't being fully overwritten, we read it from + * the file, so we need to unfilter it + */ + if (chunk_list[i].need_read && !chunk_list[i].skip_filter_pline) { + if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, + &(chunk_list[i].index_info.filter_mask), err_detect, filter_cb, + (size_t *)&chunk_list[i].chunk_new.length, &chunk_list[i].chunk_buf_size, + &chunk_list[i].buf) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTFILTER, FAIL, "couldn't unfilter chunk for modifying") + } + + iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace); + + if (H5D_select_io_mem(chunk_list[i].buf, chunk_info->fspace, io_info->u.wbuf, chunk_info->mspace, + type_info->dst_type_size, (size_t)iter_nelmts) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't copy chunk data to write buffer") + } - /* For each asynchronous receive call previously posted, receive the chunk modification - * buffer from another rank and update the chunk data - */ - for (i = 0; i < (size_t)chunk_entry->async_info.num_receive_requests; i++) { - const unsigned char *mod_data_p; + /* Allocate iterator for memory selection */ + if (NULL == (sel_iter = H5FL_MALLOC(H5S_sel_iter_t))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator") - /* Decode the process' chunk file dataspace */ - mod_data_p = chunk_entry->async_info.receive_buffer_array[i]; - if (NULL == (dataspace = H5S_decode(&mod_data_p))) + /* Now process all received chunk message buffers */ + for (i = 0; i < (size_t)chunk_msg_bufs_len; i++) { + H5D_filtered_collective_io_info_t *chunk_entry = NULL; + const unsigned char * msg_ptr = chunk_msg_bufs[i]; + hsize_t chunk_idx; + + if (msg_ptr) { + /* Retrieve the chunk's index value */ + HDmemcpy(&chunk_idx, msg_ptr, sizeof(hsize_t)); + msg_ptr += sizeof(hsize_t); + + /* Find the chunk entry according to its chunk index */ + HASH_FIND(hh, chunk_hash_table, &chunk_idx, sizeof(hsize_t), chunk_entry); + HDassert(chunk_entry); + HDassert(mpi_rank == chunk_entry->new_owner); + + /* + * Only process the chunk if its data buffer is allocated. + * In the case of multi-chunk I/O, we're only working on + * a chunk at a time, so we need to skip over messages + * that aren't for the chunk we're currently working on. + */ + if (!chunk_entry->buf) + continue; + else { + /* Decode the chunk file dataspace from the message */ + if (NULL == (dataspace = H5S_decode(&msg_ptr))) HGOTO_ERROR(H5E_DATASET, H5E_CANTDECODE, FAIL, "unable to decode dataspace") - if (H5S_select_iter_init(mem_iter, dataspace, type_info->dst_type_size, 0) < 0) + if (H5S_select_iter_init(sel_iter, dataspace, type_info->dst_type_size, + H5S_SEL_ITER_SHARE_WITH_DATASPACE) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") - mem_iter_init = TRUE; + sel_iter_init = TRUE; iter_nelmts = H5S_GET_SELECT_NPOINTS(dataspace); /* Update the chunk data with the received modification data */ - if (H5D__scatter_mem(mod_data_p, mem_iter, (size_t)iter_nelmts, chunk_entry->buf) < 0) + if (H5D__scatter_mem(msg_ptr, sel_iter, (size_t)iter_nelmts, chunk_entry->buf) < 0) HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't scatter to write buffer") - if (H5S_SELECT_ITER_RELEASE(mem_iter) < 0) + if (H5S_SELECT_ITER_RELEASE(sel_iter) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") - mem_iter_init = FALSE; + sel_iter_init = FALSE; + if (dataspace) { if (H5S_close(dataspace) < 0) HGOTO_ERROR(H5E_DATASPACE, H5E_CANTFREE, FAIL, "can't close dataspace") dataspace = NULL; } - H5MM_free(chunk_entry->async_info.receive_buffer_array[i]); - } /* end for */ - /* Filter the chunk */ - if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, &filter_mask, err_detect, filter_cb, - (size_t *)&chunk_entry->chunk_states.new_chunk.length, &buf_size, - &chunk_entry->buf) < 0) + H5MM_free(chunk_msg_bufs[i]); + chunk_msg_bufs[i] = NULL; + } + } + } + + /* Finally, filter all the chunks */ + for (i = 0; i < chunk_list_num_entries; i++) { + if (!chunk_list[i].skip_filter_pline) { + if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, + &(chunk_list[i].index_info.filter_mask), err_detect, filter_cb, + (size_t *)&chunk_list[i].chunk_new.length, &chunk_list[i].chunk_buf_size, + &chunk_list[i].buf) < 0) HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, FAIL, "output pipeline failed") + } #if H5_SIZEOF_SIZE_T > 4 - /* Check for the chunk expanding too much to encode in a 32-bit value */ - if (chunk_entry->chunk_states.new_chunk.length > ((size_t)0xffffffff)) - HGOTO_ERROR(H5E_DATASET, H5E_BADRANGE, FAIL, "chunk too large for 32-bit length") + /* Check for the chunk expanding too much to encode in a 32-bit value */ + if (chunk_list[i].chunk_new.length > ((size_t)0xffffffff)) + HGOTO_ERROR(H5E_DATASET, H5E_BADRANGE, FAIL, "chunk too large for 32-bit length") #endif - break; + } - default: - HGOTO_ERROR(H5E_DATASET, H5E_BADVALUE, FAIL, "invalid I/O operation") - } /* end switch */ +done: + if (sel_iter) { + if (sel_iter_init && H5S_SELECT_ITER_RELEASE(sel_iter) < 0) + HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") + sel_iter = H5FL_FREE(H5S_sel_iter_t, sel_iter); + } + if (dataspace && (H5S_close(dataspace) < 0)) + HDONE_ERROR(H5E_DATASPACE, H5E_CANTFREE, FAIL, "can't close dataspace") + if (fill_space && (H5S_close(fill_space) < 0)) + HDONE_ERROR(H5E_DATASET, H5E_CLOSEERROR, FAIL, "can't close fill space") + + /* Release the fill buffer info, if it's been initialized */ + if (fb_info_init && H5D__fill_term(&fb_info) < 0) + HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "Can't release fill buffer info") + + /* On failure, try to free all resources used by entries in the chunk list */ + if (ret_value < 0) { + for (i = 0; i < chunk_list_num_entries; i++) { + if (chunk_list[i].buf) { + H5MM_free(chunk_list[i].buf); + chunk_list[i].buf = NULL; + } + } + } + +#ifdef H5Dmpio_DEBUG + H5D_MPIO_TIME_STOP(mpi_rank); + H5D_MPIO_TRACE_EXIT(mpi_rank); +#endif + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__mpio_collective_filtered_chunk_update() */ + +/*------------------------------------------------------------------------- + * Function: H5D__mpio_collective_filtered_chunk_reallocate + * + * Purpose: When performing a parallel write on a chunked dataset with + * filters applied, all ranks must eventually get together and + * perform a collective reallocation of space in the file for + * all chunks that were modified on all ranks. This routine is + * responsible for coordinating that process. + * + * Return: Non-negative on success/Negative on failure + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t *chunk_list, + size_t chunk_list_num_entries, size_t *num_chunks_assigned_map, + H5D_io_info_t *io_info, H5D_chk_idx_info_t *idx_info, + int mpi_rank, int mpi_size) +{ + H5D_chunk_alloc_info_t *collective_list = NULL; + MPI_Datatype send_type; + MPI_Datatype recv_type; + hbool_t send_type_derived = FALSE; + hbool_t recv_type_derived = FALSE; + hbool_t need_sort = FALSE; + size_t collective_num_entries = 0; + size_t num_local_chunks_processed = 0; + size_t i; + void * gathered_array = NULL; + int * counts_disps_array = NULL; + int * counts_ptr = NULL; + int * displacements_ptr = NULL; + int mpi_code; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + HDassert(chunk_list || 0 == chunk_list_num_entries); + HDassert(io_info); + HDassert(idx_info); + HDassert(idx_info->storage->idx_type != H5D_CHUNK_IDX_NONE); + +#ifdef H5Dmpio_DEBUG + H5D_MPIO_TRACE_ENTER(mpi_rank); + H5D_MPIO_TIME_START(mpi_rank, "Reallocation of chunk file space"); +#endif + + /* + * Make sure it's safe to cast this rank's number + * of chunks to be sent into an int for MPI + */ + H5_CHECK_OVERFLOW(chunk_list_num_entries, size_t, int); + + /* Create derived datatypes for the chunk file space info needed */ + if (H5D__mpio_get_chunk_alloc_info_types(&recv_type, &recv_type_derived, &send_type, &send_type_derived) < + 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, + "can't create derived datatypes for chunk file space info") + + /* + * Gather the new chunk sizes to all ranks for a collective reallocation + * of the chunks in the file. + */ + if (num_chunks_assigned_map) { + /* + * If a mapping between rank value -> number of assigned chunks has + * been provided (usually during linked-chunk I/O), we can use this + * to optimize MPI overhead a bit since MPI ranks won't need to + * first inform each other about how many chunks they're contributing. + */ + if (NULL == (counts_disps_array = H5MM_malloc(2 * (size_t)mpi_size * sizeof(*counts_disps_array)))) { + /* Push an error, but still participate in collective gather operation */ + HDONE_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, + "couldn't allocate receive counts and displacements array") + } + else { + /* Set the receive counts from the assigned chunks map */ + counts_ptr = counts_disps_array; + + for (i = 0; i < (size_t)mpi_size; i++) + H5_CHECKED_ASSIGN(counts_ptr[i], int, num_chunks_assigned_map[i], size_t); + + /* Set the displacements into the receive buffer for the gather operation */ + displacements_ptr = &counts_disps_array[mpi_size]; + + *displacements_ptr = 0; + for (i = 1; i < (size_t)mpi_size; i++) + displacements_ptr[i] = displacements_ptr[i - 1] + counts_ptr[i - 1]; + } + + /* Perform gather operation */ + if (H5_mpio_gatherv_alloc(chunk_list, (int)chunk_list_num_entries, send_type, counts_ptr, + displacements_ptr, recv_type, TRUE, 0, io_info->comm, mpi_rank, mpi_size, + &gathered_array, &collective_num_entries) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "can't gather chunk file space info to/from ranks") + } + else { + /* + * If no mapping between rank value -> number of assigned chunks has + * been provided (usually during multi-chunk I/O), all MPI ranks will + * need to first inform other ranks about how many chunks they're + * contributing before performing the actual gather operation. Use + * the 'simple' MPI_Allgatherv wrapper for this. + */ + if (H5_mpio_gatherv_alloc_simple(chunk_list, (int)chunk_list_num_entries, send_type, recv_type, TRUE, + 0, io_info->comm, mpi_rank, mpi_size, &gathered_array, + &collective_num_entries) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "can't gather chunk file space info to/from ranks") + } + + /* Collectively re-allocate the modified chunks (from each rank) in the file */ + collective_list = (H5D_chunk_alloc_info_t *)gathered_array; + for (i = 0, num_local_chunks_processed = 0; i < collective_num_entries; i++) { + H5D_chunk_alloc_info_t *coll_entry = &collective_list[i]; + hbool_t need_insert; + hbool_t update_local_chunk; + + if (H5D__chunk_file_alloc(idx_info, &coll_entry->chunk_current, &coll_entry->chunk_new, &need_insert, + NULL) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk") + + /* + * If we just re-allocated a chunk that is local to this + * rank, make sure to update the chunk entry in the local + * chunk list + */ + update_local_chunk = + (num_local_chunks_processed < chunk_list_num_entries) && + (coll_entry->chunk_idx == chunk_list[num_local_chunks_processed].index_info.chunk_idx); + + if (update_local_chunk) { + H5D_filtered_collective_io_info_t *local_chunk; + + local_chunk = &chunk_list[num_local_chunks_processed]; + + /* Sanity check that this chunk is actually local */ + HDassert(mpi_rank == local_chunk->orig_owner); + HDassert(mpi_rank == local_chunk->new_owner); + + local_chunk->chunk_new = coll_entry->chunk_new; + local_chunk->index_info.need_insert = need_insert; + + /* + * Since chunk reallocation can move chunks around, check if + * the local chunk list is still in ascending offset of order + * in the file + */ + if (num_local_chunks_processed) { + haddr_t curr_chunk_offset = local_chunk->chunk_new.offset; + haddr_t prev_chunk_offset = chunk_list[num_local_chunks_processed - 1].chunk_new.offset; + + HDassert(H5F_addr_defined(prev_chunk_offset) && H5F_addr_defined(curr_chunk_offset)); + if (curr_chunk_offset < prev_chunk_offset) + need_sort = TRUE; + } + + num_local_chunks_processed++; + } + } + + HDassert(chunk_list_num_entries == num_local_chunks_processed); + + /* + * Ensure this rank's local chunk list is sorted in + * ascending order of offset in the file + */ + if (need_sort) + HDqsort(chunk_list, chunk_list_num_entries, sizeof(H5D_filtered_collective_io_info_t), + H5D__cmp_filtered_collective_io_info_entry); + +done: + H5MM_free(gathered_array); + H5MM_free(counts_disps_array); + + if (send_type_derived) { + if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&send_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + } + if (recv_type_derived) { + if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&recv_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + } + +#ifdef H5Dmpio_DEBUG + H5D_MPIO_TIME_STOP(mpi_rank); + H5D_MPIO_TRACE_EXIT(mpi_rank); +#endif + + FUNC_LEAVE_NOAPI(ret_value) +} /* H5D__mpio_collective_filtered_chunk_reallocate() */ + +/*------------------------------------------------------------------------- + * Function: H5D__mpio_collective_filtered_chunk_reinsert + * + * Purpose: When performing a parallel write on a chunked dataset with + * filters applied, all ranks must eventually get together and + * perform a collective reinsertion into the dataset's chunk + * index of chunks that were modified. This routine is + * responsible for coordinating that process. + * + * Return: Non-negative on success/Negative on failure + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t *chunk_list, + size_t chunk_list_num_entries, size_t *num_chunks_assigned_map, + H5D_io_info_t *io_info, H5D_chk_idx_info_t *idx_info, + int mpi_rank, int mpi_size) +{ + H5D_chunk_ud_t chunk_ud; + MPI_Datatype send_type; + MPI_Datatype recv_type; + hbool_t send_type_derived = FALSE; + hbool_t recv_type_derived = FALSE; + hsize_t scaled_coords[H5O_LAYOUT_NDIMS]; + size_t collective_num_entries = 0; + size_t i; + void * gathered_array = NULL; + int * counts_disps_array = NULL; + int * counts_ptr = NULL; + int * displacements_ptr = NULL; + int mpi_code; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + HDassert(chunk_list || 0 == chunk_list_num_entries); + HDassert(io_info); + HDassert(idx_info); + +#ifdef H5Dmpio_DEBUG + H5D_MPIO_TRACE_ENTER(mpi_rank); + H5D_MPIO_TIME_START(mpi_rank, "Reinsertion of modified chunks into chunk index"); +#endif + + /* Only re-insert chunks if index has an insert method */ + if (!idx_info->storage->ops->insert) + HGOTO_DONE(SUCCEED); + + /* + * Make sure it's safe to cast this rank's number + * of chunks to be sent into an int for MPI + */ + H5_CHECK_OVERFLOW(chunk_list_num_entries, size_t, int); + + /* Create derived datatypes for the chunk re-insertion info needed */ + if (H5D__mpio_get_chunk_insert_info_types(&recv_type, &recv_type_derived, &send_type, + &send_type_derived) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, + "can't create derived datatypes for chunk re-insertion info") + + /* + * Gather information to all ranks for a collective re-insertion + * of the modified chunks into the chunk index + */ + if (num_chunks_assigned_map) { + /* + * If a mapping between rank value -> number of assigned chunks has + * been provided (usually during linked-chunk I/O), we can use this + * to optimize MPI overhead a bit since MPI ranks won't need to + * first inform each other about how many chunks they're contributing. + */ + if (NULL == (counts_disps_array = H5MM_malloc(2 * (size_t)mpi_size * sizeof(*counts_disps_array)))) { + /* Push an error, but still participate in collective gather operation */ + HDONE_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, + "couldn't allocate receive counts and displacements array") + } + else { + /* Set the receive counts from the assigned chunks map */ + counts_ptr = counts_disps_array; + + for (i = 0; i < (size_t)mpi_size; i++) + H5_CHECKED_ASSIGN(counts_ptr[i], int, num_chunks_assigned_map[i], size_t); + + /* Set the displacements into the receive buffer for the gather operation */ + displacements_ptr = &counts_disps_array[mpi_size]; + + *displacements_ptr = 0; + for (i = 1; i < (size_t)mpi_size; i++) + displacements_ptr[i] = displacements_ptr[i - 1] + counts_ptr[i - 1]; + } + + /* Perform gather operation */ + if (H5_mpio_gatherv_alloc(chunk_list, (int)chunk_list_num_entries, send_type, counts_ptr, + displacements_ptr, recv_type, TRUE, 0, io_info->comm, mpi_rank, mpi_size, + &gathered_array, &collective_num_entries) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, + "can't gather chunk index re-insertion info to/from ranks") + } + else { + /* + * If no mapping between rank value -> number of assigned chunks has + * been provided (usually during multi-chunk I/O), all MPI ranks will + * need to first inform other ranks about how many chunks they're + * contributing before performing the actual gather operation. Use + * the 'simple' MPI_Allgatherv wrapper for this. + */ + if (H5_mpio_gatherv_alloc_simple(chunk_list, (int)chunk_list_num_entries, send_type, recv_type, TRUE, + 0, io_info->comm, mpi_rank, mpi_size, &gathered_array, + &collective_num_entries) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, + "can't gather chunk index re-insertion info to/from ranks") + } + + /* Initialize static chunk udata fields from chunk index info */ + H5D_MPIO_INIT_CHUNK_UD_INFO(chunk_ud, idx_info); + + for (i = 0; i < collective_num_entries; i++) { + H5D_chunk_insert_info_t *coll_entry = &((H5D_chunk_insert_info_t *)gathered_array)[i]; + + /* + * We only need to reinsert this chunk if we had to actually + * allocate or reallocate space in the file for it + */ + if (!coll_entry->index_info.need_insert) + continue; + + chunk_ud.chunk_block = coll_entry->chunk_block; + chunk_ud.chunk_idx = coll_entry->index_info.chunk_idx; + chunk_ud.filter_mask = coll_entry->index_info.filter_mask; + chunk_ud.common.scaled = scaled_coords; + + /* Calculate scaled coordinates for the chunk */ + if (idx_info->layout->idx_type == H5D_CHUNK_IDX_EARRAY && idx_info->layout->u.earray.unlim_dim > 0) { + /* + * Extensible arrays where the unlimited dimension is not + * the slowest-changing dimension "swizzle" the coordinates + * to move the unlimited dimension value to offset 0. Therefore, + * we use the "swizzled" down chunks to calculate the "swizzled" + * scaled coordinates and then we undo the "swizzle" operation. + * + * TODO: In the future, this is something that should be handled + * by the particular chunk index rather than manually + * here. Likely, the chunk index ops should get a new + * callback that accepts a chunk index and provides the + * caller with the scaled coordinates for that chunk. + */ + H5VM_array_calc_pre(chunk_ud.chunk_idx, io_info->dset->shared->ndims, + idx_info->layout->u.earray.swizzled_down_chunks, scaled_coords); + + H5VM_unswizzle_coords(hsize_t, scaled_coords, idx_info->layout->u.earray.unlim_dim); + } + else { + H5VM_array_calc_pre(chunk_ud.chunk_idx, io_info->dset->shared->ndims, + io_info->dset->shared->layout.u.chunk.down_chunks, scaled_coords); + } + + scaled_coords[io_info->dset->shared->ndims] = 0; + +#ifndef NDEBUG + /* + * If a matching local chunk entry is found, the + * `chunk_info` structure (which contains the chunk's + * pre-computed scaled coordinates) will be valid + * for this rank. Compare those coordinates against + * the calculated coordinates above to make sure + * they match. + */ + for (size_t dbg_idx = 0; dbg_idx < chunk_list_num_entries; dbg_idx++) { + if (coll_entry->index_info.chunk_idx == chunk_list[dbg_idx].index_info.chunk_idx) { + hbool_t coords_match = !HDmemcmp(scaled_coords, chunk_list[dbg_idx].chunk_info->scaled, + io_info->dset->shared->ndims * sizeof(hsize_t)); + + HDassert(coords_match && "Calculated scaled coordinates for chunk didn't match " + "chunk's actual scaled coordinates!"); + break; + } + } +#endif + + if ((idx_info->storage->ops->insert)(idx_info, &chunk_ud, io_info->dset) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINSERT, FAIL, "unable to insert chunk address into index") + } + +done: + H5MM_free(gathered_array); + H5MM_free(counts_disps_array); + + if (send_type_derived) { + if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&send_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + } + if (recv_type_derived) { + if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&recv_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + } + +#ifdef H5Dmpio_DEBUG + H5D_MPIO_TIME_STOP(mpi_rank); + H5D_MPIO_TRACE_EXIT(mpi_rank); +#endif + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__mpio_collective_filtered_chunk_reinsert() */ + +/*------------------------------------------------------------------------- + * Function: H5D__mpio_get_chunk_redistribute_info_types + * + * Purpose: Constructs MPI derived datatypes for communicating the + * info from a H5D_filtered_collective_io_info_t structure + * that is necessary for redistributing shared chunks during a + * collective write of filtered chunks. + * + * The datatype returned through `contig_type` has an extent + * equal to the size of an H5D_chunk_redistribute_info_t + * structure and is suitable for communicating that structure + * type. + * + * The datatype returned through `resized_type` has an extent + * equal to the size of an H5D_filtered_collective_io_info_t + * structure. This makes it suitable for sending an array of + * those structures, while extracting out just the info + * necessary for the chunk redistribution operation during + * communication. + * + * Return: Non-negative on success/Negative on failure + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__mpio_get_chunk_redistribute_info_types(MPI_Datatype *contig_type, hbool_t *contig_type_derived, + MPI_Datatype *resized_type, hbool_t *resized_type_derived) +{ + MPI_Datatype struct_type = MPI_DATATYPE_NULL; + hbool_t struct_type_derived = FALSE; + MPI_Datatype chunk_block_type = MPI_DATATYPE_NULL; + hbool_t chunk_block_type_derived = FALSE; + MPI_Datatype types[5]; + MPI_Aint displacements[5]; + int block_lengths[5]; + int field_count; + int mpi_code; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + HDassert(contig_type); + HDassert(contig_type_derived); + HDassert(resized_type); + HDassert(resized_type_derived); + + *contig_type_derived = FALSE; + *resized_type_derived = FALSE; + + /* Create struct type for the inner H5F_block_t structure */ + if (H5F_mpi_get_file_block_type(FALSE, &chunk_block_type, &chunk_block_type_derived) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't create derived type for chunk file description") + + field_count = 5; + HDassert(field_count == (sizeof(types) / sizeof(MPI_Datatype))); + + /* + * Create structure type to pack chunk H5F_block_t structure + * next to chunk_idx, orig_owner, new_owner and num_writers + * fields + */ + block_lengths[0] = 1; + block_lengths[1] = 1; + block_lengths[2] = 1; + block_lengths[3] = 1; + block_lengths[4] = 1; + displacements[0] = offsetof(H5D_chunk_redistribute_info_t, chunk_block); + displacements[1] = offsetof(H5D_chunk_redistribute_info_t, chunk_idx); + displacements[2] = offsetof(H5D_chunk_redistribute_info_t, orig_owner); + displacements[3] = offsetof(H5D_chunk_redistribute_info_t, new_owner); + displacements[4] = offsetof(H5D_chunk_redistribute_info_t, num_writers); + types[0] = chunk_block_type; + types[1] = HSIZE_AS_MPI_TYPE; + types[2] = MPI_INT; + types[3] = MPI_INT; + types[4] = MPI_INT; + if (MPI_SUCCESS != + (mpi_code = MPI_Type_create_struct(field_count, block_lengths, displacements, types, contig_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_struct failed", mpi_code) + *contig_type_derived = TRUE; + + if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(contig_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) + + /* Create struct type to extract the chunk_current, chunk_idx, orig_owner, + * new_owner and num_writers fields from a H5D_filtered_collective_io_info_t + * structure + */ + block_lengths[0] = 1; + block_lengths[1] = 1; + block_lengths[2] = 1; + block_lengths[3] = 1; + block_lengths[4] = 1; + displacements[0] = offsetof(H5D_filtered_collective_io_info_t, chunk_current); + displacements[1] = offsetof(H5D_filtered_collective_io_info_t, index_info.chunk_idx); + displacements[2] = offsetof(H5D_filtered_collective_io_info_t, orig_owner); + displacements[3] = offsetof(H5D_filtered_collective_io_info_t, new_owner); + displacements[4] = offsetof(H5D_filtered_collective_io_info_t, num_writers); + types[0] = chunk_block_type; + types[1] = HSIZE_AS_MPI_TYPE; + types[2] = MPI_INT; + types[3] = MPI_INT; + types[4] = MPI_INT; + if (MPI_SUCCESS != + (mpi_code = MPI_Type_create_struct(field_count, block_lengths, displacements, types, &struct_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_struct failed", mpi_code) + struct_type_derived = TRUE; + + if (MPI_SUCCESS != (mpi_code = MPI_Type_create_resized( + struct_type, 0, sizeof(H5D_filtered_collective_io_info_t), resized_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_resized failed", mpi_code) + *resized_type_derived = TRUE; + + if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(resized_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) + +done: + if (struct_type_derived) { + if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&struct_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + } + if (chunk_block_type_derived) { + if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&chunk_block_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + } + + if (ret_value < 0) { + if (*resized_type_derived) { + if (MPI_SUCCESS != (mpi_code = MPI_Type_free(resized_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + *resized_type_derived = FALSE; + } + if (*contig_type_derived) { + if (MPI_SUCCESS != (mpi_code = MPI_Type_free(contig_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + *contig_type_derived = FALSE; + } + } + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__mpio_get_chunk_redistribute_info_types() */ + +/*------------------------------------------------------------------------- + * Function: H5D__mpio_get_chunk_alloc_info_types + * + * Purpose: Constructs MPI derived datatypes for communicating the info + * from a H5D_filtered_collective_io_info_t structure that is + * necessary for re-allocating file space during a collective + * write of filtered chunks. + * + * The datatype returned through `contig_type` has an extent + * equal to the size of an H5D_chunk_alloc_info_t structure + * and is suitable for communicating that structure type. + * + * The datatype returned through `resized_type` has an extent + * equal to the size of an H5D_filtered_collective_io_info_t + * structure. This makes it suitable for sending an array of + * those structures, while extracting out just the info + * necessary for the chunk file space reallocation operation + * during communication. + * + * Return: Non-negative on success/Negative on failure + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__mpio_get_chunk_alloc_info_types(MPI_Datatype *contig_type, hbool_t *contig_type_derived, + MPI_Datatype *resized_type, hbool_t *resized_type_derived) +{ + MPI_Datatype struct_type = MPI_DATATYPE_NULL; + hbool_t struct_type_derived = FALSE; + MPI_Datatype chunk_block_type = MPI_DATATYPE_NULL; + hbool_t chunk_block_type_derived = FALSE; + MPI_Datatype types[3]; + MPI_Aint displacements[3]; + int block_lengths[3]; + int field_count; + int mpi_code; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + HDassert(contig_type); + HDassert(contig_type_derived); + HDassert(resized_type); + HDassert(resized_type_derived); + + *contig_type_derived = FALSE; + *resized_type_derived = FALSE; + + /* Create struct type for the inner H5F_block_t structure */ + if (H5F_mpi_get_file_block_type(FALSE, &chunk_block_type, &chunk_block_type_derived) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't create derived type for chunk file description") + + field_count = 3; + HDassert(field_count == (sizeof(types) / sizeof(MPI_Datatype))); + + /* + * Create structure type to pack both chunk H5F_block_t structures + * next to chunk_idx field + */ + block_lengths[0] = 1; + block_lengths[1] = 1; + block_lengths[2] = 1; + displacements[0] = offsetof(H5D_chunk_alloc_info_t, chunk_current); + displacements[1] = offsetof(H5D_chunk_alloc_info_t, chunk_new); + displacements[2] = offsetof(H5D_chunk_alloc_info_t, chunk_idx); + types[0] = chunk_block_type; + types[1] = chunk_block_type; + types[2] = HSIZE_AS_MPI_TYPE; + if (MPI_SUCCESS != + (mpi_code = MPI_Type_create_struct(field_count, block_lengths, displacements, types, contig_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_struct failed", mpi_code) + *contig_type_derived = TRUE; + + if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(contig_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) + + /* + * Create struct type to extract the chunk_current, chunk_new and chunk_idx + * fields from a H5D_filtered_collective_io_info_t structure + */ + block_lengths[0] = 1; + block_lengths[1] = 1; + block_lengths[2] = 1; + displacements[0] = offsetof(H5D_filtered_collective_io_info_t, chunk_current); + displacements[1] = offsetof(H5D_filtered_collective_io_info_t, chunk_new); + displacements[2] = offsetof(H5D_filtered_collective_io_info_t, index_info.chunk_idx); + types[0] = chunk_block_type; + types[1] = chunk_block_type; + types[2] = HSIZE_AS_MPI_TYPE; + if (MPI_SUCCESS != + (mpi_code = MPI_Type_create_struct(field_count, block_lengths, displacements, types, &struct_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_struct failed", mpi_code) + struct_type_derived = TRUE; + + if (MPI_SUCCESS != (mpi_code = MPI_Type_create_resized( + struct_type, 0, sizeof(H5D_filtered_collective_io_info_t), resized_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_resized failed", mpi_code) + *resized_type_derived = TRUE; + + if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(resized_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) + +done: + if (struct_type_derived) { + if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&struct_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + } + if (chunk_block_type_derived) { + if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&chunk_block_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + } + + if (ret_value < 0) { + if (*resized_type_derived) { + if (MPI_SUCCESS != (mpi_code = MPI_Type_free(resized_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + *resized_type_derived = FALSE; + } + if (*contig_type_derived) { + if (MPI_SUCCESS != (mpi_code = MPI_Type_free(contig_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + *contig_type_derived = FALSE; + } + } + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__mpio_get_chunk_alloc_info_types() */ + +/*------------------------------------------------------------------------- + * Function: H5D__mpio_get_chunk_insert_info_types + * + * Purpose: Constructs MPI derived datatypes for communicating the + * information necessary when reinserting chunks into a + * dataset's chunk index. This includes the chunk's new offset + * and size (H5F_block_t) and the inner `index_info` structure + * of a H5D_filtered_collective_io_info_t structure. + * + * The datatype returned through `contig_type` has an extent + * equal to the size of an H5D_chunk_insert_info_t structure + * and is suitable for communicating that structure type. + * + * The datatype returned through `resized_type` has an extent + * equal to the size of the encompassing + * H5D_filtered_collective_io_info_t structure. This makes it + * suitable for sending an array of + * H5D_filtered_collective_io_info_t structures, while + * extracting out just the information needed during + * communication. + * + * Return: Non-negative on success/Negative on failure + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__mpio_get_chunk_insert_info_types(MPI_Datatype *contig_type, hbool_t *contig_type_derived, + MPI_Datatype *resized_type, hbool_t *resized_type_derived) +{ + MPI_Datatype struct_type = MPI_DATATYPE_NULL; + hbool_t struct_type_derived = FALSE; + MPI_Datatype chunk_block_type = MPI_DATATYPE_NULL; + hbool_t chunk_block_type_derived = FALSE; + MPI_Aint contig_type_extent; + MPI_Datatype types[4]; + MPI_Aint displacements[4]; + int block_lengths[4]; + int field_count; + int mpi_code; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + HDassert(contig_type); + HDassert(contig_type_derived); + HDassert(resized_type); + HDassert(resized_type_derived); + + *contig_type_derived = FALSE; + *resized_type_derived = FALSE; + + /* Create struct type for an H5F_block_t structure */ + if (H5F_mpi_get_file_block_type(FALSE, &chunk_block_type, &chunk_block_type_derived) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't create derived type for chunk file description") + + field_count = 4; + HDassert(field_count == (sizeof(types) / sizeof(MPI_Datatype))); + + /* + * Create struct type to pack information into memory as follows: + * + * Chunk's new Offset/Size (H5F_block_t) -> + * Chunk Index Info (H5D_chunk_index_info_t) + */ + block_lengths[0] = 1; + block_lengths[1] = 1; + block_lengths[2] = 1; + block_lengths[3] = 1; + displacements[0] = offsetof(H5D_chunk_insert_info_t, chunk_block); + displacements[1] = offsetof(H5D_chunk_insert_info_t, index_info.chunk_idx); + displacements[2] = offsetof(H5D_chunk_insert_info_t, index_info.filter_mask); + displacements[3] = offsetof(H5D_chunk_insert_info_t, index_info.need_insert); + types[0] = chunk_block_type; + types[1] = HSIZE_AS_MPI_TYPE; + types[2] = MPI_UNSIGNED; + types[3] = MPI_C_BOOL; + if (MPI_SUCCESS != + (mpi_code = MPI_Type_create_struct(field_count, block_lengths, displacements, types, &struct_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_struct failed", mpi_code) + struct_type_derived = TRUE; + + contig_type_extent = (MPI_Aint)(sizeof(H5F_block_t) + sizeof(H5D_chunk_index_info_t)); + + if (MPI_SUCCESS != (mpi_code = MPI_Type_create_resized(struct_type, 0, contig_type_extent, contig_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_resized failed", mpi_code) + *contig_type_derived = TRUE; + + if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(contig_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) + + struct_type_derived = FALSE; + if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&struct_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + + /* + * Create struct type to correctly extract all needed + * information from a H5D_filtered_collective_io_info_t + * structure. + */ + displacements[0] = offsetof(H5D_filtered_collective_io_info_t, chunk_new); + displacements[1] = offsetof(H5D_filtered_collective_io_info_t, index_info.chunk_idx); + displacements[2] = offsetof(H5D_filtered_collective_io_info_t, index_info.filter_mask); + displacements[3] = offsetof(H5D_filtered_collective_io_info_t, index_info.need_insert); + if (MPI_SUCCESS != + (mpi_code = MPI_Type_create_struct(field_count, block_lengths, displacements, types, &struct_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_struct failed", mpi_code) + struct_type_derived = TRUE; + + if (MPI_SUCCESS != (mpi_code = MPI_Type_create_resized( + struct_type, 0, sizeof(H5D_filtered_collective_io_info_t), resized_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_resized failed", mpi_code) + *resized_type_derived = TRUE; + + if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(resized_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) done: - if (chunk_entry->async_info.receive_buffer_array) - H5MM_free(chunk_entry->async_info.receive_buffer_array); - if (chunk_entry->async_info.receive_requests_array) - H5MM_free(chunk_entry->async_info.receive_requests_array); - if (tmp_gath_buf) - H5MM_free(tmp_gath_buf); - if (file_iter_init && H5S_SELECT_ITER_RELEASE(file_iter) < 0) - HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") - if (file_iter) - H5MM_free(file_iter); - if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) - HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") - if (mem_iter) - H5MM_free(mem_iter); - if (dataspace) - if (H5S_close(dataspace) < 0) - HDONE_ERROR(H5E_DATASPACE, H5E_CANTFREE, FAIL, "can't close dataspace") + if (struct_type_derived) { + if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&struct_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + } + if (chunk_block_type_derived) { + if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&chunk_block_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + } + + if (ret_value < 0) { + if (*resized_type_derived) { + if (MPI_SUCCESS != (mpi_code = MPI_Type_free(resized_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + *resized_type_derived = FALSE; + } + if (*contig_type_derived) { + if (MPI_SUCCESS != (mpi_code = MPI_Type_free(contig_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + *contig_type_derived = FALSE; + } + } FUNC_LEAVE_NOAPI(ret_value) -} /* end H5D__filtered_collective_chunk_entry_io() */ +} /* end H5D__mpio_get_chunk_insert_info_types() */ + +/*------------------------------------------------------------------------- + * Function: H5D__mpio_collective_filtered_io_type + * + * Purpose: Constructs a MPI derived datatype for both the memory and + * the file for a collective I/O operation on filtered chunks. + * The datatype contains the chunk offsets and lengths in the + * file and the locations of the chunk data buffers to read + * into/write from. + * + * Return: Non-negative on success/Negative on failure + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__mpio_collective_filtered_io_type(H5D_filtered_collective_io_info_t *chunk_list, size_t num_entries, + H5D_io_op_type_t op_type, MPI_Datatype *new_mem_type, + hbool_t *mem_type_derived, MPI_Datatype *new_file_type, + hbool_t *file_type_derived) +{ + MPI_Aint *io_buf_array = NULL; /* Relative displacements of filtered chunk data buffers */ + MPI_Aint *file_offset_array = NULL; /* Chunk offsets in the file */ + int * length_array = NULL; /* Filtered Chunk lengths */ + int mpi_code; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + HDassert(chunk_list || 0 == num_entries); + HDassert(new_mem_type); + HDassert(mem_type_derived); + HDassert(new_file_type); + HDassert(file_type_derived); + + *mem_type_derived = FALSE; + *file_type_derived = FALSE; + *new_mem_type = MPI_BYTE; + *new_file_type = MPI_BYTE; + + if (num_entries > 0) { + H5F_block_t *chunk_block; + size_t last_valid_idx = 0; + size_t i; + int chunk_count; + + /* + * Determine number of chunks for I/O operation and + * setup for derived datatype creation if I/O operation + * includes multiple chunks + */ + if (num_entries == 1) { + /* Set last valid index to 0 for contiguous datatype creation */ + last_valid_idx = 0; + + if (op_type == H5D_IO_OP_WRITE) + chunk_count = 1; + else + chunk_count = chunk_list[0].need_read ? 1 : 0; + } + else { + MPI_Aint chunk_buf; + MPI_Aint base_buf; + haddr_t base_offset = HADDR_UNDEF; + + H5_CHECK_OVERFLOW(num_entries, size_t, int); + + /* Allocate arrays */ + if (NULL == (length_array = H5MM_malloc((size_t)num_entries * sizeof(int)))) + HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, + "memory allocation failed for filtered collective I/O length array") + if (NULL == (io_buf_array = H5MM_malloc((size_t)num_entries * sizeof(MPI_Aint)))) + HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, + "memory allocation failed for filtered collective I/O buf length array") + if (NULL == (file_offset_array = H5MM_malloc((size_t)num_entries * sizeof(MPI_Aint)))) + HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, + "memory allocation failed for filtered collective I/O offset array") + + /* + * If doing a write, we can set the base chunk offset + * and base chunk data buffer right away. + * + * If doing a read, some chunks may be skipped over + * for reading if they aren't yet allocated in the + * file. Therefore, we have to find the first chunk + * actually being read in order to set the base chunk + * offset and base chunk data buffer. + */ + if (op_type == H5D_IO_OP_WRITE) { +#if H5_CHECK_MPI_VERSION(3, 0) + if (MPI_SUCCESS != (mpi_code = MPI_Get_address(chunk_list[0].buf, &base_buf))) + HMPI_GOTO_ERROR(FAIL, "MPI_Get_address failed", mpi_code) +#else + base_buf = (MPI_Aint)chunk_list[0].buf; +#endif + + base_offset = chunk_list[0].chunk_new.offset; + } + + for (i = 0, chunk_count = 0; i < num_entries; i++) { + if (op_type == H5D_IO_OP_READ) { + /* + * If this chunk isn't being read, don't add it + * to the MPI type we're building up for I/O + */ + if (!chunk_list[i].need_read) + continue; + + /* + * If this chunk is being read, go ahead and + * set the base chunk offset and base chunk + * data buffer if we haven't already + */ + if (!H5F_addr_defined(base_offset)) { +#if H5_CHECK_MPI_VERSION(3, 0) + if (MPI_SUCCESS != (mpi_code = MPI_Get_address(chunk_list[i].buf, &base_buf))) + HMPI_GOTO_ERROR(FAIL, "MPI_Get_address failed", mpi_code) +#else + base_buf = (MPI_Aint)chunk_list[i].buf; +#endif + + base_offset = chunk_list[i].chunk_current.offset; + } + } + + /* Set convenience pointer for current chunk block */ + chunk_block = + (op_type == H5D_IO_OP_READ) ? &chunk_list[i].chunk_current : &chunk_list[i].chunk_new; + + /* + * Set the current chunk entry's offset in the file, relative to + * the first chunk entry + */ + HDassert(H5F_addr_defined(chunk_block->offset)); + file_offset_array[chunk_count] = (MPI_Aint)(chunk_block->offset - base_offset); + + /* + * Ensure the chunk list is sorted in ascending ordering of + * offset in the file + */ + if (chunk_count) + HDassert(file_offset_array[chunk_count] > file_offset_array[chunk_count - 1]); + + /* Set the current chunk entry's size for the I/O operation */ + H5_CHECK_OVERFLOW(chunk_block->length, hsize_t, int); + length_array[chunk_count] = (int)chunk_block->length; + + /* + * Set the displacement of the chunk entry's chunk data buffer, + * relative to the first entry's data buffer + */ +#if H5_CHECK_MPI_VERSION(3, 1) + if (MPI_SUCCESS != (mpi_code = MPI_Get_address(chunk_list[i].buf, &chunk_buf))) + HMPI_GOTO_ERROR(FAIL, "MPI_Get_address failed", mpi_code) + + io_buf_array[chunk_count] = MPI_Aint_diff(chunk_buf, base_buf); +#else + chunk_buf = (MPI_Aint)chunk_list[i].buf; + io_buf_array[chunk_count] = chunk_buf - base_buf; +#endif + + /* + * Set last valid index in case only a single chunk will + * be involved in the I/O operation + */ + last_valid_idx = i; + + chunk_count++; + } /* end for */ + } + + /* + * Create derived datatypes for the chunk list if this + * rank has any chunks to work on + */ + if (chunk_count > 0) { + if (chunk_count == 1) { + int chunk_len; + + /* Single chunk - use a contiguous type for both memory and file */ + + /* Ensure that we can cast chunk size to an int for MPI */ + chunk_block = (op_type == H5D_IO_OP_READ) ? &chunk_list[last_valid_idx].chunk_current + : &chunk_list[last_valid_idx].chunk_new; + H5_CHECKED_ASSIGN(chunk_len, int, chunk_block->length, hsize_t); + + if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(chunk_len, MPI_BYTE, new_file_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_contiguous failed", mpi_code) + *new_mem_type = *new_file_type; + + /* + * Since we use the same datatype for both memory and file, only + * mark the file type as derived so the caller doesn't try to + * free the same type twice + */ + *mem_type_derived = FALSE; + *file_type_derived = TRUE; + + if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_file_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) + } + else { + HDassert(file_offset_array); + HDassert(length_array); + HDassert(io_buf_array); + + /* Multiple chunks - use an hindexed type for both memory and file */ + + /* Create memory MPI type */ + if (MPI_SUCCESS != (mpi_code = MPI_Type_create_hindexed( + chunk_count, length_array, io_buf_array, MPI_BYTE, new_mem_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code) + *mem_type_derived = TRUE; + + if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_mem_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) + + /* Create file MPI type */ + if (MPI_SUCCESS != + (mpi_code = MPI_Type_create_hindexed(chunk_count, length_array, file_offset_array, + MPI_BYTE, new_file_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code) + *file_type_derived = TRUE; + + if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_file_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) + } + } + } /* end if */ + +done: + if (file_offset_array) + H5MM_free(file_offset_array); + if (io_buf_array) + H5MM_free(io_buf_array); + if (length_array) + H5MM_free(length_array); + + if (ret_value < 0) { + if (*file_type_derived) { + if (MPI_SUCCESS != (mpi_code = MPI_Type_free(new_file_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + *file_type_derived = FALSE; + } + if (*mem_type_derived) { + if (MPI_SUCCESS != (mpi_code = MPI_Type_free(new_mem_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + *mem_type_derived = FALSE; + } + } + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__mpio_collective_filtered_io_type() */ + +#ifdef H5Dmpio_DEBUG + +static herr_t +H5D__mpio_dump_collective_filtered_chunk_list(H5D_filtered_collective_io_info_t *chunk_list, + size_t chunk_list_num_entries, int mpi_rank) +{ + H5D_filtered_collective_io_info_t *chunk_entry; + size_t i; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC_NOERR + + H5D_MPIO_DEBUG(mpi_rank, "CHUNK LIST: ["); + for (i = 0; i < chunk_list_num_entries; i++) { + unsigned chunk_rank; + + chunk_entry = &chunk_list[i]; + + HDassert(chunk_entry->chunk_info); + chunk_rank = (unsigned)H5S_GET_EXTENT_NDIMS(chunk_entry->chunk_info->fspace); + + H5D_MPIO_DEBUG(mpi_rank, " {"); + H5D_MPIO_DEBUG_VA(mpi_rank, " - Entry %zu -", i); + + H5D_MPIO_DEBUG(mpi_rank, " - Chunk Fspace Info -"); + H5D_MPIO_DEBUG_VA(mpi_rank, + " Chunk Current Info: { Offset: %" PRIuHADDR ", Length: %" PRIuHADDR " }", + chunk_entry->chunk_current.offset, chunk_entry->chunk_current.length); + H5D_MPIO_DEBUG_VA(mpi_rank, " Chunk New Info: { Offset: %" PRIuHADDR ", Length: %" PRIuHADDR " }", + chunk_entry->chunk_new.offset, chunk_entry->chunk_new.length); + + H5D_MPIO_DEBUG(mpi_rank, " - Chunk Insert Info -"); + H5D_MPIO_DEBUG_VA(mpi_rank, + " Chunk Scaled Coords (4-d): { %" PRIuHSIZE ", %" PRIuHSIZE ", %" PRIuHSIZE + ", %" PRIuHSIZE " }", + chunk_rank < 1 ? 0 : chunk_entry->chunk_info->scaled[0], + chunk_rank < 2 ? 0 : chunk_entry->chunk_info->scaled[1], + chunk_rank < 3 ? 0 : chunk_entry->chunk_info->scaled[2], + chunk_rank < 4 ? 0 : chunk_entry->chunk_info->scaled[3]); + H5D_MPIO_DEBUG_VA(mpi_rank, " Chunk Index: %" PRIuHSIZE, chunk_entry->index_info.chunk_idx); + H5D_MPIO_DEBUG_VA(mpi_rank, " Filter Mask: %u", chunk_entry->index_info.filter_mask); + H5D_MPIO_DEBUG_VA(mpi_rank, " Need Insert: %s", + chunk_entry->index_info.need_insert ? "YES" : "NO"); + + H5D_MPIO_DEBUG(mpi_rank, " - Other Info -"); + H5D_MPIO_DEBUG_VA(mpi_rank, " Chunk Info Ptr: %p", (void *)chunk_entry->chunk_info); + H5D_MPIO_DEBUG_VA(mpi_rank, " Need Read: %s", chunk_entry->need_read ? "YES" : "NO"); + H5D_MPIO_DEBUG_VA(mpi_rank, " Chunk I/O Size: %zu", chunk_entry->io_size); + H5D_MPIO_DEBUG_VA(mpi_rank, " Chunk Buffer Size: %zu", chunk_entry->chunk_buf_size); + H5D_MPIO_DEBUG_VA(mpi_rank, " Original Owner: %d", chunk_entry->orig_owner); + H5D_MPIO_DEBUG_VA(mpi_rank, " New Owner: %d", chunk_entry->new_owner); + H5D_MPIO_DEBUG_VA(mpi_rank, " # of Writers: %d", chunk_entry->num_writers); + H5D_MPIO_DEBUG_VA(mpi_rank, " Chunk Data Buffer Ptr: %p", (void *)chunk_entry->buf); + + H5D_MPIO_DEBUG(mpi_rank, " }"); + } + H5D_MPIO_DEBUG(mpi_rank, "]"); + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__mpio_dump_collective_filtered_chunk_list() */ + +#endif + #endif /* H5_HAVE_PARALLEL */ |