diff options
author | jhendersonHDF <jhenderson@hdfgroup.org> | 2023-10-10 15:11:22 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-10-10 15:11:22 (GMT) |
commit | bfbfaf72e17fcc9efa848557a0c57c0583d5c8c4 (patch) | |
tree | 46c6309ab264df8af6bba567713a22bb187baa2f /src | |
parent | 7631015ea4af183c01025c4907869f47f7355c51 (diff) | |
download | hdf5-bfbfaf72e17fcc9efa848557a0c57c0583d5c8c4.zip hdf5-bfbfaf72e17fcc9efa848557a0c57c0583d5c8c4.tar.gz hdf5-bfbfaf72e17fcc9efa848557a0c57c0583d5c8c4.tar.bz2 |
Update parallel compression feature to support multi-dataset I/O (#3591)
Diffstat (limited to 'src')
-rw-r--r-- | src/H5Dchunk.c | 51 | ||||
-rw-r--r-- | src/H5Dcontig.c | 2 | ||||
-rw-r--r-- | src/H5Dio.c | 79 | ||||
-rw-r--r-- | src/H5Dmpio.c | 1956 | ||||
-rw-r--r-- | src/H5Dpkg.h | 43 | ||||
-rw-r--r-- | src/H5FDmpio.c | 8 | ||||
-rw-r--r-- | src/H5Fmpi.c | 29 | ||||
-rw-r--r-- | src/H5Fprivate.h | 8 | ||||
-rw-r--r-- | src/H5Fquery.c | 22 | ||||
-rw-r--r-- | src/H5Pint.c | 2 | ||||
-rw-r--r-- | src/H5VLnative_dataset.c | 10 |
11 files changed, 1408 insertions, 802 deletions
diff --git a/src/H5Dchunk.c b/src/H5Dchunk.c index cabdcfb..9f4bd90 100644 --- a/src/H5Dchunk.c +++ b/src/H5Dchunk.c @@ -1114,6 +1114,31 @@ H5D__chunk_io_init(H5D_io_info_t *io_info, H5D_dset_io_info_t *dinfo) } } +#ifdef H5_HAVE_PARALLEL + /* + * If collective metadata reads are enabled, ensure all ranks + * have the dataset's chunk index open (if it was created) to + * prevent possible metadata inconsistency issues or unintentional + * independent metadata reads later on. + */ + if (H5F_SHARED_HAS_FEATURE(io_info->f_sh, H5FD_FEAT_HAS_MPI) && + H5F_shared_get_coll_metadata_reads(io_info->f_sh) && + H5D__chunk_is_space_alloc(&dataset->shared->layout.storage)) { + H5D_chunk_ud_t udata; + hsize_t scaled[H5O_LAYOUT_NDIMS] = {0}; + + /* + * TODO: Until the dataset chunk index callback structure has + * callbacks for checking if an index is opened and also for + * directly opening the index, the following fake chunk lookup + * serves the purpose of forcing a chunk index open operation + * on all ranks + */ + if (H5D__chunk_lookup(dataset, scaled, &udata) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to collectively open dataset chunk index"); + } +#endif + done: if (file_space_normalized == true) if (H5S_hyper_denormalize_offset(dinfo->file_space, old_offset) < 0) @@ -1556,6 +1581,9 @@ H5D__create_piece_map_single(H5D_dset_io_info_t *di, H5D_io_info_t *io_info) piece_info->in_place_tconv = false; piece_info->buf_off = 0; + /* Check if chunk is in a dataset with filters applied */ + piece_info->filtered_dset = di->dset->shared->dcpl_cache.pline.nused > 0; + /* make connection to related dset info from this piece_info */ piece_info->dset_info = di; @@ -1591,6 +1619,7 @@ H5D__create_piece_file_map_all(H5D_dset_io_info_t *di, H5D_io_info_t *io_info) hsize_t curr_partial_clip[H5S_MAX_RANK]; /* Current partial dimension sizes to clip against */ hsize_t partial_dim_size[H5S_MAX_RANK]; /* Size of a partial dimension */ bool is_partial_dim[H5S_MAX_RANK]; /* Whether a dimension is currently a partial chunk */ + bool filtered_dataset; /* Whether the dataset in question has filters applied */ unsigned num_partial_dims; /* Current number of partial dimensions */ unsigned u; /* Local index variable */ herr_t ret_value = SUCCEED; /* Return value */ @@ -1640,6 +1669,9 @@ H5D__create_piece_file_map_all(H5D_dset_io_info_t *di, H5D_io_info_t *io_info) /* Set the index of this chunk */ chunk_index = 0; + /* Check whether dataset has filters applied */ + filtered_dataset = di->dset->shared->dcpl_cache.pline.nused > 0; + /* Create "temporary" chunk for selection operations (copy file space) */ if (NULL == (tmp_fchunk = H5S_create_simple(fm->f_ndims, fm->chunk_dim, NULL))) HGOTO_ERROR(H5E_DATASET, H5E_CANTCREATE, FAIL, "unable to create dataspace for chunk"); @@ -1686,6 +1718,8 @@ H5D__create_piece_file_map_all(H5D_dset_io_info_t *di, H5D_io_info_t *io_info) new_piece_info->in_place_tconv = false; new_piece_info->buf_off = 0; + new_piece_info->filtered_dset = filtered_dataset; + /* Insert the new chunk into the skip list */ if (H5SL_insert(fm->dset_sel_pieces, new_piece_info, &new_piece_info->index) < 0) { H5D__free_piece_info(new_piece_info, NULL, NULL); @@ -1798,6 +1832,7 @@ H5D__create_piece_file_map_hyper(H5D_dset_io_info_t *dinfo, H5D_io_info_t *io_in hsize_t chunk_index; /* Index of chunk */ hsize_t start_scaled[H5S_MAX_RANK]; /* Starting scaled coordinates of selection */ hsize_t scaled[H5S_MAX_RANK]; /* Scaled coordinates for this chunk */ + bool filtered_dataset; /* Whether the dataset in question has filters applied */ int curr_dim; /* Current dimension to increment */ unsigned u; /* Local index variable */ herr_t ret_value = SUCCEED; /* Return value */ @@ -1831,6 +1866,9 @@ H5D__create_piece_file_map_hyper(H5D_dset_io_info_t *dinfo, H5D_io_info_t *io_in /* Calculate the index of this chunk */ chunk_index = H5VM_array_offset_pre(fm->f_ndims, dinfo->layout->u.chunk.down_chunks, scaled); + /* Check whether dataset has filters applied */ + filtered_dataset = dinfo->dset->shared->dcpl_cache.pline.nused > 0; + /* Iterate through each chunk in the dataset */ while (sel_points) { /* Check for intersection of current chunk and file selection */ @@ -1885,6 +1923,8 @@ H5D__create_piece_file_map_hyper(H5D_dset_io_info_t *dinfo, H5D_io_info_t *io_in new_piece_info->in_place_tconv = false; new_piece_info->buf_off = 0; + new_piece_info->filtered_dset = filtered_dataset; + /* Add piece to global piece_count */ io_info->piece_count++; @@ -2257,6 +2297,8 @@ H5D__piece_file_cb(void H5_ATTR_UNUSED *elem, const H5T_t H5_ATTR_UNUSED *type, piece_info->in_place_tconv = false; piece_info->buf_off = 0; + piece_info->filtered_dset = dinfo->dset->shared->dcpl_cache.pline.nused > 0; + /* Make connection to related dset info from this piece_info */ piece_info->dset_info = dinfo; @@ -2417,6 +2459,9 @@ H5D__chunk_mdio_init(H5D_io_info_t *io_info, H5D_dset_io_info_t *dinfo) /* Add to sel_pieces and update pieces_added */ io_info->sel_pieces[io_info->pieces_added++] = piece_info; + + if (piece_info->filtered_dset) + io_info->filtered_pieces_added++; } /* Advance to next skip list node */ @@ -2728,6 +2773,9 @@ H5D__chunk_read(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info) if (io_info->sel_pieces) io_info->sel_pieces[io_info->pieces_added] = chunk_info; io_info->pieces_added++; + + if (io_info->sel_pieces && chunk_info->filtered_dset) + io_info->filtered_pieces_added++; } } /* end if */ else if (!skip_missing_chunks) { @@ -3142,6 +3190,9 @@ H5D__chunk_write(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info) if (io_info->sel_pieces) io_info->sel_pieces[io_info->pieces_added] = chunk_info; io_info->pieces_added++; + + if (io_info->sel_pieces && chunk_info->filtered_dset) + io_info->filtered_pieces_added++; } } /* end else */ diff --git a/src/H5Dcontig.c b/src/H5Dcontig.c index db156fd..2a9f178 100644 --- a/src/H5Dcontig.c +++ b/src/H5Dcontig.c @@ -644,6 +644,8 @@ H5D__contig_io_init(H5D_io_info_t *io_info, H5D_dset_io_info_t *dinfo) new_piece_info->in_place_tconv = false; new_piece_info->buf_off = 0; + new_piece_info->filtered_dset = dinfo->dset->shared->dcpl_cache.pline.nused > 0; + /* Calculate type conversion buffer size and check for in-place conversion if necessary. Currently * only implemented for selection I/O. */ if (io_info->use_select_io != H5D_SELECTION_IO_MODE_OFF && diff --git a/src/H5Dio.c b/src/H5Dio.c index 543bb56..2134ce1 100644 --- a/src/H5Dio.c +++ b/src/H5Dio.c @@ -107,6 +107,17 @@ H5D__read(size_t count, H5D_dset_io_info_t *dset_info) FUNC_ENTER_NOAPI(FAIL) +#ifdef H5_HAVE_PARALLEL + /* Reset the actual io mode properties to the default values in case + * the DXPL (if it's non-default) was previously used in a collective + * I/O operation. + */ + if (!H5CX_is_def_dxpl()) { + H5CX_set_mpio_actual_chunk_opt(H5D_MPIO_NO_CHUNK_OPTIMIZATION); + H5CX_set_mpio_actual_io_mode(H5D_MPIO_NO_COLLECTIVE); + } /* end if */ +#endif + /* Init io_info */ if (H5D__ioinfo_init(count, H5D_IO_OP_READ, dset_info, &io_info) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "can't initialize I/O info"); @@ -222,6 +233,14 @@ H5D__read(size_t count, H5D_dset_io_info_t *dset_info) dset_info[i].buf.vp = (void *)(((uint8_t *)dset_info[i].buf.vp) + buf_adj); } /* end if */ + /* Set up I/O operation */ + if (H5D__dset_ioinfo_init(dset_info[i].dset, &(dset_info[i]), &(store[i])) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to set up I/O operation"); + + /* Check if any filters are applied to the dataset */ + if (dset_info[i].dset->shared->dcpl_cache.pline.nused > 0) + io_info.filtered_count++; + /* If space hasn't been allocated and not using external storage, * return fill value to buffer if fill time is upon allocation, or * do nothing if fill time is never. If the dataset is compact and @@ -259,10 +278,6 @@ H5D__read(size_t count, H5D_dset_io_info_t *dset_info) io_skipped = io_skipped + 1; } /* end if */ else { - /* Set up I/O operation */ - if (H5D__dset_ioinfo_init(dset_info[i].dset, &(dset_info[i]), &(store[i])) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_UNSUPPORTED, FAIL, "unable to set up I/O operation"); - /* Sanity check that space is allocated, if there are elements */ if (dset_info[i].nelmts > 0) assert( @@ -273,22 +288,23 @@ H5D__read(size_t count, H5D_dset_io_info_t *dset_info) dset_info[i].dset->shared->dcpl_cache.efl.nused > 0 || dset_info[i].dset->shared->layout.type == H5D_COMPACT); - /* Call storage method's I/O initialization routine */ - if (dset_info[i].layout_ops.io_init && - (dset_info[i].layout_ops.io_init)(&io_info, &(dset_info[i])) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "can't initialize I/O info"); dset_info[i].skip_io = false; - io_op_init++; - - /* Reset metadata tagging */ - H5AC_tag(prev_tag, NULL); } + + /* Call storage method's I/O initialization routine */ + if (dset_info[i].layout_ops.io_init && + (dset_info[i].layout_ops.io_init)(&io_info, &(dset_info[i])) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "can't initialize I/O info"); + io_op_init++; + + /* Reset metadata tagging */ + H5AC_tag(prev_tag, NULL); } /* end of for loop */ - assert(io_op_init + io_skipped == count); + assert(io_op_init == count); /* If no datasets have I/O, we're done */ - if (io_op_init == 0) + if (io_skipped == count) HGOTO_DONE(SUCCEED); /* Perform second phase of type info initialization */ @@ -323,7 +339,11 @@ H5D__read(size_t count, H5D_dset_io_info_t *dset_info) } /* MDIO-specific second phase initialization */ - for (i = 0; i < count; i++) + for (i = 0; i < count; i++) { + /* Check for skipped I/O */ + if (dset_info[i].skip_io) + continue; + if (dset_info[i].layout_ops.mdio_init) { haddr_t prev_tag = HADDR_UNDEF; @@ -337,6 +357,7 @@ H5D__read(size_t count, H5D_dset_io_info_t *dset_info) /* Reset metadata tagging */ H5AC_tag(prev_tag, NULL); } + } /* Invoke correct "high level" I/O routine */ if ((*io_info.md_io_ops.multi_read_md)(&io_info) < 0) @@ -430,7 +451,7 @@ H5D__read(size_t count, H5D_dset_io_info_t *dset_info) done: /* Shut down the I/O op information */ for (i = 0; i < io_op_init; i++) - if (!dset_info[i].skip_io && dset_info[i].layout_ops.io_term && + if (dset_info[i].layout_ops.io_term && (*dset_info[i].layout_ops.io_term)(&io_info, &(dset_info[i])) < 0) HDONE_ERROR(H5E_DATASET, H5E_CANTCLOSEOBJ, FAIL, "unable to shut down I/O op info"); @@ -512,6 +533,17 @@ H5D__write(size_t count, H5D_dset_io_info_t *dset_info) FUNC_ENTER_NOAPI(FAIL) +#ifdef H5_HAVE_PARALLEL + /* Reset the actual io mode properties to the default values in case + * the DXPL (if it's non-default) was previously used in a collective + * I/O operation. + */ + if (!H5CX_is_def_dxpl()) { + H5CX_set_mpio_actual_chunk_opt(H5D_MPIO_NO_CHUNK_OPTIMIZATION); + H5CX_set_mpio_actual_io_mode(H5D_MPIO_NO_COLLECTIVE); + } /* end if */ +#endif + /* Init io_info */ if (H5D__ioinfo_init(count, H5D_IO_OP_WRITE, dset_info, &io_info) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "can't initialize I/O info"); @@ -586,7 +618,7 @@ H5D__write(size_t count, H5D_dset_io_info_t *dset_info) if (NULL == dset_info[i].buf.cvp) { /* Check for any elements selected (which is invalid) */ if (dset_info[i].nelmts > 0) - HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "no output buffer"); + HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "no input buffer"); /* If the buffer is nil, and 0 element is selected, make a fake buffer. * This is for some MPI package like ChaMPIon on NCSA's tungsten which @@ -655,6 +687,10 @@ H5D__write(size_t count, H5D_dset_io_info_t *dset_info) if (H5D__dset_ioinfo_init(dset_info[i].dset, &(dset_info[i]), &(store[i])) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to set up I/O operation"); + /* Check if any filters are applied to the dataset */ + if (dset_info[i].dset->shared->dcpl_cache.pline.nused > 0) + io_info.filtered_count++; + /* Allocate dataspace and initialize it if it hasn't been. */ should_alloc_space = dset_info[i].dset->shared->dcpl_cache.efl.nused == 0 && !(*dset_info[i].dset->shared->layout.ops->is_space_alloc)( @@ -1225,15 +1261,6 @@ H5D__ioinfo_adjust(H5D_io_info_t *io_info) dset0 = io_info->dsets_info[0].dset; assert(dset0->oloc.file); - /* Reset the actual io mode properties to the default values in case - * the DXPL (if it's non-default) was previously used in a collective - * I/O operation. - */ - if (!H5CX_is_def_dxpl()) { - H5CX_set_mpio_actual_chunk_opt(H5D_MPIO_NO_CHUNK_OPTIMIZATION); - H5CX_set_mpio_actual_io_mode(H5D_MPIO_NO_COLLECTIVE); - } /* end if */ - /* Make any parallel I/O adjustments */ if (io_info->using_mpi_vfd) { H5FD_mpio_xfer_t xfer_mode; /* Parallel transfer for this request */ diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c index 6667746..0ef6542 100644 --- a/src/H5Dmpio.c +++ b/src/H5Dmpio.c @@ -82,21 +82,10 @@ */ #define H5D_MPIO_INIT_CHUNK_IDX_INFO(index_info, dset) \ do { \ - index_info.f = (dset)->oloc.file; \ - index_info.pline = &((dset)->shared->dcpl_cache.pline); \ - index_info.layout = &((dset)->shared->layout.u.chunk); \ - index_info.storage = &((dset)->shared->layout.storage.u.chunk); \ - } while (0) - -/* - * Macro to initialize a H5D_chunk_ud_t structure - * given a pointer to a H5D_chk_idx_info_t structure - */ -#define H5D_MPIO_INIT_CHUNK_UD_INFO(chunk_ud, index_info_ptr) \ - do { \ - memset(&chunk_ud, 0, sizeof(H5D_chunk_ud_t)); \ - chunk_ud.common.layout = (index_info_ptr)->layout; \ - chunk_ud.common.storage = (index_info_ptr)->storage; \ + (index_info).f = (dset)->oloc.file; \ + (index_info).pline = &((dset)->shared->dcpl_cache.pline); \ + (index_info).layout = &((dset)->shared->layout.u.chunk); \ + (index_info).storage = &((dset)->shared->layout.storage.u.chunk); \ } while (0) /******************/ @@ -129,14 +118,43 @@ typedef struct H5D_chunk_alloc_info_t { H5F_block_t chunk_current; H5F_block_t chunk_new; hsize_t chunk_idx; + haddr_t dset_oloc_addr; } H5D_chunk_alloc_info_t; /* * Information for a chunk pertaining to the dataset's chunk - * index entry for the chunk + * index entry for the chunk. + * + * NOTE: To support efficient lookups of H5D_filtered_collective_chunk_info_t + * structures during parallel writes to filtered chunks, the + * chunk_idx and dset_oloc_addr fields of this structure are used + * together as a key for a hash table by following the approach + * outlined at https://troydhanson.github.io/uthash/userguide.html#_compound_keys. + * This means the following: + * + * - Instances of this structure should be memset to 0 when + * used for hashing to ensure that any padding between the + * chunk_idx and dset_oloc_addr fields does not affect the + * generated key. + * + * - The chunk_idx and dset_oloc_addr fields should be arranged + * in that specific order, as the code currently relies on + * this ordering when calculating the key length and it + * performs memory operations on the structure starting from + * the chunk_idx field and using the calculated key length. + * + * - The chunk_idx and dset_oloc_addr fields should ideally + * be arranged next to each other in the structure to minimize + * the calculated key length. */ typedef struct H5D_chunk_index_info_t { - hsize_t chunk_idx; + /* + * These two fields must come in this order and next to + * each other for proper and efficient hashing + */ + hsize_t chunk_idx; + haddr_t dset_oloc_addr; + unsigned filter_mask; bool need_insert; } H5D_chunk_index_info_t; @@ -232,6 +250,24 @@ typedef struct H5D_filtered_collective_chunk_info_t { } H5D_filtered_collective_chunk_info_t; /* + * Information cached about each dataset involved when performing + * collective I/O on filtered chunks. + */ +typedef struct H5D_mpio_filtered_dset_info_t { + const H5D_dset_io_info_t *dset_io_info; + H5D_fill_buf_info_t fb_info; + H5D_chk_idx_info_t chunk_idx_info; + hsize_t file_chunk_size; + haddr_t dset_oloc_addr; + H5S_t *fill_space; + bool should_fill; + bool fb_info_init; + bool index_empty; + + UT_hash_handle hh; +} H5D_mpio_filtered_dset_info_t; + +/* * Top-level structure that contains an array of H5D_filtered_collective_chunk_info_t * chunk info structures for collective filtered I/O, as well as other useful information. * The struct's fields are as follows: @@ -249,6 +285,10 @@ typedef struct H5D_filtered_collective_chunk_info_t { * will contain the chunk's "chunk index" value that can be used for chunk * lookup operations. * + * chunk_hash_table_keylen - The calculated length of the key used for the chunk info hash + * table, depending on whether collective I/O is being performed + * on a single or multiple filtered datasets. + * * num_chunks_infos - The number of entries in the `chunk_infos` array. * * num_chunks_to_read - The number of entries (or chunks) in the `chunk_infos` array that @@ -263,12 +303,39 @@ typedef struct H5D_filtered_collective_chunk_info_t { * of chunk info structures to determine how big of I/O vectors to * allocate during read operations, as an example. * + * all_dset_indices_empty - A boolean determining whether all the datasets involved in the + * I/O operation have empty chunk indices. If this is the case, + * collective read operations can be skipped during processing + * of chunks. + * + * no_dset_index_insert_methods - A boolean determining whether all the datasets involved + * in the I/O operation have no chunk index insertion + * methods. If this is the case, collective chunk reinsertion + * operations can be skipped during processing of chunks. + * + * single_dset_info - A pointer to a H5D_mpio_filtered_dset_info_t structure containing + * information that is used when performing collective I/O on a single + * filtered dataset. + * + * dset_info_hash_table - A hash table storing H5D_mpio_filtered_dset_info_t structures + * that is populated when performing collective I/O on multiple + * filtered datasets at a time using the multi-dataset I/O API + * routines. + * */ typedef struct H5D_filtered_collective_io_info_t { H5D_filtered_collective_chunk_info_t *chunk_infos; H5D_filtered_collective_chunk_info_t *chunk_hash_table; + size_t chunk_hash_table_keylen; size_t num_chunk_infos; size_t num_chunks_to_read; + bool all_dset_indices_empty; + bool no_dset_index_insert_methods; + + union { + H5D_mpio_filtered_dset_info_t *single_dset_info; + H5D_mpio_filtered_dset_info_t *dset_info_hash_table; + } dset_info; } H5D_filtered_collective_io_info_t; /* @@ -278,6 +345,7 @@ typedef struct H5D_filtered_collective_io_info_t { typedef struct H5D_chunk_redistribute_info_t { H5F_block_t chunk_block; hsize_t chunk_idx; + haddr_t dset_oloc_addr; int orig_owner; int new_owner; int num_writers; @@ -299,11 +367,11 @@ typedef struct H5D_chunk_insert_info_t { static herr_t H5D__piece_io(H5D_io_info_t *io_info); static herr_t H5D__multi_chunk_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info, int mpi_rank, int mpi_size); -static herr_t H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info, - int mpi_rank, int mpi_size); +static herr_t H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_infos, + size_t num_dset_infos, int mpi_rank, int mpi_size); static herr_t H5D__link_piece_collective_io(H5D_io_info_t *io_info, int mpi_rank); -static herr_t H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info, - int mpi_rank, int mpi_size); +static herr_t H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_infos, + size_t num_dset_infos, int mpi_rank, int mpi_size); static herr_t H5D__inter_collective_io(H5D_io_info_t *io_info, const H5D_dset_io_info_t *di, H5S_t *file_space, H5S_t *mem_space); static herr_t H5D__final_collective_io(H5D_io_info_t *io_info, hsize_t mpi_buf_count, @@ -314,7 +382,8 @@ static herr_t H5D__mpio_get_sum_chunk(const H5D_io_info_t *io_info, int *sum_chu static herr_t H5D__mpio_get_sum_chunk_dset(const H5D_io_info_t *io_info, const H5D_dset_io_info_t *dset_info, int *sum_chunkf); static herr_t H5D__mpio_collective_filtered_chunk_io_setup(const H5D_io_info_t *io_info, - const H5D_dset_io_info_t *di, int mpi_rank, + const H5D_dset_io_info_t *di, + size_t num_dset_infos, int mpi_rank, H5D_filtered_collective_io_info_t *chunk_list); static herr_t H5D__mpio_redistribute_shared_chunks(H5D_filtered_collective_io_info_t *chunk_list, const H5D_io_info_t *io_info, int mpi_rank, int mpi_size, @@ -324,28 +393,25 @@ static herr_t H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_i bool all_ranks_involved, const H5D_io_info_t *io_info, int mpi_rank, int mpi_size); static herr_t H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk_list, - H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info, - int mpi_rank, int H5_ATTR_NDEBUG_UNUSED mpi_size, - unsigned char ***chunk_msg_bufs, - int *chunk_msg_bufs_len); + H5D_io_info_t *io_info, int mpi_rank, + int H5_ATTR_NDEBUG_UNUSED mpi_size, + unsigned char ***chunk_msg_bufs, + int *chunk_msg_bufs_len); static herr_t H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chunk_list, - const H5D_io_info_t *io_info, - const H5D_dset_io_info_t *di, int mpi_rank); + const H5D_io_info_t *io_info, size_t num_dset_infos, + int mpi_rank); static herr_t H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *chunk_list, unsigned char **chunk_msg_bufs, int chunk_msg_bufs_len, const H5D_io_info_t *io_info, - const H5D_dset_io_info_t *di, - int H5_ATTR_NDEBUG_UNUSED mpi_rank); + size_t num_dset_infos, int mpi_rank); static herr_t H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t *chunk_list, - size_t *num_chunks_assigned_map, - H5D_io_info_t *io_info, - H5D_chk_idx_info_t *idx_info, int mpi_rank, - int mpi_size); + size_t *num_chunks_assigned_map, + H5D_io_info_t *io_info, size_t num_dset_infos, + int mpi_rank, int mpi_size); static herr_t H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t *chunk_list, size_t *num_chunks_assigned_map, - H5D_io_info_t *io_info, H5D_dset_io_info_t *di, - H5D_chk_idx_info_t *idx_info, int mpi_rank, - int mpi_size); + H5D_io_info_t *io_info, size_t num_dset_infos, + int mpi_rank, int mpi_size); static herr_t H5D__mpio_get_chunk_redistribute_info_types(MPI_Datatype *contig_type, bool *contig_type_derived, MPI_Datatype *resized_type, @@ -636,8 +702,8 @@ H5D__mpio_opt_possible(H5D_io_info_t *io_info) } /* Check whether these are both simple or scalar dataspaces */ - if (!((H5S_SIMPLE == H5S_GET_EXTENT_TYPE(mem_space) || - H5S_SCALAR == H5S_GET_EXTENT_TYPE(mem_space)) && + if (!((H5S_SIMPLE == H5S_GET_EXTENT_TYPE(mem_space) || H5S_SCALAR == H5S_GET_EXTENT_TYPE(mem_space) || + H5S_NULL == H5S_GET_EXTENT_TYPE(mem_space)) && (H5S_SIMPLE == H5S_GET_EXTENT_TYPE(file_space) || H5S_SCALAR == H5S_GET_EXTENT_TYPE(file_space)))) local_cause[0] |= H5D_MPIO_NOT_SIMPLE_OR_SCALAR_DATASPACES; @@ -1143,13 +1209,6 @@ H5D__piece_io(H5D_io_info_t *io_info) /* Use multi dataset path for now */ use_multi_dset = true; - /* Check for filtered datasets */ - for (i = 0; i < io_info->count; i++) - if (io_info->dsets_info[i].dset->shared->dcpl_cache.pline.nused > 0) { - use_multi_dset = false; - break; - } - /* Check if this I/O exceeds one linked chunk threshold */ if (recalc_io_option && use_multi_dset) { /* Get the chunk optimization option threshold */ @@ -1173,26 +1232,40 @@ H5D__piece_io(H5D_io_info_t *io_info) } } } + } - /* Perform multi dataset I/O if appropriate */ - if (use_multi_dset) { + /* Perform multi dataset I/O if appropriate */ + if (use_multi_dset) { #ifdef H5_HAVE_INSTRUMENTED_LIBRARY - /*** Set collective chunk user-input optimization API. ***/ - if (H5D_ONE_LINK_CHUNK_IO == io_option) { - if (H5CX_test_set_mpio_coll_chunk_link_hard(0) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "unable to set property value"); - } /* end if */ -#endif /* H5_HAVE_INSTRUMENTED_LIBRARY */ + /*** Set collective chunk user-input optimization API. ***/ + if (H5D_ONE_LINK_CHUNK_IO == io_option) { + if (H5CX_test_set_mpio_coll_chunk_link_hard(0) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "unable to set property value"); + } /* end if */ +#endif /* H5_HAVE_INSTRUMENTED_LIBRARY */ + + /* Process all the filtered datasets first */ + if (io_info->filtered_count > 0) { + if (H5D__link_chunk_filtered_collective_io(io_info, io_info->dsets_info, io_info->count, mpi_rank, + mpi_size) < 0) + HGOTO_ERROR(H5E_IO, (H5D_IO_OP_READ == io_info->op_type ? H5E_READERROR : H5E_WRITEERROR), + FAIL, "couldn't finish filtered linked chunk MPI-IO"); + } + /* Process all the unfiltered datasets */ + if ((io_info->filtered_count == 0) || (io_info->filtered_count < io_info->count)) { /* Perform unfiltered link chunk collective IO */ if (H5D__link_piece_collective_io(io_info, mpi_rank) < 0) - HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish linked chunk MPI-IO"); + HGOTO_ERROR(H5E_IO, (H5D_IO_OP_READ == io_info->op_type ? H5E_READERROR : H5E_WRITEERROR), + FAIL, "couldn't finish linked chunk MPI-IO"); } } - - if (!use_multi_dset) { + else { /* Loop over datasets */ for (i = 0; i < io_info->count; i++) { + if (io_info->dsets_info[i].skip_io) + continue; + if (io_info->dsets_info[i].layout->type == H5D_CONTIGUOUS) { /* Contiguous: call H5D__inter_collective_io() directly */ H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_CONTIGUOUS_COLLECTIVE; @@ -1203,7 +1276,8 @@ H5D__piece_io(H5D_io_info_t *io_info) if (H5D__inter_collective_io(io_info, &io_info->dsets_info[i], io_info->dsets_info[i].file_space, io_info->dsets_info[i].mem_space) < 0) - HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish shared collective MPI-IO"); + HGOTO_ERROR(H5E_IO, (H5D_IO_OP_READ == io_info->op_type ? H5E_READERROR : H5E_WRITEERROR), + FAIL, "couldn't finish shared collective MPI-IO"); /* Set the actual I/O mode property. internal_collective_io will not break to * independent I/O, so we set it here. @@ -1248,10 +1322,12 @@ H5D__piece_io(H5D_io_info_t *io_info) case H5D_ONE_LINK_CHUNK_IO_MORE_OPT: /* Check if there are any filters in the pipeline */ if (io_info->dsets_info[i].dset->shared->dcpl_cache.pline.nused > 0) { - if (H5D__link_chunk_filtered_collective_io(io_info, &io_info->dsets_info[i], + if (H5D__link_chunk_filtered_collective_io(io_info, &io_info->dsets_info[i], 1, mpi_rank, mpi_size) < 0) - HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, - "couldn't finish filtered linked chunk MPI-IO"); + HGOTO_ERROR( + H5E_IO, + (H5D_IO_OP_READ == io_info->op_type ? H5E_READERROR : H5E_WRITEERROR), + FAIL, "couldn't finish filtered linked chunk MPI-IO"); } /* end if */ else { /* If there is more than one dataset we cannot make the multi dataset call here, @@ -1262,14 +1338,18 @@ H5D__piece_io(H5D_io_info_t *io_info) if (H5D__multi_chunk_collective_io(io_info, &io_info->dsets_info[i], mpi_rank, mpi_size) < 0) - HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, - "couldn't finish optimized multiple chunk MPI-IO"); + HGOTO_ERROR( + H5E_IO, + (H5D_IO_OP_READ == io_info->op_type ? H5E_READERROR : H5E_WRITEERROR), + FAIL, "couldn't finish optimized multiple chunk MPI-IO"); } else { /* Perform unfiltered link chunk collective IO */ if (H5D__link_piece_collective_io(io_info, mpi_rank) < 0) - HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, - "couldn't finish linked chunk MPI-IO"); + HGOTO_ERROR( + H5E_IO, + (H5D_IO_OP_READ == io_info->op_type ? H5E_READERROR : H5E_WRITEERROR), + FAIL, "couldn't finish linked chunk MPI-IO"); } } @@ -1279,17 +1359,21 @@ H5D__piece_io(H5D_io_info_t *io_info) default: /* multiple chunk IO via threshold */ /* Check if there are any filters in the pipeline */ if (io_info->dsets_info[i].dset->shared->dcpl_cache.pline.nused > 0) { - if (H5D__multi_chunk_filtered_collective_io(io_info, &io_info->dsets_info[i], + if (H5D__multi_chunk_filtered_collective_io(io_info, &io_info->dsets_info[i], 1, mpi_rank, mpi_size) < 0) - HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, - "couldn't finish optimized multiple filtered chunk MPI-IO"); + HGOTO_ERROR( + H5E_IO, + (H5D_IO_OP_READ == io_info->op_type ? H5E_READERROR : H5E_WRITEERROR), + FAIL, "couldn't finish optimized multiple filtered chunk MPI-IO"); } /* end if */ else { /* Perform unfiltered multi chunk collective IO */ if (H5D__multi_chunk_collective_io(io_info, &io_info->dsets_info[i], mpi_rank, mpi_size) < 0) - HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, - "couldn't finish optimized multiple chunk MPI-IO"); + HGOTO_ERROR( + H5E_IO, + (H5D_IO_OP_READ == io_info->op_type ? H5E_READERROR : H5E_WRITEERROR), + FAIL, "couldn't finish optimized multiple chunk MPI-IO"); } break; @@ -1423,14 +1507,24 @@ H5D__link_piece_collective_io(H5D_io_info_t *io_info, int H5_ATTR_UNUSED mpi_ran int mpi_code; /* MPI return code */ H5D_mpio_actual_chunk_opt_mode_t actual_chunk_opt_mode = H5D_MPIO_LINK_CHUNK; H5D_mpio_actual_io_mode_t actual_io_mode = 0; - size_t i; /* Local index variable */ - herr_t ret_value = SUCCEED; + herr_t ret_value = SUCCEED; FUNC_ENTER_PACKAGE /* set actual_io_mode */ - for (i = 0; i < io_info->count; i++) { - assert(io_info->dsets_info[i].dset->shared->dcpl_cache.pline.nused == 0); + for (size_t i = 0; i < io_info->count; i++) { + /* Skip this dataset if no I/O is being performed */ + if (io_info->dsets_info[i].skip_io) + continue; + + /* Filtered datasets are processed elsewhere. A contiguous dataset + * could possibly have filters in the DCPL pipeline, but the library + * will currently ignore optional filters in that case. + */ + if ((io_info->dsets_info[i].dset->shared->dcpl_cache.pline.nused > 0) && + (io_info->dsets_info[i].layout->type != H5D_CONTIGUOUS)) + continue; + if (io_info->dsets_info[i].layout->type == H5D_CHUNKED) actual_io_mode |= H5D_MPIO_CHUNK_COLLECTIVE; else if (io_info->dsets_info[i].layout->type == H5D_CONTIGUOUS) @@ -1457,8 +1551,9 @@ H5D__link_piece_collective_io(H5D_io_info_t *io_info, int H5_ATTR_UNUSED mpi_ran H5_flexible_const_ptr_t base_buf_addr; base_buf_addr.cvp = NULL; - /* Get the number of chunks with a selection */ - num_chunk = io_info->pieces_added; + /* Get the number of unfiltered chunks with a selection */ + assert(io_info->filtered_pieces_added <= io_info->pieces_added); + num_chunk = io_info->pieces_added - io_info->filtered_pieces_added; H5_CHECK_OVERFLOW(num_chunk, size_t, int); #ifdef H5Dmpio_DEBUG @@ -1471,7 +1566,7 @@ H5D__link_piece_collective_io(H5D_io_info_t *io_info, int H5_ATTR_UNUSED mpi_ran /* Check if sel_pieces array is sorted */ assert(io_info->sel_pieces[0]->faddr != HADDR_UNDEF); - for (i = 1; i < num_chunk; i++) { + for (size_t i = 1; i < io_info->pieces_added; i++) { assert(io_info->sel_pieces[i]->faddr != HADDR_UNDEF); if (io_info->sel_pieces[i]->faddr < io_info->sel_pieces[i - 1]->faddr) { @@ -1508,11 +1603,20 @@ H5D__link_piece_collective_io(H5D_io_info_t *io_info, int H5_ATTR_UNUSED mpi_ran HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk file is derived datatype flags buffer"); - /* save lowest file address */ - ctg_store.contig.dset_addr = io_info->sel_pieces[0]->faddr; - - /* save base mem addr of piece for read/write */ - base_buf_addr = io_info->sel_pieces[0]->dset_info->buf; + /* + * After sorting sel_pieces according to file address, locate + * the first unfiltered chunk and save its file address and + * base memory address for read/write + */ + ctg_store.contig.dset_addr = HADDR_UNDEF; + for (size_t i = 0; i < io_info->pieces_added; i++) { + if (!io_info->sel_pieces[i]->filtered_dset) { + ctg_store.contig.dset_addr = io_info->sel_pieces[i]->faddr; + base_buf_addr = io_info->sel_pieces[i]->dset_info->buf; + break; + } + } + assert(ctg_store.contig.dset_addr != HADDR_UNDEF); #ifdef H5Dmpio_DEBUG H5D_MPIO_DEBUG(mpi_rank, "before iterate over selected pieces\n"); @@ -1520,7 +1624,7 @@ H5D__link_piece_collective_io(H5D_io_info_t *io_info, int H5_ATTR_UNUSED mpi_ran /* Obtain MPI derived datatype from all individual pieces */ /* Iterate over selected pieces for this process */ - for (i = 0; i < num_chunk; i++) { + for (size_t i = 0, curr_idx = 0; i < io_info->pieces_added; i++) { hsize_t *permute_map = NULL; /* array that holds the mapping from the old, out-of-order displacements to the in-order displacements of the MPI datatypes of the @@ -1530,24 +1634,28 @@ H5D__link_piece_collective_io(H5D_io_info_t *io_info, int H5_ATTR_UNUSED mpi_ran /* Assign convenience pointer to piece info */ piece_info = io_info->sel_pieces[i]; + /* Skip over filtered pieces as they are processed elsewhere */ + if (piece_info->filtered_dset) + continue; + /* Obtain disk and memory MPI derived datatype */ /* NOTE: The permute_map array can be allocated within H5S_mpio_space_type * and will be fed into the next call to H5S_mpio_space_type * where it will be freed. */ if (H5S_mpio_space_type(piece_info->fspace, piece_info->dset_info->type_info.src_type_size, - &chunk_ftype[i], /* OUT: datatype created */ - &chunk_mpi_file_counts[i], /* OUT */ - &(chunk_mft_is_derived_array[i]), /* OUT */ - true, /* this is a file space, - so permute the - datatype if the point - selections are out of - order */ - &permute_map, /* OUT: a map to indicate the - permutation of points - selected in case they - are out of order */ + &chunk_ftype[curr_idx], /* OUT: datatype created */ + &chunk_mpi_file_counts[curr_idx], /* OUT */ + &(chunk_mft_is_derived_array[curr_idx]), /* OUT */ + true, /* this is a file space, + so permute the + datatype if the point + selections are out of + order */ + &permute_map, /* OUT: a map to indicate the + permutation of points + selected in case they + are out of order */ &is_permuted /* OUT */) < 0) HGOTO_ERROR(H5E_DATASPACE, H5E_BADTYPE, FAIL, "couldn't create MPI file type"); @@ -1555,20 +1663,20 @@ H5D__link_piece_collective_io(H5D_io_info_t *io_info, int H5_ATTR_UNUSED mpi_ran if (is_permuted) assert(permute_map); if (H5S_mpio_space_type(piece_info->mspace, piece_info->dset_info->type_info.dst_type_size, - &chunk_mtype[i], &chunk_mpi_mem_counts[i], - &(chunk_mbt_is_derived_array[i]), false, /* this is a memory - space, so if the file - space is not - permuted, there is no - need to permute the - datatype if the point - selections are out of - order*/ - &permute_map, /* IN: the permutation map - generated by the - file_space selection - and applied to the - memory selection */ + &chunk_mtype[curr_idx], &chunk_mpi_mem_counts[curr_idx], + &(chunk_mbt_is_derived_array[curr_idx]), false, /* this is a memory + space, so if the + file space is not + permuted, there is + no need to permute + the datatype if the + point selections + are out of order */ + &permute_map, /* IN: the permutation map + generated by the + file_space selection + and applied to the + memory selection */ &is_permuted /* IN */) < 0) HGOTO_ERROR(H5E_DATASPACE, H5E_BADTYPE, FAIL, "couldn't create MPI buf type"); /* Sanity check */ @@ -1578,16 +1686,19 @@ H5D__link_piece_collective_io(H5D_io_info_t *io_info, int H5_ATTR_UNUSED mpi_ran /* Piece address relative to the first piece addr * Assign piece address to MPI displacement * (assume MPI_Aint big enough to hold it) */ - chunk_file_disp_array[i] = (MPI_Aint)piece_info->faddr - (MPI_Aint)ctg_store.contig.dset_addr; + chunk_file_disp_array[curr_idx] = + (MPI_Aint)piece_info->faddr - (MPI_Aint)ctg_store.contig.dset_addr; if (io_info->op_type == H5D_IO_OP_WRITE) { - chunk_mem_disp_array[i] = + chunk_mem_disp_array[curr_idx] = (MPI_Aint)piece_info->dset_info->buf.cvp - (MPI_Aint)base_buf_addr.cvp; } else if (io_info->op_type == H5D_IO_OP_READ) { - chunk_mem_disp_array[i] = + chunk_mem_disp_array[curr_idx] = (MPI_Aint)piece_info->dset_info->buf.vp - (MPI_Aint)base_buf_addr.vp; } + + curr_idx++; } /* end for */ /* Create final MPI derived datatype for the file */ @@ -1610,7 +1721,7 @@ H5D__link_piece_collective_io(H5D_io_info_t *io_info, int H5_ATTR_UNUSED mpi_ran chunk_final_mtype_is_derived = true; /* Free the file & memory MPI datatypes for each chunk */ - for (i = 0; i < num_chunk; i++) { + for (size_t i = 0; i < num_chunk; i++) { if (chunk_mbt_is_derived_array[i]) if (MPI_SUCCESS != (mpi_code = MPI_Type_free(chunk_mtype + i))) HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) @@ -1655,6 +1766,9 @@ done: ret_value); #endif + if (ret_value < 0) + H5CX_set_mpio_actual_chunk_opt(H5D_MPIO_NO_CHUNK_OPTIMIZATION); + /* Release resources */ if (chunk_mtype) H5MM_xfree(chunk_mtype); @@ -1751,8 +1865,8 @@ done: *------------------------------------------------------------------------- */ static herr_t -H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info, int mpi_rank, - int mpi_size) +H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_infos, + size_t num_dset_infos, int mpi_rank, int mpi_size) { H5D_filtered_collective_io_info_t chunk_list = {0}; unsigned char **chunk_msg_bufs = NULL; @@ -1760,7 +1874,7 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_ int chunk_msg_bufs_len = 0; herr_t ret_value = SUCCEED; - FUNC_ENTER_PACKAGE_TAG(dset_info->dset->oloc.addr) + FUNC_ENTER_PACKAGE assert(io_info); @@ -1781,18 +1895,15 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_ /* Build a list of selected chunks in the collective io operation */ - if (H5D__mpio_collective_filtered_chunk_io_setup(io_info, dset_info, mpi_rank, &chunk_list) < 0) + if (H5D__mpio_collective_filtered_chunk_io_setup(io_info, dset_infos, num_dset_infos, mpi_rank, + &chunk_list) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "couldn't construct filtered I/O info list"); if (io_info->op_type == H5D_IO_OP_READ) { /* Filtered collective read */ - if (H5D__mpio_collective_filtered_chunk_read(&chunk_list, io_info, dset_info, mpi_rank) < 0) + if (H5D__mpio_collective_filtered_chunk_read(&chunk_list, io_info, num_dset_infos, mpi_rank) < 0) HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't read filtered chunks"); } else { /* Filtered collective write */ - H5D_chk_idx_info_t index_info; - - H5D_MPIO_INIT_CHUNK_IDX_INFO(index_info, dset_info->dset); - if (mpi_size > 1) { /* Redistribute shared chunks being written to */ if (H5D__mpio_redistribute_shared_chunks(&chunk_list, io_info, mpi_rank, mpi_size, @@ -1800,7 +1911,7 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_ HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "unable to redistribute shared chunks"); /* Send any chunk modification messages for chunks this rank no longer owns */ - if (H5D__mpio_share_chunk_modification_data(&chunk_list, io_info, dset_info, mpi_rank, mpi_size, + if (H5D__mpio_share_chunk_modification_data(&chunk_list, io_info, mpi_rank, mpi_size, &chunk_msg_bufs, &chunk_msg_bufs_len) < 0) HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "unable to send chunk modification data between MPI ranks"); @@ -1815,7 +1926,7 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_ * must participate. */ if (H5D__mpio_collective_filtered_chunk_update(&chunk_list, chunk_msg_bufs, chunk_msg_bufs_len, - io_info, dset_info, mpi_rank) < 0) + io_info, num_dset_infos, mpi_rank) < 0) HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't update modified chunks"); /* Free up resources used by chunk hash table now that we're done updating chunks */ @@ -1823,7 +1934,7 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_ /* All ranks now collectively re-allocate file space for all chunks */ if (H5D__mpio_collective_filtered_chunk_reallocate(&chunk_list, rank_chunks_assigned_map, io_info, - &index_info, mpi_rank, mpi_size) < 0) + num_dset_infos, mpi_rank, mpi_size) < 0) HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't collectively re-allocate file space for chunks"); @@ -1843,12 +1954,15 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_ * into the chunk index */ if (H5D__mpio_collective_filtered_chunk_reinsert(&chunk_list, rank_chunks_assigned_map, io_info, - dset_info, &index_info, mpi_rank, mpi_size) < 0) + num_dset_infos, mpi_rank, mpi_size) < 0) HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't collectively re-insert modified chunks into chunk index"); } done: + if (ret_value < 0) + H5CX_set_mpio_actual_chunk_opt(H5D_MPIO_NO_CHUNK_OPTIMIZATION); + if (chunk_msg_bufs) { for (size_t i = 0; i < (size_t)chunk_msg_bufs_len; i++) H5MM_free(chunk_msg_bufs[i]); @@ -1858,6 +1972,9 @@ done: HASH_CLEAR(hh, chunk_list.chunk_hash_table); + if (rank_chunks_assigned_map) + H5MM_free(rank_chunks_assigned_map); + /* Free resources used by a rank which had some selection */ if (chunk_list.chunk_infos) { for (size_t i = 0; i < chunk_list.num_chunk_infos; i++) @@ -1867,15 +1984,42 @@ done: H5MM_free(chunk_list.chunk_infos); } /* end if */ - if (rank_chunks_assigned_map) - H5MM_free(rank_chunks_assigned_map); + /* Free resources used by cached dataset info */ + if ((num_dset_infos == 1) && (chunk_list.dset_info.single_dset_info)) { + H5D_mpio_filtered_dset_info_t *curr_dset_info = chunk_list.dset_info.single_dset_info; + + if (curr_dset_info->fb_info_init && H5D__fill_term(&curr_dset_info->fb_info) < 0) + HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "can't release fill buffer info"); + if (curr_dset_info->fill_space && H5S_close(curr_dset_info->fill_space) < 0) + HDONE_ERROR(H5E_DATASET, H5E_CLOSEERROR, FAIL, "can't close fill space"); + + H5MM_free(chunk_list.dset_info.single_dset_info); + chunk_list.dset_info.single_dset_info = NULL; + } + else if ((num_dset_infos > 1) && (chunk_list.dset_info.dset_info_hash_table)) { + H5D_mpio_filtered_dset_info_t *curr_dset_info; + H5D_mpio_filtered_dset_info_t *tmp; + + HASH_ITER(hh, chunk_list.dset_info.dset_info_hash_table, curr_dset_info, tmp) + { + HASH_DELETE(hh, chunk_list.dset_info.dset_info_hash_table, curr_dset_info); + + if (curr_dset_info->fb_info_init && H5D__fill_term(&curr_dset_info->fb_info) < 0) + HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "can't release fill buffer info"); + if (curr_dset_info->fill_space && H5S_close(curr_dset_info->fill_space) < 0) + HDONE_ERROR(H5E_DATASET, H5E_CLOSEERROR, FAIL, "can't close fill space"); + + H5MM_free(curr_dset_info); + curr_dset_info = NULL; + } + } #ifdef H5Dmpio_DEBUG H5D_MPIO_TIME_STOP(mpi_rank); H5D_MPIO_TRACE_EXIT(mpi_rank); #endif - FUNC_LEAVE_NOAPI_TAG(ret_value) + FUNC_LEAVE_NOAPI(ret_value) } /* end H5D__link_chunk_filtered_collective_io() */ /*------------------------------------------------------------------------- @@ -2079,6 +2223,9 @@ H5D__multi_chunk_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_ H5CX_set_mpio_actual_io_mode(actual_io_mode); done: + if (ret_value < 0) + H5CX_set_mpio_actual_chunk_opt(H5D_MPIO_NO_CHUNK_OPTIMIZATION); + /* Reset collective opt mode */ if (H5CX_set_mpio_coll_opt(orig_coll_opt_mode) < 0) HDONE_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't reset MPI-I/O collective_op property"); @@ -2171,8 +2318,8 @@ done: *------------------------------------------------------------------------- */ static herr_t -H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info, int mpi_rank, - int mpi_size) +H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_infos, + size_t num_dset_infos, int mpi_rank, int mpi_size) { H5D_filtered_collective_io_info_t chunk_list = {0}; unsigned char **chunk_msg_bufs = NULL; @@ -2182,9 +2329,10 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info int mpi_code; herr_t ret_value = SUCCEED; - FUNC_ENTER_PACKAGE_TAG(dset_info->dset->oloc.addr) + FUNC_ENTER_PACKAGE_TAG(dset_infos->dset->oloc.addr) assert(io_info); + assert(num_dset_infos == 1); /* Currently only supported with 1 dataset at a time */ #ifdef H5Dmpio_DEBUG H5D_MPIO_TRACE_ENTER(mpi_rank); @@ -2202,7 +2350,7 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info H5CX_set_mpio_actual_io_mode(H5D_MPIO_CHUNK_COLLECTIVE); /* Build a list of selected chunks in the collective IO operation */ - if (H5D__mpio_collective_filtered_chunk_io_setup(io_info, dset_info, mpi_rank, &chunk_list) < 0) + if (H5D__mpio_collective_filtered_chunk_io_setup(io_info, dset_infos, 1, mpi_rank, &chunk_list) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "couldn't construct filtered I/O info list"); /* Retrieve the maximum number of chunks selected for any rank */ @@ -2216,7 +2364,7 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info if (io_info->op_type == H5D_IO_OP_READ) { /* Filtered collective read */ for (size_t i = 0; i < max_num_chunks; i++) { - H5D_filtered_collective_io_info_t single_chunk_list = {0}; + H5D_filtered_collective_io_info_t single_chunk_list = chunk_list; /* Check if this rank has a chunk to work on for this iteration */ have_chunk_to_process = (i < chunk_list.num_chunk_infos); @@ -2236,8 +2384,7 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info single_chunk_list.num_chunks_to_read = 0; } - if (H5D__mpio_collective_filtered_chunk_read(&single_chunk_list, io_info, dset_info, mpi_rank) < - 0) + if (H5D__mpio_collective_filtered_chunk_read(&single_chunk_list, io_info, 1, mpi_rank) < 0) HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't read filtered chunks"); if (have_chunk_to_process && chunk_list.chunk_infos[i].buf) { @@ -2247,18 +2394,13 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info } } else { /* Filtered collective write */ - H5D_chk_idx_info_t index_info; - - /* Construct chunked index info */ - H5D_MPIO_INIT_CHUNK_IDX_INFO(index_info, dset_info->dset); - if (mpi_size > 1) { /* Redistribute shared chunks being written to */ if (H5D__mpio_redistribute_shared_chunks(&chunk_list, io_info, mpi_rank, mpi_size, NULL) < 0) HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "unable to redistribute shared chunks"); /* Send any chunk modification messages for chunks this rank no longer owns */ - if (H5D__mpio_share_chunk_modification_data(&chunk_list, io_info, dset_info, mpi_rank, mpi_size, + if (H5D__mpio_share_chunk_modification_data(&chunk_list, io_info, mpi_rank, mpi_size, &chunk_msg_bufs, &chunk_msg_bufs_len) < 0) HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "unable to send chunk modification data between MPI ranks"); @@ -2269,7 +2411,7 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info * collective re-allocation and re-insertion of chunks modified by other ranks. */ for (size_t i = 0; i < max_num_chunks; i++) { - H5D_filtered_collective_io_info_t single_chunk_list = {0}; + H5D_filtered_collective_io_info_t single_chunk_list = chunk_list; /* Check if this rank has a chunk to work on for this iteration */ have_chunk_to_process = @@ -2281,13 +2423,11 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info */ if (have_chunk_to_process) { single_chunk_list.chunk_infos = &chunk_list.chunk_infos[i]; - single_chunk_list.chunk_hash_table = chunk_list.chunk_hash_table; single_chunk_list.num_chunk_infos = 1; single_chunk_list.num_chunks_to_read = chunk_list.chunk_infos[i].need_read ? 1 : 0; } else { single_chunk_list.chunk_infos = NULL; - single_chunk_list.chunk_hash_table = chunk_list.chunk_hash_table; single_chunk_list.num_chunk_infos = 0; single_chunk_list.num_chunks_to_read = 0; } @@ -2297,13 +2437,13 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info * the chunks. As chunk reads are done collectively here, all ranks * must participate. */ - if (H5D__mpio_collective_filtered_chunk_update( - &single_chunk_list, chunk_msg_bufs, chunk_msg_bufs_len, io_info, dset_info, mpi_rank) < 0) + if (H5D__mpio_collective_filtered_chunk_update(&single_chunk_list, chunk_msg_bufs, + chunk_msg_bufs_len, io_info, 1, mpi_rank) < 0) HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't update modified chunks"); /* All ranks now collectively re-allocate file space for all chunks */ - if (H5D__mpio_collective_filtered_chunk_reallocate(&single_chunk_list, NULL, io_info, &index_info, - mpi_rank, mpi_size) < 0) + if (H5D__mpio_collective_filtered_chunk_reallocate(&single_chunk_list, NULL, io_info, 1, mpi_rank, + mpi_size) < 0) HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't collectively re-allocate file space for chunks"); @@ -2321,14 +2461,17 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info /* Participate in the collective re-insertion of all chunks modified * in this iteration into the chunk index */ - if (H5D__mpio_collective_filtered_chunk_reinsert(&single_chunk_list, NULL, io_info, dset_info, - &index_info, mpi_rank, mpi_size) < 0) + if (H5D__mpio_collective_filtered_chunk_reinsert(&single_chunk_list, NULL, io_info, 1, mpi_rank, + mpi_size) < 0) HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't collectively re-insert modified chunks into chunk index"); } /* end for */ } done: + if (ret_value < 0) + H5CX_set_mpio_actual_chunk_opt(H5D_MPIO_NO_CHUNK_OPTIMIZATION); + if (chunk_msg_bufs) { for (size_t i = 0; i < (size_t)chunk_msg_bufs_len; i++) H5MM_free(chunk_msg_bufs[i]); @@ -2347,6 +2490,36 @@ done: H5MM_free(chunk_list.chunk_infos); } /* end if */ + /* Free resources used by cached dataset info */ + if ((num_dset_infos == 1) && (chunk_list.dset_info.single_dset_info)) { + H5D_mpio_filtered_dset_info_t *curr_dset_info = chunk_list.dset_info.single_dset_info; + + if (curr_dset_info->fb_info_init && H5D__fill_term(&curr_dset_info->fb_info) < 0) + HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "can't release fill buffer info"); + if (curr_dset_info->fill_space && H5S_close(curr_dset_info->fill_space) < 0) + HDONE_ERROR(H5E_DATASET, H5E_CLOSEERROR, FAIL, "can't close fill space"); + + H5MM_free(chunk_list.dset_info.single_dset_info); + chunk_list.dset_info.single_dset_info = NULL; + } + else if ((num_dset_infos > 1) && (chunk_list.dset_info.dset_info_hash_table)) { + H5D_mpio_filtered_dset_info_t *curr_dset_info; + H5D_mpio_filtered_dset_info_t *tmp; + + HASH_ITER(hh, chunk_list.dset_info.dset_info_hash_table, curr_dset_info, tmp) + { + HASH_DELETE(hh, chunk_list.dset_info.dset_info_hash_table, curr_dset_info); + + if (curr_dset_info->fb_info_init && H5D__fill_term(&curr_dset_info->fb_info) < 0) + HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "can't release fill buffer info"); + if (curr_dset_info->fill_space && H5S_close(curr_dset_info->fill_space) < 0) + HDONE_ERROR(H5E_DATASET, H5E_CLOSEERROR, FAIL, "can't close fill space"); + + H5MM_free(curr_dset_info); + curr_dset_info = NULL; + } + } + #ifdef H5Dmpio_DEBUG H5D_MPIO_TIME_STOP(mpi_rank); H5D_MPIO_TRACE_EXIT(mpi_rank); @@ -2583,20 +2756,25 @@ H5D__cmp_filtered_collective_io_info_entry(const void *filtered_collective_io_in addr2 = entry2->chunk_new.offset; /* - * If both chunk addresses are defined, H5_addr_cmp is safe to use. - * Otherwise, if both addresses aren't defined, compared chunk - * entries based on their chunk index. Finally, if only one chunk - * address is defined, return the appropriate value based on which - * is defined. + * If both chunk's file addresses are defined, H5_addr_cmp is safe to use. + * If only one chunk's file address is defined, return the appropriate + * value based on which is defined. If neither chunk's file address is + * defined, compare chunk entries based on their dataset object header + * address, then by their chunk index value. */ if (H5_addr_defined(addr1) && H5_addr_defined(addr2)) { ret_value = H5_addr_cmp(addr1, addr2); } else if (!H5_addr_defined(addr1) && !H5_addr_defined(addr2)) { - hsize_t chunk_idx1 = entry1->index_info.chunk_idx; - hsize_t chunk_idx2 = entry2->index_info.chunk_idx; + haddr_t oloc_addr1 = entry1->index_info.dset_oloc_addr; + haddr_t oloc_addr2 = entry2->index_info.dset_oloc_addr; - ret_value = (chunk_idx1 > chunk_idx2) - (chunk_idx1 < chunk_idx2); + if (0 == (ret_value = H5_addr_cmp(oloc_addr1, oloc_addr2))) { + hsize_t chunk_idx1 = entry1->index_info.chunk_idx; + hsize_t chunk_idx2 = entry2->index_info.chunk_idx; + + ret_value = (chunk_idx1 > chunk_idx2) - (chunk_idx1 < chunk_idx2); + } } else ret_value = H5_addr_defined(addr1) ? 1 : -1; @@ -2622,8 +2800,8 @@ H5D__cmp_chunk_redistribute_info(const void *_entry1, const void *_entry2) { const H5D_chunk_redistribute_info_t *entry1; const H5D_chunk_redistribute_info_t *entry2; - hsize_t chunk_index1; - hsize_t chunk_index2; + haddr_t oloc_addr1; + haddr_t oloc_addr2; int ret_value; FUNC_ENTER_PACKAGE_NOERR @@ -2631,17 +2809,26 @@ H5D__cmp_chunk_redistribute_info(const void *_entry1, const void *_entry2) entry1 = (const H5D_chunk_redistribute_info_t *)_entry1; entry2 = (const H5D_chunk_redistribute_info_t *)_entry2; - chunk_index1 = entry1->chunk_idx; - chunk_index2 = entry2->chunk_idx; + oloc_addr1 = entry1->dset_oloc_addr; + oloc_addr2 = entry2->dset_oloc_addr; - if (chunk_index1 == chunk_index2) { - int orig_owner1 = entry1->orig_owner; - int orig_owner2 = entry2->orig_owner; + /* Sort first by dataset object header address */ + if (0 == (ret_value = H5_addr_cmp(oloc_addr1, oloc_addr2))) { + hsize_t chunk_index1 = entry1->chunk_idx; + hsize_t chunk_index2 = entry2->chunk_idx; - ret_value = (orig_owner1 > orig_owner2) - (orig_owner1 < orig_owner2); + /* Then by chunk index value */ + if (chunk_index1 == chunk_index2) { + int orig_owner1 = entry1->orig_owner; + int orig_owner2 = entry2->orig_owner; + + /* And finally by original owning MPI rank for the chunk */ + + ret_value = (orig_owner1 > orig_owner2) - (orig_owner1 < orig_owner2); + } + else + ret_value = (chunk_index1 > chunk_index2) - (chunk_index1 < chunk_index2); } - else - ret_value = (chunk_index1 > chunk_index2) - (chunk_index1 < chunk_index2); FUNC_LEAVE_NOAPI(ret_value) } /* end H5D__cmp_chunk_redistribute_info() */ @@ -2656,6 +2843,16 @@ H5D__cmp_chunk_redistribute_info(const void *_entry1, const void *_entry2) * rank for two H5D_chunk_redistribute_info_t * structures * + * NOTE: The inner logic used in this sorting callback (inside the + * block where the original owners are equal) is intended to + * cause the given array of H5D_chunk_redistribute_info_t + * structures to be sorted back exactly as it was sorted + * before a shared chunks redistribution operation, according + * to the logic in H5D__cmp_filtered_collective_io_info_entry. + * Since the two sorting callbacks are currently tied directly + * to each other, both should be updated in the same way when + * changes are made. + * * Return: -1, 0, 1 * *------------------------------------------------------------------------- @@ -2682,20 +2879,25 @@ H5D__cmp_chunk_redistribute_info_orig_owner(const void *_entry1, const void *_en haddr_t addr2 = entry2->chunk_block.offset; /* - * If both chunk addresses are defined, H5_addr_cmp is safe to use. - * Otherwise, if both addresses aren't defined, compared chunk - * entries based on their chunk index. Finally, if only one chunk - * address is defined, return the appropriate value based on which - * is defined. + * If both chunk's file addresses are defined, H5_addr_cmp is safe to use. + * If only one chunk's file address is defined, return the appropriate + * value based on which is defined. If neither chunk's file address is + * defined, compare chunk entries based on their dataset object header + * address, then by their chunk index value. */ if (H5_addr_defined(addr1) && H5_addr_defined(addr2)) { ret_value = H5_addr_cmp(addr1, addr2); } else if (!H5_addr_defined(addr1) && !H5_addr_defined(addr2)) { - hsize_t chunk_idx1 = entry1->chunk_idx; - hsize_t chunk_idx2 = entry2->chunk_idx; + haddr_t oloc_addr1 = entry1->dset_oloc_addr; + haddr_t oloc_addr2 = entry2->dset_oloc_addr; - ret_value = (chunk_idx1 > chunk_idx2) - (chunk_idx1 < chunk_idx2); + if (0 == (ret_value = H5_addr_cmp(oloc_addr1, oloc_addr2))) { + hsize_t chunk_idx1 = entry1->chunk_idx; + hsize_t chunk_idx2 = entry2->chunk_idx; + + ret_value = (chunk_idx1 > chunk_idx2) - (chunk_idx1 < chunk_idx2); + } } else ret_value = H5_addr_defined(addr1) ? 1 : -1; @@ -2927,20 +3129,21 @@ done: */ static herr_t H5D__mpio_collective_filtered_chunk_io_setup(const H5D_io_info_t *io_info, const H5D_dset_io_info_t *di, - int mpi_rank, H5D_filtered_collective_io_info_t *chunk_list) + size_t num_dset_infos, int mpi_rank, + H5D_filtered_collective_io_info_t *chunk_list) { - H5D_filtered_collective_chunk_info_t *local_info_array = NULL; - H5D_chunk_ud_t udata; - bool filter_partial_edge_chunks; - size_t num_chunks_selected; - size_t num_chunks_to_read = 0; - herr_t ret_value = SUCCEED; + H5D_filtered_collective_chunk_info_t *local_info_array = NULL; + H5D_mpio_filtered_dset_info_t *curr_dset_info = NULL; + size_t num_chunks_selected = 0; + size_t num_chunks_to_read = 0; + size_t buf_idx = 0; + bool need_sort = false; + herr_t ret_value = SUCCEED; FUNC_ENTER_PACKAGE assert(io_info); assert(di); - assert(di->layout->type == H5D_CHUNKED); assert(chunk_list); #ifdef H5Dmpio_DEBUG @@ -2948,166 +3151,330 @@ H5D__mpio_collective_filtered_chunk_io_setup(const H5D_io_info_t *io_info, const H5D_MPIO_TIME_START(mpi_rank, "Filtered Collective I/O Setup"); #endif - /* Each rank builds a local list of the chunks they have selected */ - if ((num_chunks_selected = H5SL_count(di->layout_io_info.chunk_map->dset_sel_pieces))) { - H5D_piece_info_t *chunk_info; - H5SL_node_t *chunk_node; - hsize_t select_npoints; - bool need_sort = false; + /* Calculate hash key length for chunk hash table */ + if (num_dset_infos > 1) { + /* Just in case the structure changes... */ + HDcompile_assert(offsetof(H5D_chunk_index_info_t, dset_oloc_addr) > + offsetof(H5D_chunk_index_info_t, chunk_idx)); + + /* Calculate key length using uthash compound key example */ + chunk_list->chunk_hash_table_keylen = offsetof(H5D_chunk_index_info_t, dset_oloc_addr) + + sizeof(haddr_t) - offsetof(H5D_chunk_index_info_t, chunk_idx); + } + else + chunk_list->chunk_hash_table_keylen = sizeof(hsize_t); + + chunk_list->all_dset_indices_empty = true; + chunk_list->no_dset_index_insert_methods = true; + + /* Calculate size needed for total chunk list */ + for (size_t dset_idx = 0; dset_idx < num_dset_infos; dset_idx++) { + /* Skip this dataset if no I/O is being performed */ + if (di[dset_idx].skip_io) + continue; + + /* Only process filtered, chunked datasets. A contiguous dataset + * could possibly have filters in the DCPL pipeline, but the library + * will currently ignore optional filters in that case. + */ + if ((di[dset_idx].dset->shared->dcpl_cache.pline.nused == 0) || + (di[dset_idx].layout->type == H5D_CONTIGUOUS)) + continue; + + assert(di[dset_idx].layout->type == H5D_CHUNKED); + assert(di[dset_idx].layout->storage.type == H5D_CHUNKED); - /* Determine whether partial edge chunks should be filtered */ - filter_partial_edge_chunks = - !(di->dset->shared->layout.u.chunk.flags & H5O_LAYOUT_CHUNK_DONT_FILTER_PARTIAL_BOUND_CHUNKS); + num_chunks_selected += H5SL_count(di[dset_idx].layout_io_info.chunk_map->dset_sel_pieces); + } + if (num_chunks_selected) if (NULL == (local_info_array = H5MM_malloc(num_chunks_selected * sizeof(*local_info_array)))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate local io info array buffer"); - chunk_node = H5SL_first(di->layout_io_info.chunk_map->dset_sel_pieces); - for (size_t i = 0; chunk_node; i++) { - chunk_info = (H5D_piece_info_t *)H5SL_item(chunk_node); + for (size_t dset_idx = 0; dset_idx < num_dset_infos; dset_idx++) { + H5D_chunk_ud_t udata; + H5O_fill_t *fill_msg; + haddr_t prev_tag = HADDR_UNDEF; - /* Obtain this chunk's address */ - if (H5D__chunk_lookup(di->dset, chunk_info->scaled, &udata) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "error looking up chunk address"); + /* Skip this dataset if no I/O is being performed */ + if (di[dset_idx].skip_io) + continue; - /* Initialize rank-local chunk info */ - local_info_array[i].chunk_info = chunk_info; - local_info_array[i].chunk_buf_size = 0; - local_info_array[i].num_writers = 0; - local_info_array[i].orig_owner = mpi_rank; - local_info_array[i].new_owner = mpi_rank; - local_info_array[i].buf = NULL; + /* Only process filtered, chunked datasets. A contiguous dataset + * could possibly have filters in the DCPL pipeline, but the library + * will currently ignore optional filters in that case. + */ + if ((di[dset_idx].dset->shared->dcpl_cache.pline.nused == 0) || + (di[dset_idx].layout->type == H5D_CONTIGUOUS)) + continue; - select_npoints = H5S_GET_SELECT_NPOINTS(chunk_info->fspace); - local_info_array[i].io_size = (size_t)select_npoints * di->type_info.dst_type_size; + assert(di[dset_idx].layout->storage.type == H5D_CHUNKED); + assert(di[dset_idx].layout->storage.u.chunk.idx_type != H5D_CHUNK_IDX_NONE); - /* - * Determine whether this chunk will need to be read from the file. If this is - * a read operation, the chunk will be read. If this is a write operation, we - * generally need to read a filtered chunk from the file before modifying it, - * unless the chunk is being fully overwritten. - * - * TODO: Currently the full overwrite status of a chunk is only obtained on a - * per-rank basis. This means that if the total selection in the chunk, as - * determined by the combination of selections of all of the ranks interested in - * the chunk, covers the entire chunk, the performance optimization of not reading - * the chunk from the file is still valid, but is not applied in the current - * implementation. - * - * To implement this case, a few approaches were considered: - * - * - Keep a running total (distributed to each rank) of the number of chunk - * elements selected during chunk redistribution and compare that to the total - * number of elements in the chunk once redistribution is finished - * - * - Process all incoming chunk messages before doing I/O (these are currently - * processed AFTER doing I/O), combine the owning rank's selection in a chunk - * with the selections received from other ranks and check to see whether that - * combined selection covers the entire chunk - * - * The first approach will be dangerous if the application performs an overlapping - * write to a chunk, as the number of selected elements can equal or exceed the - * number of elements in the chunk without the whole chunk selection being covered. - * While it might be considered erroneous for an application to do an overlapping - * write, we don't explicitly disallow it. - * - * The second approach contains a bit of complexity in that part of the chunk - * messages will be needed before doing I/O and part will be needed after doing I/O. - * Since modification data from chunk messages can't be applied until after any I/O - * is performed (otherwise, we'll overwrite any applied modification data), chunk - * messages are currently entirely processed after I/O. However, in order to determine - * if a chunk is being fully overwritten, we need the dataspace portion of the chunk - * messages before doing I/O. The naive way to do this is to process chunk messages - * twice, using just the relevant information from the message before and after I/O. - * The better way would be to avoid processing chunk messages twice by extracting (and - * keeping around) the dataspace portion of the message before I/O and processing the - * rest of the chunk message after I/O. Note that the dataspace portion of each chunk - * message is used to correctly apply chunk modification data from the message, so - * must be kept around both before and after I/O in this case. - */ - if (io_info->op_type == H5D_IO_OP_READ) - local_info_array[i].need_read = true; - else { - local_info_array[i].need_read = - local_info_array[i].io_size < (size_t)di->dset->shared->layout.u.chunk.size; - } + /* + * To support the multi-dataset I/O case, cache some info (chunk size, + * fill buffer and fill dataspace, etc.) about each dataset involved + * in the I/O operation for use when processing chunks. If only one + * dataset is involved, this information is the same for every chunk + * processed. Otherwise, if multiple datasets are involved, a hash + * table is used to quickly match a particular chunk with the cached + * information pertaining to the dataset it resides in. + */ + if (NULL == (curr_dset_info = H5MM_malloc(sizeof(H5D_mpio_filtered_dset_info_t)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate space for dataset info"); + + memset(&curr_dset_info->fb_info, 0, sizeof(H5D_fill_buf_info_t)); + + H5D_MPIO_INIT_CHUNK_IDX_INFO(curr_dset_info->chunk_idx_info, di[dset_idx].dset); + + curr_dset_info->dset_io_info = &di[dset_idx]; + curr_dset_info->file_chunk_size = di[dset_idx].dset->shared->layout.u.chunk.size; + curr_dset_info->dset_oloc_addr = di[dset_idx].dset->oloc.addr; + curr_dset_info->fill_space = NULL; + curr_dset_info->fb_info_init = false; + curr_dset_info->index_empty = false; + + /* Determine if fill values should be written to chunks */ + fill_msg = &di[dset_idx].dset->shared->dcpl_cache.fill; + curr_dset_info->should_fill = + (fill_msg->fill_time == H5D_FILL_TIME_ALLOC) || + ((fill_msg->fill_time == H5D_FILL_TIME_IFSET) && fill_msg->fill_defined); + + if (curr_dset_info->should_fill) { + hsize_t chunk_dims[H5S_MAX_RANK]; + + assert(di[dset_idx].dset->shared->ndims == di[dset_idx].dset->shared->layout.u.chunk.ndims - 1); + for (size_t dim_idx = 0; dim_idx < di[dset_idx].dset->shared->layout.u.chunk.ndims - 1; dim_idx++) + chunk_dims[dim_idx] = (hsize_t)di[dset_idx].dset->shared->layout.u.chunk.dim[dim_idx]; + + /* Get a dataspace for filling chunk memory buffers */ + if (NULL == (curr_dset_info->fill_space = H5S_create_simple( + di[dset_idx].dset->shared->layout.u.chunk.ndims - 1, chunk_dims, NULL))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to create chunk fill dataspace"); + + /* Initialize fill value buffer */ + if (H5D__fill_init(&curr_dset_info->fb_info, NULL, (H5MM_allocate_t)H5D__chunk_mem_alloc, + (void *)&di[dset_idx].dset->shared->dcpl_cache.pline, + (H5MM_free_t)H5D__chunk_mem_free, + (void *)&di[dset_idx].dset->shared->dcpl_cache.pline, + &di[dset_idx].dset->shared->dcpl_cache.fill, di[dset_idx].dset->shared->type, + di[dset_idx].dset->shared->type_id, 0, curr_dset_info->file_chunk_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "can't initialize fill value buffer"); + + curr_dset_info->fb_info_init = true; + } + + /* + * If the dataset is incrementally allocated and hasn't been written + * to yet, the chunk index should be empty. In this case, a collective + * read of its chunks is essentially a no-op, so we can avoid that read + * later. If all datasets have empty chunk indices, we can skip the + * collective read entirely. + */ + if (fill_msg->alloc_time == H5D_ALLOC_TIME_INCR) + if (H5D__chunk_index_empty(di[dset_idx].dset, &curr_dset_info->index_empty) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "couldn't determine if chunk index is empty"); + + if ((fill_msg->alloc_time != H5D_ALLOC_TIME_INCR) || !curr_dset_info->index_empty) + chunk_list->all_dset_indices_empty = false; + + if (curr_dset_info->chunk_idx_info.storage->ops->insert) + chunk_list->no_dset_index_insert_methods = false; - if (local_info_array[i].need_read) - num_chunks_to_read++; + /* + * For multi-dataset I/O, use a hash table to keep a mapping between + * chunks and the cached info for the dataset that they're in. Otherwise, + * we can just use the info object directly if only one dataset is being + * worked on. + */ + if (num_dset_infos > 1) { + HASH_ADD(hh, chunk_list->dset_info.dset_info_hash_table, dset_oloc_addr, sizeof(haddr_t), + curr_dset_info); + } + else + chunk_list->dset_info.single_dset_info = curr_dset_info; + curr_dset_info = NULL; + + /* + * Now, each rank builds a local list of info about the chunks + * they have selected among the chunks in the current dataset + */ + + /* Set metadata tagging with dataset oheader addr */ + H5AC_tag(di[dset_idx].dset->oloc.addr, &prev_tag); + + if (H5SL_count(di[dset_idx].layout_io_info.chunk_map->dset_sel_pieces)) { + H5SL_node_t *chunk_node; + bool filter_partial_edge_chunks; + + /* Determine whether partial edge chunks should be filtered */ + filter_partial_edge_chunks = !(di[dset_idx].dset->shared->layout.u.chunk.flags & + H5O_LAYOUT_CHUNK_DONT_FILTER_PARTIAL_BOUND_CHUNKS); + + chunk_node = H5SL_first(di[dset_idx].layout_io_info.chunk_map->dset_sel_pieces); + while (chunk_node) { + H5D_piece_info_t *chunk_info; + hsize_t select_npoints; + + chunk_info = (H5D_piece_info_t *)H5SL_item(chunk_node); + assert(chunk_info->filtered_dset); + + /* Obtain this chunk's address */ + if (H5D__chunk_lookup(di[dset_idx].dset, chunk_info->scaled, &udata) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "error looking up chunk address"); + + /* Initialize rank-local chunk info */ + local_info_array[buf_idx].chunk_info = chunk_info; + local_info_array[buf_idx].chunk_buf_size = 0; + local_info_array[buf_idx].num_writers = 0; + local_info_array[buf_idx].orig_owner = mpi_rank; + local_info_array[buf_idx].new_owner = mpi_rank; + local_info_array[buf_idx].buf = NULL; + + select_npoints = H5S_GET_SELECT_NPOINTS(chunk_info->fspace); + local_info_array[buf_idx].io_size = + (size_t)select_npoints * di[dset_idx].type_info.dst_type_size; - local_info_array[i].skip_filter_pline = false; - if (!filter_partial_edge_chunks) { /* - * If this is a partial edge chunk and the "don't filter partial edge - * chunks" flag is set, make sure not to apply filters to the chunk. + * Determine whether this chunk will need to be read from the file. If this is + * a read operation, the chunk will be read. If this is a write operation, we + * generally need to read a filtered chunk from the file before modifying it, + * unless the chunk is being fully overwritten. + * + * TODO: Currently the full overwrite status of a chunk is only obtained on a + * per-rank basis. This means that if the total selection in the chunk, as + * determined by the combination of selections of all of the ranks interested in + * the chunk, covers the entire chunk, the performance optimization of not reading + * the chunk from the file is still valid, but is not applied in the current + * implementation. + * + * To implement this case, a few approaches were considered: + * + * - Keep a running total (distributed to each rank) of the number of chunk + * elements selected during chunk redistribution and compare that to the total + * number of elements in the chunk once redistribution is finished + * + * - Process all incoming chunk messages before doing I/O (these are currently + * processed AFTER doing I/O), combine the owning rank's selection in a chunk + * with the selections received from other ranks and check to see whether that + * combined selection covers the entire chunk + * + * The first approach will be dangerous if the application performs an overlapping + * write to a chunk, as the number of selected elements can equal or exceed the + * number of elements in the chunk without the whole chunk selection being covered. + * While it might be considered erroneous for an application to do an overlapping + * write, we don't explicitly disallow it. + * + * The second approach contains a bit of complexity in that part of the chunk + * messages will be needed before doing I/O and part will be needed after doing I/O. + * Since modification data from chunk messages can't be applied until after any I/O + * is performed (otherwise, we'll overwrite any applied modification data), chunk + * messages are currently entirely processed after I/O. However, in order to determine + * if a chunk is being fully overwritten, we need the dataspace portion of the chunk + * messages before doing I/O. The naive way to do this is to process chunk messages + * twice, using just the relevant information from the message before and after I/O. + * The better way would be to avoid processing chunk messages twice by extracting (and + * keeping around) the dataspace portion of the message before I/O and processing the + * rest of the chunk message after I/O. Note that the dataspace portion of each chunk + * message is used to correctly apply chunk modification data from the message, so + * must be kept around both before and after I/O in this case. */ - if (H5D__chunk_is_partial_edge_chunk(di->dset->shared->ndims, - di->dset->shared->layout.u.chunk.dim, chunk_info->scaled, - di->dset->shared->curr_dims)) - local_info_array[i].skip_filter_pline = true; - } + if (io_info->op_type == H5D_IO_OP_READ) + local_info_array[buf_idx].need_read = true; + else { + local_info_array[buf_idx].need_read = + local_info_array[buf_idx].io_size < + (size_t)di[dset_idx].dset->shared->layout.u.chunk.size; + } - /* Initialize the chunk's shared info */ - local_info_array[i].chunk_current = udata.chunk_block; - local_info_array[i].chunk_new = udata.chunk_block; + if (local_info_array[buf_idx].need_read) + num_chunks_to_read++; - /* - * Check if the list is not in ascending order of offset in the file - * or has unallocated chunks. In either case, the list should get - * sorted. - */ - if (i) { - haddr_t curr_chunk_offset = local_info_array[i].chunk_current.offset; - haddr_t prev_chunk_offset = local_info_array[i - 1].chunk_current.offset; + local_info_array[buf_idx].skip_filter_pline = false; + if (!filter_partial_edge_chunks) { + /* + * If this is a partial edge chunk and the "don't filter partial edge + * chunks" flag is set, make sure not to apply filters to the chunk. + */ + if (H5D__chunk_is_partial_edge_chunk( + di[dset_idx].dset->shared->ndims, di[dset_idx].dset->shared->layout.u.chunk.dim, + chunk_info->scaled, di[dset_idx].dset->shared->curr_dims)) + local_info_array[buf_idx].skip_filter_pline = true; + } - if (!H5_addr_defined(prev_chunk_offset) || !H5_addr_defined(curr_chunk_offset) || - (curr_chunk_offset < prev_chunk_offset)) - need_sort = true; + /* Initialize the chunk's shared info */ + local_info_array[buf_idx].chunk_current = udata.chunk_block; + local_info_array[buf_idx].chunk_new = udata.chunk_block; + + /* + * Check if the list is not in ascending order of offset in the file + * or has unallocated chunks. In either case, the list should get + * sorted. + */ + if (!need_sort && buf_idx) { + haddr_t curr_chunk_offset = local_info_array[buf_idx].chunk_current.offset; + haddr_t prev_chunk_offset = local_info_array[buf_idx - 1].chunk_current.offset; + + if (!H5_addr_defined(prev_chunk_offset) || !H5_addr_defined(curr_chunk_offset) || + (curr_chunk_offset < prev_chunk_offset)) + need_sort = true; + } + + /* Needed for proper hashing later on */ + memset(&local_info_array[buf_idx].index_info, 0, sizeof(H5D_chunk_index_info_t)); + + /* + * Extensible arrays may calculate a chunk's index a little differently + * than normal when the dataset's unlimited dimension is not the + * slowest-changing dimension, so set the index here based on what the + * extensible array code calculated instead of what was calculated + * in the chunk file mapping. + */ + if (di[dset_idx].dset->shared->layout.u.chunk.idx_type == H5D_CHUNK_IDX_EARRAY) + local_info_array[buf_idx].index_info.chunk_idx = udata.chunk_idx; + else + local_info_array[buf_idx].index_info.chunk_idx = chunk_info->index; + + assert(H5_addr_defined(di[dset_idx].dset->oloc.addr)); + local_info_array[buf_idx].index_info.dset_oloc_addr = di[dset_idx].dset->oloc.addr; + + local_info_array[buf_idx].index_info.filter_mask = udata.filter_mask; + local_info_array[buf_idx].index_info.need_insert = false; + + buf_idx++; + + chunk_node = H5SL_next(chunk_node); } + } + else if (H5F_get_coll_metadata_reads(di[dset_idx].dset->oloc.file)) { + hsize_t scaled[H5O_LAYOUT_NDIMS] = {0}; /* - * Extensible arrays may calculate a chunk's index a little differently - * than normal when the dataset's unlimited dimension is not the - * slowest-changing dimension, so set the index here based on what the - * extensible array code calculated instead of what was calculated - * in the chunk file mapping. + * If this rank has no selection in the dataset and collective + * metadata reads are enabled, do a fake lookup of a chunk to + * ensure that this rank has the chunk index opened. Otherwise, + * only the ranks that had a selection will have opened the + * chunk index and they will have done so independently. Therefore, + * when ranks with no selection participate in later collective + * metadata reads, they will try to open the chunk index collectively + * and issues will occur since other ranks won't participate. + * + * In the future, we should consider having a chunk index "open" + * callback that can be used to ensure collectivity between ranks + * in a more natural way, but this hack should suffice for now. */ - if (di->dset->shared->layout.u.chunk.idx_type == H5D_CHUNK_IDX_EARRAY) - local_info_array[i].index_info.chunk_idx = udata.chunk_idx; - else - local_info_array[i].index_info.chunk_idx = chunk_info->index; - - local_info_array[i].index_info.filter_mask = udata.filter_mask; - local_info_array[i].index_info.need_insert = false; - - chunk_node = H5SL_next(chunk_node); + if (H5D__chunk_lookup(di[dset_idx].dset, scaled, &udata) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "error looking up chunk address"); } - /* Ensure the chunk list is sorted in ascending order of offset in the file */ - if (need_sort) - qsort(local_info_array, num_chunks_selected, sizeof(H5D_filtered_collective_chunk_info_t), - H5D__cmp_filtered_collective_io_info_entry); + /* Reset metadata tagging */ + H5AC_tag(prev_tag, NULL); } - else if (H5F_get_coll_metadata_reads(di->dset->oloc.file)) { - hsize_t scaled[H5O_LAYOUT_NDIMS] = {0}; - /* - * If this rank has no selection in the dataset and collective - * metadata reads are enabled, do a fake lookup of a chunk to - * ensure that this rank has the chunk index opened. Otherwise, - * only the ranks that had a selection will have opened the - * chunk index and they will have done so independently. Therefore, - * when ranks with no selection participate in later collective - * metadata reads, they will try to open the chunk index collectively - * and issues will occur since other ranks won't participate. - * - * In the future, we should consider having a chunk index "open" - * callback that can be used to ensure collectivity between ranks - * in a more natural way, but this hack should suffice for now. - */ - if (H5D__chunk_lookup(di->dset, scaled, &udata) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "error looking up chunk address"); - } + /* Ensure the chunk list is sorted in ascending order of offset in the file */ + if (local_info_array && need_sort) + qsort(local_info_array, num_chunks_selected, sizeof(H5D_filtered_collective_chunk_info_t), + H5D__cmp_filtered_collective_io_info_entry); chunk_list->chunk_infos = local_info_array; chunk_list->num_chunk_infos = num_chunks_selected; @@ -3119,6 +3486,37 @@ H5D__mpio_collective_filtered_chunk_io_setup(const H5D_io_info_t *io_info, const done: if (ret_value < 0) { + /* Free temporary cached dataset info object */ + if (curr_dset_info) { + if (curr_dset_info->fb_info_init && H5D__fill_term(&curr_dset_info->fb_info) < 0) + HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "can't release fill buffer info"); + if (curr_dset_info->fill_space && H5S_close(curr_dset_info->fill_space) < 0) + HDONE_ERROR(H5E_DATASET, H5E_CLOSEERROR, FAIL, "can't close fill space"); + + H5MM_free(curr_dset_info); + curr_dset_info = NULL; + + if (num_dset_infos == 1) + chunk_list->dset_info.single_dset_info = NULL; + } + + /* Free resources used by cached dataset info hash table */ + if (num_dset_infos > 1) { + H5D_mpio_filtered_dset_info_t *tmp; + + HASH_ITER(hh, chunk_list->dset_info.dset_info_hash_table, curr_dset_info, tmp) + { + HASH_DELETE(hh, chunk_list->dset_info.dset_info_hash_table, curr_dset_info); + H5MM_free(curr_dset_info); + curr_dset_info = NULL; + } + } + + if (num_dset_infos == 1) + chunk_list->dset_info.single_dset_info = NULL; + else + chunk_list->dset_info.dset_info_hash_table = NULL; + H5MM_free(local_info_array); } @@ -3158,7 +3556,6 @@ H5D__mpio_redistribute_shared_chunks(H5D_filtered_collective_io_info_t *chunk_li bool redistribute_on_all_ranks; size_t *num_chunks_map = NULL; size_t coll_chunk_list_size = 0; - size_t i; int mpi_code; herr_t ret_value = SUCCEED; @@ -3186,8 +3583,8 @@ H5D__mpio_redistribute_shared_chunks(H5D_filtered_collective_io_info_t *chunk_li num_chunks_map, 1, H5_SIZE_T_AS_MPI_TYPE, io_info->comm))) HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code) - for (i = 0; i < (size_t)mpi_size; i++) - coll_chunk_list_size += num_chunks_map[i]; + for (int curr_rank = 0; curr_rank < mpi_size; curr_rank++) + coll_chunk_list_size += num_chunks_map[curr_rank]; /* * Determine whether we should perform chunk redistribution on all @@ -3257,21 +3654,23 @@ done: * * - All MPI ranks send their list of selected chunks to the * ranks involved in chunk redistribution. Then, the - * involved ranks sort this new list in order of chunk - * index. + * involved ranks sort this new list in order of: + * + * dataset object header address -> chunk index value -> + * original owning MPI rank for chunk * * - The involved ranks scan the list looking for matching - * runs of chunk index values (corresponding to a shared - * chunk which has been selected by more than one rank in - * the I/O operation) and for each shared chunk, - * redistribute the chunk to the MPI rank writing to the - * chunk which currently has the least amount of chunks - * assigned to it. This is done by modifying the "new_owner" - * field in each of the list entries corresponding to that - * chunk. The involved ranks then re-sort the list in order - * of original chunk owner so that each rank's section of - * contributed chunks is contiguous in the collective chunk - * list. + * runs of (dataset object header address, chunk index value) + * pairs (corresponding to a shared chunk which has been + * selected by more than one rank in the I/O operation) and + * for each shared chunk, redistribute the chunk to the MPI + * rank writing to the chunk which currently has the least + * amount of chunks assigned to it. This is done by modifying + * the "new_owner" field in each of the list entries + * corresponding to that chunk. The involved ranks then + * re-sort the list in order of original chunk owner so that + * each rank's section of contributed chunks is contiguous + * in the collective chunk list. * * - If chunk redistribution occurred on all ranks, each rank * scans through the collective chunk list to find their @@ -3293,9 +3692,8 @@ H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chun { MPI_Datatype struct_type; MPI_Datatype packed_type; - bool struct_type_derived = false; - bool packed_type_derived = false; - size_t i; + bool struct_type_derived = false; + bool packed_type_derived = false; size_t coll_chunk_list_num_entries = 0; void *coll_chunk_list = NULL; int *counts_disps_array = NULL; @@ -3346,15 +3744,15 @@ H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chun /* Set the receive counts from the assigned chunks map */ counts_ptr = counts_disps_array; - for (i = 0; i < (size_t)mpi_size; i++) - H5_CHECKED_ASSIGN(counts_ptr[i], int, num_chunks_assigned_map[i], size_t); + for (int curr_rank = 0; curr_rank < mpi_size; curr_rank++) + H5_CHECKED_ASSIGN(counts_ptr[curr_rank], int, num_chunks_assigned_map[curr_rank], size_t); /* Set the displacements into the receive buffer for the gather operation */ displacements_ptr = &counts_disps_array[mpi_size]; *displacements_ptr = 0; - for (i = 1; i < (size_t)mpi_size; i++) - displacements_ptr[i] = displacements_ptr[i - 1] + counts_ptr[i - 1]; + for (int curr_rank = 1; curr_rank < mpi_size; curr_rank++) + displacements_ptr[curr_rank] = displacements_ptr[curr_rank - 1] + counts_ptr[curr_rank - 1]; } } @@ -3363,9 +3761,11 @@ H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chun * necessary for MPI communication */ if (H5D__mpio_get_chunk_redistribute_info_types(&packed_type, &packed_type_derived, &struct_type, - &struct_type_derived) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, + &struct_type_derived) < 0) { + /* Push an error, but still participate in collective gather operation */ + HDONE_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't create derived datatypes for chunk redistribution info"); + } /* Perform gather operation */ if (H5_mpio_gatherv_alloc(chunk_list->chunk_infos, num_chunks_int, struct_type, counts_ptr, @@ -3389,15 +3789,14 @@ H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chun if (all_ranks_involved || (mpi_rank == 0)) { H5D_chunk_redistribute_info_t *chunk_entry; - hsize_t curr_chunk_idx; - size_t set_begin_index; - int num_writers; - int new_chunk_owner; /* Clear the mapping from rank value -> number of assigned chunks */ memset(num_chunks_assigned_map, 0, (size_t)mpi_size * sizeof(*num_chunks_assigned_map)); - /* Sort collective chunk list according to chunk index */ + /* + * Sort collective chunk list according to: + * dataset object header address -> chunk index value -> original owning MPI rank for chunk + */ qsort(coll_chunk_list, coll_chunk_list_num_entries, sizeof(H5D_chunk_redistribute_info_t), H5D__cmp_chunk_redistribute_info); @@ -3410,21 +3809,30 @@ H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chun * chunks). */ chunk_entry = &((H5D_chunk_redistribute_info_t *)coll_chunk_list)[0]; - for (i = 0; i < coll_chunk_list_num_entries;) { + for (size_t entry_idx = 0; entry_idx < coll_chunk_list_num_entries;) { + haddr_t curr_oloc_addr; + hsize_t curr_chunk_idx; + size_t set_begin_index; + bool keep_processing; + int num_writers; + int new_chunk_owner; + /* Set chunk's initial new owner to its original owner */ new_chunk_owner = chunk_entry->orig_owner; /* - * Set the current chunk index so we know when we've processed - * all duplicate entries for a particular shared chunk + * Set the current dataset object header address and chunk + * index value so we know when we've processed all duplicate + * entries for a particular shared chunk */ + curr_oloc_addr = chunk_entry->dset_oloc_addr; curr_chunk_idx = chunk_entry->chunk_idx; /* Reset the initial number of writers to this chunk */ num_writers = 0; /* Set index for the beginning of this section of duplicate chunk entries */ - set_begin_index = i; + set_begin_index = entry_idx; /* * Process each chunk entry in the set for the current @@ -3445,13 +3853,21 @@ H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chun num_writers++; chunk_entry++; - } while (++i < coll_chunk_list_num_entries && chunk_entry->chunk_idx == curr_chunk_idx); + + keep_processing = + /* Make sure we haven't run out of chunks in the chunk list */ + (++entry_idx < coll_chunk_list_num_entries) && + /* Make sure the chunk we're looking at is in the same dataset */ + (H5_addr_eq(chunk_entry->dset_oloc_addr, curr_oloc_addr)) && + /* Make sure the chunk we're looking at is the same chunk */ + (chunk_entry->chunk_idx == curr_chunk_idx); + } while (keep_processing); /* We should never have more writers to a chunk than the number of MPI ranks */ assert(num_writers <= mpi_size); /* Set all processed chunk entries' "new_owner" and "num_writers" fields */ - for (; set_begin_index < i; set_begin_index++) { + for (; set_begin_index < entry_idx; set_begin_index++) { H5D_chunk_redistribute_info_t *entry; entry = &((H5D_chunk_redistribute_info_t *)coll_chunk_list)[set_begin_index]; @@ -3485,29 +3901,32 @@ H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chun } if (all_ranks_involved) { + size_t entry_idx; + /* * If redistribution occurred on all ranks, search for the section * in the collective chunk list corresponding to this rank's locally * selected chunks and update the local list after redistribution. */ - for (i = 0; i < coll_chunk_list_num_entries; i++) - if (mpi_rank == ((H5D_chunk_redistribute_info_t *)coll_chunk_list)[i].orig_owner) + for (entry_idx = 0; entry_idx < coll_chunk_list_num_entries; entry_idx++) + if (mpi_rank == ((H5D_chunk_redistribute_info_t *)coll_chunk_list)[entry_idx].orig_owner) break; - for (size_t j = 0; j < (size_t)num_chunks_int; j++) { + for (size_t info_idx = 0; info_idx < (size_t)num_chunks_int; info_idx++) { H5D_chunk_redistribute_info_t *coll_entry; - coll_entry = &((H5D_chunk_redistribute_info_t *)coll_chunk_list)[i++]; + coll_entry = &((H5D_chunk_redistribute_info_t *)coll_chunk_list)[entry_idx++]; - chunk_list->chunk_infos[j].new_owner = coll_entry->new_owner; - chunk_list->chunk_infos[j].num_writers = coll_entry->num_writers; + chunk_list->chunk_infos[info_idx].new_owner = coll_entry->new_owner; + chunk_list->chunk_infos[info_idx].num_writers = coll_entry->num_writers; /* * Check if the chunk list struct's `num_chunks_to_read` field * needs to be updated */ - if (chunk_list->chunk_infos[j].need_read && (chunk_list->chunk_infos[j].new_owner != mpi_rank)) { - chunk_list->chunk_infos[j].need_read = false; + if (chunk_list->chunk_infos[info_idx].need_read && + (chunk_list->chunk_infos[info_idx].new_owner != mpi_rank)) { + chunk_list->chunk_infos[info_idx].need_read = false; assert(chunk_list->num_chunks_to_read > 0); chunk_list->num_chunks_to_read--; @@ -3530,9 +3949,10 @@ H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chun * their chunk list struct's `num_chunks_to_read` field since it * may now be out of date. */ - for (i = 0; i < chunk_list->num_chunk_infos; i++) { - if ((chunk_list->chunk_infos[i].new_owner != mpi_rank) && chunk_list->chunk_infos[i].need_read) { - chunk_list->chunk_infos[i].need_read = false; + for (size_t info_idx = 0; info_idx < chunk_list->num_chunk_infos; info_idx++) { + if ((chunk_list->chunk_infos[info_idx].new_owner != mpi_rank) && + chunk_list->chunk_infos[info_idx].need_read) { + chunk_list->chunk_infos[info_idx].need_read = false; assert(chunk_list->num_chunks_to_read > 0); chunk_list->num_chunks_to_read--; @@ -3597,9 +4017,10 @@ done: * owned by that rank, the rank sends the data it wishes to * update the chunk with to the MPI rank that now has * ownership of that chunk. To do this, it encodes the - * chunk's index, its selection in the chunk and its - * modification data into a buffer and then posts a - * non-blocking MPI_Issend to the owning rank. + * chunk's index value, the dataset's object header address + * (only for the multi-dataset I/O case), its selection in + * the chunk and its modification data into a buffer and + * then posts a non-blocking MPI_Issend to the owning rank. * * Once this step is complete, all MPI ranks allocate arrays * to hold chunk message receive buffers and MPI request @@ -3641,9 +4062,8 @@ done: */ static herr_t H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk_list, H5D_io_info_t *io_info, - H5D_dset_io_info_t *dset_info, int mpi_rank, - int H5_ATTR_NDEBUG_UNUSED mpi_size, unsigned char ***chunk_msg_bufs, - int *chunk_msg_bufs_len) + int mpi_rank, int H5_ATTR_NDEBUG_UNUSED mpi_size, + unsigned char ***chunk_msg_bufs, int *chunk_msg_bufs_len) { H5D_filtered_collective_chunk_info_t *chunk_table = NULL; H5S_sel_iter_t *mem_iter = NULL; @@ -3658,8 +4078,8 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk size_t num_send_requests = 0; size_t num_recv_requests = 0; size_t num_msgs_incoming = 0; + size_t hash_keylen = 0; size_t last_assigned_idx; - size_t i; int mpi_code; herr_t ret_value = SUCCEED; @@ -3667,7 +4087,6 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk assert(chunk_list); assert(io_info); - assert(dset_info); assert(mpi_size > 1); assert(chunk_msg_bufs); assert(chunk_msg_bufs_len); @@ -3681,6 +4100,9 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk H5CX_set_libver_bounds(NULL); if (chunk_list->num_chunk_infos > 0) { + hash_keylen = chunk_list->chunk_hash_table_keylen; + assert(hash_keylen > 0); + /* Allocate a selection iterator for iterating over chunk dataspaces */ if (NULL == (mem_iter = H5FL_MALLOC(H5S_sel_iter_t))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate dataspace selection iterator"); @@ -3712,8 +4134,9 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk * synchronous sends to send the data this rank is writing to * the rank that does own the chunk. */ - for (i = 0, last_assigned_idx = 0; i < chunk_list->num_chunk_infos; i++) { - H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[i]; + last_assigned_idx = 0; + for (size_t info_idx = 0; info_idx < chunk_list->num_chunk_infos; info_idx++) { + H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[info_idx]; if (mpi_rank == chunk_entry->new_owner) { num_msgs_incoming += (size_t)(chunk_entry->num_writers - 1); @@ -3723,19 +4146,24 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk * does own, since it has sent the necessary data and is no longer * interested in the chunks it doesn't own. */ - chunk_list->chunk_infos[last_assigned_idx] = chunk_list->chunk_infos[i]; + chunk_list->chunk_infos[last_assigned_idx] = chunk_list->chunk_infos[info_idx]; /* * Since, at large scale, a chunk's index value may be larger than * the maximum value that can be stored in an int, we cannot rely * on using a chunk's index value as the tag for the MPI messages - * sent/received for a chunk. Therefore, add this chunk to a hash - * table with the chunk's index as a key so that we can quickly find - * the chunk when processing chunk messages that were received. The - * message itself will contain the chunk's index so we can update - * the correct chunk with the received data. + * sent/received for a chunk. Further, to support the multi-dataset + * I/O case, we can't rely on being able to distinguish between + * chunks by their chunk index value alone since two chunks from + * different datasets could have the same chunk index value. + * Therefore, add this chunk to a hash table with the dataset's + * object header address + the chunk's index value as a key so that + * we can quickly find the chunk when processing chunk messages that + * were received. The message itself will contain the dataset's + * object header address and the chunk's index value so we can + * update the correct chunk with the received data. */ - HASH_ADD(hh, chunk_table, index_info.chunk_idx, sizeof(hsize_t), + HASH_ADD(hh, chunk_table, index_info.chunk_idx, hash_keylen, &chunk_list->chunk_infos[last_assigned_idx]); last_assigned_idx++; @@ -3747,8 +4175,8 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk size_t mod_data_size = 0; size_t space_size = 0; - /* Add the size of the chunk index to the encoded size */ - mod_data_size += sizeof(hsize_t); + /* Add the size of the chunk hash table key to the encoded size */ + mod_data_size += hash_keylen; /* Determine size of serialized chunk file dataspace */ if (H5S_encode(chunk_info->fspace, &mod_data_p, &space_size) < 0) @@ -3759,7 +4187,7 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace); H5_CHECK_OVERFLOW(iter_nelmts, hsize_t, size_t); - mod_data_size += (size_t)iter_nelmts * dset_info->type_info.src_type_size; + mod_data_size += (size_t)iter_nelmts * chunk_info->dset_info->type_info.src_type_size; if (NULL == (msg_send_bufs[num_send_requests] = H5MM_malloc(mod_data_size))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, @@ -3767,23 +4195,28 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk mod_data_p = msg_send_bufs[num_send_requests]; - /* Store the chunk's index into the buffer */ - H5MM_memcpy(mod_data_p, &chunk_entry->index_info.chunk_idx, sizeof(hsize_t)); - mod_data_p += sizeof(hsize_t); + /* + * Add the chunk hash table key (chunk index value + possibly + * dataset object header address) into the buffer + */ + H5MM_memcpy(mod_data_p, &chunk_entry->index_info.chunk_idx, hash_keylen); + mod_data_p += hash_keylen; /* Serialize the chunk's file dataspace into the buffer */ if (H5S_encode(chunk_info->fspace, &mod_data_p, &mod_data_size) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to encode dataspace"); /* Initialize iterator for memory selection */ - if (H5S_select_iter_init(mem_iter, chunk_info->mspace, dset_info->type_info.src_type_size, + if (H5S_select_iter_init(mem_iter, chunk_info->mspace, + chunk_info->dset_info->type_info.src_type_size, H5S_SEL_ITER_SHARE_WITH_DATASPACE) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information"); mem_iter_init = true; /* Collect the modification data into the buffer */ - if (0 == H5D__gather_mem(dset_info->buf.cvp, mem_iter, (size_t)iter_nelmts, mod_data_p)) + if (0 == + H5D__gather_mem(chunk_info->dset_info->buf.cvp, mem_iter, (size_t)iter_nelmts, mod_data_p)) HGOTO_ERROR(H5E_IO, H5E_CANTGATHER, FAIL, "couldn't gather from write buffer"); /* @@ -3925,7 +4358,7 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk * send buffers used in the non-blocking operations */ if (msg_send_bufs) { - for (i = 0; i < num_send_requests; i++) { + for (size_t i = 0; i < num_send_requests; i++) { if (msg_send_bufs[i]) H5MM_free(msg_send_bufs[i]); } @@ -3960,7 +4393,7 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk /* Set the new number of locally-selected chunks */ chunk_list->num_chunk_infos = last_assigned_idx; - /* Set chunk hash table pointer for future use */ + /* Set chunk hash table information for future use */ chunk_list->chunk_hash_table = chunk_table; /* Return chunk message buffers if any were received */ @@ -3976,19 +4409,19 @@ done: } if (num_send_requests) { - for (i = 0; i < num_send_requests; i++) { + for (size_t i = 0; i < num_send_requests; i++) { MPI_Cancel(&send_requests[i]); } } if (recv_requests) { - for (i = 0; i < num_recv_requests; i++) { + for (size_t i = 0; i < num_recv_requests; i++) { MPI_Cancel(&recv_requests[i]); } } if (msg_recv_bufs) { - for (i = 0; i < num_recv_requests; i++) { + for (size_t i = 0; i < num_recv_requests; i++) { H5MM_free(msg_recv_bufs[i]); } @@ -4004,7 +4437,7 @@ done: H5MM_free(send_requests); if (msg_send_bufs) { - for (i = 0; i < num_send_requests; i++) { + for (size_t i = 0; i < num_send_requests; i++) { if (msg_send_bufs[i]) H5MM_free(msg_send_bufs[i]); } @@ -4040,26 +4473,16 @@ done: */ static herr_t H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chunk_list, - const H5D_io_info_t *io_info, const H5D_dset_io_info_t *di, - int mpi_rank) + const H5D_io_info_t *io_info, size_t num_dset_infos, int mpi_rank) { - H5D_fill_buf_info_t fb_info; - H5Z_EDC_t err_detect; /* Error detection info */ - H5Z_cb_t filter_cb; /* I/O filter callback function */ - hsize_t file_chunk_size = 0; - hsize_t iter_nelmts; /* Number of points to iterate over for the chunk IO operation */ - bool should_fill = false; - bool fb_info_init = false; - bool index_empty = false; - H5S_t *fill_space = NULL; - void *base_read_buf = NULL; - herr_t ret_value = SUCCEED; + H5Z_EDC_t err_detect; /* Error detection info */ + H5Z_cb_t filter_cb; /* I/O filter callback function */ + herr_t ret_value = SUCCEED; FUNC_ENTER_PACKAGE assert(chunk_list); assert(io_info); - assert(di); #ifdef H5Dmpio_DEBUG H5D_MPIO_TRACE_ENTER(mpi_rank); @@ -4068,22 +4491,6 @@ H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chun (void)mpi_rank; #endif - if (chunk_list->num_chunk_infos) { - /* Retrieve filter settings from API context */ - if (H5CX_get_err_detect(&err_detect) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get error detection info"); - if (H5CX_get_filter_cb(&filter_cb) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get I/O filter callback function"); - - /* Set size of full chunks in dataset */ - file_chunk_size = di->dset->shared->layout.u.chunk.size; - - /* Determine if fill values should be "read" for unallocated chunks */ - should_fill = (di->dset->shared->dcpl_cache.fill.fill_time == H5D_FILL_TIME_ALLOC) || - ((di->dset->shared->dcpl_cache.fill.fill_time == H5D_FILL_TIME_IFSET) && - di->dset->shared->dcpl_cache.fill.fill_defined); - } - /* * Allocate memory buffers for all chunks being read. Chunk data buffers are of * the largest size between the chunk's current filtered size and the chunk's true @@ -4097,29 +4504,61 @@ H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chun * size; reading into a (smaller) buffer of size equal to the unfiltered * chunk size would of course be bad. */ - for (size_t i = 0; i < chunk_list->num_chunk_infos; i++) { - H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[i]; + for (size_t info_idx = 0; info_idx < chunk_list->num_chunk_infos; info_idx++) { + H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[info_idx]; + H5D_mpio_filtered_dset_info_t *cached_dset_info; + hsize_t file_chunk_size; assert(chunk_entry->need_read); + /* Find the cached dataset info for the dataset this chunk is in */ + if (num_dset_infos > 1) { + HASH_FIND(hh, chunk_list->dset_info.dset_info_hash_table, &chunk_entry->index_info.dset_oloc_addr, + sizeof(haddr_t), cached_dset_info); + if (cached_dset_info == NULL) { + if (chunk_list->all_dset_indices_empty) + HGOTO_ERROR(H5E_DATASET, H5E_CANTFIND, FAIL, "unable to find cached dataset info entry"); + else { + /* Push an error, but participate in collective read */ + HDONE_ERROR(H5E_DATASET, H5E_CANTFIND, FAIL, "unable to find cached dataset info entry"); + break; + } + } + } + else + cached_dset_info = chunk_list->dset_info.single_dset_info; + assert(cached_dset_info); + + file_chunk_size = cached_dset_info->file_chunk_size; + chunk_entry->chunk_buf_size = MAX(chunk_entry->chunk_current.length, file_chunk_size); if (NULL == (chunk_entry->buf = H5MM_malloc(chunk_entry->chunk_buf_size))) { - /* Push an error, but participate in collective read */ - HDONE_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer"); - break; + if (chunk_list->all_dset_indices_empty) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer"); + else { + /* Push an error, but participate in collective read */ + HDONE_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer"); + break; + } } /* - * Check if chunk is currently allocated. If not, don't try to - * read it from the file. Instead, just fill the chunk buffer - * with the fill value if necessary. + * Check whether the chunk needs to be read from the file, based + * on whether the dataset's chunk index is empty or the chunk has + * a defined address in the file. If the chunk doesn't need to be + * read from the file, just fill the chunk buffer with the fill + * value if necessary. */ - if (H5_addr_defined(chunk_entry->chunk_current.offset)) { - /* Set first read buffer */ - if (!base_read_buf) - base_read_buf = chunk_entry->buf; + if (cached_dset_info->index_empty || !H5_addr_defined(chunk_entry->chunk_current.offset)) { + chunk_entry->need_read = false; + /* Update field keeping track of number of chunks to read */ + assert(chunk_list->num_chunks_to_read > 0); + chunk_list->num_chunks_to_read--; + } + + if (chunk_entry->need_read) { /* Set chunk's new length for eventual filter pipeline calls */ if (chunk_entry->skip_filter_pline) chunk_entry->chunk_new.length = file_chunk_size; @@ -4127,77 +4566,58 @@ H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chun chunk_entry->chunk_new.length = chunk_entry->chunk_current.length; } else { - chunk_entry->need_read = false; - - /* Update field keeping track of number of chunks to read */ - assert(chunk_list->num_chunks_to_read > 0); - chunk_list->num_chunks_to_read--; - /* Set chunk's new length for eventual filter pipeline calls */ chunk_entry->chunk_new.length = file_chunk_size; - if (should_fill) { - /* Initialize fill value buffer if not already initialized */ - if (!fb_info_init) { - hsize_t chunk_dims[H5S_MAX_RANK]; - - assert(di->dset->shared->ndims == di->dset->shared->layout.u.chunk.ndims - 1); - for (size_t j = 0; j < di->dset->shared->layout.u.chunk.ndims - 1; j++) - chunk_dims[j] = (hsize_t)di->dset->shared->layout.u.chunk.dim[j]; - - /* Get a dataspace for filling chunk memory buffers */ - if (NULL == (fill_space = H5S_create_simple(di->dset->shared->layout.u.chunk.ndims - 1, - chunk_dims, NULL))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to create chunk fill dataspace"); - - /* Initialize fill value buffer */ - if (H5D__fill_init( - &fb_info, NULL, (H5MM_allocate_t)H5D__chunk_mem_alloc, - (void *)&di->dset->shared->dcpl_cache.pline, (H5MM_free_t)H5D__chunk_mem_free, - (void *)&di->dset->shared->dcpl_cache.pline, &di->dset->shared->dcpl_cache.fill, - di->dset->shared->type, di->dset->shared->type_id, 0, file_chunk_size) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "can't initialize fill value buffer"); - - fb_info_init = true; - } + /* Determine if fill values should be "read" for this unallocated chunk */ + if (cached_dset_info->should_fill) { + assert(cached_dset_info->fb_info_init); + assert(cached_dset_info->fb_info.fill_buf); /* Write fill value to memory buffer */ - assert(fb_info.fill_buf); - if (H5D__fill(fb_info.fill_buf, di->dset->shared->type, chunk_entry->buf, - di->type_info.mem_type, fill_space) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, - "couldn't fill chunk buffer with fill value"); + if (H5D__fill(cached_dset_info->fb_info.fill_buf, + cached_dset_info->dset_io_info->type_info.dset_type, chunk_entry->buf, + cached_dset_info->dset_io_info->type_info.mem_type, + cached_dset_info->fill_space) < 0) { + if (chunk_list->all_dset_indices_empty) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, + "couldn't fill chunk buffer with fill value"); + else { + /* Push an error, but participate in collective read */ + HDONE_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, + "couldn't fill chunk buffer with fill value"); + break; + } + } } } } - /* - * If dataset is incrementally allocated and hasn't been written to - * yet, the chunk index should be empty. In this case, a collective - * read of chunks is essentially a no-op, so avoid it here. - */ - index_empty = false; - if (di->dset->shared->dcpl_cache.fill.alloc_time == H5D_ALLOC_TIME_INCR) - if (H5D__chunk_index_empty(di->dset, &index_empty) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "couldn't determine if chunk index is empty"); - - if (!index_empty) { - /* Perform collective vector read */ + /* Perform collective vector read if necessary */ + if (!chunk_list->all_dset_indices_empty) if (H5D__mpio_collective_filtered_vec_io(chunk_list, io_info->f_sh, H5D_IO_OP_READ) < 0) HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't perform vector I/O on filtered chunks"); + + if (chunk_list->num_chunk_infos) { + /* Retrieve filter settings from API context */ + if (H5CX_get_err_detect(&err_detect) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get error detection info"); + if (H5CX_get_filter_cb(&filter_cb) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get I/O filter callback function"); } /* * Iterate through all the read chunks, unfiltering them and scattering their * data out to the application's read buffer. */ - for (size_t i = 0; i < chunk_list->num_chunk_infos; i++) { - H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[i]; + for (size_t info_idx = 0; info_idx < chunk_list->num_chunk_infos; info_idx++) { + H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[info_idx]; H5D_piece_info_t *chunk_info = chunk_entry->chunk_info; + hsize_t iter_nelmts; /* Unfilter the chunk, unless we didn't read it from the file */ if (chunk_entry->need_read && !chunk_entry->skip_filter_pline) { - if (H5Z_pipeline(&di->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, + if (H5Z_pipeline(&chunk_info->dset_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, &(chunk_entry->index_info.filter_mask), err_detect, filter_cb, (size_t *)&chunk_entry->chunk_new.length, &chunk_entry->chunk_buf_size, &chunk_entry->buf) < 0) @@ -4207,26 +4627,21 @@ H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chun /* Scatter the chunk data to the read buffer */ iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->fspace); - if (H5D_select_io_mem(di->buf.vp, chunk_info->mspace, chunk_entry->buf, chunk_info->fspace, - di->type_info.src_type_size, (size_t)iter_nelmts) < 0) + if (H5D_select_io_mem(chunk_info->dset_info->buf.vp, chunk_info->mspace, chunk_entry->buf, + chunk_info->fspace, chunk_info->dset_info->type_info.src_type_size, + (size_t)iter_nelmts) < 0) HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't copy chunk data to read buffer"); } done: /* Free all resources used by entries in the chunk list */ - for (size_t i = 0; i < chunk_list->num_chunk_infos; i++) { - if (chunk_list->chunk_infos[i].buf) { - H5MM_free(chunk_list->chunk_infos[i].buf); - chunk_list->chunk_infos[i].buf = NULL; + for (size_t info_idx = 0; info_idx < chunk_list->num_chunk_infos; info_idx++) { + if (chunk_list->chunk_infos[info_idx].buf) { + H5MM_free(chunk_list->chunk_infos[info_idx].buf); + chunk_list->chunk_infos[info_idx].buf = NULL; } } - /* Release the fill buffer info, if it's been initialized */ - if (fb_info_init && H5D__fill_term(&fb_info) < 0) - HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "Can't release fill buffer info"); - if (fill_space && (H5S_close(fill_space) < 0)) - HDONE_ERROR(H5E_DATASET, H5E_CLOSEERROR, FAIL, "can't close fill space"); - #ifdef H5Dmpio_DEBUG H5D_MPIO_TIME_STOP(mpi_rank); H5D_MPIO_TRACE_EXIT(mpi_rank); @@ -4250,58 +4665,27 @@ done: static herr_t H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *chunk_list, unsigned char **chunk_msg_bufs, int chunk_msg_bufs_len, - const H5D_io_info_t *io_info, const H5D_dset_io_info_t *di, - int H5_ATTR_NDEBUG_UNUSED mpi_rank) + const H5D_io_info_t *io_info, size_t num_dset_infos, int mpi_rank) { - const H5D_type_info_t *type_info = NULL; - H5D_fill_buf_info_t fb_info; - H5S_sel_iter_t *sel_iter = NULL; /* Dataspace selection iterator for H5D__scatter_mem */ - H5Z_EDC_t err_detect; /* Error detection info */ - H5Z_cb_t filter_cb; /* I/O filter callback function */ - hsize_t file_chunk_size = 0; - hsize_t iter_nelmts; /* Number of points to iterate over for the chunk IO operation */ - bool should_fill = false; - bool fb_info_init = false; - bool sel_iter_init = false; - bool index_empty = false; - size_t i; - H5S_t *dataspace = NULL; - H5S_t *fill_space = NULL; - void *base_read_buf = NULL; - herr_t ret_value = SUCCEED; + H5S_sel_iter_t *sel_iter = NULL; /* Dataspace selection iterator for H5D__scatter_mem */ + H5Z_EDC_t err_detect; /* Error detection info */ + H5Z_cb_t filter_cb; /* I/O filter callback function */ + uint8_t *key_buf = NULL; + H5S_t *dataspace = NULL; + bool sel_iter_init = false; + herr_t ret_value = SUCCEED; FUNC_ENTER_PACKAGE assert(chunk_list); assert((chunk_msg_bufs && chunk_list->chunk_hash_table) || 0 == chunk_msg_bufs_len); assert(io_info); - assert(di); #ifdef H5Dmpio_DEBUG H5D_MPIO_TRACE_ENTER(mpi_rank); H5D_MPIO_TIME_START(mpi_rank, "Filtered collective chunk update"); #endif - /* Set convenience pointers */ - type_info = &(di->type_info); - assert(type_info); - - if (chunk_list->num_chunk_infos > 0) { - /* Retrieve filter settings from API context */ - if (H5CX_get_err_detect(&err_detect) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get error detection info"); - if (H5CX_get_filter_cb(&filter_cb) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get I/O filter callback function"); - - /* Set size of full chunks in dataset */ - file_chunk_size = di->dset->shared->layout.u.chunk.size; - - /* Determine if fill values should be written to chunks */ - should_fill = (di->dset->shared->dcpl_cache.fill.fill_time == H5D_FILL_TIME_ALLOC) || - ((di->dset->shared->dcpl_cache.fill.fill_time == H5D_FILL_TIME_IFSET) && - di->dset->shared->dcpl_cache.fill.fill_defined); - } - /* * Allocate memory buffers for all owned chunks. Chunk data buffers are of the * largest size between the chunk's current filtered size and the chunk's true @@ -4321,11 +4705,33 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch * size; reading into a (smaller) buffer of size equal to the unfiltered * chunk size would of course be bad. */ - for (i = 0; i < chunk_list->num_chunk_infos; i++) { - H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[i]; + for (size_t info_idx = 0; info_idx < chunk_list->num_chunk_infos; info_idx++) { + H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[info_idx]; + H5D_mpio_filtered_dset_info_t *cached_dset_info; + hsize_t file_chunk_size; assert(mpi_rank == chunk_entry->new_owner); + /* Find the cached dataset info for the dataset this chunk is in */ + if (num_dset_infos > 1) { + HASH_FIND(hh, chunk_list->dset_info.dset_info_hash_table, &chunk_entry->index_info.dset_oloc_addr, + sizeof(haddr_t), cached_dset_info); + if (cached_dset_info == NULL) { + if (chunk_list->all_dset_indices_empty) + HGOTO_ERROR(H5E_DATASET, H5E_CANTFIND, FAIL, "unable to find cached dataset info entry"); + else { + /* Push an error, but participate in collective read */ + HDONE_ERROR(H5E_DATASET, H5E_CANTFIND, FAIL, "unable to find cached dataset info entry"); + break; + } + } + } + else + cached_dset_info = chunk_list->dset_info.single_dset_info; + assert(cached_dset_info); + + file_chunk_size = cached_dset_info->file_chunk_size; + chunk_entry->chunk_buf_size = MAX(chunk_entry->chunk_current.length, file_chunk_size); /* @@ -4333,29 +4739,41 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch * out fill values to it, make sure to 0-fill its memory buffer * so we don't use uninitialized memory. */ - if (!H5_addr_defined(chunk_entry->chunk_current.offset) && !should_fill) + if (!H5_addr_defined(chunk_entry->chunk_current.offset) && !cached_dset_info->should_fill) chunk_entry->buf = H5MM_calloc(chunk_entry->chunk_buf_size); else chunk_entry->buf = H5MM_malloc(chunk_entry->chunk_buf_size); if (NULL == chunk_entry->buf) { - /* Push an error, but participate in collective read */ - HDONE_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer"); - break; + if (chunk_list->all_dset_indices_empty) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer"); + else { + /* Push an error, but participate in collective read */ + HDONE_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer"); + break; + } } - /* Set chunk's new length for eventual filter pipeline calls */ - if (chunk_entry->need_read) { + if (!chunk_entry->need_read) + /* Set chunk's new length for eventual filter pipeline calls */ + chunk_entry->chunk_new.length = file_chunk_size; + else { /* - * Check if chunk is currently allocated. If not, don't try to - * read it from the file. Instead, just fill the chunk buffer - * with the fill value if fill values are to be written. + * Check whether the chunk needs to be read from the file, based + * on whether the dataset's chunk index is empty or the chunk has + * a defined address in the file. If the chunk doesn't need to be + * read from the file, just fill the chunk buffer with the fill + * value if necessary. */ - if (H5_addr_defined(chunk_entry->chunk_current.offset)) { - /* Set first read buffer */ - if (!base_read_buf) - base_read_buf = chunk_entry->buf; + if (cached_dset_info->index_empty || !H5_addr_defined(chunk_entry->chunk_current.offset)) { + chunk_entry->need_read = false; + /* Update field keeping track of number of chunks to read */ + assert(chunk_list->num_chunks_to_read > 0); + chunk_list->num_chunks_to_read--; + } + + if (chunk_entry->need_read) { /* Set chunk's new length for eventual filter pipeline calls */ if (chunk_entry->skip_filter_pline) chunk_entry->chunk_new.length = file_chunk_size; @@ -4363,81 +4781,57 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch chunk_entry->chunk_new.length = chunk_entry->chunk_current.length; } else { - chunk_entry->need_read = false; - - /* Update field keeping track of number of chunks to read */ - assert(chunk_list->num_chunks_to_read > 0); - chunk_list->num_chunks_to_read--; - /* Set chunk's new length for eventual filter pipeline calls */ chunk_entry->chunk_new.length = file_chunk_size; - if (should_fill) { - /* Initialize fill value buffer if not already initialized */ - if (!fb_info_init) { - hsize_t chunk_dims[H5S_MAX_RANK]; - - assert(di->dset->shared->ndims == di->dset->shared->layout.u.chunk.ndims - 1); - for (size_t j = 0; j < di->dset->shared->layout.u.chunk.ndims - 1; j++) - chunk_dims[j] = (hsize_t)di->dset->shared->layout.u.chunk.dim[j]; + /* Determine if fill values should be "read" for this unallocated chunk */ + if (cached_dset_info->should_fill) { + assert(cached_dset_info->fb_info_init); + assert(cached_dset_info->fb_info.fill_buf); - /* Get a dataspace for filling chunk memory buffers */ - if (NULL == (fill_space = H5S_create_simple( - di->dset->shared->layout.u.chunk.ndims - 1, chunk_dims, NULL))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, - "unable to create chunk fill dataspace"); - - /* Initialize fill value buffer */ - if (H5D__fill_init(&fb_info, NULL, (H5MM_allocate_t)H5D__chunk_mem_alloc, - (void *)&di->dset->shared->dcpl_cache.pline, - (H5MM_free_t)H5D__chunk_mem_free, - (void *)&di->dset->shared->dcpl_cache.pline, - &di->dset->shared->dcpl_cache.fill, di->dset->shared->type, - di->dset->shared->type_id, 0, file_chunk_size) < 0) + /* Write fill value to memory buffer */ + if (H5D__fill(cached_dset_info->fb_info.fill_buf, + cached_dset_info->dset_io_info->type_info.dset_type, chunk_entry->buf, + cached_dset_info->dset_io_info->type_info.mem_type, + cached_dset_info->fill_space) < 0) { + if (chunk_list->all_dset_indices_empty) HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, - "can't initialize fill value buffer"); - - fb_info_init = true; + "couldn't fill chunk buffer with fill value"); + else { + /* Push an error, but participate in collective read */ + HDONE_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, + "couldn't fill chunk buffer with fill value"); + break; + } } - - /* Write fill value to memory buffer */ - assert(fb_info.fill_buf); - if (H5D__fill(fb_info.fill_buf, di->dset->shared->type, chunk_entry->buf, - type_info->mem_type, fill_space) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, - "couldn't fill chunk buffer with fill value"); } } } - else - chunk_entry->chunk_new.length = file_chunk_size; } - /* - * If dataset is incrementally allocated and hasn't been written to - * yet, the chunk index should be empty. In this case, a collective - * read of chunks is essentially a no-op, so avoid it here. - */ - index_empty = false; - if (di->dset->shared->dcpl_cache.fill.alloc_time == H5D_ALLOC_TIME_INCR) - if (H5D__chunk_index_empty(di->dset, &index_empty) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "couldn't determine if chunk index is empty"); - - if (!index_empty) { - /* Perform collective vector read */ + /* Perform collective vector read if necessary */ + if (!chunk_list->all_dset_indices_empty) if (H5D__mpio_collective_filtered_vec_io(chunk_list, io_info->f_sh, H5D_IO_OP_READ) < 0) HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't perform vector I/O on filtered chunks"); - } /* * Now that all owned chunks have been read, update the chunks * with modification data from the owning rank and other ranks. */ + if (chunk_list->num_chunk_infos > 0) { + /* Retrieve filter settings from API context */ + if (H5CX_get_err_detect(&err_detect) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get error detection info"); + if (H5CX_get_filter_cb(&filter_cb) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get I/O filter callback function"); + } + /* Process all chunks with data from the owning rank first */ - for (i = 0; i < chunk_list->num_chunk_infos; i++) { - H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[i]; + for (size_t info_idx = 0; info_idx < chunk_list->num_chunk_infos; info_idx++) { + H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[info_idx]; H5D_piece_info_t *chunk_info = chunk_entry->chunk_info; + hsize_t iter_nelmts; assert(mpi_rank == chunk_entry->new_owner); @@ -4446,7 +4840,7 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch * the file, so we need to unfilter it */ if (chunk_entry->need_read && !chunk_entry->skip_filter_pline) { - if (H5Z_pipeline(&di->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, + if (H5Z_pipeline(&chunk_info->dset_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, &(chunk_entry->index_info.filter_mask), err_detect, filter_cb, (size_t *)&chunk_entry->chunk_new.length, &chunk_entry->chunk_buf_size, &chunk_entry->buf) < 0) @@ -4455,28 +4849,35 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace); - if (H5D_select_io_mem(chunk_entry->buf, chunk_info->fspace, di->buf.cvp, chunk_info->mspace, - type_info->dst_type_size, (size_t)iter_nelmts) < 0) + if (H5D_select_io_mem(chunk_entry->buf, chunk_info->fspace, chunk_info->dset_info->buf.cvp, + chunk_info->mspace, chunk_info->dset_info->type_info.dst_type_size, + (size_t)iter_nelmts) < 0) HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't copy chunk data to write buffer"); } /* Allocate iterator for memory selection */ - if (NULL == (sel_iter = H5FL_MALLOC(H5S_sel_iter_t))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator"); + if (chunk_msg_bufs_len > 0) { + assert(chunk_list->chunk_hash_table_keylen > 0); + if (NULL == (key_buf = H5MM_malloc(chunk_list->chunk_hash_table_keylen))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate hash table key buffer"); + + if (NULL == (sel_iter = H5FL_MALLOC(H5S_sel_iter_t))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator"); + } /* Now process all received chunk message buffers */ - for (i = 0; i < (size_t)chunk_msg_bufs_len; i++) { + for (size_t buf_idx = 0; buf_idx < (size_t)chunk_msg_bufs_len; buf_idx++) { H5D_filtered_collective_chunk_info_t *chunk_entry = NULL; - const unsigned char *msg_ptr = chunk_msg_bufs[i]; - hsize_t chunk_idx; + const unsigned char *msg_ptr = chunk_msg_bufs[buf_idx]; if (msg_ptr) { - /* Retrieve the chunk's index value */ - H5MM_memcpy(&chunk_idx, msg_ptr, sizeof(hsize_t)); - msg_ptr += sizeof(hsize_t); + /* Retrieve the chunk hash table key from the chunk message buffer */ + H5MM_memcpy(key_buf, msg_ptr, chunk_list->chunk_hash_table_keylen); + msg_ptr += chunk_list->chunk_hash_table_keylen; - /* Find the chunk entry according to its chunk index */ - HASH_FIND(hh, chunk_list->chunk_hash_table, &chunk_idx, sizeof(hsize_t), chunk_entry); + /* Find the chunk entry according to its chunk hash table key */ + HASH_FIND(hh, chunk_list->chunk_hash_table, key_buf, chunk_list->chunk_hash_table_keylen, + chunk_entry); if (chunk_entry == NULL) HGOTO_ERROR(H5E_DATASET, H5E_CANTFIND, FAIL, "unable to find chunk entry"); if (mpi_rank != chunk_entry->new_owner) @@ -4491,11 +4892,14 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch if (!chunk_entry->buf) continue; else { + hsize_t iter_nelmts; + /* Decode the chunk file dataspace from the message */ if (NULL == (dataspace = H5S_decode(&msg_ptr))) HGOTO_ERROR(H5E_DATASET, H5E_CANTDECODE, FAIL, "unable to decode dataspace"); - if (H5S_select_iter_init(sel_iter, dataspace, type_info->dst_type_size, + if (H5S_select_iter_init(sel_iter, dataspace, + chunk_entry->chunk_info->dset_info->type_info.dst_type_size, H5S_SEL_ITER_SHARE_WITH_DATASPACE) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information"); @@ -4517,50 +4921,49 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch dataspace = NULL; } - H5MM_free(chunk_msg_bufs[i]); - chunk_msg_bufs[i] = NULL; + H5MM_free(chunk_msg_bufs[buf_idx]); + chunk_msg_bufs[buf_idx] = NULL; } } } /* Finally, filter all the chunks */ - for (i = 0; i < chunk_list->num_chunk_infos; i++) { - if (!chunk_list->chunk_infos[i].skip_filter_pline) { - if (H5Z_pipeline(&di->dset->shared->dcpl_cache.pline, 0, - &(chunk_list->chunk_infos[i].index_info.filter_mask), err_detect, filter_cb, - (size_t *)&chunk_list->chunk_infos[i].chunk_new.length, - &chunk_list->chunk_infos[i].chunk_buf_size, &chunk_list->chunk_infos[i].buf) < 0) + for (size_t info_idx = 0; info_idx < chunk_list->num_chunk_infos; info_idx++) { + if (!chunk_list->chunk_infos[info_idx].skip_filter_pline) { + if (H5Z_pipeline( + &chunk_list->chunk_infos[info_idx].chunk_info->dset_info->dset->shared->dcpl_cache.pline, + 0, &(chunk_list->chunk_infos[info_idx].index_info.filter_mask), err_detect, filter_cb, + (size_t *)&chunk_list->chunk_infos[info_idx].chunk_new.length, + &chunk_list->chunk_infos[info_idx].chunk_buf_size, + &chunk_list->chunk_infos[info_idx].buf) < 0) HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, FAIL, "output pipeline failed"); } #if H5_SIZEOF_SIZE_T > 4 /* Check for the chunk expanding too much to encode in a 32-bit value */ - if (chunk_list->chunk_infos[i].chunk_new.length > ((size_t)0xffffffff)) + if (chunk_list->chunk_infos[info_idx].chunk_new.length > ((size_t)0xffffffff)) HGOTO_ERROR(H5E_DATASET, H5E_BADRANGE, FAIL, "chunk too large for 32-bit length"); #endif } done: + if (dataspace && (H5S_close(dataspace) < 0)) + HDONE_ERROR(H5E_DATASPACE, H5E_CANTFREE, FAIL, "can't close dataspace"); + if (sel_iter) { if (sel_iter_init && H5S_SELECT_ITER_RELEASE(sel_iter) < 0) HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator"); sel_iter = H5FL_FREE(H5S_sel_iter_t, sel_iter); } - if (dataspace && (H5S_close(dataspace) < 0)) - HDONE_ERROR(H5E_DATASPACE, H5E_CANTFREE, FAIL, "can't close dataspace"); - if (fill_space && (H5S_close(fill_space) < 0)) - HDONE_ERROR(H5E_DATASET, H5E_CLOSEERROR, FAIL, "can't close fill space"); - /* Release the fill buffer info, if it's been initialized */ - if (fb_info_init && H5D__fill_term(&fb_info) < 0) - HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "Can't release fill buffer info"); + H5MM_free(key_buf); /* On failure, try to free all resources used by entries in the chunk list */ if (ret_value < 0) { - for (i = 0; i < chunk_list->num_chunk_infos; i++) { - if (chunk_list->chunk_infos[i].buf) { - H5MM_free(chunk_list->chunk_infos[i].buf); - chunk_list->chunk_infos[i].buf = NULL; + for (size_t info_idx = 0; info_idx < chunk_list->num_chunk_infos; info_idx++) { + if (chunk_list->chunk_infos[info_idx].buf) { + H5MM_free(chunk_list->chunk_infos[info_idx].buf); + chunk_list->chunk_infos[info_idx].buf = NULL; } } } @@ -4589,7 +4992,7 @@ done: static herr_t H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t *chunk_list, size_t *num_chunks_assigned_map, H5D_io_info_t *io_info, - H5D_chk_idx_info_t *idx_info, int mpi_rank, int mpi_size) + size_t num_dset_infos, int mpi_rank, int mpi_size) { H5D_chunk_alloc_info_t *collective_list = NULL; MPI_Datatype send_type; @@ -4599,11 +5002,10 @@ H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t bool need_sort = false; size_t collective_num_entries = 0; size_t num_local_chunks_processed = 0; - size_t i; - void *gathered_array = NULL; - int *counts_disps_array = NULL; - int *counts_ptr = NULL; - int *displacements_ptr = NULL; + void *gathered_array = NULL; + int *counts_disps_array = NULL; + int *counts_ptr = NULL; + int *displacements_ptr = NULL; int mpi_code; herr_t ret_value = SUCCEED; @@ -4611,8 +5013,6 @@ H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t assert(chunk_list); assert(io_info); - assert(idx_info); - assert(idx_info->storage->idx_type != H5D_CHUNK_IDX_NONE); #ifdef H5Dmpio_DEBUG H5D_MPIO_TRACE_ENTER(mpi_rank); @@ -4651,15 +5051,15 @@ H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t /* Set the receive counts from the assigned chunks map */ counts_ptr = counts_disps_array; - for (i = 0; i < (size_t)mpi_size; i++) - H5_CHECKED_ASSIGN(counts_ptr[i], int, num_chunks_assigned_map[i], size_t); + for (int curr_rank = 0; curr_rank < mpi_size; curr_rank++) + H5_CHECKED_ASSIGN(counts_ptr[curr_rank], int, num_chunks_assigned_map[curr_rank], size_t); /* Set the displacements into the receive buffer for the gather operation */ displacements_ptr = &counts_disps_array[mpi_size]; *displacements_ptr = 0; - for (i = 1; i < (size_t)mpi_size; i++) - displacements_ptr[i] = displacements_ptr[i - 1] + counts_ptr[i - 1]; + for (int curr_rank = 1; curr_rank < mpi_size; curr_rank++) + displacements_ptr[curr_rank] = displacements_ptr[curr_rank - 1] + counts_ptr[curr_rank - 1]; } /* Perform gather operation */ @@ -4685,14 +5085,27 @@ H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t } /* Collectively re-allocate the modified chunks (from each rank) in the file */ - collective_list = (H5D_chunk_alloc_info_t *)gathered_array; - for (i = 0, num_local_chunks_processed = 0; i < collective_num_entries; i++) { - H5D_chunk_alloc_info_t *coll_entry = &collective_list[i]; - bool need_insert; - bool update_local_chunk; - - if (H5D__chunk_file_alloc(idx_info, &coll_entry->chunk_current, &coll_entry->chunk_new, &need_insert, - NULL) < 0) + collective_list = (H5D_chunk_alloc_info_t *)gathered_array; + num_local_chunks_processed = 0; + for (size_t entry_idx = 0; entry_idx < collective_num_entries; entry_idx++) { + H5D_mpio_filtered_dset_info_t *cached_dset_info; + H5D_chunk_alloc_info_t *coll_entry = &collective_list[entry_idx]; + bool need_insert; + bool update_local_chunk; + + /* Find the cached dataset info for the dataset this chunk is in */ + if (num_dset_infos > 1) { + HASH_FIND(hh, chunk_list->dset_info.dset_info_hash_table, &coll_entry->dset_oloc_addr, + sizeof(haddr_t), cached_dset_info); + if (cached_dset_info == NULL) + HGOTO_ERROR(H5E_DATASET, H5E_CANTFIND, FAIL, "unable to find cached dataset info entry"); + } + else + cached_dset_info = chunk_list->dset_info.single_dset_info; + assert(cached_dset_info); + + if (H5D__chunk_file_alloc(&cached_dset_info->chunk_idx_info, &coll_entry->chunk_current, + &coll_entry->chunk_new, &need_insert, NULL) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk"); /* @@ -4700,9 +5113,12 @@ H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t * rank, make sure to update the chunk entry in the local * chunk list */ - update_local_chunk = (num_local_chunks_processed < chunk_list->num_chunk_infos) && - (coll_entry->chunk_idx == - chunk_list->chunk_infos[num_local_chunks_processed].index_info.chunk_idx); + update_local_chunk = + (num_local_chunks_processed < chunk_list->num_chunk_infos) && + (coll_entry->dset_oloc_addr == + chunk_list->chunk_infos[num_local_chunks_processed].index_info.dset_oloc_addr) && + (coll_entry->chunk_idx == + chunk_list->chunk_infos[num_local_chunks_processed].index_info.chunk_idx); if (update_local_chunk) { H5D_filtered_collective_chunk_info_t *local_chunk; @@ -4782,38 +5198,35 @@ done: static herr_t H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t *chunk_list, size_t *num_chunks_assigned_map, H5D_io_info_t *io_info, - H5D_dset_io_info_t *di, H5D_chk_idx_info_t *idx_info, - int mpi_rank, int mpi_size) + size_t num_dset_infos, int mpi_rank, int mpi_size) { - H5D_chunk_ud_t chunk_ud; - MPI_Datatype send_type; - MPI_Datatype recv_type; - bool send_type_derived = false; - bool recv_type_derived = false; - hsize_t scaled_coords[H5O_LAYOUT_NDIMS]; - size_t collective_num_entries = 0; - size_t i; - void *gathered_array = NULL; - int *counts_disps_array = NULL; - int *counts_ptr = NULL; - int *displacements_ptr = NULL; - int mpi_code; - herr_t ret_value = SUCCEED; + MPI_Datatype send_type; + MPI_Datatype recv_type; + size_t collective_num_entries = 0; + bool send_type_derived = false; + bool recv_type_derived = false; + void *gathered_array = NULL; + int *counts_disps_array = NULL; + int *counts_ptr = NULL; + int *displacements_ptr = NULL; + int mpi_code; + herr_t ret_value = SUCCEED; FUNC_ENTER_PACKAGE assert(chunk_list); assert(io_info); - assert(di); - assert(idx_info); #ifdef H5Dmpio_DEBUG H5D_MPIO_TRACE_ENTER(mpi_rank); H5D_MPIO_TIME_START(mpi_rank, "Reinsertion of modified chunks into chunk index"); #endif - /* Only re-insert chunks if index has an insert method */ - if (!idx_info->storage->ops->insert) + /* + * If no datasets involved have a chunk index 'insert' + * operation, this function is a no-op + */ + if (chunk_list->no_dset_index_insert_methods) HGOTO_DONE(SUCCEED); /* @@ -4848,15 +5261,15 @@ H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t * /* Set the receive counts from the assigned chunks map */ counts_ptr = counts_disps_array; - for (i = 0; i < (size_t)mpi_size; i++) - H5_CHECKED_ASSIGN(counts_ptr[i], int, num_chunks_assigned_map[i], size_t); + for (int curr_rank = 0; curr_rank < mpi_size; curr_rank++) + H5_CHECKED_ASSIGN(counts_ptr[curr_rank], int, num_chunks_assigned_map[curr_rank], size_t); /* Set the displacements into the receive buffer for the gather operation */ displacements_ptr = &counts_disps_array[mpi_size]; *displacements_ptr = 0; - for (i = 1; i < (size_t)mpi_size; i++) - displacements_ptr[i] = displacements_ptr[i - 1] + counts_ptr[i - 1]; + for (int curr_rank = 1; curr_rank < mpi_size; curr_rank++) + displacements_ptr[curr_rank] = displacements_ptr[curr_rank - 1] + counts_ptr[curr_rank - 1]; } /* Perform gather operation */ @@ -4881,11 +5294,12 @@ H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t * "can't gather chunk index re-insertion info to/from ranks"); } - /* Initialize static chunk udata fields from chunk index info */ - H5D_MPIO_INIT_CHUNK_UD_INFO(chunk_ud, idx_info); - - for (i = 0; i < collective_num_entries; i++) { - H5D_chunk_insert_info_t *coll_entry = &((H5D_chunk_insert_info_t *)gathered_array)[i]; + for (size_t entry_idx = 0; entry_idx < collective_num_entries; entry_idx++) { + H5D_mpio_filtered_dset_info_t *cached_dset_info; + H5D_chunk_insert_info_t *coll_entry = &((H5D_chunk_insert_info_t *)gathered_array)[entry_idx]; + H5D_chunk_ud_t chunk_ud; + haddr_t prev_tag = HADDR_UNDEF; + hsize_t scaled_coords[H5O_LAYOUT_NDIMS]; /* * We only need to reinsert this chunk if we had to actually @@ -4894,13 +5308,28 @@ H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t * if (!coll_entry->index_info.need_insert) continue; - chunk_ud.chunk_block = coll_entry->chunk_block; - chunk_ud.chunk_idx = coll_entry->index_info.chunk_idx; - chunk_ud.filter_mask = coll_entry->index_info.filter_mask; - chunk_ud.common.scaled = scaled_coords; + /* Find the cached dataset info for the dataset this chunk is in */ + if (num_dset_infos > 1) { + HASH_FIND(hh, chunk_list->dset_info.dset_info_hash_table, &coll_entry->index_info.dset_oloc_addr, + sizeof(haddr_t), cached_dset_info); + if (cached_dset_info == NULL) + HGOTO_ERROR(H5E_DATASET, H5E_CANTFIND, FAIL, "unable to find cached dataset info entry"); + } + else + cached_dset_info = chunk_list->dset_info.single_dset_info; + assert(cached_dset_info); + + chunk_ud.common.layout = cached_dset_info->chunk_idx_info.layout; + chunk_ud.common.storage = cached_dset_info->chunk_idx_info.storage; + chunk_ud.common.scaled = scaled_coords; + + chunk_ud.chunk_block = coll_entry->chunk_block; + chunk_ud.chunk_idx = coll_entry->index_info.chunk_idx; + chunk_ud.filter_mask = coll_entry->index_info.filter_mask; /* Calculate scaled coordinates for the chunk */ - if (idx_info->layout->idx_type == H5D_CHUNK_IDX_EARRAY && idx_info->layout->u.earray.unlim_dim > 0) { + if (cached_dset_info->chunk_idx_info.layout->idx_type == H5D_CHUNK_IDX_EARRAY && + cached_dset_info->chunk_idx_info.layout->u.earray.unlim_dim > 0) { /* * Extensible arrays where the unlimited dimension is not * the slowest-changing dimension "swizzle" the coordinates @@ -4914,17 +5343,20 @@ H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t * * callback that accepts a chunk index and provides the * caller with the scaled coordinates for that chunk. */ - H5VM_array_calc_pre(chunk_ud.chunk_idx, di->dset->shared->ndims, - idx_info->layout->u.earray.swizzled_down_chunks, scaled_coords); + H5VM_array_calc_pre(chunk_ud.chunk_idx, cached_dset_info->dset_io_info->dset->shared->ndims, + cached_dset_info->chunk_idx_info.layout->u.earray.swizzled_down_chunks, + scaled_coords); - H5VM_unswizzle_coords(hsize_t, scaled_coords, idx_info->layout->u.earray.unlim_dim); + H5VM_unswizzle_coords(hsize_t, scaled_coords, + cached_dset_info->chunk_idx_info.layout->u.earray.unlim_dim); } else { - H5VM_array_calc_pre(chunk_ud.chunk_idx, di->dset->shared->ndims, - di->dset->shared->layout.u.chunk.down_chunks, scaled_coords); + H5VM_array_calc_pre(chunk_ud.chunk_idx, cached_dset_info->dset_io_info->dset->shared->ndims, + cached_dset_info->dset_io_info->dset->shared->layout.u.chunk.down_chunks, + scaled_coords); } - scaled_coords[di->dset->shared->ndims] = 0; + scaled_coords[cached_dset_info->dset_io_info->dset->shared->ndims] = 0; #ifndef NDEBUG /* @@ -4936,10 +5368,18 @@ H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t * * they match. */ for (size_t dbg_idx = 0; dbg_idx < chunk_list->num_chunk_infos; dbg_idx++) { - if (coll_entry->index_info.chunk_idx == chunk_list->chunk_infos[dbg_idx].index_info.chunk_idx) { + bool same_chunk; + + /* Chunks must have the same index and reside in the same dataset */ + same_chunk = (0 == H5_addr_cmp(coll_entry->index_info.dset_oloc_addr, + chunk_list->chunk_infos[dbg_idx].index_info.dset_oloc_addr)); + same_chunk = same_chunk && (coll_entry->index_info.chunk_idx == + chunk_list->chunk_infos[dbg_idx].index_info.chunk_idx); + + if (same_chunk) { bool coords_match = !memcmp(scaled_coords, chunk_list->chunk_infos[dbg_idx].chunk_info->scaled, - di->dset->shared->ndims * sizeof(hsize_t)); + cached_dset_info->dset_io_info->dset->shared->ndims * sizeof(hsize_t)); assert(coords_match && "Calculated scaled coordinates for chunk didn't match " "chunk's actual scaled coordinates!"); @@ -4948,8 +5388,15 @@ H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t * } #endif - if ((idx_info->storage->ops->insert)(idx_info, &chunk_ud, di->dset) < 0) + /* Set metadata tagging with dataset oheader addr */ + H5AC_tag(cached_dset_info->dset_io_info->dset->oloc.addr, &prev_tag); + + if ((cached_dset_info->chunk_idx_info.storage->ops->insert)( + &cached_dset_info->chunk_idx_info, &chunk_ud, cached_dset_info->dset_io_info->dset) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTINSERT, FAIL, "unable to insert chunk address into index"); + + /* Reset metadata tagging */ + H5AC_tag(prev_tag, NULL); } done: @@ -5005,9 +5452,9 @@ H5D__mpio_get_chunk_redistribute_info_types(MPI_Datatype *contig_type, bool *con bool struct_type_derived = false; MPI_Datatype chunk_block_type = MPI_DATATYPE_NULL; bool chunk_block_type_derived = false; - MPI_Datatype types[5]; - MPI_Aint displacements[5]; - int block_lengths[5]; + MPI_Datatype types[6]; + MPI_Aint displacements[6]; + int block_lengths[6]; int field_count; int mpi_code; herr_t ret_value = SUCCEED; @@ -5026,29 +5473,32 @@ H5D__mpio_get_chunk_redistribute_info_types(MPI_Datatype *contig_type, bool *con if (H5F_mpi_get_file_block_type(false, &chunk_block_type, &chunk_block_type_derived) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't create derived type for chunk file description"); - field_count = 5; + field_count = 6; assert(field_count == (sizeof(types) / sizeof(MPI_Datatype))); /* * Create structure type to pack chunk H5F_block_t structure - * next to chunk_idx, orig_owner, new_owner and num_writers - * fields + * next to chunk_idx, dset_oloc_addr, orig_owner, new_owner + * and num_writers fields */ block_lengths[0] = 1; block_lengths[1] = 1; block_lengths[2] = 1; block_lengths[3] = 1; block_lengths[4] = 1; + block_lengths[5] = 1; displacements[0] = offsetof(H5D_chunk_redistribute_info_t, chunk_block); displacements[1] = offsetof(H5D_chunk_redistribute_info_t, chunk_idx); - displacements[2] = offsetof(H5D_chunk_redistribute_info_t, orig_owner); - displacements[3] = offsetof(H5D_chunk_redistribute_info_t, new_owner); - displacements[4] = offsetof(H5D_chunk_redistribute_info_t, num_writers); + displacements[2] = offsetof(H5D_chunk_redistribute_info_t, dset_oloc_addr); + displacements[3] = offsetof(H5D_chunk_redistribute_info_t, orig_owner); + displacements[4] = offsetof(H5D_chunk_redistribute_info_t, new_owner); + displacements[5] = offsetof(H5D_chunk_redistribute_info_t, num_writers); types[0] = chunk_block_type; types[1] = HSIZE_AS_MPI_TYPE; - types[2] = MPI_INT; + types[2] = HADDR_AS_MPI_TYPE; types[3] = MPI_INT; types[4] = MPI_INT; + types[5] = MPI_INT; if (MPI_SUCCESS != (mpi_code = MPI_Type_create_struct(field_count, block_lengths, displacements, types, contig_type))) HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_struct failed", mpi_code) @@ -5057,25 +5507,28 @@ H5D__mpio_get_chunk_redistribute_info_types(MPI_Datatype *contig_type, bool *con if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(contig_type))) HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) - /* Create struct type to extract the chunk_current, chunk_idx, orig_owner, - * new_owner and num_writers fields from a H5D_filtered_collective_chunk_info_t - * structure + /* Create struct type to extract the chunk_current, chunk_idx, + * dset_oloc_addr, orig_owner, new_owner and num_writers fields + * from a H5D_filtered_collective_chunk_info_t structure */ block_lengths[0] = 1; block_lengths[1] = 1; block_lengths[2] = 1; block_lengths[3] = 1; block_lengths[4] = 1; + block_lengths[5] = 1; displacements[0] = offsetof(H5D_filtered_collective_chunk_info_t, chunk_current); displacements[1] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.chunk_idx); - displacements[2] = offsetof(H5D_filtered_collective_chunk_info_t, orig_owner); - displacements[3] = offsetof(H5D_filtered_collective_chunk_info_t, new_owner); - displacements[4] = offsetof(H5D_filtered_collective_chunk_info_t, num_writers); + displacements[2] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.dset_oloc_addr); + displacements[3] = offsetof(H5D_filtered_collective_chunk_info_t, orig_owner); + displacements[4] = offsetof(H5D_filtered_collective_chunk_info_t, new_owner); + displacements[5] = offsetof(H5D_filtered_collective_chunk_info_t, num_writers); types[0] = chunk_block_type; types[1] = HSIZE_AS_MPI_TYPE; - types[2] = MPI_INT; + types[2] = HADDR_AS_MPI_TYPE; types[3] = MPI_INT; types[4] = MPI_INT; + types[5] = MPI_INT; if (MPI_SUCCESS != (mpi_code = MPI_Type_create_struct(field_count, block_lengths, displacements, types, &struct_type))) HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_struct failed", mpi_code) @@ -5146,9 +5599,9 @@ H5D__mpio_get_chunk_alloc_info_types(MPI_Datatype *contig_type, bool *contig_typ bool struct_type_derived = false; MPI_Datatype chunk_block_type = MPI_DATATYPE_NULL; bool chunk_block_type_derived = false; - MPI_Datatype types[3]; - MPI_Aint displacements[3]; - int block_lengths[3]; + MPI_Datatype types[4]; + MPI_Aint displacements[4]; + int block_lengths[4]; int field_count; int mpi_code; herr_t ret_value = SUCCEED; @@ -5167,22 +5620,25 @@ H5D__mpio_get_chunk_alloc_info_types(MPI_Datatype *contig_type, bool *contig_typ if (H5F_mpi_get_file_block_type(false, &chunk_block_type, &chunk_block_type_derived) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't create derived type for chunk file description"); - field_count = 3; + field_count = 4; assert(field_count == (sizeof(types) / sizeof(MPI_Datatype))); /* * Create structure type to pack both chunk H5F_block_t structures - * next to chunk_idx field + * next to chunk_idx and dset_oloc_addr fields */ block_lengths[0] = 1; block_lengths[1] = 1; block_lengths[2] = 1; + block_lengths[3] = 1; displacements[0] = offsetof(H5D_chunk_alloc_info_t, chunk_current); displacements[1] = offsetof(H5D_chunk_alloc_info_t, chunk_new); displacements[2] = offsetof(H5D_chunk_alloc_info_t, chunk_idx); + displacements[3] = offsetof(H5D_chunk_alloc_info_t, dset_oloc_addr); types[0] = chunk_block_type; types[1] = chunk_block_type; types[2] = HSIZE_AS_MPI_TYPE; + types[3] = HADDR_AS_MPI_TYPE; if (MPI_SUCCESS != (mpi_code = MPI_Type_create_struct(field_count, block_lengths, displacements, types, contig_type))) HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_struct failed", mpi_code) @@ -5192,18 +5648,22 @@ H5D__mpio_get_chunk_alloc_info_types(MPI_Datatype *contig_type, bool *contig_typ HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) /* - * Create struct type to extract the chunk_current, chunk_new and chunk_idx - * fields from a H5D_filtered_collective_chunk_info_t structure + * Create struct type to extract the chunk_current, chunk_new, chunk_idx + * and dset_oloc_addr fields from a H5D_filtered_collective_chunk_info_t + * structure */ block_lengths[0] = 1; block_lengths[1] = 1; block_lengths[2] = 1; + block_lengths[3] = 1; displacements[0] = offsetof(H5D_filtered_collective_chunk_info_t, chunk_current); displacements[1] = offsetof(H5D_filtered_collective_chunk_info_t, chunk_new); displacements[2] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.chunk_idx); + displacements[3] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.dset_oloc_addr); types[0] = chunk_block_type; types[1] = chunk_block_type; types[2] = HSIZE_AS_MPI_TYPE; + types[3] = HADDR_AS_MPI_TYPE; if (MPI_SUCCESS != (mpi_code = MPI_Type_create_struct(field_count, block_lengths, displacements, types, &struct_type))) HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_struct failed", mpi_code) @@ -5277,9 +5737,9 @@ H5D__mpio_get_chunk_insert_info_types(MPI_Datatype *contig_type, bool *contig_ty MPI_Datatype chunk_block_type = MPI_DATATYPE_NULL; bool chunk_block_type_derived = false; MPI_Aint contig_type_extent; - MPI_Datatype types[4]; - MPI_Aint displacements[4]; - int block_lengths[4]; + MPI_Datatype types[5]; + MPI_Aint displacements[5]; + int block_lengths[5]; int field_count; int mpi_code; herr_t ret_value = SUCCEED; @@ -5298,7 +5758,7 @@ H5D__mpio_get_chunk_insert_info_types(MPI_Datatype *contig_type, bool *contig_ty if (H5F_mpi_get_file_block_type(false, &chunk_block_type, &chunk_block_type_derived) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't create derived type for chunk file description"); - field_count = 4; + field_count = 5; assert(field_count == (sizeof(types) / sizeof(MPI_Datatype))); /* @@ -5311,14 +5771,17 @@ H5D__mpio_get_chunk_insert_info_types(MPI_Datatype *contig_type, bool *contig_ty block_lengths[1] = 1; block_lengths[2] = 1; block_lengths[3] = 1; + block_lengths[4] = 1; displacements[0] = offsetof(H5D_chunk_insert_info_t, chunk_block); displacements[1] = offsetof(H5D_chunk_insert_info_t, index_info.chunk_idx); - displacements[2] = offsetof(H5D_chunk_insert_info_t, index_info.filter_mask); - displacements[3] = offsetof(H5D_chunk_insert_info_t, index_info.need_insert); + displacements[2] = offsetof(H5D_chunk_insert_info_t, index_info.dset_oloc_addr); + displacements[3] = offsetof(H5D_chunk_insert_info_t, index_info.filter_mask); + displacements[4] = offsetof(H5D_chunk_insert_info_t, index_info.need_insert); types[0] = chunk_block_type; types[1] = HSIZE_AS_MPI_TYPE; - types[2] = MPI_UNSIGNED; - types[3] = MPI_C_BOOL; + types[2] = HADDR_AS_MPI_TYPE; + types[3] = MPI_UNSIGNED; + types[4] = MPI_C_BOOL; if (MPI_SUCCESS != (mpi_code = MPI_Type_create_struct(field_count, block_lengths, displacements, types, &struct_type))) HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_struct failed", mpi_code) @@ -5344,8 +5807,9 @@ H5D__mpio_get_chunk_insert_info_types(MPI_Datatype *contig_type, bool *contig_ty */ displacements[0] = offsetof(H5D_filtered_collective_chunk_info_t, chunk_new); displacements[1] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.chunk_idx); - displacements[2] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.filter_mask); - displacements[3] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.need_insert); + displacements[2] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.dset_oloc_addr); + displacements[3] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.filter_mask); + displacements[4] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.need_insert); if (MPI_SUCCESS != (mpi_code = MPI_Type_create_struct(field_count, block_lengths, displacements, types, &struct_type))) HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_struct failed", mpi_code) @@ -5552,6 +6016,8 @@ H5D__mpio_dump_collective_filtered_chunk_list(H5D_filtered_collective_io_info_t chunk_rank < 3 ? 0 : chunk_entry->chunk_info->scaled[2], chunk_rank < 4 ? 0 : chunk_entry->chunk_info->scaled[3]); H5D_MPIO_DEBUG_VA(mpi_rank, " Chunk Index: %" PRIuHSIZE, chunk_entry->index_info.chunk_idx); + H5D_MPIO_DEBUG_VA(mpi_rank, " Dataset Object Header Address: %" PRIuHADDR, + chunk_entry->index_info.dset_oloc_addr); H5D_MPIO_DEBUG_VA(mpi_rank, " Filter Mask: %u", chunk_entry->index_info.filter_mask); H5D_MPIO_DEBUG_VA(mpi_rank, " Need Insert: %s", chunk_entry->index_info.need_insert ? "YES" : "NO"); diff --git a/src/H5Dpkg.h b/src/H5Dpkg.h index 5ffb235..82fec0e 100644 --- a/src/H5Dpkg.h +++ b/src/H5Dpkg.h @@ -254,6 +254,7 @@ typedef struct H5D_piece_info_t { unsigned mspace_shared; /* Indicate that the memory space for a chunk is shared and shouldn't be freed */ bool in_place_tconv; /* Whether to perform type conversion in-place */ size_t buf_off; /* Buffer offset for in-place type conversion */ + bool filtered_dset; /* Whether the dataset this chunk is in has filters applied */ struct H5D_dset_io_info_t *dset_info; /* Pointer to dset_info */ } H5D_piece_info_t; @@ -292,26 +293,28 @@ typedef struct H5D_io_info_t { #endif /* H5_HAVE_PARALLEL */ H5D_md_io_ops_t md_io_ops; /* Multi dataset I/O operation function pointers */ H5D_io_op_type_t op_type; - size_t count; /* Number of datasets in I/O request */ - H5D_dset_io_info_t *dsets_info; /* dsets info where I/O is done to/from */ - size_t piece_count; /* Number of pieces in I/O request */ - size_t pieces_added; /* Number of pieces added so far to arrays */ - H5D_piece_info_t **sel_pieces; /* Array of info struct for all pieces in I/O */ - H5S_t **mem_spaces; /* Array of chunk memory spaces */ - H5S_t **file_spaces; /* Array of chunk file spaces */ - haddr_t *addrs; /* Array of chunk addresses */ - size_t *element_sizes; /* Array of element sizes */ - void **rbufs; /* Array of read buffers */ - const void **wbufs; /* Array of write buffers */ - haddr_t store_faddr; /* lowest file addr for read/write */ - H5_flexible_const_ptr_t base_maddr; /* starting mem address */ - H5D_selection_io_mode_t use_select_io; /* Whether to use selection I/O */ - uint8_t *tconv_buf; /* Datatype conv buffer */ - bool tconv_buf_allocated; /* Whether the type conversion buffer was allocated */ - size_t tconv_buf_size; /* Size of type conversion buffer */ - uint8_t *bkg_buf; /* Background buffer */ - bool bkg_buf_allocated; /* Whether the background buffer was allocated */ - size_t bkg_buf_size; /* Size of background buffer */ + size_t count; /* Number of datasets in I/O request */ + size_t filtered_count; /* Number of datasets with filters applied in I/O request */ + H5D_dset_io_info_t *dsets_info; /* dsets info where I/O is done to/from */ + size_t piece_count; /* Number of pieces in I/O request */ + size_t pieces_added; /* Number of pieces added so far to arrays */ + size_t filtered_pieces_added; /* Number of filtered pieces in I/O request */ + H5D_piece_info_t **sel_pieces; /* Array of info struct for all pieces in I/O */ + H5S_t **mem_spaces; /* Array of chunk memory spaces */ + H5S_t **file_spaces; /* Array of chunk file spaces */ + haddr_t *addrs; /* Array of chunk addresses */ + size_t *element_sizes; /* Array of element sizes */ + void **rbufs; /* Array of read buffers */ + const void **wbufs; /* Array of write buffers */ + haddr_t store_faddr; /* lowest file addr for read/write */ + H5_flexible_const_ptr_t base_maddr; /* starting mem address */ + H5D_selection_io_mode_t use_select_io; /* Whether to use selection I/O */ + uint8_t *tconv_buf; /* Datatype conv buffer */ + bool tconv_buf_allocated; /* Whether the type conversion buffer was allocated */ + size_t tconv_buf_size; /* Size of type conversion buffer */ + uint8_t *bkg_buf; /* Background buffer */ + bool bkg_buf_allocated; /* Whether the background buffer was allocated */ + size_t bkg_buf_size; /* Size of background buffer */ size_t max_tconv_type_size; /* Largest of all source and destination type sizes involved in type conversion */ bool must_fill_bkg; /* Whether any datasets need a background buffer filled with destination contents */ diff --git a/src/H5FDmpio.c b/src/H5FDmpio.c index 5e2668b..7141550 100644 --- a/src/H5FDmpio.c +++ b/src/H5FDmpio.c @@ -3025,9 +3025,9 @@ H5FD__mpio_read_selection(H5FD_t *_file, H5FD_mem_t type, hid_t H5_ATTR_UNUSED d * s_bufs[] to find the smallest value, and choose that for * mpi_bufs_base. */ - j = 0; /* guess at the index of the smallest value of s_bufs[] */ - if (s_bufs[j + 1].vp != NULL) { + j = 0; /* guess at the index of the smallest value of s_bufs[] */ + if ((count > 1) && (s_bufs[1].vp != NULL)) { for (i = 1; i < count; i++) if (s_bufs[i].vp < s_bufs[j].vp) j = i; @@ -3375,9 +3375,9 @@ H5FD__mpio_write_selection(H5FD_t *_file, H5FD_mem_t type, hid_t H5_ATTR_UNUSED * s_bufs[] to find the smallest value, and choose that for * mpi_bufs_base. */ - j = 0; /* guess at the index of the smallest value of s_bufs[] */ - if (s_bufs[j + 1].cvp != NULL) { + j = 0; /* guess at the index of the smallest value of s_bufs[] */ + if ((count > 1) && (s_bufs[1].cvp != NULL)) { for (i = 1; i < count; i++) if (s_bufs[i].cvp < s_bufs[j].cvp) j = i; diff --git a/src/H5Fmpi.c b/src/H5Fmpi.c index 4abc226..8a8fdc1 100644 --- a/src/H5Fmpi.c +++ b/src/H5Fmpi.c @@ -408,15 +408,38 @@ done: bool H5F_get_coll_metadata_reads(const H5F_t *file) { + FUNC_ENTER_NOAPI_NOERR + + assert(file && file->shared); + + FUNC_LEAVE_NOAPI(H5F_shared_get_coll_metadata_reads(file->shared)); +} /* end H5F_get_coll_metadata_reads() */ + +/*------------------------------------------------------------------------- + * Function: H5F_shared_get_coll_metadata_reads + * + * Purpose: Determines whether collective metadata reads should be + * performed. This routine is meant to be the single source of + * truth for the collective metadata reads status, as it + * coordinates between the file-global flag and the flag set + * for the current operation in the current API context. + * + * Return: true/false (can't fail) + * + *------------------------------------------------------------------------- + */ +bool +H5F_shared_get_coll_metadata_reads(const H5F_shared_t *f_sh) +{ H5P_coll_md_read_flag_t file_flag = H5P_USER_FALSE; bool ret_value = false; FUNC_ENTER_NOAPI_NOERR - assert(file && file->shared); + assert(f_sh); /* Retrieve the file-global flag */ - file_flag = H5F_COLL_MD_READ(file); + file_flag = H5F_SHARED_COLL_MD_READ(f_sh); /* If file flag is set to H5P_FORCE_FALSE, exit early * with false, since collective metadata reads have @@ -442,7 +465,7 @@ H5F_get_coll_metadata_reads(const H5F_t *file) } FUNC_LEAVE_NOAPI(ret_value) -} /* end H5F_get_coll_metadata_reads() */ +} /* end H5F_shared_get_coll_metadata_reads() */ /*------------------------------------------------------------------------- * Function: H5F_set_coll_metadata_reads diff --git a/src/H5Fprivate.h b/src/H5Fprivate.h index 5b232c5..9adbf3a 100644 --- a/src/H5Fprivate.h +++ b/src/H5Fprivate.h @@ -85,7 +85,8 @@ typedef struct H5F_t H5F_t; #define H5F_USE_TMP_SPACE(F) ((F)->shared->fs.use_tmp_space) #define H5F_IS_TMP_ADDR(F, ADDR) (H5_addr_le((F)->shared->fs.tmp_addr, (ADDR))) #ifdef H5_HAVE_PARALLEL -#define H5F_COLL_MD_READ(F) ((F)->shared->coll_md_read) +#define H5F_COLL_MD_READ(F) ((F)->shared->coll_md_read) +#define H5F_SHARED_COLL_MD_READ(F_SH) ((F_SH)->coll_md_read) #endif /* H5_HAVE_PARALLEL */ #define H5F_USE_MDC_LOGGING(F) ((F)->shared->use_mdc_logging) #define H5F_START_MDC_LOG_ON_ACCESS(F) ((F)->shared->start_mdc_log_on_access) @@ -148,7 +149,8 @@ typedef struct H5F_t H5F_t; #define H5F_USE_TMP_SPACE(F) (H5F_use_tmp_space(F)) #define H5F_IS_TMP_ADDR(F, ADDR) (H5F_is_tmp_addr((F), (ADDR))) #ifdef H5_HAVE_PARALLEL -#define H5F_COLL_MD_READ(F) (H5F_coll_md_read(F)) +#define H5F_COLL_MD_READ(F) (H5F_coll_md_read(F)) +#define H5F_SHARED_COLL_MD_READ(F_SH) (H5F_shared_coll_md_read(F)) #endif /* H5_HAVE_PARALLEL */ #define H5F_USE_MDC_LOGGING(F) (H5F_use_mdc_logging(F)) #define H5F_START_MDC_LOG_ON_ACCESS(F) (H5F_start_mdc_log_on_access(F)) @@ -556,6 +558,7 @@ H5_DLL hsize_t H5F_get_alignment(const H5F_t *f); H5_DLL hsize_t H5F_get_threshold(const H5F_t *f); #ifdef H5_HAVE_PARALLEL H5_DLL H5P_coll_md_read_flag_t H5F_coll_md_read(const H5F_t *f); +H5_DLL H5P_coll_md_read_flag_t H5F_shared_coll_md_read(const H5F_shared_t *f_sh); #endif /* H5_HAVE_PARALLEL */ H5_DLL bool H5F_use_mdc_logging(const H5F_t *f); H5_DLL bool H5F_start_mdc_log_on_access(const H5F_t *f); @@ -642,6 +645,7 @@ H5_DLL int H5F_mpi_get_size(const H5F_t *f); H5_DLL herr_t H5F_mpi_retrieve_comm(hid_t loc_id, hid_t acspl_id, MPI_Comm *mpi_comm); H5_DLL herr_t H5F_mpi_get_file_block_type(bool commit, MPI_Datatype *new_type, bool *new_type_derived); H5_DLL bool H5F_get_coll_metadata_reads(const H5F_t *f); +H5_DLL bool H5F_shared_get_coll_metadata_reads(const H5F_shared_t *f_sh); H5_DLL void H5F_set_coll_metadata_reads(H5F_t *f, H5P_coll_md_read_flag_t *file_flag, bool *context_flag); H5_DLL herr_t H5F_shared_get_mpi_file_sync_required(const H5F_shared_t *f_sh, bool *flag); #endif /* H5_HAVE_PARALLEL */ diff --git a/src/H5Fquery.c b/src/H5Fquery.c index af120a4..44a52c8 100644 --- a/src/H5Fquery.c +++ b/src/H5Fquery.c @@ -1054,12 +1054,32 @@ H5F_coll_md_read(const H5F_t *f) /* Use FUNC_ENTER_NOAPI_NOINIT_NOERR here to avoid performance issues */ FUNC_ENTER_NOAPI_NOINIT_NOERR - assert(f); + assert(f && f->shared); FUNC_LEAVE_NOAPI(f->shared->coll_md_read) } /* end H5F_coll_md_read() */ /*------------------------------------------------------------------------- + * Function: H5F_shared_coll_md_read + * + * Purpose: Retrieve the 'collective metadata reads' flag for the file. + * + * Return: Success: Non-negative, the 'collective metadata reads' flag + * Failure: (can't happen) + *------------------------------------------------------------------------- + */ +H5P_coll_md_read_flag_t +H5F_shared_coll_md_read(const H5F_shared_t *f_sh) +{ + /* Use FUNC_ENTER_NOAPI_NOINIT_NOERR here to avoid performance issues */ + FUNC_ENTER_NOAPI_NOINIT_NOERR + + assert(f_sh); + + FUNC_LEAVE_NOAPI(f_sh->coll_md_read) +} /* end H5F_shared_coll_md_read() */ + +/*------------------------------------------------------------------------- * Function: H5F_shared_get_mpi_file_sync_required * * Purpose: Returns the mpi_file_sync_required flag diff --git a/src/H5Pint.c b/src/H5Pint.c index da7f887..f6dbb27 100644 --- a/src/H5Pint.c +++ b/src/H5Pint.c @@ -4102,7 +4102,7 @@ H5P_object_verify(hid_t plist_id, hid_t pclass_id) /* Compare the property list's class against the other class */ if (H5P_isa_class(plist_id, pclass_id) != true) - HGOTO_ERROR(H5E_PLIST, H5E_CANTREGISTER, NULL, "property list is not a member of the class"); + HGOTO_ERROR(H5E_PLIST, H5E_CANTCOMPARE, NULL, "property list is not a member of the class"); /* Get the plist structure */ if (NULL == (ret_value = (H5P_genplist_t *)H5I_object(plist_id))) diff --git a/src/H5VLnative_dataset.c b/src/H5VLnative_dataset.c index 9054157..a58eb51 100644 --- a/src/H5VLnative_dataset.c +++ b/src/H5VLnative_dataset.c @@ -97,6 +97,16 @@ H5VL__native_dataset_io_setup(size_t count, void *obj[], hid_t mem_type_id[], hi /* Iterate over datasets */ for (i = 0; i < count; i++) { + /* Initialize fields not set here to prevent use of uninitialized */ + memset(&dinfo[i].layout_ops, 0, sizeof(dinfo[i].layout_ops)); + memset(&dinfo[i].io_ops, 0, sizeof(dinfo[i].io_ops)); + memset(&dinfo[i].layout_io_info, 0, sizeof(dinfo[i].layout_io_info)); + memset(&dinfo[i].type_info, 0, sizeof(dinfo[i].type_info)); + dinfo[i].store = NULL; + dinfo[i].layout = NULL; + dinfo[i].nelmts = 0; + dinfo[i].skip_io = false; + /* Set up dset */ dinfo[i].dset = (H5D_t *)obj[i]; assert(dinfo[i].dset); |