From 3fc5e34d71c9300ec10e87f2534bccec8df85756 Mon Sep 17 00:00:00 2001 From: jhendersonHDF Date: Wed, 2 Aug 2023 15:13:34 -0500 Subject: Switch parallel compression to use vector I/O (#3245) (#3327) Updates parallel compression feature to use vector I/O instead of creating and passing down MPI derived types to VFD --- src/H5Dmpio.c | 1340 ++++++++++++++++++++++----------------------------------- src/H5Fio.c | 20 +- 2 files changed, 544 insertions(+), 816 deletions(-) diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c index 812e748..be76c42 100644 --- a/src/H5Dmpio.c +++ b/src/H5Dmpio.c @@ -213,7 +213,7 @@ typedef struct H5D_chunk_index_info_t { * hh - A handle for hash tables provided by the uthash.h header * */ -typedef struct H5D_filtered_collective_io_info_t { +typedef struct H5D_filtered_collective_chunk_info_t { H5D_chunk_index_info_t index_info; H5D_piece_info_t *chunk_info; @@ -229,6 +229,46 @@ typedef struct H5D_filtered_collective_io_info_t { void *buf; UT_hash_handle hh; +} H5D_filtered_collective_chunk_info_t; + +/* + * Top-level structure that contains an array of H5D_filtered_collective_chunk_info_t + * chunk info structures for collective filtered I/O, as well as other useful information. + * The struct's fields are as follows: + * + * chunk_infos - An array of H5D_filtered_collective_chunk_info_t structures that each + * contain information about a single chunk when performing collective filtered + * I/O. + * + * chunk_hash_table - A hash table storing H5D_filtered_collective_chunk_info_t structures + * that is populated when chunk modification data has to be shared between + * MPI processes during collective filtered I/O. This hash table facilitates + * quicker and easier lookup of a particular chunk by its "chunk index" + * value when applying chunk data modification messages from another MPI + * process. Each modification message received from another MPI process + * will contain the chunk's "chunk index" value that can be used for chunk + * lookup operations. + * + * num_chunks_infos - The number of entries in the `chunk_infos` array. + * + * num_chunks_to_read - The number of entries (or chunks) in the `chunk_infos` array that + * will need to be read in the case of a read operation (which can + * occur during dataset reads or during dataset writes when a chunk + * has to go through a read - modify - write cycle). The value for + * this field is based on a chunk's `need_read` field in its particular + * H5D_filtered_collective_chunk_info_t structure, but may be adjusted + * later depending on file space allocation timing and other factors. + * + * This field can be helpful to avoid needing to scan through the list + * of chunk info structures to determine how big of I/O vectors to + * allocate during read operations, as an example. + * + */ +typedef struct H5D_filtered_collective_io_info_t { + H5D_filtered_collective_chunk_info_t *chunk_infos; + H5D_filtered_collective_chunk_info_t *chunk_hash_table; + size_t num_chunk_infos; + size_t num_chunks_to_read; } H5D_filtered_collective_io_info_t; /* @@ -273,12 +313,10 @@ static herr_t H5D__obtain_mpio_mode(H5D_io_info_t *io_info, H5D_dset_io_info_t * static herr_t H5D__mpio_get_sum_chunk(const H5D_io_info_t *io_info, int *sum_chunkf); static herr_t H5D__mpio_get_sum_chunk_dset(const H5D_io_info_t *io_info, const H5D_dset_io_info_t *dset_info, int *sum_chunkf); -static herr_t H5D__mpio_collective_filtered_chunk_io_setup(const H5D_io_info_t *io_info, - const H5D_dset_io_info_t *di, - H5D_filtered_collective_io_info_t **chunk_list, - size_t *num_entries, int mpi_rank); +static herr_t H5D__mpio_collective_filtered_chunk_io_setup(const H5D_io_info_t *io_info, + const H5D_dset_io_info_t *di, int mpi_rank, + H5D_filtered_collective_io_info_t *chunk_list); static herr_t H5D__mpio_redistribute_shared_chunks(H5D_filtered_collective_io_info_t *chunk_list, - size_t chunk_list_num_entries, const H5D_io_info_t *io_info, int mpi_rank, int mpi_size, size_t **rank_chunks_assigned_map); static herr_t H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chunk_list, @@ -287,35 +325,24 @@ static herr_t H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_i const H5D_io_info_t *io_info, int mpi_rank, int mpi_size); static herr_t H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk_list, - size_t *chunk_list_num_entries, H5D_io_info_t *io_info, - H5D_dset_io_info_t *dset_info, int mpi_rank, - int mpi_size, - H5D_filtered_collective_io_info_t **chunk_hash_table, - unsigned char ***chunk_msg_bufs, - int *chunk_msg_bufs_len); -static herr_t H5D__mpio_collective_filtered_chunk_common_io(H5D_filtered_collective_io_info_t *chunk_list, - size_t chunk_list_num_entries, - const H5D_io_info_t *io_info, int mpi_size); + H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info, + int mpi_rank, int H5_ATTR_NDEBUG_UNUSED mpi_size, + unsigned char ***chunk_msg_bufs, + int *chunk_msg_bufs_len); static herr_t H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chunk_list, - size_t chunk_list_num_entries, - const H5D_io_info_t *io_info, - const H5D_dset_io_info_t *di, int mpi_rank, - int mpi_size); + const H5D_io_info_t *io_info, + const H5D_dset_io_info_t *di, int mpi_rank); static herr_t H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *chunk_list, - size_t chunk_list_num_entries, - H5D_filtered_collective_io_info_t *chunk_hash_table, unsigned char **chunk_msg_bufs, int chunk_msg_bufs_len, const H5D_io_info_t *io_info, - const H5D_dset_io_info_t *di, int mpi_rank, - int mpi_size); + const H5D_dset_io_info_t *di, + int H5_ATTR_NDEBUG_UNUSED mpi_rank); static herr_t H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t *chunk_list, - size_t chunk_list_num_entries, size_t *num_chunks_assigned_map, H5D_io_info_t *io_info, H5D_chk_idx_info_t *idx_info, int mpi_rank, int mpi_size); static herr_t H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t *chunk_list, - size_t chunk_list_num_entries, size_t *num_chunks_assigned_map, H5D_io_info_t *io_info, H5D_dset_io_info_t *di, H5D_chk_idx_info_t *idx_info, int mpi_rank, @@ -329,10 +356,8 @@ static herr_t H5D__mpio_get_chunk_alloc_info_types(MPI_Datatype *contig_type, hb static herr_t H5D__mpio_get_chunk_insert_info_types(MPI_Datatype *contig_type, hbool_t *contig_type_derived, MPI_Datatype *resized_type, hbool_t *resized_type_derived); -static herr_t H5D__mpio_collective_filtered_io_type(H5D_filtered_collective_io_info_t *chunk_list, - size_t num_entries, H5D_io_op_type_t op_type, - MPI_Datatype *new_mem_type, hbool_t *mem_type_derived, - MPI_Datatype *new_file_type, hbool_t *file_type_derived); +static herr_t H5D__mpio_collective_filtered_vec_io(const H5D_filtered_collective_io_info_t *chunk_list, + H5F_shared_t *f_sh, H5D_io_op_type_t op_type); static int H5D__cmp_piece_addr(const void *chunk_addr_info1, const void *chunk_addr_info2); static int H5D__cmp_filtered_collective_io_info_entry(const void *filtered_collective_io_info_entry1, const void *filtered_collective_io_info_entry2); @@ -342,7 +367,7 @@ static int H5D__cmp_chunk_redistribute_info_orig_owner(const void *entry1, co #ifdef H5Dmpio_DEBUG static herr_t H5D__mpio_debug_init(void); static herr_t H5D__mpio_dump_collective_filtered_chunk_list(H5D_filtered_collective_io_info_t *chunk_list, - size_t chunk_list_num_entries, int mpi_rank); + int mpi_rank); #endif /*********************/ @@ -1730,7 +1755,7 @@ done: * * TODO: Note that steps D. and F. here are both collective * operations that partially share data from the - * H5D_filtered_collective_io_info_t structure. To + * H5D_filtered_collective_chunk_info_t structure. To * try to conserve on memory a bit, the distributed * arrays these operations create are discarded after * each operation is performed. If memory consumption @@ -1748,20 +1773,11 @@ static herr_t H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info, int mpi_rank, int mpi_size) { - H5D_filtered_collective_io_info_t *chunk_list = NULL; /* The list of chunks being read/written */ - H5D_filtered_collective_io_info_t *chunk_hash_table = NULL; - unsigned char **chunk_msg_bufs = NULL; - MPI_Datatype mem_type = MPI_BYTE; - MPI_Datatype file_type = MPI_BYTE; - hbool_t mem_type_is_derived = FALSE; - hbool_t file_type_is_derived = FALSE; - size_t *rank_chunks_assigned_map = NULL; - size_t chunk_list_num_entries; - size_t i; - int chunk_msg_bufs_len = 0; - char fake_buf; /* Used as a fake buffer for ranks with no chunks, thus a NULL buf pointer */ - int mpi_code; - herr_t ret_value = SUCCEED; + H5D_filtered_collective_io_info_t chunk_list = {0}; + unsigned char **chunk_msg_bufs = NULL; + size_t *rank_chunks_assigned_map = NULL; + int chunk_msg_bufs_len = 0; + herr_t ret_value = SUCCEED; FUNC_ENTER_PACKAGE_TAG(dset_info->dset->oloc.addr) @@ -1783,36 +1799,32 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_ H5CX_set_mpio_actual_io_mode(H5D_MPIO_CHUNK_COLLECTIVE); /* Build a list of selected chunks in the collective io operation */ - if (H5D__mpio_collective_filtered_chunk_io_setup(io_info, dset_info, &chunk_list, &chunk_list_num_entries, - mpi_rank) < 0) + if (H5D__mpio_collective_filtered_chunk_io_setup(io_info, dset_info, mpi_rank, &chunk_list) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "couldn't construct filtered I/O info list") if (io_info->op_type == H5D_IO_OP_READ) { /* Filtered collective read */ - if (H5D__mpio_collective_filtered_chunk_read(chunk_list, chunk_list_num_entries, io_info, dset_info, - mpi_rank, mpi_size) < 0) + if (H5D__mpio_collective_filtered_chunk_read(&chunk_list, io_info, dset_info, mpi_rank) < 0) HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't read filtered chunks") } else { /* Filtered collective write */ H5D_chk_idx_info_t index_info; - hsize_t mpi_buf_count; H5D_MPIO_INIT_CHUNK_IDX_INFO(index_info, dset_info->dset); if (mpi_size > 1) { /* Redistribute shared chunks being written to */ - if (H5D__mpio_redistribute_shared_chunks(chunk_list, chunk_list_num_entries, io_info, mpi_rank, - mpi_size, &rank_chunks_assigned_map) < 0) + if (H5D__mpio_redistribute_shared_chunks(&chunk_list, io_info, mpi_rank, mpi_size, + &rank_chunks_assigned_map) < 0) HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "unable to redistribute shared chunks") /* Send any chunk modification messages for chunks this rank no longer owns */ - if (H5D__mpio_share_chunk_modification_data(chunk_list, &chunk_list_num_entries, io_info, - dset_info, mpi_rank, mpi_size, &chunk_hash_table, + if (H5D__mpio_share_chunk_modification_data(&chunk_list, io_info, dset_info, mpi_rank, mpi_size, &chunk_msg_bufs, &chunk_msg_bufs_len) < 0) HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "unable to send chunk modification data between MPI ranks") /* Make sure the local chunk list was updated correctly */ - assert(chunk_list_num_entries == rank_chunks_assigned_map[mpi_rank]); + assert(chunk_list.num_chunk_infos == rank_chunks_assigned_map[mpi_rank]); } /* Proceed to update all the chunks this rank owns with its own @@ -1820,97 +1832,57 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_ * the chunks. As chunk reads are done collectively here, all ranks * must participate. */ - if (H5D__mpio_collective_filtered_chunk_update(chunk_list, chunk_list_num_entries, chunk_hash_table, - chunk_msg_bufs, chunk_msg_bufs_len, io_info, dset_info, - mpi_rank, mpi_size) < 0) + if (H5D__mpio_collective_filtered_chunk_update(&chunk_list, chunk_msg_bufs, chunk_msg_bufs_len, + io_info, dset_info, mpi_rank) < 0) HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't update modified chunks") /* Free up resources used by chunk hash table now that we're done updating chunks */ - HASH_CLEAR(hh, chunk_hash_table); + HASH_CLEAR(hh, chunk_list.chunk_hash_table); /* All ranks now collectively re-allocate file space for all chunks */ - if (H5D__mpio_collective_filtered_chunk_reallocate(chunk_list, chunk_list_num_entries, - rank_chunks_assigned_map, io_info, &index_info, - mpi_rank, mpi_size) < 0) + if (H5D__mpio_collective_filtered_chunk_reallocate(&chunk_list, rank_chunks_assigned_map, io_info, + &index_info, mpi_rank, mpi_size) < 0) HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't collectively re-allocate file space for chunks") - /* If this rank has any chunks selected, create a MPI type for collectively - * writing out the chunks to file. Otherwise, the rank contributes to the - * collective write with a none type. - */ - if (H5D__mpio_collective_filtered_io_type(chunk_list, chunk_list_num_entries, io_info->op_type, - &mem_type, &mem_type_is_derived, &file_type, - &file_type_is_derived) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, - "couldn't create MPI type for writing filtered chunks") - - mpi_buf_count = (file_type_is_derived || mem_type_is_derived) ? 1 : 0; - - /* Setup contig storage info for I/O operation */ - if (chunk_list_num_entries) { - /* - * Override the write buffer to point to the first - * chunk's data buffer - */ - io_info->base_maddr.cvp = chunk_list[0].buf; - - /* - * Setup the base storage address for this operation - * to be the first chunk's file address - */ - io_info->store_faddr = chunk_list[0].chunk_new.offset; - } - else { - io_info->base_maddr.cvp = &fake_buf; - io_info->store_faddr = 0; - } - - /* Perform I/O */ - if (H5D__final_collective_io(io_info, mpi_buf_count, file_type, mem_type) < 0) - HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish MPI-IO") + /* Perform vector I/O on chunks */ + if (H5D__mpio_collective_filtered_vec_io(&chunk_list, io_info->f_sh, io_info->op_type) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't perform vector I/O on filtered chunks") /* Free up resources in anticipation of following collective operation */ - for (i = 0; i < chunk_list_num_entries; i++) { - if (chunk_list[i].buf) { - H5MM_free(chunk_list[i].buf); - chunk_list[i].buf = NULL; + for (size_t i = 0; i < chunk_list.num_chunk_infos; i++) { + if (chunk_list.chunk_infos[i].buf) { + H5MM_free(chunk_list.chunk_infos[i].buf); + chunk_list.chunk_infos[i].buf = NULL; } } /* Participate in the collective re-insertion of all chunks modified * into the chunk index */ - if (H5D__mpio_collective_filtered_chunk_reinsert(chunk_list, chunk_list_num_entries, - rank_chunks_assigned_map, io_info, dset_info, - &index_info, mpi_rank, mpi_size) < 0) + if (H5D__mpio_collective_filtered_chunk_reinsert(&chunk_list, rank_chunks_assigned_map, io_info, + dset_info, &index_info, mpi_rank, mpi_size) < 0) HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't collectively re-insert modified chunks into chunk index") } done: - /* Free the MPI buf and file types, if they were derived */ - if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type))) - HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) - if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type))) - HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) - if (chunk_msg_bufs) { - for (i = 0; i < (size_t)chunk_msg_bufs_len; i++) + for (size_t i = 0; i < (size_t)chunk_msg_bufs_len; i++) H5MM_free(chunk_msg_bufs[i]); H5MM_free(chunk_msg_bufs); } - HASH_CLEAR(hh, chunk_hash_table); + HASH_CLEAR(hh, chunk_list.chunk_hash_table); /* Free resources used by a rank which had some selection */ - if (chunk_list) { - for (i = 0; i < chunk_list_num_entries; i++) - if (chunk_list[i].buf) - H5MM_free(chunk_list[i].buf); + if (chunk_list.chunk_infos) { + for (size_t i = 0; i < chunk_list.num_chunk_infos; i++) + if (chunk_list.chunk_infos[i].buf) + H5MM_free(chunk_list.chunk_infos[i].buf); - H5MM_free(chunk_list); + H5MM_free(chunk_list.chunk_infos); } /* end if */ if (rank_chunks_assigned_map) @@ -2202,7 +2174,7 @@ done: * * TODO: Note that steps E. and G. here are both collective * operations that partially share data from the - * H5D_filtered_collective_io_info_t structure. To + * H5D_filtered_collective_chunk_info_t structure. To * try to conserve on memory a bit, the distributed * arrays these operations create are discarded after * each operation is performed. If memory consumption @@ -2220,21 +2192,13 @@ static herr_t H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info, int mpi_rank, int mpi_size) { - H5D_filtered_collective_io_info_t *chunk_list = NULL; /* The list of chunks being read/written */ - H5D_filtered_collective_io_info_t *chunk_hash_table = NULL; - unsigned char **chunk_msg_bufs = NULL; - H5D_io_info_t ctg_io_info; /* Contiguous I/O info object */ - MPI_Datatype mem_type = MPI_BYTE; - MPI_Datatype file_type = MPI_BYTE; - hbool_t mem_type_is_derived = FALSE; - hbool_t file_type_is_derived = FALSE; - hbool_t have_chunk_to_process; - size_t chunk_list_num_entries; - size_t i; - size_t max_num_chunks; - int chunk_msg_bufs_len = 0; - int mpi_code; - herr_t ret_value = SUCCEED; + H5D_filtered_collective_io_info_t chunk_list = {0}; + unsigned char **chunk_msg_bufs = NULL; + hbool_t have_chunk_to_process; + size_t max_num_chunks; + int chunk_msg_bufs_len = 0; + int mpi_code; + herr_t ret_value = SUCCEED; FUNC_ENTER_PACKAGE_TAG(dset_info->dset->oloc.addr) @@ -2256,12 +2220,11 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info H5CX_set_mpio_actual_io_mode(H5D_MPIO_CHUNK_COLLECTIVE); /* Build a list of selected chunks in the collective IO operation */ - if (H5D__mpio_collective_filtered_chunk_io_setup(io_info, dset_info, &chunk_list, &chunk_list_num_entries, - mpi_rank) < 0) + if (H5D__mpio_collective_filtered_chunk_io_setup(io_info, dset_info, mpi_rank, &chunk_list) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "couldn't construct filtered I/O info list") /* Retrieve the maximum number of chunks selected for any rank */ - if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&chunk_list_num_entries, &max_num_chunks, 1, + if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&chunk_list.num_chunk_infos, &max_num_chunks, 1, MPI_UNSIGNED_LONG_LONG, MPI_MAX, io_info->comm))) HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code) @@ -2269,41 +2232,51 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info if (0 == max_num_chunks) HGOTO_DONE(SUCCEED); - /* Set up contiguous I/O info object */ - H5MM_memcpy(&ctg_io_info, io_info, sizeof(ctg_io_info)); - if (io_info->op_type == H5D_IO_OP_READ) { /* Filtered collective read */ - for (i = 0; i < max_num_chunks; i++) { + for (size_t i = 0; i < max_num_chunks; i++) { + H5D_filtered_collective_io_info_t single_chunk_list = {0}; + /* Check if this rank has a chunk to work on for this iteration */ - have_chunk_to_process = (i < chunk_list_num_entries); + have_chunk_to_process = (i < chunk_list.num_chunk_infos); - if (H5D__mpio_collective_filtered_chunk_read(have_chunk_to_process ? &chunk_list[i] : NULL, - have_chunk_to_process ? 1 : 0, io_info, dset_info, - mpi_rank, mpi_size) < 0) + /* + * Setup a chunk list structure for either 1 or 0 chunks, depending + * on whether this rank has a chunk to work on for this iteration + */ + if (have_chunk_to_process) { + single_chunk_list.chunk_infos = &chunk_list.chunk_infos[i]; + single_chunk_list.num_chunk_infos = 1; + single_chunk_list.num_chunks_to_read = chunk_list.chunk_infos[i].need_read ? 1 : 0; + } + else { + single_chunk_list.chunk_infos = NULL; + single_chunk_list.num_chunk_infos = 0; + single_chunk_list.num_chunks_to_read = 0; + } + + if (H5D__mpio_collective_filtered_chunk_read(&single_chunk_list, io_info, dset_info, mpi_rank) < + 0) HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't read filtered chunks") - if (have_chunk_to_process && chunk_list[i].buf) { - H5MM_free(chunk_list[i].buf); - chunk_list[i].buf = NULL; + if (have_chunk_to_process && chunk_list.chunk_infos[i].buf) { + H5MM_free(chunk_list.chunk_infos[i].buf); + chunk_list.chunk_infos[i].buf = NULL; } } } else { /* Filtered collective write */ H5D_chk_idx_info_t index_info; - hsize_t mpi_buf_count; /* Construct chunked index info */ H5D_MPIO_INIT_CHUNK_IDX_INFO(index_info, dset_info->dset); if (mpi_size > 1) { /* Redistribute shared chunks being written to */ - if (H5D__mpio_redistribute_shared_chunks(chunk_list, chunk_list_num_entries, io_info, mpi_rank, - mpi_size, NULL) < 0) + if (H5D__mpio_redistribute_shared_chunks(&chunk_list, io_info, mpi_rank, mpi_size, NULL) < 0) HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "unable to redistribute shared chunks") /* Send any chunk modification messages for chunks this rank no longer owns */ - if (H5D__mpio_share_chunk_modification_data(chunk_list, &chunk_list_num_entries, io_info, - dset_info, mpi_rank, mpi_size, &chunk_hash_table, + if (H5D__mpio_share_chunk_modification_data(&chunk_list, io_info, dset_info, mpi_rank, mpi_size, &chunk_msg_bufs, &chunk_msg_bufs_len) < 0) HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "unable to send chunk modification data between MPI ranks") @@ -2313,113 +2286,83 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info * have no chunks left to work on, but it still needs to participate in the * collective re-allocation and re-insertion of chunks modified by other ranks. */ - for (i = 0; i < max_num_chunks; i++) { + for (size_t i = 0; i < max_num_chunks; i++) { + H5D_filtered_collective_io_info_t single_chunk_list = {0}; + /* Check if this rank has a chunk to work on for this iteration */ - have_chunk_to_process = (i < chunk_list_num_entries) && (mpi_rank == chunk_list[i].new_owner); + have_chunk_to_process = + (i < chunk_list.num_chunk_infos) && (mpi_rank == chunk_list.chunk_infos[i].new_owner); + + /* + * Setup a chunk list structure for either 1 or 0 chunks, depending + * on whether this rank has a chunk to work on for this iteration + */ + if (have_chunk_to_process) { + single_chunk_list.chunk_infos = &chunk_list.chunk_infos[i]; + single_chunk_list.chunk_hash_table = chunk_list.chunk_hash_table; + single_chunk_list.num_chunk_infos = 1; + single_chunk_list.num_chunks_to_read = chunk_list.chunk_infos[i].need_read ? 1 : 0; + } + else { + single_chunk_list.chunk_infos = NULL; + single_chunk_list.chunk_hash_table = chunk_list.chunk_hash_table; + single_chunk_list.num_chunk_infos = 0; + single_chunk_list.num_chunks_to_read = 0; + } /* Proceed to update the chunk this rank owns (if any left) with its * own modification data and data from other ranks, before re-filtering * the chunks. As chunk reads are done collectively here, all ranks * must participate. */ - if (H5D__mpio_collective_filtered_chunk_update(have_chunk_to_process ? &chunk_list[i] : NULL, - have_chunk_to_process ? 1 : 0, chunk_hash_table, - chunk_msg_bufs, chunk_msg_bufs_len, io_info, - dset_info, mpi_rank, mpi_size) < 0) + if (H5D__mpio_collective_filtered_chunk_update( + &single_chunk_list, chunk_msg_bufs, chunk_msg_bufs_len, io_info, dset_info, mpi_rank) < 0) HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't update modified chunks") /* All ranks now collectively re-allocate file space for all chunks */ - if (H5D__mpio_collective_filtered_chunk_reallocate(have_chunk_to_process ? &chunk_list[i] : NULL, - have_chunk_to_process ? 1 : 0, NULL, io_info, - &index_info, mpi_rank, mpi_size) < 0) + if (H5D__mpio_collective_filtered_chunk_reallocate(&single_chunk_list, NULL, io_info, &index_info, + mpi_rank, mpi_size) < 0) HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't collectively re-allocate file space for chunks") - /* - * If this rank has a chunk to work on, create a MPI type - * for writing out the chunk. Otherwise, the rank will - * use MPI_BYTE for the file and memory type and specify - * a count of 0. - */ - if (H5D__mpio_collective_filtered_io_type( - have_chunk_to_process ? &chunk_list[i] : NULL, have_chunk_to_process ? 1 : 0, - io_info->op_type, &mem_type, &mem_type_is_derived, &file_type, &file_type_is_derived) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, - "couldn't create MPI type for writing filtered chunks") - - mpi_buf_count = (file_type_is_derived || mem_type_is_derived) ? 1 : 0; - - /* Override the write buffer to point to the chunk data buffer */ - if (have_chunk_to_process) { - /* - * Override the write buffer to point to the - * chunk's data buffer - */ - ctg_io_info.base_maddr.cvp = chunk_list[i].buf; - - /* - * Setup the base storage address for this - * operation to be the chunk's file address - */ - ctg_io_info.store_faddr = chunk_list[i].chunk_new.offset; - } - else { - ctg_io_info.store_faddr = 0; - ctg_io_info.base_maddr = dset_info->buf; - } - - /* Perform the I/O */ - if (H5D__final_collective_io(&ctg_io_info, mpi_buf_count, file_type, mem_type) < 0) - HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish MPI-IO") + /* Perform vector I/O on chunks */ + if (H5D__mpio_collective_filtered_vec_io(&single_chunk_list, io_info->f_sh, io_info->op_type) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, + "couldn't perform vector I/O on filtered chunks") /* Free up resources in anticipation of following collective operation */ - if (have_chunk_to_process && chunk_list[i].buf) { - H5MM_free(chunk_list[i].buf); - chunk_list[i].buf = NULL; + if (have_chunk_to_process && chunk_list.chunk_infos[i].buf) { + H5MM_free(chunk_list.chunk_infos[i].buf); + chunk_list.chunk_infos[i].buf = NULL; } /* Participate in the collective re-insertion of all chunks modified * in this iteration into the chunk index */ - if (H5D__mpio_collective_filtered_chunk_reinsert(have_chunk_to_process ? &chunk_list[i] : NULL, - have_chunk_to_process ? 1 : 0, NULL, io_info, - dset_info, &index_info, mpi_rank, mpi_size) < 0) + if (H5D__mpio_collective_filtered_chunk_reinsert(&single_chunk_list, NULL, io_info, dset_info, + &index_info, mpi_rank, mpi_size) < 0) HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't collectively re-insert modified chunks into chunk index") - - /* Free the MPI types, if they were derived */ - if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type))) - HMPI_GOTO_ERROR(FAIL, "MPI_Type_free failed", mpi_code) - mem_type_is_derived = FALSE; - if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type))) - HMPI_GOTO_ERROR(FAIL, "MPI_Type_free failed", mpi_code) - file_type_is_derived = FALSE; } /* end for */ } done: - /* Free the MPI buf and file types, if they were derived */ - if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type))) - HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) - if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type))) - HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) - if (chunk_msg_bufs) { - for (i = 0; i < (size_t)chunk_msg_bufs_len; i++) + for (size_t i = 0; i < (size_t)chunk_msg_bufs_len; i++) H5MM_free(chunk_msg_bufs[i]); H5MM_free(chunk_msg_bufs); } - HASH_CLEAR(hh, chunk_hash_table); + HASH_CLEAR(hh, chunk_list.chunk_hash_table); /* Free resources used by a rank which had some selection */ - if (chunk_list) { - for (i = 0; i < chunk_list_num_entries; i++) - if (chunk_list[i].buf) - H5MM_free(chunk_list[i].buf); + if (chunk_list.chunk_infos) { + for (size_t i = 0; i < chunk_list.num_chunk_infos; i++) + if (chunk_list.chunk_infos[i].buf) + H5MM_free(chunk_list.chunk_infos[i].buf); - H5MM_free(chunk_list); + H5MM_free(chunk_list.chunk_infos); } /* end if */ #ifdef H5Dmpio_DEBUG @@ -2643,16 +2586,16 @@ static int H5D__cmp_filtered_collective_io_info_entry(const void *filtered_collective_io_info_entry1, const void *filtered_collective_io_info_entry2) { - const H5D_filtered_collective_io_info_t *entry1; - const H5D_filtered_collective_io_info_t *entry2; - haddr_t addr1 = HADDR_UNDEF; - haddr_t addr2 = HADDR_UNDEF; - int ret_value; + const H5D_filtered_collective_chunk_info_t *entry1; + const H5D_filtered_collective_chunk_info_t *entry2; + haddr_t addr1 = HADDR_UNDEF; + haddr_t addr2 = HADDR_UNDEF; + int ret_value; FUNC_ENTER_PACKAGE_NOERR - entry1 = (const H5D_filtered_collective_io_info_t *)filtered_collective_io_info_entry1; - entry2 = (const H5D_filtered_collective_io_info_t *)filtered_collective_io_info_entry2; + entry1 = (const H5D_filtered_collective_chunk_info_t *)filtered_collective_io_info_entry1; + entry2 = (const H5D_filtered_collective_chunk_info_t *)filtered_collective_io_info_entry2; addr1 = entry1->chunk_new.offset; addr2 = entry2->chunk_new.offset; @@ -3002,29 +2945,27 @@ done: */ static herr_t H5D__mpio_collective_filtered_chunk_io_setup(const H5D_io_info_t *io_info, const H5D_dset_io_info_t *di, - H5D_filtered_collective_io_info_t **chunk_list, - size_t *num_entries, int mpi_rank) + int mpi_rank, H5D_filtered_collective_io_info_t *chunk_list) { - H5D_filtered_collective_io_info_t *local_info_array = NULL; - H5D_chunk_ud_t udata; - hbool_t filter_partial_edge_chunks; - size_t num_chunks_selected; - size_t i; - herr_t ret_value = SUCCEED; + H5D_filtered_collective_chunk_info_t *local_info_array = NULL; + H5D_chunk_ud_t udata; + hbool_t filter_partial_edge_chunks; + size_t num_chunks_selected; + size_t num_chunks_to_read = 0; + herr_t ret_value = SUCCEED; FUNC_ENTER_PACKAGE assert(io_info); assert(di); + assert(di->layout->type == H5D_CHUNKED); assert(chunk_list); - assert(num_entries); + #ifdef H5Dmpio_DEBUG H5D_MPIO_TRACE_ENTER(mpi_rank); H5D_MPIO_TIME_START(mpi_rank, "Filtered Collective I/O Setup"); #endif - assert(di->layout->type == H5D_CHUNKED); - /* Each rank builds a local list of the chunks they have selected */ if ((num_chunks_selected = H5SL_count(di->layout_io_info.chunk_map->dset_sel_pieces))) { H5D_piece_info_t *chunk_info; @@ -3040,7 +2981,7 @@ H5D__mpio_collective_filtered_chunk_io_setup(const H5D_io_info_t *io_info, const HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate local io info array buffer") chunk_node = H5SL_first(di->layout_io_info.chunk_map->dset_sel_pieces); - for (i = 0; chunk_node; i++) { + for (size_t i = 0; chunk_node; i++) { chunk_info = (H5D_piece_info_t *)H5SL_item(chunk_node); /* Obtain this chunk's address */ @@ -3109,6 +3050,9 @@ H5D__mpio_collective_filtered_chunk_io_setup(const H5D_io_info_t *io_info, const local_info_array[i].io_size < (size_t)di->dset->shared->layout.u.chunk.size; } + if (local_info_array[i].need_read) + num_chunks_to_read++; + local_info_array[i].skip_filter_pline = FALSE; if (!filter_partial_edge_chunks) { /* @@ -3159,12 +3103,8 @@ H5D__mpio_collective_filtered_chunk_io_setup(const H5D_io_info_t *io_info, const /* Ensure the chunk list is sorted in ascending order of offset in the file */ if (need_sort) - qsort(local_info_array, num_chunks_selected, sizeof(H5D_filtered_collective_io_info_t), + qsort(local_info_array, num_chunks_selected, sizeof(H5D_filtered_collective_chunk_info_t), H5D__cmp_filtered_collective_io_info_entry); - -#ifdef H5Dmpio_DEBUG - H5D__mpio_dump_collective_filtered_chunk_list(local_info_array, num_chunks_selected, mpi_rank); -#endif } else if (H5F_get_coll_metadata_reads(di->dset->oloc.file)) { hsize_t scaled[H5O_LAYOUT_NDIMS] = {0}; @@ -3187,10 +3127,19 @@ H5D__mpio_collective_filtered_chunk_io_setup(const H5D_io_info_t *io_info, const HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "error looking up chunk address") } - *chunk_list = local_info_array; - *num_entries = num_chunks_selected; + chunk_list->chunk_infos = local_info_array; + chunk_list->num_chunk_infos = num_chunks_selected; + chunk_list->num_chunks_to_read = num_chunks_to_read; + +#ifdef H5Dmpio_DEBUG + H5D__mpio_dump_collective_filtered_chunk_list(chunk_list, mpi_rank); +#endif done: + if (ret_value < 0) { + H5MM_free(local_info_array); + } + #ifdef H5Dmpio_DEBUG H5D_MPIO_TIME_STOP(mpi_rank); H5D_MPIO_TRACE_EXIT(mpi_rank); @@ -3221,8 +3170,8 @@ done: */ static herr_t H5D__mpio_redistribute_shared_chunks(H5D_filtered_collective_io_info_t *chunk_list, - size_t chunk_list_num_entries, const H5D_io_info_t *io_info, - int mpi_rank, int mpi_size, size_t **rank_chunks_assigned_map) + const H5D_io_info_t *io_info, int mpi_rank, int mpi_size, + size_t **rank_chunks_assigned_map) { hbool_t redistribute_on_all_ranks; size_t *num_chunks_map = NULL; @@ -3233,7 +3182,7 @@ H5D__mpio_redistribute_shared_chunks(H5D_filtered_collective_io_info_t *chunk_li FUNC_ENTER_PACKAGE - assert(chunk_list || 0 == chunk_list_num_entries); + assert(chunk_list); assert(io_info); assert(mpi_size > 1); /* No chunk sharing is possible for MPI Comm size of 1 */ @@ -3251,7 +3200,7 @@ H5D__mpio_redistribute_shared_chunks(H5D_filtered_collective_io_info_t *chunk_li HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "couldn't allocate assigned chunks array") /* Perform initial Allgather to determine the collective chunk list size */ - if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&chunk_list_num_entries, 1, H5_SIZE_T_AS_MPI_TYPE, + if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&chunk_list->num_chunk_infos, 1, H5_SIZE_T_AS_MPI_TYPE, num_chunks_map, 1, H5_SIZE_T_AS_MPI_TYPE, io_info->comm))) HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code) @@ -3376,8 +3325,8 @@ H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chun FUNC_ENTER_PACKAGE + assert(chunk_list); assert(num_chunks_assigned_map); - assert(chunk_list || 0 == num_chunks_assigned_map[mpi_rank]); assert(io_info); assert(mpi_size > 1); @@ -3437,9 +3386,9 @@ H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chun "can't create derived datatypes for chunk redistribution info") /* Perform gather operation */ - if (H5_mpio_gatherv_alloc(chunk_list, num_chunks_int, struct_type, counts_ptr, displacements_ptr, - packed_type, all_ranks_involved, 0, io_info->comm, mpi_rank, mpi_size, - &coll_chunk_list, &coll_chunk_list_num_entries) < 0) + if (H5_mpio_gatherv_alloc(chunk_list->chunk_infos, num_chunks_int, struct_type, counts_ptr, + displacements_ptr, packed_type, all_ranks_involved, 0, io_info->comm, mpi_rank, + mpi_size, &coll_chunk_list, &coll_chunk_list_num_entries) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "can't gather chunk redistribution info to involved ranks") @@ -3568,8 +3517,19 @@ H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chun coll_entry = &((H5D_chunk_redistribute_info_t *)coll_chunk_list)[i++]; - chunk_list[j].new_owner = coll_entry->new_owner; - chunk_list[j].num_writers = coll_entry->num_writers; + chunk_list->chunk_infos[j].new_owner = coll_entry->new_owner; + chunk_list->chunk_infos[j].num_writers = coll_entry->num_writers; + + /* + * Check if the chunk list struct's `num_chunks_to_read` field + * needs to be updated + */ + if (chunk_list->chunk_infos[j].need_read && (chunk_list->chunk_infos[j].new_owner != mpi_rank)) { + chunk_list->chunk_infos[j].need_read = FALSE; + + assert(chunk_list->num_chunks_to_read > 0); + chunk_list->num_chunks_to_read--; + } } } else { @@ -3579,13 +3539,27 @@ H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chun * local chunk lists get updated */ if (MPI_SUCCESS != - (mpi_code = MPI_Scatterv(coll_chunk_list, counts_ptr, displacements_ptr, packed_type, chunk_list, - num_chunks_int, struct_type, 0, io_info->comm))) + (mpi_code = MPI_Scatterv(coll_chunk_list, counts_ptr, displacements_ptr, packed_type, + chunk_list->chunk_infos, num_chunks_int, struct_type, 0, io_info->comm))) HMPI_GOTO_ERROR(FAIL, "unable to scatter shared chunks info buffer", mpi_code) + + /* + * Now that chunks have been redistributed, each rank must update + * their chunk list struct's `num_chunks_to_read` field since it + * may now be out of date. + */ + for (i = 0; i < chunk_list->num_chunk_infos; i++) { + if ((chunk_list->chunk_infos[i].new_owner != mpi_rank) && chunk_list->chunk_infos[i].need_read) { + chunk_list->chunk_infos[i].need_read = FALSE; + + assert(chunk_list->num_chunks_to_read > 0); + chunk_list->num_chunks_to_read--; + } + } } #ifdef H5Dmpio_DEBUG - H5D__mpio_dump_collective_filtered_chunk_list(chunk_list, num_chunks_assigned_map[mpi_rank], mpi_rank); + H5D__mpio_dump_collective_filtered_chunk_list(chunk_list, mpi_rank); #endif done: @@ -3684,36 +3658,33 @@ done: *------------------------------------------------------------------------- */ static herr_t -H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk_list, - size_t *chunk_list_num_entries, H5D_io_info_t *io_info, +H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk_list, H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info, int mpi_rank, - int H5_ATTR_NDEBUG_UNUSED mpi_size, - H5D_filtered_collective_io_info_t **chunk_hash_table, - unsigned char ***chunk_msg_bufs, int *chunk_msg_bufs_len) + int H5_ATTR_NDEBUG_UNUSED mpi_size, unsigned char ***chunk_msg_bufs, + int *chunk_msg_bufs_len) { #if H5_CHECK_MPI_VERSION(3, 0) - H5D_filtered_collective_io_info_t *chunk_table = NULL; - H5S_sel_iter_t *mem_iter = NULL; - unsigned char **msg_send_bufs = NULL; - unsigned char **msg_recv_bufs = NULL; - MPI_Request *send_requests = NULL; - MPI_Request *recv_requests = NULL; - MPI_Request ibarrier = MPI_REQUEST_NULL; - hbool_t mem_iter_init = FALSE; - hbool_t ibarrier_posted = FALSE; - size_t send_bufs_nalloc = 0; - size_t num_send_requests = 0; - size_t num_recv_requests = 0; - size_t num_msgs_incoming = 0; - size_t last_assigned_idx; - size_t i; - int mpi_code; - herr_t ret_value = SUCCEED; + H5D_filtered_collective_chunk_info_t *chunk_table = NULL; + H5S_sel_iter_t *mem_iter = NULL; + unsigned char **msg_send_bufs = NULL; + unsigned char **msg_recv_bufs = NULL; + MPI_Request *send_requests = NULL; + MPI_Request *recv_requests = NULL; + MPI_Request ibarrier = MPI_REQUEST_NULL; + hbool_t mem_iter_init = FALSE; + hbool_t ibarrier_posted = FALSE; + size_t send_bufs_nalloc = 0; + size_t num_send_requests = 0; + size_t num_recv_requests = 0; + size_t num_msgs_incoming = 0; + size_t last_assigned_idx; + size_t i; + int mpi_code; + herr_t ret_value = SUCCEED; FUNC_ENTER_PACKAGE - assert(chunk_list_num_entries); - assert(chunk_list || 0 == *chunk_list_num_entries); + assert(chunk_list); assert(io_info); assert(dset_info); assert(mpi_size > 1); @@ -3728,7 +3699,7 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk /* Set to latest format for encoding dataspace */ H5CX_set_libver_bounds(NULL); - if (*chunk_list_num_entries) { + if (chunk_list->num_chunk_infos > 0) { /* Allocate a selection iterator for iterating over chunk dataspaces */ if (NULL == (mem_iter = H5FL_MALLOC(H5S_sel_iter_t))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate dataspace selection iterator") @@ -3760,8 +3731,8 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk * synchronous sends to send the data this rank is writing to * the rank that does own the chunk. */ - for (i = 0, last_assigned_idx = 0; i < *chunk_list_num_entries; i++) { - H5D_filtered_collective_io_info_t *chunk_entry = &chunk_list[i]; + for (i = 0, last_assigned_idx = 0; i < chunk_list->num_chunk_infos; i++) { + H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[i]; if (mpi_rank == chunk_entry->new_owner) { num_msgs_incoming += (size_t)(chunk_entry->num_writers - 1); @@ -3771,7 +3742,7 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk * does own, since it has sent the necessary data and is no longer * interested in the chunks it doesn't own. */ - chunk_list[last_assigned_idx] = chunk_list[i]; + chunk_list->chunk_infos[last_assigned_idx] = chunk_list->chunk_infos[i]; /* * Since, at large scale, a chunk's index value may be larger than @@ -3783,7 +3754,8 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk * message itself will contain the chunk's index so we can update * the correct chunk with the received data. */ - HASH_ADD(hh, chunk_table, index_info.chunk_idx, sizeof(hsize_t), &chunk_list[last_assigned_idx]); + HASH_ADD(hh, chunk_table, index_info.chunk_idx, sizeof(hsize_t), + &chunk_list->chunk_infos[last_assigned_idx]); last_assigned_idx++; } @@ -4013,10 +3985,12 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk H5_GCC_DIAG_ON("stringop-overflow") /* Set the new number of locally-selected chunks */ - *chunk_list_num_entries = last_assigned_idx; + chunk_list->num_chunk_infos = last_assigned_idx; + + /* Set chunk hash table pointer for future use */ + chunk_list->chunk_hash_table = chunk_table; /* Return chunk message buffers if any were received */ - *chunk_hash_table = chunk_table; *chunk_msg_bufs = msg_recv_bufs; *chunk_msg_bufs_len = (int)num_recv_requests; @@ -4087,129 +4061,12 @@ done: } /* end H5D__mpio_share_chunk_modification_data() */ /*------------------------------------------------------------------------- - * Function: H5D__mpio_collective_filtered_chunk_common_io - * - * Purpose: This routine performs the common part of collective I/O - * when reading or writing filtered chunks collectively. - * - * Return: Non-negative on success/Negative on failure - * - *------------------------------------------------------------------------- - */ -static herr_t -H5D__mpio_collective_filtered_chunk_common_io(H5D_filtered_collective_io_info_t *chunk_list, - size_t chunk_list_num_entries, const H5D_io_info_t *io_info, - int mpi_size) -{ - H5D_io_info_t coll_io_info; - MPI_Datatype file_type = MPI_DATATYPE_NULL; - MPI_Datatype mem_type = MPI_DATATYPE_NULL; - hbool_t mem_type_is_derived = FALSE; - hbool_t file_type_is_derived = FALSE; - hsize_t mpi_buf_count; - haddr_t base_read_offset = HADDR_UNDEF; - size_t num_chunks; - size_t i; - char fake_buf; /* Used as a fake buffer for ranks with no chunks, thus a NULL buf pointer */ - int mpi_code; - herr_t ret_value = SUCCEED; - - FUNC_ENTER_PACKAGE - - assert(chunk_list || 0 == chunk_list_num_entries); - assert(io_info); - - /* Initialize temporary I/O info */ - coll_io_info = *io_info; - - /* - * Construct MPI derived datatype for collective I/O on chunks - */ - if (H5D__mpio_collective_filtered_io_type(chunk_list, chunk_list_num_entries, io_info->op_type, &mem_type, - &mem_type_is_derived, &file_type, &file_type_is_derived) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_BADTYPE, FAIL, "couldn't create MPI I/O type for chunk I/O") - - /* - * For reads, determine how many chunks are actually being read. - * Note that if this is a read during a write operation - * (read chunk -> unfilter -> modify -> write back), some - * chunks may not need to be read if they're being fully - * overwritten during a write operation. - */ - if (io_info->op_type == H5D_IO_OP_READ) { - for (i = 0, num_chunks = 0; i < chunk_list_num_entries; i++) { - assert(chunk_list[i].buf); - - if (chunk_list[i].need_read) { - if (!H5_addr_defined(base_read_offset)) - base_read_offset = chunk_list[i].chunk_current.offset; - - num_chunks++; - } - } - } - else - num_chunks = chunk_list_num_entries; - - /* - * If this rank doesn't have a selection, it can - * skip I/O if the MPI communicator size is 1. - * - * Otherwise, this rank has to participate in - * collective I/O, but probably has a NULL buf - * pointer, so override to a fake buffer since our - * write/read function expects one. - */ - if (num_chunks == 0) { - if (mpi_size == 1) - HGOTO_DONE(SUCCEED); - else { - if (io_info->op_type == H5D_IO_OP_WRITE) - coll_io_info.base_maddr.cvp = &fake_buf; - else - coll_io_info.base_maddr.vp = &fake_buf; - } - } - - /* - * Setup for I/O operation - */ - - mpi_buf_count = (num_chunks) ? 1 : 0; - - if (num_chunks) { - /* - * Setup the base storage address for this operation - * to be the first chunk's file address - */ - if (io_info->op_type == H5D_IO_OP_WRITE) - coll_io_info.store_faddr = chunk_list[0].chunk_new.offset; - else - coll_io_info.store_faddr = base_read_offset; - } - else - coll_io_info.store_faddr = 0; - - /* Perform I/O */ - if (H5D__final_collective_io(&coll_io_info, mpi_buf_count, file_type, mem_type) < 0) - HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "couldn't finish MPI I/O") - -done: - /* Free the MPI buf and file types, if they were derived */ - if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type))) - HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) - if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type))) - HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) - - FUNC_LEAVE_NOAPI(ret_value) -} /* end H5D__mpio_collective_filtered_chunk_common_io() */ - -/*------------------------------------------------------------------------- * Function: H5D__mpio_collective_filtered_chunk_read * * Purpose: This routine coordinates a collective read across all ranks * of the chunks they have selected. Each rank will then go - * and + * and unfilter their read chunks as necessary and scatter + * the data into the provided read buffer. * * Return: Non-negative on success/Negative on failure * @@ -4217,27 +4074,24 @@ done: */ static herr_t H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chunk_list, - size_t chunk_list_num_entries, const H5D_io_info_t *io_info, - const H5D_dset_io_info_t *di, int mpi_rank, int mpi_size) + const H5D_io_info_t *io_info, const H5D_dset_io_info_t *di, + int mpi_rank) { H5D_fill_buf_info_t fb_info; - H5D_piece_info_t *chunk_info = NULL; - H5D_io_info_t coll_io_info; H5Z_EDC_t err_detect; /* Error detection info */ H5Z_cb_t filter_cb; /* I/O filter callback function */ hsize_t file_chunk_size = 0; hsize_t iter_nelmts; /* Number of points to iterate over for the chunk IO operation */ - hbool_t should_fill = FALSE; - hbool_t fb_info_init = FALSE; - hbool_t index_empty = FALSE; - size_t i; + hbool_t should_fill = FALSE; + hbool_t fb_info_init = FALSE; + hbool_t index_empty = FALSE; H5S_t *fill_space = NULL; void *base_read_buf = NULL; herr_t ret_value = SUCCEED; FUNC_ENTER_PACKAGE - assert(chunk_list || 0 == chunk_list_num_entries); + assert(chunk_list); assert(io_info); assert(di); @@ -4248,11 +4102,7 @@ H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chun (void)mpi_rank; #endif - /* Initialize temporary I/O info */ - coll_io_info = *io_info; - coll_io_info.base_maddr.vp = NULL; - - if (chunk_list_num_entries) { + if (chunk_list->num_chunk_infos) { /* Retrieve filter settings from API context */ if (H5CX_get_err_detect(&err_detect) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get error detection info") @@ -4281,12 +4131,14 @@ H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chun * size; reading into a (smaller) buffer of size equal to the unfiltered * chunk size would of course be bad. */ - for (i = 0; i < chunk_list_num_entries; i++) { - assert(chunk_list[i].need_read); + for (size_t i = 0; i < chunk_list->num_chunk_infos; i++) { + H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[i]; + + assert(chunk_entry->need_read); - chunk_list[i].chunk_buf_size = MAX(chunk_list[i].chunk_current.length, file_chunk_size); + chunk_entry->chunk_buf_size = MAX(chunk_entry->chunk_current.length, file_chunk_size); - if (NULL == (chunk_list[i].buf = H5MM_malloc(chunk_list[i].chunk_buf_size))) { + if (NULL == (chunk_entry->buf = H5MM_malloc(chunk_entry->chunk_buf_size))) { /* Push an error, but participate in collective read */ HDONE_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer"); break; @@ -4297,22 +4149,26 @@ H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chun * read it from the file. Instead, just fill the chunk buffer * with the fill value if necessary. */ - if (H5_addr_defined(chunk_list[i].chunk_current.offset)) { + if (H5_addr_defined(chunk_entry->chunk_current.offset)) { /* Set first read buffer */ if (!base_read_buf) - base_read_buf = chunk_list[i].buf; + base_read_buf = chunk_entry->buf; /* Set chunk's new length for eventual filter pipeline calls */ - if (chunk_list[i].skip_filter_pline) - chunk_list[i].chunk_new.length = file_chunk_size; + if (chunk_entry->skip_filter_pline) + chunk_entry->chunk_new.length = file_chunk_size; else - chunk_list[i].chunk_new.length = chunk_list[i].chunk_current.length; + chunk_entry->chunk_new.length = chunk_entry->chunk_current.length; } else { - chunk_list[i].need_read = FALSE; + chunk_entry->need_read = FALSE; + + /* Update field keeping track of number of chunks to read */ + assert(chunk_list->num_chunks_to_read > 0); + chunk_list->num_chunks_to_read--; /* Set chunk's new length for eventual filter pipeline calls */ - chunk_list[i].chunk_new.length = file_chunk_size; + chunk_entry->chunk_new.length = file_chunk_size; if (should_fill) { /* Initialize fill value buffer if not already initialized */ @@ -4341,7 +4197,7 @@ H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chun /* Write fill value to memory buffer */ assert(fb_info.fill_buf); - if (H5D__fill(fb_info.fill_buf, di->dset->shared->type, chunk_list[i].buf, + if (H5D__fill(fb_info.fill_buf, di->dset->shared->type, chunk_entry->buf, di->type_info.mem_type, fill_space) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "couldn't fill chunk buffer with fill value") } @@ -4359,49 +4215,42 @@ H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chun HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "couldn't determine if chunk index is empty") if (!index_empty) { - /* - * Override the read buffer to point to the address of - * the first chunk data buffer being read into - */ - if (base_read_buf) - coll_io_info.base_maddr.vp = base_read_buf; - - /* Perform collective chunk read */ - if (H5D__mpio_collective_filtered_chunk_common_io(chunk_list, chunk_list_num_entries, &coll_io_info, - mpi_size) < 0) - HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "couldn't finish collective filtered chunk read") + /* Perform collective vector read */ + if (H5D__mpio_collective_filtered_vec_io(chunk_list, io_info->f_sh, H5D_IO_OP_READ) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't perform vector I/O on filtered chunks") } /* * Iterate through all the read chunks, unfiltering them and scattering their * data out to the application's read buffer. */ - for (i = 0; i < chunk_list_num_entries; i++) { - chunk_info = chunk_list[i].chunk_info; + for (size_t i = 0; i < chunk_list->num_chunk_infos; i++) { + H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[i]; + H5D_piece_info_t *chunk_info = chunk_entry->chunk_info; /* Unfilter the chunk, unless we didn't read it from the file */ - if (chunk_list[i].need_read && !chunk_list[i].skip_filter_pline) { + if (chunk_entry->need_read && !chunk_entry->skip_filter_pline) { if (H5Z_pipeline(&di->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, - &(chunk_list[i].index_info.filter_mask), err_detect, filter_cb, - (size_t *)&chunk_list[i].chunk_new.length, &chunk_list[i].chunk_buf_size, - &chunk_list[i].buf) < 0) + &(chunk_entry->index_info.filter_mask), err_detect, filter_cb, + (size_t *)&chunk_entry->chunk_new.length, &chunk_entry->chunk_buf_size, + &chunk_entry->buf) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTFILTER, FAIL, "couldn't unfilter chunk for modifying") } /* Scatter the chunk data to the read buffer */ iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->fspace); - if (H5D_select_io_mem(di->buf.vp, chunk_info->mspace, chunk_list[i].buf, chunk_info->fspace, + if (H5D_select_io_mem(di->buf.vp, chunk_info->mspace, chunk_entry->buf, chunk_info->fspace, di->type_info.src_type_size, (size_t)iter_nelmts) < 0) HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't copy chunk data to read buffer") } done: /* Free all resources used by entries in the chunk list */ - for (i = 0; i < chunk_list_num_entries; i++) { - if (chunk_list[i].buf) { - H5MM_free(chunk_list[i].buf); - chunk_list[i].buf = NULL; + for (size_t i = 0; i < chunk_list->num_chunk_infos; i++) { + if (chunk_list->chunk_infos[i].buf) { + H5MM_free(chunk_list->chunk_infos[i].buf); + chunk_list->chunk_infos[i].buf = NULL; } } @@ -4433,19 +4282,15 @@ done: */ static herr_t H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *chunk_list, - size_t chunk_list_num_entries, - H5D_filtered_collective_io_info_t *chunk_hash_table, unsigned char **chunk_msg_bufs, int chunk_msg_bufs_len, const H5D_io_info_t *io_info, const H5D_dset_io_info_t *di, - int H5_ATTR_NDEBUG_UNUSED mpi_rank, int mpi_size) + int H5_ATTR_NDEBUG_UNUSED mpi_rank) { const H5D_type_info_t *type_info = NULL; H5D_fill_buf_info_t fb_info; - H5D_piece_info_t *chunk_info = NULL; - H5S_sel_iter_t *sel_iter = NULL; /* Dataspace selection iterator for H5D__scatter_mem */ - H5D_io_info_t coll_io_info; - H5Z_EDC_t err_detect; /* Error detection info */ - H5Z_cb_t filter_cb; /* I/O filter callback function */ + H5S_sel_iter_t *sel_iter = NULL; /* Dataspace selection iterator for H5D__scatter_mem */ + H5Z_EDC_t err_detect; /* Error detection info */ + H5Z_cb_t filter_cb; /* I/O filter callback function */ hsize_t file_chunk_size = 0; hsize_t iter_nelmts; /* Number of points to iterate over for the chunk IO operation */ hbool_t should_fill = FALSE; @@ -4460,8 +4305,8 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch FUNC_ENTER_PACKAGE - assert(chunk_list || 0 == chunk_list_num_entries); - assert((chunk_msg_bufs && chunk_hash_table) || 0 == chunk_msg_bufs_len); + assert(chunk_list); + assert((chunk_msg_bufs && chunk_list->chunk_hash_table) || 0 == chunk_msg_bufs_len); assert(io_info); assert(di); @@ -4474,7 +4319,7 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch type_info = &(di->type_info); assert(type_info); - if (chunk_list_num_entries) { + if (chunk_list->num_chunk_infos > 0) { /* Retrieve filter settings from API context */ if (H5CX_get_err_detect(&err_detect) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get error detection info") @@ -4509,50 +4354,56 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch * size; reading into a (smaller) buffer of size equal to the unfiltered * chunk size would of course be bad. */ - for (i = 0; i < chunk_list_num_entries; i++) { - assert(mpi_rank == chunk_list[i].new_owner); + for (i = 0; i < chunk_list->num_chunk_infos; i++) { + H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[i]; - chunk_list[i].chunk_buf_size = MAX(chunk_list[i].chunk_current.length, file_chunk_size); + assert(mpi_rank == chunk_entry->new_owner); + + chunk_entry->chunk_buf_size = MAX(chunk_entry->chunk_current.length, file_chunk_size); /* * If this chunk hasn't been allocated yet and we aren't writing * out fill values to it, make sure to 0-fill its memory buffer * so we don't use uninitialized memory. */ - if (!H5_addr_defined(chunk_list[i].chunk_current.offset) && !should_fill) - chunk_list[i].buf = H5MM_calloc(chunk_list[i].chunk_buf_size); + if (!H5_addr_defined(chunk_entry->chunk_current.offset) && !should_fill) + chunk_entry->buf = H5MM_calloc(chunk_entry->chunk_buf_size); else - chunk_list[i].buf = H5MM_malloc(chunk_list[i].chunk_buf_size); + chunk_entry->buf = H5MM_malloc(chunk_entry->chunk_buf_size); - if (NULL == chunk_list[i].buf) { + if (NULL == chunk_entry->buf) { /* Push an error, but participate in collective read */ HDONE_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer"); break; } /* Set chunk's new length for eventual filter pipeline calls */ - if (chunk_list[i].need_read) { + if (chunk_entry->need_read) { /* * Check if chunk is currently allocated. If not, don't try to * read it from the file. Instead, just fill the chunk buffer * with the fill value if fill values are to be written. */ - if (H5_addr_defined(chunk_list[i].chunk_current.offset)) { + if (H5_addr_defined(chunk_entry->chunk_current.offset)) { /* Set first read buffer */ if (!base_read_buf) - base_read_buf = chunk_list[i].buf; + base_read_buf = chunk_entry->buf; /* Set chunk's new length for eventual filter pipeline calls */ - if (chunk_list[i].skip_filter_pline) - chunk_list[i].chunk_new.length = file_chunk_size; + if (chunk_entry->skip_filter_pline) + chunk_entry->chunk_new.length = file_chunk_size; else - chunk_list[i].chunk_new.length = chunk_list[i].chunk_current.length; + chunk_entry->chunk_new.length = chunk_entry->chunk_current.length; } else { - chunk_list[i].need_read = FALSE; + chunk_entry->need_read = FALSE; + + /* Update field keeping track of number of chunks to read */ + assert(chunk_list->num_chunks_to_read > 0); + chunk_list->num_chunks_to_read--; /* Set chunk's new length for eventual filter pipeline calls */ - chunk_list[i].chunk_new.length = file_chunk_size; + chunk_entry->chunk_new.length = file_chunk_size; if (should_fill) { /* Initialize fill value buffer if not already initialized */ @@ -4583,7 +4434,7 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch /* Write fill value to memory buffer */ assert(fb_info.fill_buf); - if (H5D__fill(fb_info.fill_buf, di->dset->shared->type, chunk_list[i].buf, + if (H5D__fill(fb_info.fill_buf, di->dset->shared->type, chunk_entry->buf, type_info->mem_type, fill_space) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "couldn't fill chunk buffer with fill value") @@ -4591,7 +4442,7 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch } } else - chunk_list[i].chunk_new.length = file_chunk_size; + chunk_entry->chunk_new.length = file_chunk_size; } /* @@ -4605,25 +4456,9 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "couldn't determine if chunk index is empty") if (!index_empty) { - /* - * Setup for I/O operation - */ - - /* Initialize temporary I/O info */ - coll_io_info = *io_info; - coll_io_info.op_type = H5D_IO_OP_READ; - - /* Override the read buffer to point to the address of the first - * chunk data buffer being read into - */ - if (base_read_buf) { - coll_io_info.base_maddr.vp = base_read_buf; - } - - /* Read all chunks that need to be read from the file */ - if (H5D__mpio_collective_filtered_chunk_common_io(chunk_list, chunk_list_num_entries, &coll_io_info, - mpi_size) < 0) - HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "couldn't finish collective filtered chunk read") + /* Perform collective vector read */ + if (H5D__mpio_collective_filtered_vec_io(chunk_list, io_info->f_sh, H5D_IO_OP_READ) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't perform vector I/O on filtered chunks") } /* @@ -4632,26 +4467,27 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch */ /* Process all chunks with data from the owning rank first */ - for (i = 0; i < chunk_list_num_entries; i++) { - assert(mpi_rank == chunk_list[i].new_owner); + for (i = 0; i < chunk_list->num_chunk_infos; i++) { + H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[i]; + H5D_piece_info_t *chunk_info = chunk_entry->chunk_info; - chunk_info = chunk_list[i].chunk_info; + assert(mpi_rank == chunk_entry->new_owner); /* * If this chunk wasn't being fully overwritten, we read it from * the file, so we need to unfilter it */ - if (chunk_list[i].need_read && !chunk_list[i].skip_filter_pline) { + if (chunk_entry->need_read && !chunk_entry->skip_filter_pline) { if (H5Z_pipeline(&di->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, - &(chunk_list[i].index_info.filter_mask), err_detect, filter_cb, - (size_t *)&chunk_list[i].chunk_new.length, &chunk_list[i].chunk_buf_size, - &chunk_list[i].buf) < 0) + &(chunk_entry->index_info.filter_mask), err_detect, filter_cb, + (size_t *)&chunk_entry->chunk_new.length, &chunk_entry->chunk_buf_size, + &chunk_entry->buf) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTFILTER, FAIL, "couldn't unfilter chunk for modifying") } iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace); - if (H5D_select_io_mem(chunk_list[i].buf, chunk_info->fspace, di->buf.cvp, chunk_info->mspace, + if (H5D_select_io_mem(chunk_entry->buf, chunk_info->fspace, di->buf.cvp, chunk_info->mspace, type_info->dst_type_size, (size_t)iter_nelmts) < 0) HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't copy chunk data to write buffer") } @@ -4662,9 +4498,9 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch /* Now process all received chunk message buffers */ for (i = 0; i < (size_t)chunk_msg_bufs_len; i++) { - H5D_filtered_collective_io_info_t *chunk_entry = NULL; - const unsigned char *msg_ptr = chunk_msg_bufs[i]; - hsize_t chunk_idx; + H5D_filtered_collective_chunk_info_t *chunk_entry = NULL; + const unsigned char *msg_ptr = chunk_msg_bufs[i]; + hsize_t chunk_idx; if (msg_ptr) { /* Retrieve the chunk's index value */ @@ -4672,7 +4508,7 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch msg_ptr += sizeof(hsize_t); /* Find the chunk entry according to its chunk index */ - HASH_FIND(hh, chunk_hash_table, &chunk_idx, sizeof(hsize_t), chunk_entry); + HASH_FIND(hh, chunk_list->chunk_hash_table, &chunk_idx, sizeof(hsize_t), chunk_entry); if (chunk_entry == NULL) HGOTO_ERROR(H5E_DATASET, H5E_CANTFIND, FAIL, "unable to find chunk entry") if (mpi_rank != chunk_entry->new_owner) @@ -4720,17 +4556,18 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch } /* Finally, filter all the chunks */ - for (i = 0; i < chunk_list_num_entries; i++) { - if (!chunk_list[i].skip_filter_pline) { - if (H5Z_pipeline(&di->dset->shared->dcpl_cache.pline, 0, &(chunk_list[i].index_info.filter_mask), - err_detect, filter_cb, (size_t *)&chunk_list[i].chunk_new.length, - &chunk_list[i].chunk_buf_size, &chunk_list[i].buf) < 0) + for (i = 0; i < chunk_list->num_chunk_infos; i++) { + if (!chunk_list->chunk_infos[i].skip_filter_pline) { + if (H5Z_pipeline(&di->dset->shared->dcpl_cache.pline, 0, + &(chunk_list->chunk_infos[i].index_info.filter_mask), err_detect, filter_cb, + (size_t *)&chunk_list->chunk_infos[i].chunk_new.length, + &chunk_list->chunk_infos[i].chunk_buf_size, &chunk_list->chunk_infos[i].buf) < 0) HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, FAIL, "output pipeline failed") } #if H5_SIZEOF_SIZE_T > 4 /* Check for the chunk expanding too much to encode in a 32-bit value */ - if (chunk_list[i].chunk_new.length > ((size_t)0xffffffff)) + if (chunk_list->chunk_infos[i].chunk_new.length > ((size_t)0xffffffff)) HGOTO_ERROR(H5E_DATASET, H5E_BADRANGE, FAIL, "chunk too large for 32-bit length") #endif } @@ -4752,10 +4589,10 @@ done: /* On failure, try to free all resources used by entries in the chunk list */ if (ret_value < 0) { - for (i = 0; i < chunk_list_num_entries; i++) { - if (chunk_list[i].buf) { - H5MM_free(chunk_list[i].buf); - chunk_list[i].buf = NULL; + for (i = 0; i < chunk_list->num_chunk_infos; i++) { + if (chunk_list->chunk_infos[i].buf) { + H5MM_free(chunk_list->chunk_infos[i].buf); + chunk_list->chunk_infos[i].buf = NULL; } } } @@ -4783,9 +4620,8 @@ done: */ static herr_t H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t *chunk_list, - size_t chunk_list_num_entries, size_t *num_chunks_assigned_map, - H5D_io_info_t *io_info, H5D_chk_idx_info_t *idx_info, - int mpi_rank, int mpi_size) + size_t *num_chunks_assigned_map, H5D_io_info_t *io_info, + H5D_chk_idx_info_t *idx_info, int mpi_rank, int mpi_size) { H5D_chunk_alloc_info_t *collective_list = NULL; MPI_Datatype send_type; @@ -4805,7 +4641,7 @@ H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t FUNC_ENTER_PACKAGE - assert(chunk_list || 0 == chunk_list_num_entries); + assert(chunk_list); assert(io_info); assert(idx_info); assert(idx_info->storage->idx_type != H5D_CHUNK_IDX_NONE); @@ -4819,7 +4655,7 @@ H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t * Make sure it's safe to cast this rank's number * of chunks to be sent into an int for MPI */ - H5_CHECK_OVERFLOW(chunk_list_num_entries, size_t, int); + H5_CHECK_OVERFLOW(chunk_list->num_chunk_infos, size_t, int); /* Create derived datatypes for the chunk file space info needed */ if (H5D__mpio_get_chunk_alloc_info_types(&recv_type, &recv_type_derived, &send_type, &send_type_derived) < @@ -4859,9 +4695,9 @@ H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t } /* Perform gather operation */ - if (H5_mpio_gatherv_alloc(chunk_list, (int)chunk_list_num_entries, send_type, counts_ptr, - displacements_ptr, recv_type, TRUE, 0, io_info->comm, mpi_rank, mpi_size, - &gathered_array, &collective_num_entries) < 0) + if (H5_mpio_gatherv_alloc(chunk_list->chunk_infos, (int)chunk_list->num_chunk_infos, send_type, + counts_ptr, displacements_ptr, recv_type, TRUE, 0, io_info->comm, mpi_rank, + mpi_size, &gathered_array, &collective_num_entries) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "can't gather chunk file space info to/from ranks") } else { @@ -4872,9 +4708,9 @@ H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t * contributing before performing the actual gather operation. Use * the 'simple' MPI_Allgatherv wrapper for this. */ - if (H5_mpio_gatherv_alloc_simple(chunk_list, (int)chunk_list_num_entries, send_type, recv_type, TRUE, - 0, io_info->comm, mpi_rank, mpi_size, &gathered_array, - &collective_num_entries) < 0) + if (H5_mpio_gatherv_alloc_simple(chunk_list->chunk_infos, (int)chunk_list->num_chunk_infos, send_type, + recv_type, TRUE, 0, io_info->comm, mpi_rank, mpi_size, + &gathered_array, &collective_num_entries) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "can't gather chunk file space info to/from ranks") } @@ -4894,14 +4730,14 @@ H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t * rank, make sure to update the chunk entry in the local * chunk list */ - update_local_chunk = - (num_local_chunks_processed < chunk_list_num_entries) && - (coll_entry->chunk_idx == chunk_list[num_local_chunks_processed].index_info.chunk_idx); + update_local_chunk = (num_local_chunks_processed < chunk_list->num_chunk_infos) && + (coll_entry->chunk_idx == + chunk_list->chunk_infos[num_local_chunks_processed].index_info.chunk_idx); if (update_local_chunk) { - H5D_filtered_collective_io_info_t *local_chunk; + H5D_filtered_collective_chunk_info_t *local_chunk; - local_chunk = &chunk_list[num_local_chunks_processed]; + local_chunk = &chunk_list->chunk_infos[num_local_chunks_processed]; /* Sanity check that this chunk is actually local */ assert(mpi_rank == local_chunk->orig_owner); @@ -4917,7 +4753,8 @@ H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t */ if (num_local_chunks_processed) { haddr_t curr_chunk_offset = local_chunk->chunk_new.offset; - haddr_t prev_chunk_offset = chunk_list[num_local_chunks_processed - 1].chunk_new.offset; + haddr_t prev_chunk_offset = + chunk_list->chunk_infos[num_local_chunks_processed - 1].chunk_new.offset; assert(H5_addr_defined(prev_chunk_offset) && H5_addr_defined(curr_chunk_offset)); if (curr_chunk_offset < prev_chunk_offset) @@ -4928,15 +4765,15 @@ H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t } } - assert(chunk_list_num_entries == num_local_chunks_processed); + assert(chunk_list->num_chunk_infos == num_local_chunks_processed); /* * Ensure this rank's local chunk list is sorted in * ascending order of offset in the file */ if (need_sort) - qsort(chunk_list, chunk_list_num_entries, sizeof(H5D_filtered_collective_io_info_t), - H5D__cmp_filtered_collective_io_info_entry); + qsort(chunk_list->chunk_infos, chunk_list->num_chunk_infos, + sizeof(H5D_filtered_collective_chunk_info_t), H5D__cmp_filtered_collective_io_info_entry); done: H5MM_free(gathered_array); @@ -4974,9 +4811,9 @@ done: */ static herr_t H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t *chunk_list, - size_t chunk_list_num_entries, size_t *num_chunks_assigned_map, - H5D_io_info_t *io_info, H5D_dset_io_info_t *di, - H5D_chk_idx_info_t *idx_info, int mpi_rank, int mpi_size) + size_t *num_chunks_assigned_map, H5D_io_info_t *io_info, + H5D_dset_io_info_t *di, H5D_chk_idx_info_t *idx_info, + int mpi_rank, int mpi_size) { H5D_chunk_ud_t chunk_ud; MPI_Datatype send_type; @@ -4995,7 +4832,7 @@ H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t * FUNC_ENTER_PACKAGE - assert(chunk_list || 0 == chunk_list_num_entries); + assert(chunk_list); assert(io_info); assert(di); assert(idx_info); @@ -5013,7 +4850,7 @@ H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t * * Make sure it's safe to cast this rank's number * of chunks to be sent into an int for MPI */ - H5_CHECK_OVERFLOW(chunk_list_num_entries, size_t, int); + H5_CHECK_OVERFLOW(chunk_list->num_chunk_infos, size_t, int); /* Create derived datatypes for the chunk re-insertion info needed */ if (H5D__mpio_get_chunk_insert_info_types(&recv_type, &recv_type_derived, &send_type, @@ -5053,9 +4890,9 @@ H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t * } /* Perform gather operation */ - if (H5_mpio_gatherv_alloc(chunk_list, (int)chunk_list_num_entries, send_type, counts_ptr, - displacements_ptr, recv_type, TRUE, 0, io_info->comm, mpi_rank, mpi_size, - &gathered_array, &collective_num_entries) < 0) + if (H5_mpio_gatherv_alloc(chunk_list->chunk_infos, (int)chunk_list->num_chunk_infos, send_type, + counts_ptr, displacements_ptr, recv_type, TRUE, 0, io_info->comm, mpi_rank, + mpi_size, &gathered_array, &collective_num_entries) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "can't gather chunk index re-insertion info to/from ranks") } @@ -5067,9 +4904,9 @@ H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t * * contributing before performing the actual gather operation. Use * the 'simple' MPI_Allgatherv wrapper for this. */ - if (H5_mpio_gatherv_alloc_simple(chunk_list, (int)chunk_list_num_entries, send_type, recv_type, TRUE, - 0, io_info->comm, mpi_rank, mpi_size, &gathered_array, - &collective_num_entries) < 0) + if (H5_mpio_gatherv_alloc_simple(chunk_list->chunk_infos, (int)chunk_list->num_chunk_infos, send_type, + recv_type, TRUE, 0, io_info->comm, mpi_rank, mpi_size, + &gathered_array, &collective_num_entries) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "can't gather chunk index re-insertion info to/from ranks") } @@ -5128,10 +4965,11 @@ H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t * * the calculated coordinates above to make sure * they match. */ - for (size_t dbg_idx = 0; dbg_idx < chunk_list_num_entries; dbg_idx++) { - if (coll_entry->index_info.chunk_idx == chunk_list[dbg_idx].index_info.chunk_idx) { - hbool_t coords_match = !memcmp(scaled_coords, chunk_list[dbg_idx].chunk_info->scaled, - di->dset->shared->ndims * sizeof(hsize_t)); + for (size_t dbg_idx = 0; dbg_idx < chunk_list->num_chunk_infos; dbg_idx++) { + if (coll_entry->index_info.chunk_idx == chunk_list->chunk_infos[dbg_idx].index_info.chunk_idx) { + hbool_t coords_match = + !memcmp(scaled_coords, chunk_list->chunk_infos[dbg_idx].chunk_info->scaled, + di->dset->shared->ndims * sizeof(hsize_t)); assert(coords_match && "Calculated scaled coordinates for chunk didn't match " "chunk's actual scaled coordinates!"); @@ -5169,7 +5007,7 @@ done: * Function: H5D__mpio_get_chunk_redistribute_info_types * * Purpose: Constructs MPI derived datatypes for communicating the - * info from a H5D_filtered_collective_io_info_t structure + * info from a H5D_filtered_collective_chunk_info_t structure * that is necessary for redistributing shared chunks during a * collective write of filtered chunks. * @@ -5179,7 +5017,7 @@ done: * type. * * The datatype returned through `resized_type` has an extent - * equal to the size of an H5D_filtered_collective_io_info_t + * equal to the size of an H5D_filtered_collective_chunk_info_t * structure. This makes it suitable for sending an array of * those structures, while extracting out just the info * necessary for the chunk redistribution operation during @@ -5250,7 +5088,7 @@ H5D__mpio_get_chunk_redistribute_info_types(MPI_Datatype *contig_type, hbool_t * HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) /* Create struct type to extract the chunk_current, chunk_idx, orig_owner, - * new_owner and num_writers fields from a H5D_filtered_collective_io_info_t + * new_owner and num_writers fields from a H5D_filtered_collective_chunk_info_t * structure */ block_lengths[0] = 1; @@ -5258,11 +5096,11 @@ H5D__mpio_get_chunk_redistribute_info_types(MPI_Datatype *contig_type, hbool_t * block_lengths[2] = 1; block_lengths[3] = 1; block_lengths[4] = 1; - displacements[0] = offsetof(H5D_filtered_collective_io_info_t, chunk_current); - displacements[1] = offsetof(H5D_filtered_collective_io_info_t, index_info.chunk_idx); - displacements[2] = offsetof(H5D_filtered_collective_io_info_t, orig_owner); - displacements[3] = offsetof(H5D_filtered_collective_io_info_t, new_owner); - displacements[4] = offsetof(H5D_filtered_collective_io_info_t, num_writers); + displacements[0] = offsetof(H5D_filtered_collective_chunk_info_t, chunk_current); + displacements[1] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.chunk_idx); + displacements[2] = offsetof(H5D_filtered_collective_chunk_info_t, orig_owner); + displacements[3] = offsetof(H5D_filtered_collective_chunk_info_t, new_owner); + displacements[4] = offsetof(H5D_filtered_collective_chunk_info_t, num_writers); types[0] = chunk_block_type; types[1] = HSIZE_AS_MPI_TYPE; types[2] = MPI_INT; @@ -5274,7 +5112,7 @@ H5D__mpio_get_chunk_redistribute_info_types(MPI_Datatype *contig_type, hbool_t * struct_type_derived = TRUE; if (MPI_SUCCESS != (mpi_code = MPI_Type_create_resized( - struct_type, 0, sizeof(H5D_filtered_collective_io_info_t), resized_type))) + struct_type, 0, sizeof(H5D_filtered_collective_chunk_info_t), resized_type))) HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_resized failed", mpi_code) *resized_type_derived = TRUE; @@ -5311,16 +5149,16 @@ done: * Function: H5D__mpio_get_chunk_alloc_info_types * * Purpose: Constructs MPI derived datatypes for communicating the info - * from a H5D_filtered_collective_io_info_t structure that is - * necessary for re-allocating file space during a collective - * write of filtered chunks. + * from a H5D_filtered_collective_chunk_info_t structure that + * is necessary for re-allocating file space during a + * collective write of filtered chunks. * * The datatype returned through `contig_type` has an extent * equal to the size of an H5D_chunk_alloc_info_t structure * and is suitable for communicating that structure type. * * The datatype returned through `resized_type` has an extent - * equal to the size of an H5D_filtered_collective_io_info_t + * equal to the size of an H5D_filtered_collective_chunk_info_t * structure. This makes it suitable for sending an array of * those structures, while extracting out just the info * necessary for the chunk file space reallocation operation @@ -5385,14 +5223,14 @@ H5D__mpio_get_chunk_alloc_info_types(MPI_Datatype *contig_type, hbool_t *contig_ /* * Create struct type to extract the chunk_current, chunk_new and chunk_idx - * fields from a H5D_filtered_collective_io_info_t structure + * fields from a H5D_filtered_collective_chunk_info_t structure */ block_lengths[0] = 1; block_lengths[1] = 1; block_lengths[2] = 1; - displacements[0] = offsetof(H5D_filtered_collective_io_info_t, chunk_current); - displacements[1] = offsetof(H5D_filtered_collective_io_info_t, chunk_new); - displacements[2] = offsetof(H5D_filtered_collective_io_info_t, index_info.chunk_idx); + displacements[0] = offsetof(H5D_filtered_collective_chunk_info_t, chunk_current); + displacements[1] = offsetof(H5D_filtered_collective_chunk_info_t, chunk_new); + displacements[2] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.chunk_idx); types[0] = chunk_block_type; types[1] = chunk_block_type; types[2] = HSIZE_AS_MPI_TYPE; @@ -5402,7 +5240,7 @@ H5D__mpio_get_chunk_alloc_info_types(MPI_Datatype *contig_type, hbool_t *contig_ struct_type_derived = TRUE; if (MPI_SUCCESS != (mpi_code = MPI_Type_create_resized( - struct_type, 0, sizeof(H5D_filtered_collective_io_info_t), resized_type))) + struct_type, 0, sizeof(H5D_filtered_collective_chunk_info_t), resized_type))) HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_resized failed", mpi_code) *resized_type_derived = TRUE; @@ -5442,7 +5280,7 @@ done: * information necessary when reinserting chunks into a * dataset's chunk index. This includes the chunk's new offset * and size (H5F_block_t) and the inner `index_info` structure - * of a H5D_filtered_collective_io_info_t structure. + * of a H5D_filtered_collective_chunk_info_t structure. * * The datatype returned through `contig_type` has an extent * equal to the size of an H5D_chunk_insert_info_t structure @@ -5450,9 +5288,9 @@ done: * * The datatype returned through `resized_type` has an extent * equal to the size of the encompassing - * H5D_filtered_collective_io_info_t structure. This makes it - * suitable for sending an array of - * H5D_filtered_collective_io_info_t structures, while + * H5D_filtered_collective_chunk_info_t structure. This makes + * it suitable for sending an array of + * H5D_filtered_collective_chunk_info_t structures, while * extracting out just the information needed during * communication. * @@ -5531,20 +5369,20 @@ H5D__mpio_get_chunk_insert_info_types(MPI_Datatype *contig_type, hbool_t *contig /* * Create struct type to correctly extract all needed - * information from a H5D_filtered_collective_io_info_t + * information from a H5D_filtered_collective_chunk_info_t * structure. */ - displacements[0] = offsetof(H5D_filtered_collective_io_info_t, chunk_new); - displacements[1] = offsetof(H5D_filtered_collective_io_info_t, index_info.chunk_idx); - displacements[2] = offsetof(H5D_filtered_collective_io_info_t, index_info.filter_mask); - displacements[3] = offsetof(H5D_filtered_collective_io_info_t, index_info.need_insert); + displacements[0] = offsetof(H5D_filtered_collective_chunk_info_t, chunk_new); + displacements[1] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.chunk_idx); + displacements[2] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.filter_mask); + displacements[3] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.need_insert); if (MPI_SUCCESS != (mpi_code = MPI_Type_create_struct(field_count, block_lengths, displacements, types, &struct_type))) HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_struct failed", mpi_code) struct_type_derived = TRUE; if (MPI_SUCCESS != (mpi_code = MPI_Type_create_resized( - struct_type, 0, sizeof(H5D_filtered_collective_io_info_t), resized_type))) + struct_type, 0, sizeof(H5D_filtered_collective_chunk_info_t), resized_type))) HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_resized failed", mpi_code) *resized_type_derived = TRUE; @@ -5578,274 +5416,148 @@ done: } /* end H5D__mpio_get_chunk_insert_info_types() */ /*------------------------------------------------------------------------- - * Function: H5D__mpio_collective_filtered_io_type + * Function: H5D__mpio_collective_filtered_vec_io * - * Purpose: Constructs a MPI derived datatype for both the memory and - * the file for a collective I/O operation on filtered chunks. - * The datatype contains the chunk offsets and lengths in the - * file and the locations of the chunk data buffers to read - * into/write from. + * Purpose: Given a pointer to a H5D_filtered_collective_io_info_t + * structure with information about collective filtered chunk + * I/O, populates I/O vectors and performs vector I/O on those + * chunks. * * Return: Non-negative on success/Negative on failure * *------------------------------------------------------------------------- */ static herr_t -H5D__mpio_collective_filtered_io_type(H5D_filtered_collective_io_info_t *chunk_list, size_t num_entries, - H5D_io_op_type_t op_type, MPI_Datatype *new_mem_type, - hbool_t *mem_type_derived, MPI_Datatype *new_file_type, - hbool_t *file_type_derived) +H5D__mpio_collective_filtered_vec_io(const H5D_filtered_collective_io_info_t *chunk_list, H5F_shared_t *f_sh, + H5D_io_op_type_t op_type) { - MPI_Aint *io_buf_array = NULL; /* Relative displacements of filtered chunk data buffers */ - MPI_Aint *file_offset_array = NULL; /* Chunk offsets in the file */ - int *length_array = NULL; /* Filtered Chunk lengths */ - int mpi_code; - herr_t ret_value = SUCCEED; + const void **io_wbufs = NULL; + void **io_rbufs = NULL; + H5FD_mem_t io_types[2]; + uint32_t iovec_count = 0; + haddr_t *io_addrs = NULL; + size_t *io_sizes = NULL; + herr_t ret_value = SUCCEED; FUNC_ENTER_PACKAGE - assert(chunk_list || 0 == num_entries); - assert(new_mem_type); - assert(mem_type_derived); - assert(new_file_type); - assert(file_type_derived); + assert(chunk_list); + assert(f_sh); - *mem_type_derived = FALSE; - *file_type_derived = FALSE; - *new_mem_type = MPI_BYTE; - *new_file_type = MPI_BYTE; + if (op_type == H5D_IO_OP_WRITE) + iovec_count = (uint32_t)chunk_list->num_chunk_infos; + else { + assert(chunk_list->num_chunks_to_read <= chunk_list->num_chunk_infos); + iovec_count = (uint32_t)chunk_list->num_chunks_to_read; + } - if (num_entries > 0) { - H5F_block_t *chunk_block; - size_t last_valid_idx = 0; - size_t i; - int chunk_count; + if (iovec_count > 0) { + if (chunk_list->num_chunk_infos > UINT32_MAX) + HGOTO_ERROR(H5E_INTERNAL, H5E_BADRANGE, FAIL, + "number of chunk entries in I/O operation exceeds UINT32_MAX") - /* - * Determine number of chunks for I/O operation and - * setup for derived datatype creation if I/O operation - * includes multiple chunks - */ - if (num_entries == 1) { - /* Set last valid index to 0 for contiguous datatype creation */ - last_valid_idx = 0; + if (NULL == (io_addrs = H5MM_malloc(iovec_count * sizeof(*io_addrs)))) + HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "couldn't allocate space for I/O addresses vector") + if (NULL == (io_sizes = H5MM_malloc(iovec_count * sizeof(*io_sizes)))) + HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "couldn't allocate space for I/O sizes vector") - if (op_type == H5D_IO_OP_WRITE) - chunk_count = 1; - else - chunk_count = chunk_list[0].need_read ? 1 : 0; + if (op_type == H5D_IO_OP_WRITE) { + if (NULL == (io_wbufs = H5MM_malloc(iovec_count * sizeof(*io_wbufs)))) + HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, + "couldn't allocate space for I/O buffers vector") } else { - MPI_Aint chunk_buf; - MPI_Aint base_buf; - haddr_t base_offset = HADDR_UNDEF; + if (NULL == (io_rbufs = H5MM_malloc(iovec_count * sizeof(*io_rbufs)))) + HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, + "couldn't allocate space for I/O buffers vector") + } - H5_CHECK_OVERFLOW(num_entries, size_t, int); + /* + * Since all I/O will be raw data, we can save on memory a bit by + * making use of H5FD_MEM_NOLIST to signal that all the memory types + * are the same across the I/O vectors + */ + io_types[0] = H5FD_MEM_DRAW; + io_types[1] = H5FD_MEM_NOLIST; - /* Allocate arrays */ - if (NULL == (length_array = H5MM_malloc((size_t)num_entries * sizeof(int)))) - HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, - "memory allocation failed for filtered collective I/O length array") - if (NULL == (io_buf_array = H5MM_malloc((size_t)num_entries * sizeof(MPI_Aint)))) - HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, - "memory allocation failed for filtered collective I/O buf length array") - if (NULL == (file_offset_array = H5MM_malloc((size_t)num_entries * sizeof(MPI_Aint)))) - HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, - "memory allocation failed for filtered collective I/O offset array") + for (size_t i = 0, vec_idx = 0; i < chunk_list->num_chunk_infos; i++) { + H5F_block_t *chunk_block; /* - * If doing a write, we can set the base chunk offset - * and base chunk data buffer right away. - * - * If doing a read, some chunks may be skipped over - * for reading if they aren't yet allocated in the - * file. Therefore, we have to find the first chunk - * actually being read in order to set the base chunk - * offset and base chunk data buffer. + * Check that we aren't going to accidentally try to write past the + * allocated memory for the I/O vector buffers in case bookkeeping + * wasn't done properly for the chunk list struct's `num_chunks_to_read` + * field. */ - if (op_type == H5D_IO_OP_WRITE) { -#if H5_CHECK_MPI_VERSION(3, 0) - if (MPI_SUCCESS != (mpi_code = MPI_Get_address(chunk_list[0].buf, &base_buf))) - HMPI_GOTO_ERROR(FAIL, "MPI_Get_address failed", mpi_code) -#else - base_buf = (MPI_Aint)chunk_list[0].buf; -#endif - - base_offset = chunk_list[0].chunk_new.offset; - } - - for (i = 0, chunk_count = 0; i < num_entries; i++) { - if (op_type == H5D_IO_OP_READ) { - /* - * If this chunk isn't being read, don't add it - * to the MPI type we're building up for I/O - */ - if (!chunk_list[i].need_read) - continue; - - /* - * If this chunk is being read, go ahead and - * set the base chunk offset and base chunk - * data buffer if we haven't already - */ - if (!H5_addr_defined(base_offset)) { -#if H5_CHECK_MPI_VERSION(3, 0) - if (MPI_SUCCESS != (mpi_code = MPI_Get_address(chunk_list[i].buf, &base_buf))) - HMPI_GOTO_ERROR(FAIL, "MPI_Get_address failed", mpi_code) -#else - base_buf = (MPI_Aint)chunk_list[i].buf; -#endif - - base_offset = chunk_list[i].chunk_current.offset; - } - } + assert(vec_idx < iovec_count); - /* Set convenience pointer for current chunk block */ - chunk_block = - (op_type == H5D_IO_OP_READ) ? &chunk_list[i].chunk_current : &chunk_list[i].chunk_new; - - /* - * Set the current chunk entry's offset in the file, relative to - * the first chunk entry - */ - assert(H5_addr_defined(chunk_block->offset)); - file_offset_array[chunk_count] = (MPI_Aint)(chunk_block->offset - base_offset); + if (op_type == H5D_IO_OP_READ && !chunk_list->chunk_infos[i].need_read) + continue; - /* - * Ensure the chunk list is sorted in ascending ordering of - * offset in the file - */ - if (chunk_count) - assert(file_offset_array[chunk_count] > file_offset_array[chunk_count - 1]); + /* Set convenience pointer for current chunk block */ + chunk_block = (op_type == H5D_IO_OP_READ) ? &chunk_list->chunk_infos[i].chunk_current + : &chunk_list->chunk_infos[i].chunk_new; - /* Set the current chunk entry's size for the I/O operation */ - H5_CHECK_OVERFLOW(chunk_block->length, hsize_t, int); - length_array[chunk_count] = (int)chunk_block->length; + assert(H5_addr_defined(chunk_block->offset)); + io_addrs[vec_idx] = chunk_block->offset; - /* - * Set the displacement of the chunk entry's chunk data buffer, - * relative to the first entry's data buffer - */ -#if H5_CHECK_MPI_VERSION(3, 1) - if (MPI_SUCCESS != (mpi_code = MPI_Get_address(chunk_list[i].buf, &chunk_buf))) - HMPI_GOTO_ERROR(FAIL, "MPI_Get_address failed", mpi_code) - - io_buf_array[chunk_count] = MPI_Aint_diff(chunk_buf, base_buf); -#else - chunk_buf = (MPI_Aint)chunk_list[i].buf; - io_buf_array[chunk_count] = chunk_buf - base_buf; + /* + * Ensure the chunk list is sorted in ascending ordering of + * offset in the file. Note that we only compare that the + * current address is greater than the previous address and + * not equal to it; file addresses should only appear in the + * chunk list once. + */ +#ifndef NDEBUG + if (vec_idx > 0) + assert(io_addrs[vec_idx] > io_addrs[vec_idx - 1]); #endif - /* - * Set last valid index in case only a single chunk will - * be involved in the I/O operation - */ - last_valid_idx = i; - - chunk_count++; - } /* end for */ - } - - /* - * Create derived datatypes for the chunk list if this - * rank has any chunks to work on - */ - if (chunk_count > 0) { - if (chunk_count == 1) { - int chunk_len; - - /* Single chunk - use a contiguous type for both memory and file */ - - /* Ensure that we can cast chunk size to an int for MPI */ - chunk_block = (op_type == H5D_IO_OP_READ) ? &chunk_list[last_valid_idx].chunk_current - : &chunk_list[last_valid_idx].chunk_new; - H5_CHECKED_ASSIGN(chunk_len, int, chunk_block->length, hsize_t); + io_sizes[vec_idx] = (size_t)chunk_block->length; - if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(chunk_len, MPI_BYTE, new_file_type))) - HMPI_GOTO_ERROR(FAIL, "MPI_Type_contiguous failed", mpi_code) - *new_mem_type = *new_file_type; - - /* - * Since we use the same datatype for both memory and file, only - * mark the file type as derived so the caller doesn't try to - * free the same type twice - */ - *mem_type_derived = FALSE; - *file_type_derived = TRUE; + if (op_type == H5D_IO_OP_WRITE) + io_wbufs[vec_idx] = chunk_list->chunk_infos[i].buf; + else + io_rbufs[vec_idx] = chunk_list->chunk_infos[i].buf; - if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_file_type))) - HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) - } - else { - assert(file_offset_array); - assert(length_array); - assert(io_buf_array); - - /* Multiple chunks - use an hindexed type for both memory and file */ - - /* Create memory MPI type */ - if (MPI_SUCCESS != (mpi_code = MPI_Type_create_hindexed( - chunk_count, length_array, io_buf_array, MPI_BYTE, new_mem_type))) - HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code) - *mem_type_derived = TRUE; - - if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_mem_type))) - HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) - - /* Create file MPI type */ - if (MPI_SUCCESS != - (mpi_code = MPI_Type_create_hindexed(chunk_count, length_array, file_offset_array, - MPI_BYTE, new_file_type))) - HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code) - *file_type_derived = TRUE; - - if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_file_type))) - HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) - } + vec_idx++; } - } /* end if */ - -done: - if (file_offset_array) - H5MM_free(file_offset_array); - if (io_buf_array) - H5MM_free(io_buf_array); - if (length_array) - H5MM_free(length_array); + } - if (ret_value < 0) { - if (*file_type_derived) { - if (MPI_SUCCESS != (mpi_code = MPI_Type_free(new_file_type))) - HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) - *file_type_derived = FALSE; - } - if (*mem_type_derived) { - if (MPI_SUCCESS != (mpi_code = MPI_Type_free(new_mem_type))) - HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) - *mem_type_derived = FALSE; - } + if (op_type == H5D_IO_OP_WRITE) { + if (H5F_shared_vector_write(f_sh, iovec_count, io_types, io_addrs, io_sizes, io_wbufs) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "vector write call failed") + } + else { + if (H5F_shared_vector_read(f_sh, iovec_count, io_types, io_addrs, io_sizes, io_rbufs) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "vector read call failed") } +done: + H5MM_free(io_wbufs); + H5MM_free(io_rbufs); + H5MM_free(io_sizes); + H5MM_free(io_addrs); + FUNC_LEAVE_NOAPI(ret_value) -} /* end H5D__mpio_collective_filtered_io_type() */ +} #ifdef H5Dmpio_DEBUG static herr_t -H5D__mpio_dump_collective_filtered_chunk_list(H5D_filtered_collective_io_info_t *chunk_list, - size_t chunk_list_num_entries, int mpi_rank) +H5D__mpio_dump_collective_filtered_chunk_list(H5D_filtered_collective_io_info_t *chunk_list, int mpi_rank) { - H5D_filtered_collective_io_info_t *chunk_entry; - size_t i; - herr_t ret_value = SUCCEED; + H5D_filtered_collective_chunk_info_t *chunk_entry; + size_t i; + herr_t ret_value = SUCCEED; FUNC_ENTER_PACKAGE_NOERR H5D_MPIO_DEBUG(mpi_rank, "CHUNK LIST: ["); - for (i = 0; i < chunk_list_num_entries; i++) { + for (i = 0; i < chunk_list->num_chunk_infos; i++) { unsigned chunk_rank; - chunk_entry = &chunk_list[i]; + chunk_entry = &chunk_list->chunk_infos[i]; assert(chunk_entry->chunk_info); chunk_rank = (unsigned)H5S_GET_EXTENT_NDIMS(chunk_entry->chunk_info->fspace); diff --git a/src/H5Fio.c b/src/H5Fio.c index 1924004..09d57e8 100644 --- a/src/H5Fio.c +++ b/src/H5Fio.c @@ -328,8 +328,16 @@ H5F_shared_vector_read(H5F_shared_t *f_sh, uint32_t count, H5FD_mem_t types[], h * for now, assume the caller has done this already. */ #ifndef NDEBUG - for (uint32_t i = 0; i < count; i++) + for (uint32_t i = 0; i < count; i++) { + /* Break early if H5FD_MEM_NOLIST was specified + * since a full 'count'-sized array may not + * have been passed for 'types' + */ + if (i > 0 && types[i] == H5FD_MEM_NOLIST) + break; + assert(types[i] != H5FD_MEM_GHEAP); + } #endif /* Pass down to file driver layer (bypass page buffer for now) */ @@ -373,8 +381,16 @@ H5F_shared_vector_write(H5F_shared_t *f_sh, uint32_t count, H5FD_mem_t types[], * for now, assume the caller has done this already. */ #ifndef NDEBUG - for (uint32_t i = 0; i < count; i++) + for (uint32_t i = 0; i < count; i++) { + /* Break early if H5FD_MEM_NOLIST was specified + * since a full 'count'-sized array may not + * have been passed for 'types' + */ + if (i > 0 && types[i] == H5FD_MEM_NOLIST) + break; + assert(types[i] != H5FD_MEM_GHEAP); + } #endif /* Pass down to file driver layer (bypass page buffer for now) */ -- cgit v0.12