summaryrefslogtreecommitdiffstats
path: root/src/H5Dmpio.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/H5Dmpio.c')
-rw-r--r--src/H5Dmpio.c1956
1 files changed, 1211 insertions, 745 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c
index 6667746..0ef6542 100644
--- a/src/H5Dmpio.c
+++ b/src/H5Dmpio.c
@@ -82,21 +82,10 @@
*/
#define H5D_MPIO_INIT_CHUNK_IDX_INFO(index_info, dset) \
do { \
- index_info.f = (dset)->oloc.file; \
- index_info.pline = &((dset)->shared->dcpl_cache.pline); \
- index_info.layout = &((dset)->shared->layout.u.chunk); \
- index_info.storage = &((dset)->shared->layout.storage.u.chunk); \
- } while (0)
-
-/*
- * Macro to initialize a H5D_chunk_ud_t structure
- * given a pointer to a H5D_chk_idx_info_t structure
- */
-#define H5D_MPIO_INIT_CHUNK_UD_INFO(chunk_ud, index_info_ptr) \
- do { \
- memset(&chunk_ud, 0, sizeof(H5D_chunk_ud_t)); \
- chunk_ud.common.layout = (index_info_ptr)->layout; \
- chunk_ud.common.storage = (index_info_ptr)->storage; \
+ (index_info).f = (dset)->oloc.file; \
+ (index_info).pline = &((dset)->shared->dcpl_cache.pline); \
+ (index_info).layout = &((dset)->shared->layout.u.chunk); \
+ (index_info).storage = &((dset)->shared->layout.storage.u.chunk); \
} while (0)
/******************/
@@ -129,14 +118,43 @@ typedef struct H5D_chunk_alloc_info_t {
H5F_block_t chunk_current;
H5F_block_t chunk_new;
hsize_t chunk_idx;
+ haddr_t dset_oloc_addr;
} H5D_chunk_alloc_info_t;
/*
* Information for a chunk pertaining to the dataset's chunk
- * index entry for the chunk
+ * index entry for the chunk.
+ *
+ * NOTE: To support efficient lookups of H5D_filtered_collective_chunk_info_t
+ * structures during parallel writes to filtered chunks, the
+ * chunk_idx and dset_oloc_addr fields of this structure are used
+ * together as a key for a hash table by following the approach
+ * outlined at https://troydhanson.github.io/uthash/userguide.html#_compound_keys.
+ * This means the following:
+ *
+ * - Instances of this structure should be memset to 0 when
+ * used for hashing to ensure that any padding between the
+ * chunk_idx and dset_oloc_addr fields does not affect the
+ * generated key.
+ *
+ * - The chunk_idx and dset_oloc_addr fields should be arranged
+ * in that specific order, as the code currently relies on
+ * this ordering when calculating the key length and it
+ * performs memory operations on the structure starting from
+ * the chunk_idx field and using the calculated key length.
+ *
+ * - The chunk_idx and dset_oloc_addr fields should ideally
+ * be arranged next to each other in the structure to minimize
+ * the calculated key length.
*/
typedef struct H5D_chunk_index_info_t {
- hsize_t chunk_idx;
+ /*
+ * These two fields must come in this order and next to
+ * each other for proper and efficient hashing
+ */
+ hsize_t chunk_idx;
+ haddr_t dset_oloc_addr;
+
unsigned filter_mask;
bool need_insert;
} H5D_chunk_index_info_t;
@@ -232,6 +250,24 @@ typedef struct H5D_filtered_collective_chunk_info_t {
} H5D_filtered_collective_chunk_info_t;
/*
+ * Information cached about each dataset involved when performing
+ * collective I/O on filtered chunks.
+ */
+typedef struct H5D_mpio_filtered_dset_info_t {
+ const H5D_dset_io_info_t *dset_io_info;
+ H5D_fill_buf_info_t fb_info;
+ H5D_chk_idx_info_t chunk_idx_info;
+ hsize_t file_chunk_size;
+ haddr_t dset_oloc_addr;
+ H5S_t *fill_space;
+ bool should_fill;
+ bool fb_info_init;
+ bool index_empty;
+
+ UT_hash_handle hh;
+} H5D_mpio_filtered_dset_info_t;
+
+/*
* Top-level structure that contains an array of H5D_filtered_collective_chunk_info_t
* chunk info structures for collective filtered I/O, as well as other useful information.
* The struct's fields are as follows:
@@ -249,6 +285,10 @@ typedef struct H5D_filtered_collective_chunk_info_t {
* will contain the chunk's "chunk index" value that can be used for chunk
* lookup operations.
*
+ * chunk_hash_table_keylen - The calculated length of the key used for the chunk info hash
+ * table, depending on whether collective I/O is being performed
+ * on a single or multiple filtered datasets.
+ *
* num_chunks_infos - The number of entries in the `chunk_infos` array.
*
* num_chunks_to_read - The number of entries (or chunks) in the `chunk_infos` array that
@@ -263,12 +303,39 @@ typedef struct H5D_filtered_collective_chunk_info_t {
* of chunk info structures to determine how big of I/O vectors to
* allocate during read operations, as an example.
*
+ * all_dset_indices_empty - A boolean determining whether all the datasets involved in the
+ * I/O operation have empty chunk indices. If this is the case,
+ * collective read operations can be skipped during processing
+ * of chunks.
+ *
+ * no_dset_index_insert_methods - A boolean determining whether all the datasets involved
+ * in the I/O operation have no chunk index insertion
+ * methods. If this is the case, collective chunk reinsertion
+ * operations can be skipped during processing of chunks.
+ *
+ * single_dset_info - A pointer to a H5D_mpio_filtered_dset_info_t structure containing
+ * information that is used when performing collective I/O on a single
+ * filtered dataset.
+ *
+ * dset_info_hash_table - A hash table storing H5D_mpio_filtered_dset_info_t structures
+ * that is populated when performing collective I/O on multiple
+ * filtered datasets at a time using the multi-dataset I/O API
+ * routines.
+ *
*/
typedef struct H5D_filtered_collective_io_info_t {
H5D_filtered_collective_chunk_info_t *chunk_infos;
H5D_filtered_collective_chunk_info_t *chunk_hash_table;
+ size_t chunk_hash_table_keylen;
size_t num_chunk_infos;
size_t num_chunks_to_read;
+ bool all_dset_indices_empty;
+ bool no_dset_index_insert_methods;
+
+ union {
+ H5D_mpio_filtered_dset_info_t *single_dset_info;
+ H5D_mpio_filtered_dset_info_t *dset_info_hash_table;
+ } dset_info;
} H5D_filtered_collective_io_info_t;
/*
@@ -278,6 +345,7 @@ typedef struct H5D_filtered_collective_io_info_t {
typedef struct H5D_chunk_redistribute_info_t {
H5F_block_t chunk_block;
hsize_t chunk_idx;
+ haddr_t dset_oloc_addr;
int orig_owner;
int new_owner;
int num_writers;
@@ -299,11 +367,11 @@ typedef struct H5D_chunk_insert_info_t {
static herr_t H5D__piece_io(H5D_io_info_t *io_info);
static herr_t H5D__multi_chunk_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info,
int mpi_rank, int mpi_size);
-static herr_t H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info,
- int mpi_rank, int mpi_size);
+static herr_t H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_infos,
+ size_t num_dset_infos, int mpi_rank, int mpi_size);
static herr_t H5D__link_piece_collective_io(H5D_io_info_t *io_info, int mpi_rank);
-static herr_t H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info,
- int mpi_rank, int mpi_size);
+static herr_t H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_infos,
+ size_t num_dset_infos, int mpi_rank, int mpi_size);
static herr_t H5D__inter_collective_io(H5D_io_info_t *io_info, const H5D_dset_io_info_t *di,
H5S_t *file_space, H5S_t *mem_space);
static herr_t H5D__final_collective_io(H5D_io_info_t *io_info, hsize_t mpi_buf_count,
@@ -314,7 +382,8 @@ static herr_t H5D__mpio_get_sum_chunk(const H5D_io_info_t *io_info, int *sum_chu
static herr_t H5D__mpio_get_sum_chunk_dset(const H5D_io_info_t *io_info, const H5D_dset_io_info_t *dset_info,
int *sum_chunkf);
static herr_t H5D__mpio_collective_filtered_chunk_io_setup(const H5D_io_info_t *io_info,
- const H5D_dset_io_info_t *di, int mpi_rank,
+ const H5D_dset_io_info_t *di,
+ size_t num_dset_infos, int mpi_rank,
H5D_filtered_collective_io_info_t *chunk_list);
static herr_t H5D__mpio_redistribute_shared_chunks(H5D_filtered_collective_io_info_t *chunk_list,
const H5D_io_info_t *io_info, int mpi_rank, int mpi_size,
@@ -324,28 +393,25 @@ static herr_t H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_i
bool all_ranks_involved, const H5D_io_info_t *io_info,
int mpi_rank, int mpi_size);
static herr_t H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk_list,
- H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info,
- int mpi_rank, int H5_ATTR_NDEBUG_UNUSED mpi_size,
- unsigned char ***chunk_msg_bufs,
- int *chunk_msg_bufs_len);
+ H5D_io_info_t *io_info, int mpi_rank,
+ int H5_ATTR_NDEBUG_UNUSED mpi_size,
+ unsigned char ***chunk_msg_bufs,
+ int *chunk_msg_bufs_len);
static herr_t H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chunk_list,
- const H5D_io_info_t *io_info,
- const H5D_dset_io_info_t *di, int mpi_rank);
+ const H5D_io_info_t *io_info, size_t num_dset_infos,
+ int mpi_rank);
static herr_t H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *chunk_list,
unsigned char **chunk_msg_bufs,
int chunk_msg_bufs_len, const H5D_io_info_t *io_info,
- const H5D_dset_io_info_t *di,
- int H5_ATTR_NDEBUG_UNUSED mpi_rank);
+ size_t num_dset_infos, int mpi_rank);
static herr_t H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t *chunk_list,
- size_t *num_chunks_assigned_map,
- H5D_io_info_t *io_info,
- H5D_chk_idx_info_t *idx_info, int mpi_rank,
- int mpi_size);
+ size_t *num_chunks_assigned_map,
+ H5D_io_info_t *io_info, size_t num_dset_infos,
+ int mpi_rank, int mpi_size);
static herr_t H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t *chunk_list,
size_t *num_chunks_assigned_map,
- H5D_io_info_t *io_info, H5D_dset_io_info_t *di,
- H5D_chk_idx_info_t *idx_info, int mpi_rank,
- int mpi_size);
+ H5D_io_info_t *io_info, size_t num_dset_infos,
+ int mpi_rank, int mpi_size);
static herr_t H5D__mpio_get_chunk_redistribute_info_types(MPI_Datatype *contig_type,
bool *contig_type_derived,
MPI_Datatype *resized_type,
@@ -636,8 +702,8 @@ H5D__mpio_opt_possible(H5D_io_info_t *io_info)
}
/* Check whether these are both simple or scalar dataspaces */
- if (!((H5S_SIMPLE == H5S_GET_EXTENT_TYPE(mem_space) ||
- H5S_SCALAR == H5S_GET_EXTENT_TYPE(mem_space)) &&
+ if (!((H5S_SIMPLE == H5S_GET_EXTENT_TYPE(mem_space) || H5S_SCALAR == H5S_GET_EXTENT_TYPE(mem_space) ||
+ H5S_NULL == H5S_GET_EXTENT_TYPE(mem_space)) &&
(H5S_SIMPLE == H5S_GET_EXTENT_TYPE(file_space) ||
H5S_SCALAR == H5S_GET_EXTENT_TYPE(file_space))))
local_cause[0] |= H5D_MPIO_NOT_SIMPLE_OR_SCALAR_DATASPACES;
@@ -1143,13 +1209,6 @@ H5D__piece_io(H5D_io_info_t *io_info)
/* Use multi dataset path for now */
use_multi_dset = true;
- /* Check for filtered datasets */
- for (i = 0; i < io_info->count; i++)
- if (io_info->dsets_info[i].dset->shared->dcpl_cache.pline.nused > 0) {
- use_multi_dset = false;
- break;
- }
-
/* Check if this I/O exceeds one linked chunk threshold */
if (recalc_io_option && use_multi_dset) {
/* Get the chunk optimization option threshold */
@@ -1173,26 +1232,40 @@ H5D__piece_io(H5D_io_info_t *io_info)
}
}
}
+ }
- /* Perform multi dataset I/O if appropriate */
- if (use_multi_dset) {
+ /* Perform multi dataset I/O if appropriate */
+ if (use_multi_dset) {
#ifdef H5_HAVE_INSTRUMENTED_LIBRARY
- /*** Set collective chunk user-input optimization API. ***/
- if (H5D_ONE_LINK_CHUNK_IO == io_option) {
- if (H5CX_test_set_mpio_coll_chunk_link_hard(0) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "unable to set property value");
- } /* end if */
-#endif /* H5_HAVE_INSTRUMENTED_LIBRARY */
+ /*** Set collective chunk user-input optimization API. ***/
+ if (H5D_ONE_LINK_CHUNK_IO == io_option) {
+ if (H5CX_test_set_mpio_coll_chunk_link_hard(0) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "unable to set property value");
+ } /* end if */
+#endif /* H5_HAVE_INSTRUMENTED_LIBRARY */
+
+ /* Process all the filtered datasets first */
+ if (io_info->filtered_count > 0) {
+ if (H5D__link_chunk_filtered_collective_io(io_info, io_info->dsets_info, io_info->count, mpi_rank,
+ mpi_size) < 0)
+ HGOTO_ERROR(H5E_IO, (H5D_IO_OP_READ == io_info->op_type ? H5E_READERROR : H5E_WRITEERROR),
+ FAIL, "couldn't finish filtered linked chunk MPI-IO");
+ }
+ /* Process all the unfiltered datasets */
+ if ((io_info->filtered_count == 0) || (io_info->filtered_count < io_info->count)) {
/* Perform unfiltered link chunk collective IO */
if (H5D__link_piece_collective_io(io_info, mpi_rank) < 0)
- HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish linked chunk MPI-IO");
+ HGOTO_ERROR(H5E_IO, (H5D_IO_OP_READ == io_info->op_type ? H5E_READERROR : H5E_WRITEERROR),
+ FAIL, "couldn't finish linked chunk MPI-IO");
}
}
-
- if (!use_multi_dset) {
+ else {
/* Loop over datasets */
for (i = 0; i < io_info->count; i++) {
+ if (io_info->dsets_info[i].skip_io)
+ continue;
+
if (io_info->dsets_info[i].layout->type == H5D_CONTIGUOUS) {
/* Contiguous: call H5D__inter_collective_io() directly */
H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_CONTIGUOUS_COLLECTIVE;
@@ -1203,7 +1276,8 @@ H5D__piece_io(H5D_io_info_t *io_info)
if (H5D__inter_collective_io(io_info, &io_info->dsets_info[i],
io_info->dsets_info[i].file_space,
io_info->dsets_info[i].mem_space) < 0)
- HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish shared collective MPI-IO");
+ HGOTO_ERROR(H5E_IO, (H5D_IO_OP_READ == io_info->op_type ? H5E_READERROR : H5E_WRITEERROR),
+ FAIL, "couldn't finish shared collective MPI-IO");
/* Set the actual I/O mode property. internal_collective_io will not break to
* independent I/O, so we set it here.
@@ -1248,10 +1322,12 @@ H5D__piece_io(H5D_io_info_t *io_info)
case H5D_ONE_LINK_CHUNK_IO_MORE_OPT:
/* Check if there are any filters in the pipeline */
if (io_info->dsets_info[i].dset->shared->dcpl_cache.pline.nused > 0) {
- if (H5D__link_chunk_filtered_collective_io(io_info, &io_info->dsets_info[i],
+ if (H5D__link_chunk_filtered_collective_io(io_info, &io_info->dsets_info[i], 1,
mpi_rank, mpi_size) < 0)
- HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL,
- "couldn't finish filtered linked chunk MPI-IO");
+ HGOTO_ERROR(
+ H5E_IO,
+ (H5D_IO_OP_READ == io_info->op_type ? H5E_READERROR : H5E_WRITEERROR),
+ FAIL, "couldn't finish filtered linked chunk MPI-IO");
} /* end if */
else {
/* If there is more than one dataset we cannot make the multi dataset call here,
@@ -1262,14 +1338,18 @@ H5D__piece_io(H5D_io_info_t *io_info)
if (H5D__multi_chunk_collective_io(io_info, &io_info->dsets_info[i], mpi_rank,
mpi_size) < 0)
- HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL,
- "couldn't finish optimized multiple chunk MPI-IO");
+ HGOTO_ERROR(
+ H5E_IO,
+ (H5D_IO_OP_READ == io_info->op_type ? H5E_READERROR : H5E_WRITEERROR),
+ FAIL, "couldn't finish optimized multiple chunk MPI-IO");
}
else {
/* Perform unfiltered link chunk collective IO */
if (H5D__link_piece_collective_io(io_info, mpi_rank) < 0)
- HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL,
- "couldn't finish linked chunk MPI-IO");
+ HGOTO_ERROR(
+ H5E_IO,
+ (H5D_IO_OP_READ == io_info->op_type ? H5E_READERROR : H5E_WRITEERROR),
+ FAIL, "couldn't finish linked chunk MPI-IO");
}
}
@@ -1279,17 +1359,21 @@ H5D__piece_io(H5D_io_info_t *io_info)
default: /* multiple chunk IO via threshold */
/* Check if there are any filters in the pipeline */
if (io_info->dsets_info[i].dset->shared->dcpl_cache.pline.nused > 0) {
- if (H5D__multi_chunk_filtered_collective_io(io_info, &io_info->dsets_info[i],
+ if (H5D__multi_chunk_filtered_collective_io(io_info, &io_info->dsets_info[i], 1,
mpi_rank, mpi_size) < 0)
- HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL,
- "couldn't finish optimized multiple filtered chunk MPI-IO");
+ HGOTO_ERROR(
+ H5E_IO,
+ (H5D_IO_OP_READ == io_info->op_type ? H5E_READERROR : H5E_WRITEERROR),
+ FAIL, "couldn't finish optimized multiple filtered chunk MPI-IO");
} /* end if */
else {
/* Perform unfiltered multi chunk collective IO */
if (H5D__multi_chunk_collective_io(io_info, &io_info->dsets_info[i], mpi_rank,
mpi_size) < 0)
- HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL,
- "couldn't finish optimized multiple chunk MPI-IO");
+ HGOTO_ERROR(
+ H5E_IO,
+ (H5D_IO_OP_READ == io_info->op_type ? H5E_READERROR : H5E_WRITEERROR),
+ FAIL, "couldn't finish optimized multiple chunk MPI-IO");
}
break;
@@ -1423,14 +1507,24 @@ H5D__link_piece_collective_io(H5D_io_info_t *io_info, int H5_ATTR_UNUSED mpi_ran
int mpi_code; /* MPI return code */
H5D_mpio_actual_chunk_opt_mode_t actual_chunk_opt_mode = H5D_MPIO_LINK_CHUNK;
H5D_mpio_actual_io_mode_t actual_io_mode = 0;
- size_t i; /* Local index variable */
- herr_t ret_value = SUCCEED;
+ herr_t ret_value = SUCCEED;
FUNC_ENTER_PACKAGE
/* set actual_io_mode */
- for (i = 0; i < io_info->count; i++) {
- assert(io_info->dsets_info[i].dset->shared->dcpl_cache.pline.nused == 0);
+ for (size_t i = 0; i < io_info->count; i++) {
+ /* Skip this dataset if no I/O is being performed */
+ if (io_info->dsets_info[i].skip_io)
+ continue;
+
+ /* Filtered datasets are processed elsewhere. A contiguous dataset
+ * could possibly have filters in the DCPL pipeline, but the library
+ * will currently ignore optional filters in that case.
+ */
+ if ((io_info->dsets_info[i].dset->shared->dcpl_cache.pline.nused > 0) &&
+ (io_info->dsets_info[i].layout->type != H5D_CONTIGUOUS))
+ continue;
+
if (io_info->dsets_info[i].layout->type == H5D_CHUNKED)
actual_io_mode |= H5D_MPIO_CHUNK_COLLECTIVE;
else if (io_info->dsets_info[i].layout->type == H5D_CONTIGUOUS)
@@ -1457,8 +1551,9 @@ H5D__link_piece_collective_io(H5D_io_info_t *io_info, int H5_ATTR_UNUSED mpi_ran
H5_flexible_const_ptr_t base_buf_addr;
base_buf_addr.cvp = NULL;
- /* Get the number of chunks with a selection */
- num_chunk = io_info->pieces_added;
+ /* Get the number of unfiltered chunks with a selection */
+ assert(io_info->filtered_pieces_added <= io_info->pieces_added);
+ num_chunk = io_info->pieces_added - io_info->filtered_pieces_added;
H5_CHECK_OVERFLOW(num_chunk, size_t, int);
#ifdef H5Dmpio_DEBUG
@@ -1471,7 +1566,7 @@ H5D__link_piece_collective_io(H5D_io_info_t *io_info, int H5_ATTR_UNUSED mpi_ran
/* Check if sel_pieces array is sorted */
assert(io_info->sel_pieces[0]->faddr != HADDR_UNDEF);
- for (i = 1; i < num_chunk; i++) {
+ for (size_t i = 1; i < io_info->pieces_added; i++) {
assert(io_info->sel_pieces[i]->faddr != HADDR_UNDEF);
if (io_info->sel_pieces[i]->faddr < io_info->sel_pieces[i - 1]->faddr) {
@@ -1508,11 +1603,20 @@ H5D__link_piece_collective_io(H5D_io_info_t *io_info, int H5_ATTR_UNUSED mpi_ran
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL,
"couldn't allocate chunk file is derived datatype flags buffer");
- /* save lowest file address */
- ctg_store.contig.dset_addr = io_info->sel_pieces[0]->faddr;
-
- /* save base mem addr of piece for read/write */
- base_buf_addr = io_info->sel_pieces[0]->dset_info->buf;
+ /*
+ * After sorting sel_pieces according to file address, locate
+ * the first unfiltered chunk and save its file address and
+ * base memory address for read/write
+ */
+ ctg_store.contig.dset_addr = HADDR_UNDEF;
+ for (size_t i = 0; i < io_info->pieces_added; i++) {
+ if (!io_info->sel_pieces[i]->filtered_dset) {
+ ctg_store.contig.dset_addr = io_info->sel_pieces[i]->faddr;
+ base_buf_addr = io_info->sel_pieces[i]->dset_info->buf;
+ break;
+ }
+ }
+ assert(ctg_store.contig.dset_addr != HADDR_UNDEF);
#ifdef H5Dmpio_DEBUG
H5D_MPIO_DEBUG(mpi_rank, "before iterate over selected pieces\n");
@@ -1520,7 +1624,7 @@ H5D__link_piece_collective_io(H5D_io_info_t *io_info, int H5_ATTR_UNUSED mpi_ran
/* Obtain MPI derived datatype from all individual pieces */
/* Iterate over selected pieces for this process */
- for (i = 0; i < num_chunk; i++) {
+ for (size_t i = 0, curr_idx = 0; i < io_info->pieces_added; i++) {
hsize_t *permute_map = NULL; /* array that holds the mapping from the old,
out-of-order displacements to the in-order
displacements of the MPI datatypes of the
@@ -1530,24 +1634,28 @@ H5D__link_piece_collective_io(H5D_io_info_t *io_info, int H5_ATTR_UNUSED mpi_ran
/* Assign convenience pointer to piece info */
piece_info = io_info->sel_pieces[i];
+ /* Skip over filtered pieces as they are processed elsewhere */
+ if (piece_info->filtered_dset)
+ continue;
+
/* Obtain disk and memory MPI derived datatype */
/* NOTE: The permute_map array can be allocated within H5S_mpio_space_type
* and will be fed into the next call to H5S_mpio_space_type
* where it will be freed.
*/
if (H5S_mpio_space_type(piece_info->fspace, piece_info->dset_info->type_info.src_type_size,
- &chunk_ftype[i], /* OUT: datatype created */
- &chunk_mpi_file_counts[i], /* OUT */
- &(chunk_mft_is_derived_array[i]), /* OUT */
- true, /* this is a file space,
- so permute the
- datatype if the point
- selections are out of
- order */
- &permute_map, /* OUT: a map to indicate the
- permutation of points
- selected in case they
- are out of order */
+ &chunk_ftype[curr_idx], /* OUT: datatype created */
+ &chunk_mpi_file_counts[curr_idx], /* OUT */
+ &(chunk_mft_is_derived_array[curr_idx]), /* OUT */
+ true, /* this is a file space,
+ so permute the
+ datatype if the point
+ selections are out of
+ order */
+ &permute_map, /* OUT: a map to indicate the
+ permutation of points
+ selected in case they
+ are out of order */
&is_permuted /* OUT */) < 0)
HGOTO_ERROR(H5E_DATASPACE, H5E_BADTYPE, FAIL, "couldn't create MPI file type");
@@ -1555,20 +1663,20 @@ H5D__link_piece_collective_io(H5D_io_info_t *io_info, int H5_ATTR_UNUSED mpi_ran
if (is_permuted)
assert(permute_map);
if (H5S_mpio_space_type(piece_info->mspace, piece_info->dset_info->type_info.dst_type_size,
- &chunk_mtype[i], &chunk_mpi_mem_counts[i],
- &(chunk_mbt_is_derived_array[i]), false, /* this is a memory
- space, so if the file
- space is not
- permuted, there is no
- need to permute the
- datatype if the point
- selections are out of
- order*/
- &permute_map, /* IN: the permutation map
- generated by the
- file_space selection
- and applied to the
- memory selection */
+ &chunk_mtype[curr_idx], &chunk_mpi_mem_counts[curr_idx],
+ &(chunk_mbt_is_derived_array[curr_idx]), false, /* this is a memory
+ space, so if the
+ file space is not
+ permuted, there is
+ no need to permute
+ the datatype if the
+ point selections
+ are out of order */
+ &permute_map, /* IN: the permutation map
+ generated by the
+ file_space selection
+ and applied to the
+ memory selection */
&is_permuted /* IN */) < 0)
HGOTO_ERROR(H5E_DATASPACE, H5E_BADTYPE, FAIL, "couldn't create MPI buf type");
/* Sanity check */
@@ -1578,16 +1686,19 @@ H5D__link_piece_collective_io(H5D_io_info_t *io_info, int H5_ATTR_UNUSED mpi_ran
/* Piece address relative to the first piece addr
* Assign piece address to MPI displacement
* (assume MPI_Aint big enough to hold it) */
- chunk_file_disp_array[i] = (MPI_Aint)piece_info->faddr - (MPI_Aint)ctg_store.contig.dset_addr;
+ chunk_file_disp_array[curr_idx] =
+ (MPI_Aint)piece_info->faddr - (MPI_Aint)ctg_store.contig.dset_addr;
if (io_info->op_type == H5D_IO_OP_WRITE) {
- chunk_mem_disp_array[i] =
+ chunk_mem_disp_array[curr_idx] =
(MPI_Aint)piece_info->dset_info->buf.cvp - (MPI_Aint)base_buf_addr.cvp;
}
else if (io_info->op_type == H5D_IO_OP_READ) {
- chunk_mem_disp_array[i] =
+ chunk_mem_disp_array[curr_idx] =
(MPI_Aint)piece_info->dset_info->buf.vp - (MPI_Aint)base_buf_addr.vp;
}
+
+ curr_idx++;
} /* end for */
/* Create final MPI derived datatype for the file */
@@ -1610,7 +1721,7 @@ H5D__link_piece_collective_io(H5D_io_info_t *io_info, int H5_ATTR_UNUSED mpi_ran
chunk_final_mtype_is_derived = true;
/* Free the file & memory MPI datatypes for each chunk */
- for (i = 0; i < num_chunk; i++) {
+ for (size_t i = 0; i < num_chunk; i++) {
if (chunk_mbt_is_derived_array[i])
if (MPI_SUCCESS != (mpi_code = MPI_Type_free(chunk_mtype + i)))
HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
@@ -1655,6 +1766,9 @@ done:
ret_value);
#endif
+ if (ret_value < 0)
+ H5CX_set_mpio_actual_chunk_opt(H5D_MPIO_NO_CHUNK_OPTIMIZATION);
+
/* Release resources */
if (chunk_mtype)
H5MM_xfree(chunk_mtype);
@@ -1751,8 +1865,8 @@ done:
*-------------------------------------------------------------------------
*/
static herr_t
-H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info, int mpi_rank,
- int mpi_size)
+H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_infos,
+ size_t num_dset_infos, int mpi_rank, int mpi_size)
{
H5D_filtered_collective_io_info_t chunk_list = {0};
unsigned char **chunk_msg_bufs = NULL;
@@ -1760,7 +1874,7 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_
int chunk_msg_bufs_len = 0;
herr_t ret_value = SUCCEED;
- FUNC_ENTER_PACKAGE_TAG(dset_info->dset->oloc.addr)
+ FUNC_ENTER_PACKAGE
assert(io_info);
@@ -1781,18 +1895,15 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_
/* Build a list of selected chunks in the collective io operation */
- if (H5D__mpio_collective_filtered_chunk_io_setup(io_info, dset_info, mpi_rank, &chunk_list) < 0)
+ if (H5D__mpio_collective_filtered_chunk_io_setup(io_info, dset_infos, num_dset_infos, mpi_rank,
+ &chunk_list) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "couldn't construct filtered I/O info list");
if (io_info->op_type == H5D_IO_OP_READ) { /* Filtered collective read */
- if (H5D__mpio_collective_filtered_chunk_read(&chunk_list, io_info, dset_info, mpi_rank) < 0)
+ if (H5D__mpio_collective_filtered_chunk_read(&chunk_list, io_info, num_dset_infos, mpi_rank) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't read filtered chunks");
}
else { /* Filtered collective write */
- H5D_chk_idx_info_t index_info;
-
- H5D_MPIO_INIT_CHUNK_IDX_INFO(index_info, dset_info->dset);
-
if (mpi_size > 1) {
/* Redistribute shared chunks being written to */
if (H5D__mpio_redistribute_shared_chunks(&chunk_list, io_info, mpi_rank, mpi_size,
@@ -1800,7 +1911,7 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "unable to redistribute shared chunks");
/* Send any chunk modification messages for chunks this rank no longer owns */
- if (H5D__mpio_share_chunk_modification_data(&chunk_list, io_info, dset_info, mpi_rank, mpi_size,
+ if (H5D__mpio_share_chunk_modification_data(&chunk_list, io_info, mpi_rank, mpi_size,
&chunk_msg_bufs, &chunk_msg_bufs_len) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL,
"unable to send chunk modification data between MPI ranks");
@@ -1815,7 +1926,7 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_
* must participate.
*/
if (H5D__mpio_collective_filtered_chunk_update(&chunk_list, chunk_msg_bufs, chunk_msg_bufs_len,
- io_info, dset_info, mpi_rank) < 0)
+ io_info, num_dset_infos, mpi_rank) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't update modified chunks");
/* Free up resources used by chunk hash table now that we're done updating chunks */
@@ -1823,7 +1934,7 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_
/* All ranks now collectively re-allocate file space for all chunks */
if (H5D__mpio_collective_filtered_chunk_reallocate(&chunk_list, rank_chunks_assigned_map, io_info,
- &index_info, mpi_rank, mpi_size) < 0)
+ num_dset_infos, mpi_rank, mpi_size) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL,
"couldn't collectively re-allocate file space for chunks");
@@ -1843,12 +1954,15 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_
* into the chunk index
*/
if (H5D__mpio_collective_filtered_chunk_reinsert(&chunk_list, rank_chunks_assigned_map, io_info,
- dset_info, &index_info, mpi_rank, mpi_size) < 0)
+ num_dset_infos, mpi_rank, mpi_size) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL,
"couldn't collectively re-insert modified chunks into chunk index");
}
done:
+ if (ret_value < 0)
+ H5CX_set_mpio_actual_chunk_opt(H5D_MPIO_NO_CHUNK_OPTIMIZATION);
+
if (chunk_msg_bufs) {
for (size_t i = 0; i < (size_t)chunk_msg_bufs_len; i++)
H5MM_free(chunk_msg_bufs[i]);
@@ -1858,6 +1972,9 @@ done:
HASH_CLEAR(hh, chunk_list.chunk_hash_table);
+ if (rank_chunks_assigned_map)
+ H5MM_free(rank_chunks_assigned_map);
+
/* Free resources used by a rank which had some selection */
if (chunk_list.chunk_infos) {
for (size_t i = 0; i < chunk_list.num_chunk_infos; i++)
@@ -1867,15 +1984,42 @@ done:
H5MM_free(chunk_list.chunk_infos);
} /* end if */
- if (rank_chunks_assigned_map)
- H5MM_free(rank_chunks_assigned_map);
+ /* Free resources used by cached dataset info */
+ if ((num_dset_infos == 1) && (chunk_list.dset_info.single_dset_info)) {
+ H5D_mpio_filtered_dset_info_t *curr_dset_info = chunk_list.dset_info.single_dset_info;
+
+ if (curr_dset_info->fb_info_init && H5D__fill_term(&curr_dset_info->fb_info) < 0)
+ HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "can't release fill buffer info");
+ if (curr_dset_info->fill_space && H5S_close(curr_dset_info->fill_space) < 0)
+ HDONE_ERROR(H5E_DATASET, H5E_CLOSEERROR, FAIL, "can't close fill space");
+
+ H5MM_free(chunk_list.dset_info.single_dset_info);
+ chunk_list.dset_info.single_dset_info = NULL;
+ }
+ else if ((num_dset_infos > 1) && (chunk_list.dset_info.dset_info_hash_table)) {
+ H5D_mpio_filtered_dset_info_t *curr_dset_info;
+ H5D_mpio_filtered_dset_info_t *tmp;
+
+ HASH_ITER(hh, chunk_list.dset_info.dset_info_hash_table, curr_dset_info, tmp)
+ {
+ HASH_DELETE(hh, chunk_list.dset_info.dset_info_hash_table, curr_dset_info);
+
+ if (curr_dset_info->fb_info_init && H5D__fill_term(&curr_dset_info->fb_info) < 0)
+ HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "can't release fill buffer info");
+ if (curr_dset_info->fill_space && H5S_close(curr_dset_info->fill_space) < 0)
+ HDONE_ERROR(H5E_DATASET, H5E_CLOSEERROR, FAIL, "can't close fill space");
+
+ H5MM_free(curr_dset_info);
+ curr_dset_info = NULL;
+ }
+ }
#ifdef H5Dmpio_DEBUG
H5D_MPIO_TIME_STOP(mpi_rank);
H5D_MPIO_TRACE_EXIT(mpi_rank);
#endif
- FUNC_LEAVE_NOAPI_TAG(ret_value)
+ FUNC_LEAVE_NOAPI(ret_value)
} /* end H5D__link_chunk_filtered_collective_io() */
/*-------------------------------------------------------------------------
@@ -2079,6 +2223,9 @@ H5D__multi_chunk_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_
H5CX_set_mpio_actual_io_mode(actual_io_mode);
done:
+ if (ret_value < 0)
+ H5CX_set_mpio_actual_chunk_opt(H5D_MPIO_NO_CHUNK_OPTIMIZATION);
+
/* Reset collective opt mode */
if (H5CX_set_mpio_coll_opt(orig_coll_opt_mode) < 0)
HDONE_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't reset MPI-I/O collective_op property");
@@ -2171,8 +2318,8 @@ done:
*-------------------------------------------------------------------------
*/
static herr_t
-H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info, int mpi_rank,
- int mpi_size)
+H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_infos,
+ size_t num_dset_infos, int mpi_rank, int mpi_size)
{
H5D_filtered_collective_io_info_t chunk_list = {0};
unsigned char **chunk_msg_bufs = NULL;
@@ -2182,9 +2329,10 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info
int mpi_code;
herr_t ret_value = SUCCEED;
- FUNC_ENTER_PACKAGE_TAG(dset_info->dset->oloc.addr)
+ FUNC_ENTER_PACKAGE_TAG(dset_infos->dset->oloc.addr)
assert(io_info);
+ assert(num_dset_infos == 1); /* Currently only supported with 1 dataset at a time */
#ifdef H5Dmpio_DEBUG
H5D_MPIO_TRACE_ENTER(mpi_rank);
@@ -2202,7 +2350,7 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info
H5CX_set_mpio_actual_io_mode(H5D_MPIO_CHUNK_COLLECTIVE);
/* Build a list of selected chunks in the collective IO operation */
- if (H5D__mpio_collective_filtered_chunk_io_setup(io_info, dset_info, mpi_rank, &chunk_list) < 0)
+ if (H5D__mpio_collective_filtered_chunk_io_setup(io_info, dset_infos, 1, mpi_rank, &chunk_list) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "couldn't construct filtered I/O info list");
/* Retrieve the maximum number of chunks selected for any rank */
@@ -2216,7 +2364,7 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info
if (io_info->op_type == H5D_IO_OP_READ) { /* Filtered collective read */
for (size_t i = 0; i < max_num_chunks; i++) {
- H5D_filtered_collective_io_info_t single_chunk_list = {0};
+ H5D_filtered_collective_io_info_t single_chunk_list = chunk_list;
/* Check if this rank has a chunk to work on for this iteration */
have_chunk_to_process = (i < chunk_list.num_chunk_infos);
@@ -2236,8 +2384,7 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info
single_chunk_list.num_chunks_to_read = 0;
}
- if (H5D__mpio_collective_filtered_chunk_read(&single_chunk_list, io_info, dset_info, mpi_rank) <
- 0)
+ if (H5D__mpio_collective_filtered_chunk_read(&single_chunk_list, io_info, 1, mpi_rank) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't read filtered chunks");
if (have_chunk_to_process && chunk_list.chunk_infos[i].buf) {
@@ -2247,18 +2394,13 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info
}
}
else { /* Filtered collective write */
- H5D_chk_idx_info_t index_info;
-
- /* Construct chunked index info */
- H5D_MPIO_INIT_CHUNK_IDX_INFO(index_info, dset_info->dset);
-
if (mpi_size > 1) {
/* Redistribute shared chunks being written to */
if (H5D__mpio_redistribute_shared_chunks(&chunk_list, io_info, mpi_rank, mpi_size, NULL) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "unable to redistribute shared chunks");
/* Send any chunk modification messages for chunks this rank no longer owns */
- if (H5D__mpio_share_chunk_modification_data(&chunk_list, io_info, dset_info, mpi_rank, mpi_size,
+ if (H5D__mpio_share_chunk_modification_data(&chunk_list, io_info, mpi_rank, mpi_size,
&chunk_msg_bufs, &chunk_msg_bufs_len) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL,
"unable to send chunk modification data between MPI ranks");
@@ -2269,7 +2411,7 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info
* collective re-allocation and re-insertion of chunks modified by other ranks.
*/
for (size_t i = 0; i < max_num_chunks; i++) {
- H5D_filtered_collective_io_info_t single_chunk_list = {0};
+ H5D_filtered_collective_io_info_t single_chunk_list = chunk_list;
/* Check if this rank has a chunk to work on for this iteration */
have_chunk_to_process =
@@ -2281,13 +2423,11 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info
*/
if (have_chunk_to_process) {
single_chunk_list.chunk_infos = &chunk_list.chunk_infos[i];
- single_chunk_list.chunk_hash_table = chunk_list.chunk_hash_table;
single_chunk_list.num_chunk_infos = 1;
single_chunk_list.num_chunks_to_read = chunk_list.chunk_infos[i].need_read ? 1 : 0;
}
else {
single_chunk_list.chunk_infos = NULL;
- single_chunk_list.chunk_hash_table = chunk_list.chunk_hash_table;
single_chunk_list.num_chunk_infos = 0;
single_chunk_list.num_chunks_to_read = 0;
}
@@ -2297,13 +2437,13 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info
* the chunks. As chunk reads are done collectively here, all ranks
* must participate.
*/
- if (H5D__mpio_collective_filtered_chunk_update(
- &single_chunk_list, chunk_msg_bufs, chunk_msg_bufs_len, io_info, dset_info, mpi_rank) < 0)
+ if (H5D__mpio_collective_filtered_chunk_update(&single_chunk_list, chunk_msg_bufs,
+ chunk_msg_bufs_len, io_info, 1, mpi_rank) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't update modified chunks");
/* All ranks now collectively re-allocate file space for all chunks */
- if (H5D__mpio_collective_filtered_chunk_reallocate(&single_chunk_list, NULL, io_info, &index_info,
- mpi_rank, mpi_size) < 0)
+ if (H5D__mpio_collective_filtered_chunk_reallocate(&single_chunk_list, NULL, io_info, 1, mpi_rank,
+ mpi_size) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL,
"couldn't collectively re-allocate file space for chunks");
@@ -2321,14 +2461,17 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info
/* Participate in the collective re-insertion of all chunks modified
* in this iteration into the chunk index
*/
- if (H5D__mpio_collective_filtered_chunk_reinsert(&single_chunk_list, NULL, io_info, dset_info,
- &index_info, mpi_rank, mpi_size) < 0)
+ if (H5D__mpio_collective_filtered_chunk_reinsert(&single_chunk_list, NULL, io_info, 1, mpi_rank,
+ mpi_size) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL,
"couldn't collectively re-insert modified chunks into chunk index");
} /* end for */
}
done:
+ if (ret_value < 0)
+ H5CX_set_mpio_actual_chunk_opt(H5D_MPIO_NO_CHUNK_OPTIMIZATION);
+
if (chunk_msg_bufs) {
for (size_t i = 0; i < (size_t)chunk_msg_bufs_len; i++)
H5MM_free(chunk_msg_bufs[i]);
@@ -2347,6 +2490,36 @@ done:
H5MM_free(chunk_list.chunk_infos);
} /* end if */
+ /* Free resources used by cached dataset info */
+ if ((num_dset_infos == 1) && (chunk_list.dset_info.single_dset_info)) {
+ H5D_mpio_filtered_dset_info_t *curr_dset_info = chunk_list.dset_info.single_dset_info;
+
+ if (curr_dset_info->fb_info_init && H5D__fill_term(&curr_dset_info->fb_info) < 0)
+ HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "can't release fill buffer info");
+ if (curr_dset_info->fill_space && H5S_close(curr_dset_info->fill_space) < 0)
+ HDONE_ERROR(H5E_DATASET, H5E_CLOSEERROR, FAIL, "can't close fill space");
+
+ H5MM_free(chunk_list.dset_info.single_dset_info);
+ chunk_list.dset_info.single_dset_info = NULL;
+ }
+ else if ((num_dset_infos > 1) && (chunk_list.dset_info.dset_info_hash_table)) {
+ H5D_mpio_filtered_dset_info_t *curr_dset_info;
+ H5D_mpio_filtered_dset_info_t *tmp;
+
+ HASH_ITER(hh, chunk_list.dset_info.dset_info_hash_table, curr_dset_info, tmp)
+ {
+ HASH_DELETE(hh, chunk_list.dset_info.dset_info_hash_table, curr_dset_info);
+
+ if (curr_dset_info->fb_info_init && H5D__fill_term(&curr_dset_info->fb_info) < 0)
+ HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "can't release fill buffer info");
+ if (curr_dset_info->fill_space && H5S_close(curr_dset_info->fill_space) < 0)
+ HDONE_ERROR(H5E_DATASET, H5E_CLOSEERROR, FAIL, "can't close fill space");
+
+ H5MM_free(curr_dset_info);
+ curr_dset_info = NULL;
+ }
+ }
+
#ifdef H5Dmpio_DEBUG
H5D_MPIO_TIME_STOP(mpi_rank);
H5D_MPIO_TRACE_EXIT(mpi_rank);
@@ -2583,20 +2756,25 @@ H5D__cmp_filtered_collective_io_info_entry(const void *filtered_collective_io_in
addr2 = entry2->chunk_new.offset;
/*
- * If both chunk addresses are defined, H5_addr_cmp is safe to use.
- * Otherwise, if both addresses aren't defined, compared chunk
- * entries based on their chunk index. Finally, if only one chunk
- * address is defined, return the appropriate value based on which
- * is defined.
+ * If both chunk's file addresses are defined, H5_addr_cmp is safe to use.
+ * If only one chunk's file address is defined, return the appropriate
+ * value based on which is defined. If neither chunk's file address is
+ * defined, compare chunk entries based on their dataset object header
+ * address, then by their chunk index value.
*/
if (H5_addr_defined(addr1) && H5_addr_defined(addr2)) {
ret_value = H5_addr_cmp(addr1, addr2);
}
else if (!H5_addr_defined(addr1) && !H5_addr_defined(addr2)) {
- hsize_t chunk_idx1 = entry1->index_info.chunk_idx;
- hsize_t chunk_idx2 = entry2->index_info.chunk_idx;
+ haddr_t oloc_addr1 = entry1->index_info.dset_oloc_addr;
+ haddr_t oloc_addr2 = entry2->index_info.dset_oloc_addr;
- ret_value = (chunk_idx1 > chunk_idx2) - (chunk_idx1 < chunk_idx2);
+ if (0 == (ret_value = H5_addr_cmp(oloc_addr1, oloc_addr2))) {
+ hsize_t chunk_idx1 = entry1->index_info.chunk_idx;
+ hsize_t chunk_idx2 = entry2->index_info.chunk_idx;
+
+ ret_value = (chunk_idx1 > chunk_idx2) - (chunk_idx1 < chunk_idx2);
+ }
}
else
ret_value = H5_addr_defined(addr1) ? 1 : -1;
@@ -2622,8 +2800,8 @@ H5D__cmp_chunk_redistribute_info(const void *_entry1, const void *_entry2)
{
const H5D_chunk_redistribute_info_t *entry1;
const H5D_chunk_redistribute_info_t *entry2;
- hsize_t chunk_index1;
- hsize_t chunk_index2;
+ haddr_t oloc_addr1;
+ haddr_t oloc_addr2;
int ret_value;
FUNC_ENTER_PACKAGE_NOERR
@@ -2631,17 +2809,26 @@ H5D__cmp_chunk_redistribute_info(const void *_entry1, const void *_entry2)
entry1 = (const H5D_chunk_redistribute_info_t *)_entry1;
entry2 = (const H5D_chunk_redistribute_info_t *)_entry2;
- chunk_index1 = entry1->chunk_idx;
- chunk_index2 = entry2->chunk_idx;
+ oloc_addr1 = entry1->dset_oloc_addr;
+ oloc_addr2 = entry2->dset_oloc_addr;
- if (chunk_index1 == chunk_index2) {
- int orig_owner1 = entry1->orig_owner;
- int orig_owner2 = entry2->orig_owner;
+ /* Sort first by dataset object header address */
+ if (0 == (ret_value = H5_addr_cmp(oloc_addr1, oloc_addr2))) {
+ hsize_t chunk_index1 = entry1->chunk_idx;
+ hsize_t chunk_index2 = entry2->chunk_idx;
- ret_value = (orig_owner1 > orig_owner2) - (orig_owner1 < orig_owner2);
+ /* Then by chunk index value */
+ if (chunk_index1 == chunk_index2) {
+ int orig_owner1 = entry1->orig_owner;
+ int orig_owner2 = entry2->orig_owner;
+
+ /* And finally by original owning MPI rank for the chunk */
+
+ ret_value = (orig_owner1 > orig_owner2) - (orig_owner1 < orig_owner2);
+ }
+ else
+ ret_value = (chunk_index1 > chunk_index2) - (chunk_index1 < chunk_index2);
}
- else
- ret_value = (chunk_index1 > chunk_index2) - (chunk_index1 < chunk_index2);
FUNC_LEAVE_NOAPI(ret_value)
} /* end H5D__cmp_chunk_redistribute_info() */
@@ -2656,6 +2843,16 @@ H5D__cmp_chunk_redistribute_info(const void *_entry1, const void *_entry2)
* rank for two H5D_chunk_redistribute_info_t
* structures
*
+ * NOTE: The inner logic used in this sorting callback (inside the
+ * block where the original owners are equal) is intended to
+ * cause the given array of H5D_chunk_redistribute_info_t
+ * structures to be sorted back exactly as it was sorted
+ * before a shared chunks redistribution operation, according
+ * to the logic in H5D__cmp_filtered_collective_io_info_entry.
+ * Since the two sorting callbacks are currently tied directly
+ * to each other, both should be updated in the same way when
+ * changes are made.
+ *
* Return: -1, 0, 1
*
*-------------------------------------------------------------------------
@@ -2682,20 +2879,25 @@ H5D__cmp_chunk_redistribute_info_orig_owner(const void *_entry1, const void *_en
haddr_t addr2 = entry2->chunk_block.offset;
/*
- * If both chunk addresses are defined, H5_addr_cmp is safe to use.
- * Otherwise, if both addresses aren't defined, compared chunk
- * entries based on their chunk index. Finally, if only one chunk
- * address is defined, return the appropriate value based on which
- * is defined.
+ * If both chunk's file addresses are defined, H5_addr_cmp is safe to use.
+ * If only one chunk's file address is defined, return the appropriate
+ * value based on which is defined. If neither chunk's file address is
+ * defined, compare chunk entries based on their dataset object header
+ * address, then by their chunk index value.
*/
if (H5_addr_defined(addr1) && H5_addr_defined(addr2)) {
ret_value = H5_addr_cmp(addr1, addr2);
}
else if (!H5_addr_defined(addr1) && !H5_addr_defined(addr2)) {
- hsize_t chunk_idx1 = entry1->chunk_idx;
- hsize_t chunk_idx2 = entry2->chunk_idx;
+ haddr_t oloc_addr1 = entry1->dset_oloc_addr;
+ haddr_t oloc_addr2 = entry2->dset_oloc_addr;
- ret_value = (chunk_idx1 > chunk_idx2) - (chunk_idx1 < chunk_idx2);
+ if (0 == (ret_value = H5_addr_cmp(oloc_addr1, oloc_addr2))) {
+ hsize_t chunk_idx1 = entry1->chunk_idx;
+ hsize_t chunk_idx2 = entry2->chunk_idx;
+
+ ret_value = (chunk_idx1 > chunk_idx2) - (chunk_idx1 < chunk_idx2);
+ }
}
else
ret_value = H5_addr_defined(addr1) ? 1 : -1;
@@ -2927,20 +3129,21 @@ done:
*/
static herr_t
H5D__mpio_collective_filtered_chunk_io_setup(const H5D_io_info_t *io_info, const H5D_dset_io_info_t *di,
- int mpi_rank, H5D_filtered_collective_io_info_t *chunk_list)
+ size_t num_dset_infos, int mpi_rank,
+ H5D_filtered_collective_io_info_t *chunk_list)
{
- H5D_filtered_collective_chunk_info_t *local_info_array = NULL;
- H5D_chunk_ud_t udata;
- bool filter_partial_edge_chunks;
- size_t num_chunks_selected;
- size_t num_chunks_to_read = 0;
- herr_t ret_value = SUCCEED;
+ H5D_filtered_collective_chunk_info_t *local_info_array = NULL;
+ H5D_mpio_filtered_dset_info_t *curr_dset_info = NULL;
+ size_t num_chunks_selected = 0;
+ size_t num_chunks_to_read = 0;
+ size_t buf_idx = 0;
+ bool need_sort = false;
+ herr_t ret_value = SUCCEED;
FUNC_ENTER_PACKAGE
assert(io_info);
assert(di);
- assert(di->layout->type == H5D_CHUNKED);
assert(chunk_list);
#ifdef H5Dmpio_DEBUG
@@ -2948,166 +3151,330 @@ H5D__mpio_collective_filtered_chunk_io_setup(const H5D_io_info_t *io_info, const
H5D_MPIO_TIME_START(mpi_rank, "Filtered Collective I/O Setup");
#endif
- /* Each rank builds a local list of the chunks they have selected */
- if ((num_chunks_selected = H5SL_count(di->layout_io_info.chunk_map->dset_sel_pieces))) {
- H5D_piece_info_t *chunk_info;
- H5SL_node_t *chunk_node;
- hsize_t select_npoints;
- bool need_sort = false;
+ /* Calculate hash key length for chunk hash table */
+ if (num_dset_infos > 1) {
+ /* Just in case the structure changes... */
+ HDcompile_assert(offsetof(H5D_chunk_index_info_t, dset_oloc_addr) >
+ offsetof(H5D_chunk_index_info_t, chunk_idx));
+
+ /* Calculate key length using uthash compound key example */
+ chunk_list->chunk_hash_table_keylen = offsetof(H5D_chunk_index_info_t, dset_oloc_addr) +
+ sizeof(haddr_t) - offsetof(H5D_chunk_index_info_t, chunk_idx);
+ }
+ else
+ chunk_list->chunk_hash_table_keylen = sizeof(hsize_t);
+
+ chunk_list->all_dset_indices_empty = true;
+ chunk_list->no_dset_index_insert_methods = true;
+
+ /* Calculate size needed for total chunk list */
+ for (size_t dset_idx = 0; dset_idx < num_dset_infos; dset_idx++) {
+ /* Skip this dataset if no I/O is being performed */
+ if (di[dset_idx].skip_io)
+ continue;
+
+ /* Only process filtered, chunked datasets. A contiguous dataset
+ * could possibly have filters in the DCPL pipeline, but the library
+ * will currently ignore optional filters in that case.
+ */
+ if ((di[dset_idx].dset->shared->dcpl_cache.pline.nused == 0) ||
+ (di[dset_idx].layout->type == H5D_CONTIGUOUS))
+ continue;
+
+ assert(di[dset_idx].layout->type == H5D_CHUNKED);
+ assert(di[dset_idx].layout->storage.type == H5D_CHUNKED);
- /* Determine whether partial edge chunks should be filtered */
- filter_partial_edge_chunks =
- !(di->dset->shared->layout.u.chunk.flags & H5O_LAYOUT_CHUNK_DONT_FILTER_PARTIAL_BOUND_CHUNKS);
+ num_chunks_selected += H5SL_count(di[dset_idx].layout_io_info.chunk_map->dset_sel_pieces);
+ }
+ if (num_chunks_selected)
if (NULL == (local_info_array = H5MM_malloc(num_chunks_selected * sizeof(*local_info_array))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate local io info array buffer");
- chunk_node = H5SL_first(di->layout_io_info.chunk_map->dset_sel_pieces);
- for (size_t i = 0; chunk_node; i++) {
- chunk_info = (H5D_piece_info_t *)H5SL_item(chunk_node);
+ for (size_t dset_idx = 0; dset_idx < num_dset_infos; dset_idx++) {
+ H5D_chunk_ud_t udata;
+ H5O_fill_t *fill_msg;
+ haddr_t prev_tag = HADDR_UNDEF;
- /* Obtain this chunk's address */
- if (H5D__chunk_lookup(di->dset, chunk_info->scaled, &udata) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "error looking up chunk address");
+ /* Skip this dataset if no I/O is being performed */
+ if (di[dset_idx].skip_io)
+ continue;
- /* Initialize rank-local chunk info */
- local_info_array[i].chunk_info = chunk_info;
- local_info_array[i].chunk_buf_size = 0;
- local_info_array[i].num_writers = 0;
- local_info_array[i].orig_owner = mpi_rank;
- local_info_array[i].new_owner = mpi_rank;
- local_info_array[i].buf = NULL;
+ /* Only process filtered, chunked datasets. A contiguous dataset
+ * could possibly have filters in the DCPL pipeline, but the library
+ * will currently ignore optional filters in that case.
+ */
+ if ((di[dset_idx].dset->shared->dcpl_cache.pline.nused == 0) ||
+ (di[dset_idx].layout->type == H5D_CONTIGUOUS))
+ continue;
- select_npoints = H5S_GET_SELECT_NPOINTS(chunk_info->fspace);
- local_info_array[i].io_size = (size_t)select_npoints * di->type_info.dst_type_size;
+ assert(di[dset_idx].layout->storage.type == H5D_CHUNKED);
+ assert(di[dset_idx].layout->storage.u.chunk.idx_type != H5D_CHUNK_IDX_NONE);
- /*
- * Determine whether this chunk will need to be read from the file. If this is
- * a read operation, the chunk will be read. If this is a write operation, we
- * generally need to read a filtered chunk from the file before modifying it,
- * unless the chunk is being fully overwritten.
- *
- * TODO: Currently the full overwrite status of a chunk is only obtained on a
- * per-rank basis. This means that if the total selection in the chunk, as
- * determined by the combination of selections of all of the ranks interested in
- * the chunk, covers the entire chunk, the performance optimization of not reading
- * the chunk from the file is still valid, but is not applied in the current
- * implementation.
- *
- * To implement this case, a few approaches were considered:
- *
- * - Keep a running total (distributed to each rank) of the number of chunk
- * elements selected during chunk redistribution and compare that to the total
- * number of elements in the chunk once redistribution is finished
- *
- * - Process all incoming chunk messages before doing I/O (these are currently
- * processed AFTER doing I/O), combine the owning rank's selection in a chunk
- * with the selections received from other ranks and check to see whether that
- * combined selection covers the entire chunk
- *
- * The first approach will be dangerous if the application performs an overlapping
- * write to a chunk, as the number of selected elements can equal or exceed the
- * number of elements in the chunk without the whole chunk selection being covered.
- * While it might be considered erroneous for an application to do an overlapping
- * write, we don't explicitly disallow it.
- *
- * The second approach contains a bit of complexity in that part of the chunk
- * messages will be needed before doing I/O and part will be needed after doing I/O.
- * Since modification data from chunk messages can't be applied until after any I/O
- * is performed (otherwise, we'll overwrite any applied modification data), chunk
- * messages are currently entirely processed after I/O. However, in order to determine
- * if a chunk is being fully overwritten, we need the dataspace portion of the chunk
- * messages before doing I/O. The naive way to do this is to process chunk messages
- * twice, using just the relevant information from the message before and after I/O.
- * The better way would be to avoid processing chunk messages twice by extracting (and
- * keeping around) the dataspace portion of the message before I/O and processing the
- * rest of the chunk message after I/O. Note that the dataspace portion of each chunk
- * message is used to correctly apply chunk modification data from the message, so
- * must be kept around both before and after I/O in this case.
- */
- if (io_info->op_type == H5D_IO_OP_READ)
- local_info_array[i].need_read = true;
- else {
- local_info_array[i].need_read =
- local_info_array[i].io_size < (size_t)di->dset->shared->layout.u.chunk.size;
- }
+ /*
+ * To support the multi-dataset I/O case, cache some info (chunk size,
+ * fill buffer and fill dataspace, etc.) about each dataset involved
+ * in the I/O operation for use when processing chunks. If only one
+ * dataset is involved, this information is the same for every chunk
+ * processed. Otherwise, if multiple datasets are involved, a hash
+ * table is used to quickly match a particular chunk with the cached
+ * information pertaining to the dataset it resides in.
+ */
+ if (NULL == (curr_dset_info = H5MM_malloc(sizeof(H5D_mpio_filtered_dset_info_t))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate space for dataset info");
+
+ memset(&curr_dset_info->fb_info, 0, sizeof(H5D_fill_buf_info_t));
+
+ H5D_MPIO_INIT_CHUNK_IDX_INFO(curr_dset_info->chunk_idx_info, di[dset_idx].dset);
+
+ curr_dset_info->dset_io_info = &di[dset_idx];
+ curr_dset_info->file_chunk_size = di[dset_idx].dset->shared->layout.u.chunk.size;
+ curr_dset_info->dset_oloc_addr = di[dset_idx].dset->oloc.addr;
+ curr_dset_info->fill_space = NULL;
+ curr_dset_info->fb_info_init = false;
+ curr_dset_info->index_empty = false;
+
+ /* Determine if fill values should be written to chunks */
+ fill_msg = &di[dset_idx].dset->shared->dcpl_cache.fill;
+ curr_dset_info->should_fill =
+ (fill_msg->fill_time == H5D_FILL_TIME_ALLOC) ||
+ ((fill_msg->fill_time == H5D_FILL_TIME_IFSET) && fill_msg->fill_defined);
+
+ if (curr_dset_info->should_fill) {
+ hsize_t chunk_dims[H5S_MAX_RANK];
+
+ assert(di[dset_idx].dset->shared->ndims == di[dset_idx].dset->shared->layout.u.chunk.ndims - 1);
+ for (size_t dim_idx = 0; dim_idx < di[dset_idx].dset->shared->layout.u.chunk.ndims - 1; dim_idx++)
+ chunk_dims[dim_idx] = (hsize_t)di[dset_idx].dset->shared->layout.u.chunk.dim[dim_idx];
+
+ /* Get a dataspace for filling chunk memory buffers */
+ if (NULL == (curr_dset_info->fill_space = H5S_create_simple(
+ di[dset_idx].dset->shared->layout.u.chunk.ndims - 1, chunk_dims, NULL)))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to create chunk fill dataspace");
+
+ /* Initialize fill value buffer */
+ if (H5D__fill_init(&curr_dset_info->fb_info, NULL, (H5MM_allocate_t)H5D__chunk_mem_alloc,
+ (void *)&di[dset_idx].dset->shared->dcpl_cache.pline,
+ (H5MM_free_t)H5D__chunk_mem_free,
+ (void *)&di[dset_idx].dset->shared->dcpl_cache.pline,
+ &di[dset_idx].dset->shared->dcpl_cache.fill, di[dset_idx].dset->shared->type,
+ di[dset_idx].dset->shared->type_id, 0, curr_dset_info->file_chunk_size) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "can't initialize fill value buffer");
+
+ curr_dset_info->fb_info_init = true;
+ }
+
+ /*
+ * If the dataset is incrementally allocated and hasn't been written
+ * to yet, the chunk index should be empty. In this case, a collective
+ * read of its chunks is essentially a no-op, so we can avoid that read
+ * later. If all datasets have empty chunk indices, we can skip the
+ * collective read entirely.
+ */
+ if (fill_msg->alloc_time == H5D_ALLOC_TIME_INCR)
+ if (H5D__chunk_index_empty(di[dset_idx].dset, &curr_dset_info->index_empty) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "couldn't determine if chunk index is empty");
+
+ if ((fill_msg->alloc_time != H5D_ALLOC_TIME_INCR) || !curr_dset_info->index_empty)
+ chunk_list->all_dset_indices_empty = false;
+
+ if (curr_dset_info->chunk_idx_info.storage->ops->insert)
+ chunk_list->no_dset_index_insert_methods = false;
- if (local_info_array[i].need_read)
- num_chunks_to_read++;
+ /*
+ * For multi-dataset I/O, use a hash table to keep a mapping between
+ * chunks and the cached info for the dataset that they're in. Otherwise,
+ * we can just use the info object directly if only one dataset is being
+ * worked on.
+ */
+ if (num_dset_infos > 1) {
+ HASH_ADD(hh, chunk_list->dset_info.dset_info_hash_table, dset_oloc_addr, sizeof(haddr_t),
+ curr_dset_info);
+ }
+ else
+ chunk_list->dset_info.single_dset_info = curr_dset_info;
+ curr_dset_info = NULL;
+
+ /*
+ * Now, each rank builds a local list of info about the chunks
+ * they have selected among the chunks in the current dataset
+ */
+
+ /* Set metadata tagging with dataset oheader addr */
+ H5AC_tag(di[dset_idx].dset->oloc.addr, &prev_tag);
+
+ if (H5SL_count(di[dset_idx].layout_io_info.chunk_map->dset_sel_pieces)) {
+ H5SL_node_t *chunk_node;
+ bool filter_partial_edge_chunks;
+
+ /* Determine whether partial edge chunks should be filtered */
+ filter_partial_edge_chunks = !(di[dset_idx].dset->shared->layout.u.chunk.flags &
+ H5O_LAYOUT_CHUNK_DONT_FILTER_PARTIAL_BOUND_CHUNKS);
+
+ chunk_node = H5SL_first(di[dset_idx].layout_io_info.chunk_map->dset_sel_pieces);
+ while (chunk_node) {
+ H5D_piece_info_t *chunk_info;
+ hsize_t select_npoints;
+
+ chunk_info = (H5D_piece_info_t *)H5SL_item(chunk_node);
+ assert(chunk_info->filtered_dset);
+
+ /* Obtain this chunk's address */
+ if (H5D__chunk_lookup(di[dset_idx].dset, chunk_info->scaled, &udata) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "error looking up chunk address");
+
+ /* Initialize rank-local chunk info */
+ local_info_array[buf_idx].chunk_info = chunk_info;
+ local_info_array[buf_idx].chunk_buf_size = 0;
+ local_info_array[buf_idx].num_writers = 0;
+ local_info_array[buf_idx].orig_owner = mpi_rank;
+ local_info_array[buf_idx].new_owner = mpi_rank;
+ local_info_array[buf_idx].buf = NULL;
+
+ select_npoints = H5S_GET_SELECT_NPOINTS(chunk_info->fspace);
+ local_info_array[buf_idx].io_size =
+ (size_t)select_npoints * di[dset_idx].type_info.dst_type_size;
- local_info_array[i].skip_filter_pline = false;
- if (!filter_partial_edge_chunks) {
/*
- * If this is a partial edge chunk and the "don't filter partial edge
- * chunks" flag is set, make sure not to apply filters to the chunk.
+ * Determine whether this chunk will need to be read from the file. If this is
+ * a read operation, the chunk will be read. If this is a write operation, we
+ * generally need to read a filtered chunk from the file before modifying it,
+ * unless the chunk is being fully overwritten.
+ *
+ * TODO: Currently the full overwrite status of a chunk is only obtained on a
+ * per-rank basis. This means that if the total selection in the chunk, as
+ * determined by the combination of selections of all of the ranks interested in
+ * the chunk, covers the entire chunk, the performance optimization of not reading
+ * the chunk from the file is still valid, but is not applied in the current
+ * implementation.
+ *
+ * To implement this case, a few approaches were considered:
+ *
+ * - Keep a running total (distributed to each rank) of the number of chunk
+ * elements selected during chunk redistribution and compare that to the total
+ * number of elements in the chunk once redistribution is finished
+ *
+ * - Process all incoming chunk messages before doing I/O (these are currently
+ * processed AFTER doing I/O), combine the owning rank's selection in a chunk
+ * with the selections received from other ranks and check to see whether that
+ * combined selection covers the entire chunk
+ *
+ * The first approach will be dangerous if the application performs an overlapping
+ * write to a chunk, as the number of selected elements can equal or exceed the
+ * number of elements in the chunk without the whole chunk selection being covered.
+ * While it might be considered erroneous for an application to do an overlapping
+ * write, we don't explicitly disallow it.
+ *
+ * The second approach contains a bit of complexity in that part of the chunk
+ * messages will be needed before doing I/O and part will be needed after doing I/O.
+ * Since modification data from chunk messages can't be applied until after any I/O
+ * is performed (otherwise, we'll overwrite any applied modification data), chunk
+ * messages are currently entirely processed after I/O. However, in order to determine
+ * if a chunk is being fully overwritten, we need the dataspace portion of the chunk
+ * messages before doing I/O. The naive way to do this is to process chunk messages
+ * twice, using just the relevant information from the message before and after I/O.
+ * The better way would be to avoid processing chunk messages twice by extracting (and
+ * keeping around) the dataspace portion of the message before I/O and processing the
+ * rest of the chunk message after I/O. Note that the dataspace portion of each chunk
+ * message is used to correctly apply chunk modification data from the message, so
+ * must be kept around both before and after I/O in this case.
*/
- if (H5D__chunk_is_partial_edge_chunk(di->dset->shared->ndims,
- di->dset->shared->layout.u.chunk.dim, chunk_info->scaled,
- di->dset->shared->curr_dims))
- local_info_array[i].skip_filter_pline = true;
- }
+ if (io_info->op_type == H5D_IO_OP_READ)
+ local_info_array[buf_idx].need_read = true;
+ else {
+ local_info_array[buf_idx].need_read =
+ local_info_array[buf_idx].io_size <
+ (size_t)di[dset_idx].dset->shared->layout.u.chunk.size;
+ }
- /* Initialize the chunk's shared info */
- local_info_array[i].chunk_current = udata.chunk_block;
- local_info_array[i].chunk_new = udata.chunk_block;
+ if (local_info_array[buf_idx].need_read)
+ num_chunks_to_read++;
- /*
- * Check if the list is not in ascending order of offset in the file
- * or has unallocated chunks. In either case, the list should get
- * sorted.
- */
- if (i) {
- haddr_t curr_chunk_offset = local_info_array[i].chunk_current.offset;
- haddr_t prev_chunk_offset = local_info_array[i - 1].chunk_current.offset;
+ local_info_array[buf_idx].skip_filter_pline = false;
+ if (!filter_partial_edge_chunks) {
+ /*
+ * If this is a partial edge chunk and the "don't filter partial edge
+ * chunks" flag is set, make sure not to apply filters to the chunk.
+ */
+ if (H5D__chunk_is_partial_edge_chunk(
+ di[dset_idx].dset->shared->ndims, di[dset_idx].dset->shared->layout.u.chunk.dim,
+ chunk_info->scaled, di[dset_idx].dset->shared->curr_dims))
+ local_info_array[buf_idx].skip_filter_pline = true;
+ }
- if (!H5_addr_defined(prev_chunk_offset) || !H5_addr_defined(curr_chunk_offset) ||
- (curr_chunk_offset < prev_chunk_offset))
- need_sort = true;
+ /* Initialize the chunk's shared info */
+ local_info_array[buf_idx].chunk_current = udata.chunk_block;
+ local_info_array[buf_idx].chunk_new = udata.chunk_block;
+
+ /*
+ * Check if the list is not in ascending order of offset in the file
+ * or has unallocated chunks. In either case, the list should get
+ * sorted.
+ */
+ if (!need_sort && buf_idx) {
+ haddr_t curr_chunk_offset = local_info_array[buf_idx].chunk_current.offset;
+ haddr_t prev_chunk_offset = local_info_array[buf_idx - 1].chunk_current.offset;
+
+ if (!H5_addr_defined(prev_chunk_offset) || !H5_addr_defined(curr_chunk_offset) ||
+ (curr_chunk_offset < prev_chunk_offset))
+ need_sort = true;
+ }
+
+ /* Needed for proper hashing later on */
+ memset(&local_info_array[buf_idx].index_info, 0, sizeof(H5D_chunk_index_info_t));
+
+ /*
+ * Extensible arrays may calculate a chunk's index a little differently
+ * than normal when the dataset's unlimited dimension is not the
+ * slowest-changing dimension, so set the index here based on what the
+ * extensible array code calculated instead of what was calculated
+ * in the chunk file mapping.
+ */
+ if (di[dset_idx].dset->shared->layout.u.chunk.idx_type == H5D_CHUNK_IDX_EARRAY)
+ local_info_array[buf_idx].index_info.chunk_idx = udata.chunk_idx;
+ else
+ local_info_array[buf_idx].index_info.chunk_idx = chunk_info->index;
+
+ assert(H5_addr_defined(di[dset_idx].dset->oloc.addr));
+ local_info_array[buf_idx].index_info.dset_oloc_addr = di[dset_idx].dset->oloc.addr;
+
+ local_info_array[buf_idx].index_info.filter_mask = udata.filter_mask;
+ local_info_array[buf_idx].index_info.need_insert = false;
+
+ buf_idx++;
+
+ chunk_node = H5SL_next(chunk_node);
}
+ }
+ else if (H5F_get_coll_metadata_reads(di[dset_idx].dset->oloc.file)) {
+ hsize_t scaled[H5O_LAYOUT_NDIMS] = {0};
/*
- * Extensible arrays may calculate a chunk's index a little differently
- * than normal when the dataset's unlimited dimension is not the
- * slowest-changing dimension, so set the index here based on what the
- * extensible array code calculated instead of what was calculated
- * in the chunk file mapping.
+ * If this rank has no selection in the dataset and collective
+ * metadata reads are enabled, do a fake lookup of a chunk to
+ * ensure that this rank has the chunk index opened. Otherwise,
+ * only the ranks that had a selection will have opened the
+ * chunk index and they will have done so independently. Therefore,
+ * when ranks with no selection participate in later collective
+ * metadata reads, they will try to open the chunk index collectively
+ * and issues will occur since other ranks won't participate.
+ *
+ * In the future, we should consider having a chunk index "open"
+ * callback that can be used to ensure collectivity between ranks
+ * in a more natural way, but this hack should suffice for now.
*/
- if (di->dset->shared->layout.u.chunk.idx_type == H5D_CHUNK_IDX_EARRAY)
- local_info_array[i].index_info.chunk_idx = udata.chunk_idx;
- else
- local_info_array[i].index_info.chunk_idx = chunk_info->index;
-
- local_info_array[i].index_info.filter_mask = udata.filter_mask;
- local_info_array[i].index_info.need_insert = false;
-
- chunk_node = H5SL_next(chunk_node);
+ if (H5D__chunk_lookup(di[dset_idx].dset, scaled, &udata) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "error looking up chunk address");
}
- /* Ensure the chunk list is sorted in ascending order of offset in the file */
- if (need_sort)
- qsort(local_info_array, num_chunks_selected, sizeof(H5D_filtered_collective_chunk_info_t),
- H5D__cmp_filtered_collective_io_info_entry);
+ /* Reset metadata tagging */
+ H5AC_tag(prev_tag, NULL);
}
- else if (H5F_get_coll_metadata_reads(di->dset->oloc.file)) {
- hsize_t scaled[H5O_LAYOUT_NDIMS] = {0};
- /*
- * If this rank has no selection in the dataset and collective
- * metadata reads are enabled, do a fake lookup of a chunk to
- * ensure that this rank has the chunk index opened. Otherwise,
- * only the ranks that had a selection will have opened the
- * chunk index and they will have done so independently. Therefore,
- * when ranks with no selection participate in later collective
- * metadata reads, they will try to open the chunk index collectively
- * and issues will occur since other ranks won't participate.
- *
- * In the future, we should consider having a chunk index "open"
- * callback that can be used to ensure collectivity between ranks
- * in a more natural way, but this hack should suffice for now.
- */
- if (H5D__chunk_lookup(di->dset, scaled, &udata) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "error looking up chunk address");
- }
+ /* Ensure the chunk list is sorted in ascending order of offset in the file */
+ if (local_info_array && need_sort)
+ qsort(local_info_array, num_chunks_selected, sizeof(H5D_filtered_collective_chunk_info_t),
+ H5D__cmp_filtered_collective_io_info_entry);
chunk_list->chunk_infos = local_info_array;
chunk_list->num_chunk_infos = num_chunks_selected;
@@ -3119,6 +3486,37 @@ H5D__mpio_collective_filtered_chunk_io_setup(const H5D_io_info_t *io_info, const
done:
if (ret_value < 0) {
+ /* Free temporary cached dataset info object */
+ if (curr_dset_info) {
+ if (curr_dset_info->fb_info_init && H5D__fill_term(&curr_dset_info->fb_info) < 0)
+ HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "can't release fill buffer info");
+ if (curr_dset_info->fill_space && H5S_close(curr_dset_info->fill_space) < 0)
+ HDONE_ERROR(H5E_DATASET, H5E_CLOSEERROR, FAIL, "can't close fill space");
+
+ H5MM_free(curr_dset_info);
+ curr_dset_info = NULL;
+
+ if (num_dset_infos == 1)
+ chunk_list->dset_info.single_dset_info = NULL;
+ }
+
+ /* Free resources used by cached dataset info hash table */
+ if (num_dset_infos > 1) {
+ H5D_mpio_filtered_dset_info_t *tmp;
+
+ HASH_ITER(hh, chunk_list->dset_info.dset_info_hash_table, curr_dset_info, tmp)
+ {
+ HASH_DELETE(hh, chunk_list->dset_info.dset_info_hash_table, curr_dset_info);
+ H5MM_free(curr_dset_info);
+ curr_dset_info = NULL;
+ }
+ }
+
+ if (num_dset_infos == 1)
+ chunk_list->dset_info.single_dset_info = NULL;
+ else
+ chunk_list->dset_info.dset_info_hash_table = NULL;
+
H5MM_free(local_info_array);
}
@@ -3158,7 +3556,6 @@ H5D__mpio_redistribute_shared_chunks(H5D_filtered_collective_io_info_t *chunk_li
bool redistribute_on_all_ranks;
size_t *num_chunks_map = NULL;
size_t coll_chunk_list_size = 0;
- size_t i;
int mpi_code;
herr_t ret_value = SUCCEED;
@@ -3186,8 +3583,8 @@ H5D__mpio_redistribute_shared_chunks(H5D_filtered_collective_io_info_t *chunk_li
num_chunks_map, 1, H5_SIZE_T_AS_MPI_TYPE, io_info->comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code)
- for (i = 0; i < (size_t)mpi_size; i++)
- coll_chunk_list_size += num_chunks_map[i];
+ for (int curr_rank = 0; curr_rank < mpi_size; curr_rank++)
+ coll_chunk_list_size += num_chunks_map[curr_rank];
/*
* Determine whether we should perform chunk redistribution on all
@@ -3257,21 +3654,23 @@ done:
*
* - All MPI ranks send their list of selected chunks to the
* ranks involved in chunk redistribution. Then, the
- * involved ranks sort this new list in order of chunk
- * index.
+ * involved ranks sort this new list in order of:
+ *
+ * dataset object header address -> chunk index value ->
+ * original owning MPI rank for chunk
*
* - The involved ranks scan the list looking for matching
- * runs of chunk index values (corresponding to a shared
- * chunk which has been selected by more than one rank in
- * the I/O operation) and for each shared chunk,
- * redistribute the chunk to the MPI rank writing to the
- * chunk which currently has the least amount of chunks
- * assigned to it. This is done by modifying the "new_owner"
- * field in each of the list entries corresponding to that
- * chunk. The involved ranks then re-sort the list in order
- * of original chunk owner so that each rank's section of
- * contributed chunks is contiguous in the collective chunk
- * list.
+ * runs of (dataset object header address, chunk index value)
+ * pairs (corresponding to a shared chunk which has been
+ * selected by more than one rank in the I/O operation) and
+ * for each shared chunk, redistribute the chunk to the MPI
+ * rank writing to the chunk which currently has the least
+ * amount of chunks assigned to it. This is done by modifying
+ * the "new_owner" field in each of the list entries
+ * corresponding to that chunk. The involved ranks then
+ * re-sort the list in order of original chunk owner so that
+ * each rank's section of contributed chunks is contiguous
+ * in the collective chunk list.
*
* - If chunk redistribution occurred on all ranks, each rank
* scans through the collective chunk list to find their
@@ -3293,9 +3692,8 @@ H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chun
{
MPI_Datatype struct_type;
MPI_Datatype packed_type;
- bool struct_type_derived = false;
- bool packed_type_derived = false;
- size_t i;
+ bool struct_type_derived = false;
+ bool packed_type_derived = false;
size_t coll_chunk_list_num_entries = 0;
void *coll_chunk_list = NULL;
int *counts_disps_array = NULL;
@@ -3346,15 +3744,15 @@ H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chun
/* Set the receive counts from the assigned chunks map */
counts_ptr = counts_disps_array;
- for (i = 0; i < (size_t)mpi_size; i++)
- H5_CHECKED_ASSIGN(counts_ptr[i], int, num_chunks_assigned_map[i], size_t);
+ for (int curr_rank = 0; curr_rank < mpi_size; curr_rank++)
+ H5_CHECKED_ASSIGN(counts_ptr[curr_rank], int, num_chunks_assigned_map[curr_rank], size_t);
/* Set the displacements into the receive buffer for the gather operation */
displacements_ptr = &counts_disps_array[mpi_size];
*displacements_ptr = 0;
- for (i = 1; i < (size_t)mpi_size; i++)
- displacements_ptr[i] = displacements_ptr[i - 1] + counts_ptr[i - 1];
+ for (int curr_rank = 1; curr_rank < mpi_size; curr_rank++)
+ displacements_ptr[curr_rank] = displacements_ptr[curr_rank - 1] + counts_ptr[curr_rank - 1];
}
}
@@ -3363,9 +3761,11 @@ H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chun
* necessary for MPI communication
*/
if (H5D__mpio_get_chunk_redistribute_info_types(&packed_type, &packed_type_derived, &struct_type,
- &struct_type_derived) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL,
+ &struct_type_derived) < 0) {
+ /* Push an error, but still participate in collective gather operation */
+ HDONE_ERROR(H5E_DATASET, H5E_CANTGET, FAIL,
"can't create derived datatypes for chunk redistribution info");
+ }
/* Perform gather operation */
if (H5_mpio_gatherv_alloc(chunk_list->chunk_infos, num_chunks_int, struct_type, counts_ptr,
@@ -3389,15 +3789,14 @@ H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chun
if (all_ranks_involved || (mpi_rank == 0)) {
H5D_chunk_redistribute_info_t *chunk_entry;
- hsize_t curr_chunk_idx;
- size_t set_begin_index;
- int num_writers;
- int new_chunk_owner;
/* Clear the mapping from rank value -> number of assigned chunks */
memset(num_chunks_assigned_map, 0, (size_t)mpi_size * sizeof(*num_chunks_assigned_map));
- /* Sort collective chunk list according to chunk index */
+ /*
+ * Sort collective chunk list according to:
+ * dataset object header address -> chunk index value -> original owning MPI rank for chunk
+ */
qsort(coll_chunk_list, coll_chunk_list_num_entries, sizeof(H5D_chunk_redistribute_info_t),
H5D__cmp_chunk_redistribute_info);
@@ -3410,21 +3809,30 @@ H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chun
* chunks).
*/
chunk_entry = &((H5D_chunk_redistribute_info_t *)coll_chunk_list)[0];
- for (i = 0; i < coll_chunk_list_num_entries;) {
+ for (size_t entry_idx = 0; entry_idx < coll_chunk_list_num_entries;) {
+ haddr_t curr_oloc_addr;
+ hsize_t curr_chunk_idx;
+ size_t set_begin_index;
+ bool keep_processing;
+ int num_writers;
+ int new_chunk_owner;
+
/* Set chunk's initial new owner to its original owner */
new_chunk_owner = chunk_entry->orig_owner;
/*
- * Set the current chunk index so we know when we've processed
- * all duplicate entries for a particular shared chunk
+ * Set the current dataset object header address and chunk
+ * index value so we know when we've processed all duplicate
+ * entries for a particular shared chunk
*/
+ curr_oloc_addr = chunk_entry->dset_oloc_addr;
curr_chunk_idx = chunk_entry->chunk_idx;
/* Reset the initial number of writers to this chunk */
num_writers = 0;
/* Set index for the beginning of this section of duplicate chunk entries */
- set_begin_index = i;
+ set_begin_index = entry_idx;
/*
* Process each chunk entry in the set for the current
@@ -3445,13 +3853,21 @@ H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chun
num_writers++;
chunk_entry++;
- } while (++i < coll_chunk_list_num_entries && chunk_entry->chunk_idx == curr_chunk_idx);
+
+ keep_processing =
+ /* Make sure we haven't run out of chunks in the chunk list */
+ (++entry_idx < coll_chunk_list_num_entries) &&
+ /* Make sure the chunk we're looking at is in the same dataset */
+ (H5_addr_eq(chunk_entry->dset_oloc_addr, curr_oloc_addr)) &&
+ /* Make sure the chunk we're looking at is the same chunk */
+ (chunk_entry->chunk_idx == curr_chunk_idx);
+ } while (keep_processing);
/* We should never have more writers to a chunk than the number of MPI ranks */
assert(num_writers <= mpi_size);
/* Set all processed chunk entries' "new_owner" and "num_writers" fields */
- for (; set_begin_index < i; set_begin_index++) {
+ for (; set_begin_index < entry_idx; set_begin_index++) {
H5D_chunk_redistribute_info_t *entry;
entry = &((H5D_chunk_redistribute_info_t *)coll_chunk_list)[set_begin_index];
@@ -3485,29 +3901,32 @@ H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chun
}
if (all_ranks_involved) {
+ size_t entry_idx;
+
/*
* If redistribution occurred on all ranks, search for the section
* in the collective chunk list corresponding to this rank's locally
* selected chunks and update the local list after redistribution.
*/
- for (i = 0; i < coll_chunk_list_num_entries; i++)
- if (mpi_rank == ((H5D_chunk_redistribute_info_t *)coll_chunk_list)[i].orig_owner)
+ for (entry_idx = 0; entry_idx < coll_chunk_list_num_entries; entry_idx++)
+ if (mpi_rank == ((H5D_chunk_redistribute_info_t *)coll_chunk_list)[entry_idx].orig_owner)
break;
- for (size_t j = 0; j < (size_t)num_chunks_int; j++) {
+ for (size_t info_idx = 0; info_idx < (size_t)num_chunks_int; info_idx++) {
H5D_chunk_redistribute_info_t *coll_entry;
- coll_entry = &((H5D_chunk_redistribute_info_t *)coll_chunk_list)[i++];
+ coll_entry = &((H5D_chunk_redistribute_info_t *)coll_chunk_list)[entry_idx++];
- chunk_list->chunk_infos[j].new_owner = coll_entry->new_owner;
- chunk_list->chunk_infos[j].num_writers = coll_entry->num_writers;
+ chunk_list->chunk_infos[info_idx].new_owner = coll_entry->new_owner;
+ chunk_list->chunk_infos[info_idx].num_writers = coll_entry->num_writers;
/*
* Check if the chunk list struct's `num_chunks_to_read` field
* needs to be updated
*/
- if (chunk_list->chunk_infos[j].need_read && (chunk_list->chunk_infos[j].new_owner != mpi_rank)) {
- chunk_list->chunk_infos[j].need_read = false;
+ if (chunk_list->chunk_infos[info_idx].need_read &&
+ (chunk_list->chunk_infos[info_idx].new_owner != mpi_rank)) {
+ chunk_list->chunk_infos[info_idx].need_read = false;
assert(chunk_list->num_chunks_to_read > 0);
chunk_list->num_chunks_to_read--;
@@ -3530,9 +3949,10 @@ H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chun
* their chunk list struct's `num_chunks_to_read` field since it
* may now be out of date.
*/
- for (i = 0; i < chunk_list->num_chunk_infos; i++) {
- if ((chunk_list->chunk_infos[i].new_owner != mpi_rank) && chunk_list->chunk_infos[i].need_read) {
- chunk_list->chunk_infos[i].need_read = false;
+ for (size_t info_idx = 0; info_idx < chunk_list->num_chunk_infos; info_idx++) {
+ if ((chunk_list->chunk_infos[info_idx].new_owner != mpi_rank) &&
+ chunk_list->chunk_infos[info_idx].need_read) {
+ chunk_list->chunk_infos[info_idx].need_read = false;
assert(chunk_list->num_chunks_to_read > 0);
chunk_list->num_chunks_to_read--;
@@ -3597,9 +4017,10 @@ done:
* owned by that rank, the rank sends the data it wishes to
* update the chunk with to the MPI rank that now has
* ownership of that chunk. To do this, it encodes the
- * chunk's index, its selection in the chunk and its
- * modification data into a buffer and then posts a
- * non-blocking MPI_Issend to the owning rank.
+ * chunk's index value, the dataset's object header address
+ * (only for the multi-dataset I/O case), its selection in
+ * the chunk and its modification data into a buffer and
+ * then posts a non-blocking MPI_Issend to the owning rank.
*
* Once this step is complete, all MPI ranks allocate arrays
* to hold chunk message receive buffers and MPI request
@@ -3641,9 +4062,8 @@ done:
*/
static herr_t
H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk_list, H5D_io_info_t *io_info,
- H5D_dset_io_info_t *dset_info, int mpi_rank,
- int H5_ATTR_NDEBUG_UNUSED mpi_size, unsigned char ***chunk_msg_bufs,
- int *chunk_msg_bufs_len)
+ int mpi_rank, int H5_ATTR_NDEBUG_UNUSED mpi_size,
+ unsigned char ***chunk_msg_bufs, int *chunk_msg_bufs_len)
{
H5D_filtered_collective_chunk_info_t *chunk_table = NULL;
H5S_sel_iter_t *mem_iter = NULL;
@@ -3658,8 +4078,8 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk
size_t num_send_requests = 0;
size_t num_recv_requests = 0;
size_t num_msgs_incoming = 0;
+ size_t hash_keylen = 0;
size_t last_assigned_idx;
- size_t i;
int mpi_code;
herr_t ret_value = SUCCEED;
@@ -3667,7 +4087,6 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk
assert(chunk_list);
assert(io_info);
- assert(dset_info);
assert(mpi_size > 1);
assert(chunk_msg_bufs);
assert(chunk_msg_bufs_len);
@@ -3681,6 +4100,9 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk
H5CX_set_libver_bounds(NULL);
if (chunk_list->num_chunk_infos > 0) {
+ hash_keylen = chunk_list->chunk_hash_table_keylen;
+ assert(hash_keylen > 0);
+
/* Allocate a selection iterator for iterating over chunk dataspaces */
if (NULL == (mem_iter = H5FL_MALLOC(H5S_sel_iter_t)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate dataspace selection iterator");
@@ -3712,8 +4134,9 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk
* synchronous sends to send the data this rank is writing to
* the rank that does own the chunk.
*/
- for (i = 0, last_assigned_idx = 0; i < chunk_list->num_chunk_infos; i++) {
- H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[i];
+ last_assigned_idx = 0;
+ for (size_t info_idx = 0; info_idx < chunk_list->num_chunk_infos; info_idx++) {
+ H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[info_idx];
if (mpi_rank == chunk_entry->new_owner) {
num_msgs_incoming += (size_t)(chunk_entry->num_writers - 1);
@@ -3723,19 +4146,24 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk
* does own, since it has sent the necessary data and is no longer
* interested in the chunks it doesn't own.
*/
- chunk_list->chunk_infos[last_assigned_idx] = chunk_list->chunk_infos[i];
+ chunk_list->chunk_infos[last_assigned_idx] = chunk_list->chunk_infos[info_idx];
/*
* Since, at large scale, a chunk's index value may be larger than
* the maximum value that can be stored in an int, we cannot rely
* on using a chunk's index value as the tag for the MPI messages
- * sent/received for a chunk. Therefore, add this chunk to a hash
- * table with the chunk's index as a key so that we can quickly find
- * the chunk when processing chunk messages that were received. The
- * message itself will contain the chunk's index so we can update
- * the correct chunk with the received data.
+ * sent/received for a chunk. Further, to support the multi-dataset
+ * I/O case, we can't rely on being able to distinguish between
+ * chunks by their chunk index value alone since two chunks from
+ * different datasets could have the same chunk index value.
+ * Therefore, add this chunk to a hash table with the dataset's
+ * object header address + the chunk's index value as a key so that
+ * we can quickly find the chunk when processing chunk messages that
+ * were received. The message itself will contain the dataset's
+ * object header address and the chunk's index value so we can
+ * update the correct chunk with the received data.
*/
- HASH_ADD(hh, chunk_table, index_info.chunk_idx, sizeof(hsize_t),
+ HASH_ADD(hh, chunk_table, index_info.chunk_idx, hash_keylen,
&chunk_list->chunk_infos[last_assigned_idx]);
last_assigned_idx++;
@@ -3747,8 +4175,8 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk
size_t mod_data_size = 0;
size_t space_size = 0;
- /* Add the size of the chunk index to the encoded size */
- mod_data_size += sizeof(hsize_t);
+ /* Add the size of the chunk hash table key to the encoded size */
+ mod_data_size += hash_keylen;
/* Determine size of serialized chunk file dataspace */
if (H5S_encode(chunk_info->fspace, &mod_data_p, &space_size) < 0)
@@ -3759,7 +4187,7 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk
iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace);
H5_CHECK_OVERFLOW(iter_nelmts, hsize_t, size_t);
- mod_data_size += (size_t)iter_nelmts * dset_info->type_info.src_type_size;
+ mod_data_size += (size_t)iter_nelmts * chunk_info->dset_info->type_info.src_type_size;
if (NULL == (msg_send_bufs[num_send_requests] = H5MM_malloc(mod_data_size)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL,
@@ -3767,23 +4195,28 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk
mod_data_p = msg_send_bufs[num_send_requests];
- /* Store the chunk's index into the buffer */
- H5MM_memcpy(mod_data_p, &chunk_entry->index_info.chunk_idx, sizeof(hsize_t));
- mod_data_p += sizeof(hsize_t);
+ /*
+ * Add the chunk hash table key (chunk index value + possibly
+ * dataset object header address) into the buffer
+ */
+ H5MM_memcpy(mod_data_p, &chunk_entry->index_info.chunk_idx, hash_keylen);
+ mod_data_p += hash_keylen;
/* Serialize the chunk's file dataspace into the buffer */
if (H5S_encode(chunk_info->fspace, &mod_data_p, &mod_data_size) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to encode dataspace");
/* Initialize iterator for memory selection */
- if (H5S_select_iter_init(mem_iter, chunk_info->mspace, dset_info->type_info.src_type_size,
+ if (H5S_select_iter_init(mem_iter, chunk_info->mspace,
+ chunk_info->dset_info->type_info.src_type_size,
H5S_SEL_ITER_SHARE_WITH_DATASPACE) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL,
"unable to initialize memory selection information");
mem_iter_init = true;
/* Collect the modification data into the buffer */
- if (0 == H5D__gather_mem(dset_info->buf.cvp, mem_iter, (size_t)iter_nelmts, mod_data_p))
+ if (0 ==
+ H5D__gather_mem(chunk_info->dset_info->buf.cvp, mem_iter, (size_t)iter_nelmts, mod_data_p))
HGOTO_ERROR(H5E_IO, H5E_CANTGATHER, FAIL, "couldn't gather from write buffer");
/*
@@ -3925,7 +4358,7 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk
* send buffers used in the non-blocking operations
*/
if (msg_send_bufs) {
- for (i = 0; i < num_send_requests; i++) {
+ for (size_t i = 0; i < num_send_requests; i++) {
if (msg_send_bufs[i])
H5MM_free(msg_send_bufs[i]);
}
@@ -3960,7 +4393,7 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk
/* Set the new number of locally-selected chunks */
chunk_list->num_chunk_infos = last_assigned_idx;
- /* Set chunk hash table pointer for future use */
+ /* Set chunk hash table information for future use */
chunk_list->chunk_hash_table = chunk_table;
/* Return chunk message buffers if any were received */
@@ -3976,19 +4409,19 @@ done:
}
if (num_send_requests) {
- for (i = 0; i < num_send_requests; i++) {
+ for (size_t i = 0; i < num_send_requests; i++) {
MPI_Cancel(&send_requests[i]);
}
}
if (recv_requests) {
- for (i = 0; i < num_recv_requests; i++) {
+ for (size_t i = 0; i < num_recv_requests; i++) {
MPI_Cancel(&recv_requests[i]);
}
}
if (msg_recv_bufs) {
- for (i = 0; i < num_recv_requests; i++) {
+ for (size_t i = 0; i < num_recv_requests; i++) {
H5MM_free(msg_recv_bufs[i]);
}
@@ -4004,7 +4437,7 @@ done:
H5MM_free(send_requests);
if (msg_send_bufs) {
- for (i = 0; i < num_send_requests; i++) {
+ for (size_t i = 0; i < num_send_requests; i++) {
if (msg_send_bufs[i])
H5MM_free(msg_send_bufs[i]);
}
@@ -4040,26 +4473,16 @@ done:
*/
static herr_t
H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chunk_list,
- const H5D_io_info_t *io_info, const H5D_dset_io_info_t *di,
- int mpi_rank)
+ const H5D_io_info_t *io_info, size_t num_dset_infos, int mpi_rank)
{
- H5D_fill_buf_info_t fb_info;
- H5Z_EDC_t err_detect; /* Error detection info */
- H5Z_cb_t filter_cb; /* I/O filter callback function */
- hsize_t file_chunk_size = 0;
- hsize_t iter_nelmts; /* Number of points to iterate over for the chunk IO operation */
- bool should_fill = false;
- bool fb_info_init = false;
- bool index_empty = false;
- H5S_t *fill_space = NULL;
- void *base_read_buf = NULL;
- herr_t ret_value = SUCCEED;
+ H5Z_EDC_t err_detect; /* Error detection info */
+ H5Z_cb_t filter_cb; /* I/O filter callback function */
+ herr_t ret_value = SUCCEED;
FUNC_ENTER_PACKAGE
assert(chunk_list);
assert(io_info);
- assert(di);
#ifdef H5Dmpio_DEBUG
H5D_MPIO_TRACE_ENTER(mpi_rank);
@@ -4068,22 +4491,6 @@ H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chun
(void)mpi_rank;
#endif
- if (chunk_list->num_chunk_infos) {
- /* Retrieve filter settings from API context */
- if (H5CX_get_err_detect(&err_detect) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get error detection info");
- if (H5CX_get_filter_cb(&filter_cb) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get I/O filter callback function");
-
- /* Set size of full chunks in dataset */
- file_chunk_size = di->dset->shared->layout.u.chunk.size;
-
- /* Determine if fill values should be "read" for unallocated chunks */
- should_fill = (di->dset->shared->dcpl_cache.fill.fill_time == H5D_FILL_TIME_ALLOC) ||
- ((di->dset->shared->dcpl_cache.fill.fill_time == H5D_FILL_TIME_IFSET) &&
- di->dset->shared->dcpl_cache.fill.fill_defined);
- }
-
/*
* Allocate memory buffers for all chunks being read. Chunk data buffers are of
* the largest size between the chunk's current filtered size and the chunk's true
@@ -4097,29 +4504,61 @@ H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chun
* size; reading into a (smaller) buffer of size equal to the unfiltered
* chunk size would of course be bad.
*/
- for (size_t i = 0; i < chunk_list->num_chunk_infos; i++) {
- H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[i];
+ for (size_t info_idx = 0; info_idx < chunk_list->num_chunk_infos; info_idx++) {
+ H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[info_idx];
+ H5D_mpio_filtered_dset_info_t *cached_dset_info;
+ hsize_t file_chunk_size;
assert(chunk_entry->need_read);
+ /* Find the cached dataset info for the dataset this chunk is in */
+ if (num_dset_infos > 1) {
+ HASH_FIND(hh, chunk_list->dset_info.dset_info_hash_table, &chunk_entry->index_info.dset_oloc_addr,
+ sizeof(haddr_t), cached_dset_info);
+ if (cached_dset_info == NULL) {
+ if (chunk_list->all_dset_indices_empty)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTFIND, FAIL, "unable to find cached dataset info entry");
+ else {
+ /* Push an error, but participate in collective read */
+ HDONE_ERROR(H5E_DATASET, H5E_CANTFIND, FAIL, "unable to find cached dataset info entry");
+ break;
+ }
+ }
+ }
+ else
+ cached_dset_info = chunk_list->dset_info.single_dset_info;
+ assert(cached_dset_info);
+
+ file_chunk_size = cached_dset_info->file_chunk_size;
+
chunk_entry->chunk_buf_size = MAX(chunk_entry->chunk_current.length, file_chunk_size);
if (NULL == (chunk_entry->buf = H5MM_malloc(chunk_entry->chunk_buf_size))) {
- /* Push an error, but participate in collective read */
- HDONE_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer");
- break;
+ if (chunk_list->all_dset_indices_empty)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer");
+ else {
+ /* Push an error, but participate in collective read */
+ HDONE_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer");
+ break;
+ }
}
/*
- * Check if chunk is currently allocated. If not, don't try to
- * read it from the file. Instead, just fill the chunk buffer
- * with the fill value if necessary.
+ * Check whether the chunk needs to be read from the file, based
+ * on whether the dataset's chunk index is empty or the chunk has
+ * a defined address in the file. If the chunk doesn't need to be
+ * read from the file, just fill the chunk buffer with the fill
+ * value if necessary.
*/
- if (H5_addr_defined(chunk_entry->chunk_current.offset)) {
- /* Set first read buffer */
- if (!base_read_buf)
- base_read_buf = chunk_entry->buf;
+ if (cached_dset_info->index_empty || !H5_addr_defined(chunk_entry->chunk_current.offset)) {
+ chunk_entry->need_read = false;
+ /* Update field keeping track of number of chunks to read */
+ assert(chunk_list->num_chunks_to_read > 0);
+ chunk_list->num_chunks_to_read--;
+ }
+
+ if (chunk_entry->need_read) {
/* Set chunk's new length for eventual filter pipeline calls */
if (chunk_entry->skip_filter_pline)
chunk_entry->chunk_new.length = file_chunk_size;
@@ -4127,77 +4566,58 @@ H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chun
chunk_entry->chunk_new.length = chunk_entry->chunk_current.length;
}
else {
- chunk_entry->need_read = false;
-
- /* Update field keeping track of number of chunks to read */
- assert(chunk_list->num_chunks_to_read > 0);
- chunk_list->num_chunks_to_read--;
-
/* Set chunk's new length for eventual filter pipeline calls */
chunk_entry->chunk_new.length = file_chunk_size;
- if (should_fill) {
- /* Initialize fill value buffer if not already initialized */
- if (!fb_info_init) {
- hsize_t chunk_dims[H5S_MAX_RANK];
-
- assert(di->dset->shared->ndims == di->dset->shared->layout.u.chunk.ndims - 1);
- for (size_t j = 0; j < di->dset->shared->layout.u.chunk.ndims - 1; j++)
- chunk_dims[j] = (hsize_t)di->dset->shared->layout.u.chunk.dim[j];
-
- /* Get a dataspace for filling chunk memory buffers */
- if (NULL == (fill_space = H5S_create_simple(di->dset->shared->layout.u.chunk.ndims - 1,
- chunk_dims, NULL)))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to create chunk fill dataspace");
-
- /* Initialize fill value buffer */
- if (H5D__fill_init(
- &fb_info, NULL, (H5MM_allocate_t)H5D__chunk_mem_alloc,
- (void *)&di->dset->shared->dcpl_cache.pline, (H5MM_free_t)H5D__chunk_mem_free,
- (void *)&di->dset->shared->dcpl_cache.pline, &di->dset->shared->dcpl_cache.fill,
- di->dset->shared->type, di->dset->shared->type_id, 0, file_chunk_size) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "can't initialize fill value buffer");
-
- fb_info_init = true;
- }
+ /* Determine if fill values should be "read" for this unallocated chunk */
+ if (cached_dset_info->should_fill) {
+ assert(cached_dset_info->fb_info_init);
+ assert(cached_dset_info->fb_info.fill_buf);
/* Write fill value to memory buffer */
- assert(fb_info.fill_buf);
- if (H5D__fill(fb_info.fill_buf, di->dset->shared->type, chunk_entry->buf,
- di->type_info.mem_type, fill_space) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL,
- "couldn't fill chunk buffer with fill value");
+ if (H5D__fill(cached_dset_info->fb_info.fill_buf,
+ cached_dset_info->dset_io_info->type_info.dset_type, chunk_entry->buf,
+ cached_dset_info->dset_io_info->type_info.mem_type,
+ cached_dset_info->fill_space) < 0) {
+ if (chunk_list->all_dset_indices_empty)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL,
+ "couldn't fill chunk buffer with fill value");
+ else {
+ /* Push an error, but participate in collective read */
+ HDONE_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL,
+ "couldn't fill chunk buffer with fill value");
+ break;
+ }
+ }
}
}
}
- /*
- * If dataset is incrementally allocated and hasn't been written to
- * yet, the chunk index should be empty. In this case, a collective
- * read of chunks is essentially a no-op, so avoid it here.
- */
- index_empty = false;
- if (di->dset->shared->dcpl_cache.fill.alloc_time == H5D_ALLOC_TIME_INCR)
- if (H5D__chunk_index_empty(di->dset, &index_empty) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "couldn't determine if chunk index is empty");
-
- if (!index_empty) {
- /* Perform collective vector read */
+ /* Perform collective vector read if necessary */
+ if (!chunk_list->all_dset_indices_empty)
if (H5D__mpio_collective_filtered_vec_io(chunk_list, io_info->f_sh, H5D_IO_OP_READ) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't perform vector I/O on filtered chunks");
+
+ if (chunk_list->num_chunk_infos) {
+ /* Retrieve filter settings from API context */
+ if (H5CX_get_err_detect(&err_detect) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get error detection info");
+ if (H5CX_get_filter_cb(&filter_cb) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get I/O filter callback function");
}
/*
* Iterate through all the read chunks, unfiltering them and scattering their
* data out to the application's read buffer.
*/
- for (size_t i = 0; i < chunk_list->num_chunk_infos; i++) {
- H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[i];
+ for (size_t info_idx = 0; info_idx < chunk_list->num_chunk_infos; info_idx++) {
+ H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[info_idx];
H5D_piece_info_t *chunk_info = chunk_entry->chunk_info;
+ hsize_t iter_nelmts;
/* Unfilter the chunk, unless we didn't read it from the file */
if (chunk_entry->need_read && !chunk_entry->skip_filter_pline) {
- if (H5Z_pipeline(&di->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE,
+ if (H5Z_pipeline(&chunk_info->dset_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE,
&(chunk_entry->index_info.filter_mask), err_detect, filter_cb,
(size_t *)&chunk_entry->chunk_new.length, &chunk_entry->chunk_buf_size,
&chunk_entry->buf) < 0)
@@ -4207,26 +4627,21 @@ H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chun
/* Scatter the chunk data to the read buffer */
iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->fspace);
- if (H5D_select_io_mem(di->buf.vp, chunk_info->mspace, chunk_entry->buf, chunk_info->fspace,
- di->type_info.src_type_size, (size_t)iter_nelmts) < 0)
+ if (H5D_select_io_mem(chunk_info->dset_info->buf.vp, chunk_info->mspace, chunk_entry->buf,
+ chunk_info->fspace, chunk_info->dset_info->type_info.src_type_size,
+ (size_t)iter_nelmts) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't copy chunk data to read buffer");
}
done:
/* Free all resources used by entries in the chunk list */
- for (size_t i = 0; i < chunk_list->num_chunk_infos; i++) {
- if (chunk_list->chunk_infos[i].buf) {
- H5MM_free(chunk_list->chunk_infos[i].buf);
- chunk_list->chunk_infos[i].buf = NULL;
+ for (size_t info_idx = 0; info_idx < chunk_list->num_chunk_infos; info_idx++) {
+ if (chunk_list->chunk_infos[info_idx].buf) {
+ H5MM_free(chunk_list->chunk_infos[info_idx].buf);
+ chunk_list->chunk_infos[info_idx].buf = NULL;
}
}
- /* Release the fill buffer info, if it's been initialized */
- if (fb_info_init && H5D__fill_term(&fb_info) < 0)
- HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "Can't release fill buffer info");
- if (fill_space && (H5S_close(fill_space) < 0))
- HDONE_ERROR(H5E_DATASET, H5E_CLOSEERROR, FAIL, "can't close fill space");
-
#ifdef H5Dmpio_DEBUG
H5D_MPIO_TIME_STOP(mpi_rank);
H5D_MPIO_TRACE_EXIT(mpi_rank);
@@ -4250,58 +4665,27 @@ done:
static herr_t
H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *chunk_list,
unsigned char **chunk_msg_bufs, int chunk_msg_bufs_len,
- const H5D_io_info_t *io_info, const H5D_dset_io_info_t *di,
- int H5_ATTR_NDEBUG_UNUSED mpi_rank)
+ const H5D_io_info_t *io_info, size_t num_dset_infos, int mpi_rank)
{
- const H5D_type_info_t *type_info = NULL;
- H5D_fill_buf_info_t fb_info;
- H5S_sel_iter_t *sel_iter = NULL; /* Dataspace selection iterator for H5D__scatter_mem */
- H5Z_EDC_t err_detect; /* Error detection info */
- H5Z_cb_t filter_cb; /* I/O filter callback function */
- hsize_t file_chunk_size = 0;
- hsize_t iter_nelmts; /* Number of points to iterate over for the chunk IO operation */
- bool should_fill = false;
- bool fb_info_init = false;
- bool sel_iter_init = false;
- bool index_empty = false;
- size_t i;
- H5S_t *dataspace = NULL;
- H5S_t *fill_space = NULL;
- void *base_read_buf = NULL;
- herr_t ret_value = SUCCEED;
+ H5S_sel_iter_t *sel_iter = NULL; /* Dataspace selection iterator for H5D__scatter_mem */
+ H5Z_EDC_t err_detect; /* Error detection info */
+ H5Z_cb_t filter_cb; /* I/O filter callback function */
+ uint8_t *key_buf = NULL;
+ H5S_t *dataspace = NULL;
+ bool sel_iter_init = false;
+ herr_t ret_value = SUCCEED;
FUNC_ENTER_PACKAGE
assert(chunk_list);
assert((chunk_msg_bufs && chunk_list->chunk_hash_table) || 0 == chunk_msg_bufs_len);
assert(io_info);
- assert(di);
#ifdef H5Dmpio_DEBUG
H5D_MPIO_TRACE_ENTER(mpi_rank);
H5D_MPIO_TIME_START(mpi_rank, "Filtered collective chunk update");
#endif
- /* Set convenience pointers */
- type_info = &(di->type_info);
- assert(type_info);
-
- if (chunk_list->num_chunk_infos > 0) {
- /* Retrieve filter settings from API context */
- if (H5CX_get_err_detect(&err_detect) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get error detection info");
- if (H5CX_get_filter_cb(&filter_cb) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get I/O filter callback function");
-
- /* Set size of full chunks in dataset */
- file_chunk_size = di->dset->shared->layout.u.chunk.size;
-
- /* Determine if fill values should be written to chunks */
- should_fill = (di->dset->shared->dcpl_cache.fill.fill_time == H5D_FILL_TIME_ALLOC) ||
- ((di->dset->shared->dcpl_cache.fill.fill_time == H5D_FILL_TIME_IFSET) &&
- di->dset->shared->dcpl_cache.fill.fill_defined);
- }
-
/*
* Allocate memory buffers for all owned chunks. Chunk data buffers are of the
* largest size between the chunk's current filtered size and the chunk's true
@@ -4321,11 +4705,33 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch
* size; reading into a (smaller) buffer of size equal to the unfiltered
* chunk size would of course be bad.
*/
- for (i = 0; i < chunk_list->num_chunk_infos; i++) {
- H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[i];
+ for (size_t info_idx = 0; info_idx < chunk_list->num_chunk_infos; info_idx++) {
+ H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[info_idx];
+ H5D_mpio_filtered_dset_info_t *cached_dset_info;
+ hsize_t file_chunk_size;
assert(mpi_rank == chunk_entry->new_owner);
+ /* Find the cached dataset info for the dataset this chunk is in */
+ if (num_dset_infos > 1) {
+ HASH_FIND(hh, chunk_list->dset_info.dset_info_hash_table, &chunk_entry->index_info.dset_oloc_addr,
+ sizeof(haddr_t), cached_dset_info);
+ if (cached_dset_info == NULL) {
+ if (chunk_list->all_dset_indices_empty)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTFIND, FAIL, "unable to find cached dataset info entry");
+ else {
+ /* Push an error, but participate in collective read */
+ HDONE_ERROR(H5E_DATASET, H5E_CANTFIND, FAIL, "unable to find cached dataset info entry");
+ break;
+ }
+ }
+ }
+ else
+ cached_dset_info = chunk_list->dset_info.single_dset_info;
+ assert(cached_dset_info);
+
+ file_chunk_size = cached_dset_info->file_chunk_size;
+
chunk_entry->chunk_buf_size = MAX(chunk_entry->chunk_current.length, file_chunk_size);
/*
@@ -4333,29 +4739,41 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch
* out fill values to it, make sure to 0-fill its memory buffer
* so we don't use uninitialized memory.
*/
- if (!H5_addr_defined(chunk_entry->chunk_current.offset) && !should_fill)
+ if (!H5_addr_defined(chunk_entry->chunk_current.offset) && !cached_dset_info->should_fill)
chunk_entry->buf = H5MM_calloc(chunk_entry->chunk_buf_size);
else
chunk_entry->buf = H5MM_malloc(chunk_entry->chunk_buf_size);
if (NULL == chunk_entry->buf) {
- /* Push an error, but participate in collective read */
- HDONE_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer");
- break;
+ if (chunk_list->all_dset_indices_empty)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer");
+ else {
+ /* Push an error, but participate in collective read */
+ HDONE_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer");
+ break;
+ }
}
- /* Set chunk's new length for eventual filter pipeline calls */
- if (chunk_entry->need_read) {
+ if (!chunk_entry->need_read)
+ /* Set chunk's new length for eventual filter pipeline calls */
+ chunk_entry->chunk_new.length = file_chunk_size;
+ else {
/*
- * Check if chunk is currently allocated. If not, don't try to
- * read it from the file. Instead, just fill the chunk buffer
- * with the fill value if fill values are to be written.
+ * Check whether the chunk needs to be read from the file, based
+ * on whether the dataset's chunk index is empty or the chunk has
+ * a defined address in the file. If the chunk doesn't need to be
+ * read from the file, just fill the chunk buffer with the fill
+ * value if necessary.
*/
- if (H5_addr_defined(chunk_entry->chunk_current.offset)) {
- /* Set first read buffer */
- if (!base_read_buf)
- base_read_buf = chunk_entry->buf;
+ if (cached_dset_info->index_empty || !H5_addr_defined(chunk_entry->chunk_current.offset)) {
+ chunk_entry->need_read = false;
+ /* Update field keeping track of number of chunks to read */
+ assert(chunk_list->num_chunks_to_read > 0);
+ chunk_list->num_chunks_to_read--;
+ }
+
+ if (chunk_entry->need_read) {
/* Set chunk's new length for eventual filter pipeline calls */
if (chunk_entry->skip_filter_pline)
chunk_entry->chunk_new.length = file_chunk_size;
@@ -4363,81 +4781,57 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch
chunk_entry->chunk_new.length = chunk_entry->chunk_current.length;
}
else {
- chunk_entry->need_read = false;
-
- /* Update field keeping track of number of chunks to read */
- assert(chunk_list->num_chunks_to_read > 0);
- chunk_list->num_chunks_to_read--;
-
/* Set chunk's new length for eventual filter pipeline calls */
chunk_entry->chunk_new.length = file_chunk_size;
- if (should_fill) {
- /* Initialize fill value buffer if not already initialized */
- if (!fb_info_init) {
- hsize_t chunk_dims[H5S_MAX_RANK];
-
- assert(di->dset->shared->ndims == di->dset->shared->layout.u.chunk.ndims - 1);
- for (size_t j = 0; j < di->dset->shared->layout.u.chunk.ndims - 1; j++)
- chunk_dims[j] = (hsize_t)di->dset->shared->layout.u.chunk.dim[j];
+ /* Determine if fill values should be "read" for this unallocated chunk */
+ if (cached_dset_info->should_fill) {
+ assert(cached_dset_info->fb_info_init);
+ assert(cached_dset_info->fb_info.fill_buf);
- /* Get a dataspace for filling chunk memory buffers */
- if (NULL == (fill_space = H5S_create_simple(
- di->dset->shared->layout.u.chunk.ndims - 1, chunk_dims, NULL)))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL,
- "unable to create chunk fill dataspace");
-
- /* Initialize fill value buffer */
- if (H5D__fill_init(&fb_info, NULL, (H5MM_allocate_t)H5D__chunk_mem_alloc,
- (void *)&di->dset->shared->dcpl_cache.pline,
- (H5MM_free_t)H5D__chunk_mem_free,
- (void *)&di->dset->shared->dcpl_cache.pline,
- &di->dset->shared->dcpl_cache.fill, di->dset->shared->type,
- di->dset->shared->type_id, 0, file_chunk_size) < 0)
+ /* Write fill value to memory buffer */
+ if (H5D__fill(cached_dset_info->fb_info.fill_buf,
+ cached_dset_info->dset_io_info->type_info.dset_type, chunk_entry->buf,
+ cached_dset_info->dset_io_info->type_info.mem_type,
+ cached_dset_info->fill_space) < 0) {
+ if (chunk_list->all_dset_indices_empty)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL,
- "can't initialize fill value buffer");
-
- fb_info_init = true;
+ "couldn't fill chunk buffer with fill value");
+ else {
+ /* Push an error, but participate in collective read */
+ HDONE_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL,
+ "couldn't fill chunk buffer with fill value");
+ break;
+ }
}
-
- /* Write fill value to memory buffer */
- assert(fb_info.fill_buf);
- if (H5D__fill(fb_info.fill_buf, di->dset->shared->type, chunk_entry->buf,
- type_info->mem_type, fill_space) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL,
- "couldn't fill chunk buffer with fill value");
}
}
}
- else
- chunk_entry->chunk_new.length = file_chunk_size;
}
- /*
- * If dataset is incrementally allocated and hasn't been written to
- * yet, the chunk index should be empty. In this case, a collective
- * read of chunks is essentially a no-op, so avoid it here.
- */
- index_empty = false;
- if (di->dset->shared->dcpl_cache.fill.alloc_time == H5D_ALLOC_TIME_INCR)
- if (H5D__chunk_index_empty(di->dset, &index_empty) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "couldn't determine if chunk index is empty");
-
- if (!index_empty) {
- /* Perform collective vector read */
+ /* Perform collective vector read if necessary */
+ if (!chunk_list->all_dset_indices_empty)
if (H5D__mpio_collective_filtered_vec_io(chunk_list, io_info->f_sh, H5D_IO_OP_READ) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't perform vector I/O on filtered chunks");
- }
/*
* Now that all owned chunks have been read, update the chunks
* with modification data from the owning rank and other ranks.
*/
+ if (chunk_list->num_chunk_infos > 0) {
+ /* Retrieve filter settings from API context */
+ if (H5CX_get_err_detect(&err_detect) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get error detection info");
+ if (H5CX_get_filter_cb(&filter_cb) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get I/O filter callback function");
+ }
+
/* Process all chunks with data from the owning rank first */
- for (i = 0; i < chunk_list->num_chunk_infos; i++) {
- H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[i];
+ for (size_t info_idx = 0; info_idx < chunk_list->num_chunk_infos; info_idx++) {
+ H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[info_idx];
H5D_piece_info_t *chunk_info = chunk_entry->chunk_info;
+ hsize_t iter_nelmts;
assert(mpi_rank == chunk_entry->new_owner);
@@ -4446,7 +4840,7 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch
* the file, so we need to unfilter it
*/
if (chunk_entry->need_read && !chunk_entry->skip_filter_pline) {
- if (H5Z_pipeline(&di->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE,
+ if (H5Z_pipeline(&chunk_info->dset_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE,
&(chunk_entry->index_info.filter_mask), err_detect, filter_cb,
(size_t *)&chunk_entry->chunk_new.length, &chunk_entry->chunk_buf_size,
&chunk_entry->buf) < 0)
@@ -4455,28 +4849,35 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch
iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace);
- if (H5D_select_io_mem(chunk_entry->buf, chunk_info->fspace, di->buf.cvp, chunk_info->mspace,
- type_info->dst_type_size, (size_t)iter_nelmts) < 0)
+ if (H5D_select_io_mem(chunk_entry->buf, chunk_info->fspace, chunk_info->dset_info->buf.cvp,
+ chunk_info->mspace, chunk_info->dset_info->type_info.dst_type_size,
+ (size_t)iter_nelmts) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't copy chunk data to write buffer");
}
/* Allocate iterator for memory selection */
- if (NULL == (sel_iter = H5FL_MALLOC(H5S_sel_iter_t)))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator");
+ if (chunk_msg_bufs_len > 0) {
+ assert(chunk_list->chunk_hash_table_keylen > 0);
+ if (NULL == (key_buf = H5MM_malloc(chunk_list->chunk_hash_table_keylen)))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate hash table key buffer");
+
+ if (NULL == (sel_iter = H5FL_MALLOC(H5S_sel_iter_t)))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator");
+ }
/* Now process all received chunk message buffers */
- for (i = 0; i < (size_t)chunk_msg_bufs_len; i++) {
+ for (size_t buf_idx = 0; buf_idx < (size_t)chunk_msg_bufs_len; buf_idx++) {
H5D_filtered_collective_chunk_info_t *chunk_entry = NULL;
- const unsigned char *msg_ptr = chunk_msg_bufs[i];
- hsize_t chunk_idx;
+ const unsigned char *msg_ptr = chunk_msg_bufs[buf_idx];
if (msg_ptr) {
- /* Retrieve the chunk's index value */
- H5MM_memcpy(&chunk_idx, msg_ptr, sizeof(hsize_t));
- msg_ptr += sizeof(hsize_t);
+ /* Retrieve the chunk hash table key from the chunk message buffer */
+ H5MM_memcpy(key_buf, msg_ptr, chunk_list->chunk_hash_table_keylen);
+ msg_ptr += chunk_list->chunk_hash_table_keylen;
- /* Find the chunk entry according to its chunk index */
- HASH_FIND(hh, chunk_list->chunk_hash_table, &chunk_idx, sizeof(hsize_t), chunk_entry);
+ /* Find the chunk entry according to its chunk hash table key */
+ HASH_FIND(hh, chunk_list->chunk_hash_table, key_buf, chunk_list->chunk_hash_table_keylen,
+ chunk_entry);
if (chunk_entry == NULL)
HGOTO_ERROR(H5E_DATASET, H5E_CANTFIND, FAIL, "unable to find chunk entry");
if (mpi_rank != chunk_entry->new_owner)
@@ -4491,11 +4892,14 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch
if (!chunk_entry->buf)
continue;
else {
+ hsize_t iter_nelmts;
+
/* Decode the chunk file dataspace from the message */
if (NULL == (dataspace = H5S_decode(&msg_ptr)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTDECODE, FAIL, "unable to decode dataspace");
- if (H5S_select_iter_init(sel_iter, dataspace, type_info->dst_type_size,
+ if (H5S_select_iter_init(sel_iter, dataspace,
+ chunk_entry->chunk_info->dset_info->type_info.dst_type_size,
H5S_SEL_ITER_SHARE_WITH_DATASPACE) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL,
"unable to initialize memory selection information");
@@ -4517,50 +4921,49 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch
dataspace = NULL;
}
- H5MM_free(chunk_msg_bufs[i]);
- chunk_msg_bufs[i] = NULL;
+ H5MM_free(chunk_msg_bufs[buf_idx]);
+ chunk_msg_bufs[buf_idx] = NULL;
}
}
}
/* Finally, filter all the chunks */
- for (i = 0; i < chunk_list->num_chunk_infos; i++) {
- if (!chunk_list->chunk_infos[i].skip_filter_pline) {
- if (H5Z_pipeline(&di->dset->shared->dcpl_cache.pline, 0,
- &(chunk_list->chunk_infos[i].index_info.filter_mask), err_detect, filter_cb,
- (size_t *)&chunk_list->chunk_infos[i].chunk_new.length,
- &chunk_list->chunk_infos[i].chunk_buf_size, &chunk_list->chunk_infos[i].buf) < 0)
+ for (size_t info_idx = 0; info_idx < chunk_list->num_chunk_infos; info_idx++) {
+ if (!chunk_list->chunk_infos[info_idx].skip_filter_pline) {
+ if (H5Z_pipeline(
+ &chunk_list->chunk_infos[info_idx].chunk_info->dset_info->dset->shared->dcpl_cache.pline,
+ 0, &(chunk_list->chunk_infos[info_idx].index_info.filter_mask), err_detect, filter_cb,
+ (size_t *)&chunk_list->chunk_infos[info_idx].chunk_new.length,
+ &chunk_list->chunk_infos[info_idx].chunk_buf_size,
+ &chunk_list->chunk_infos[info_idx].buf) < 0)
HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, FAIL, "output pipeline failed");
}
#if H5_SIZEOF_SIZE_T > 4
/* Check for the chunk expanding too much to encode in a 32-bit value */
- if (chunk_list->chunk_infos[i].chunk_new.length > ((size_t)0xffffffff))
+ if (chunk_list->chunk_infos[info_idx].chunk_new.length > ((size_t)0xffffffff))
HGOTO_ERROR(H5E_DATASET, H5E_BADRANGE, FAIL, "chunk too large for 32-bit length");
#endif
}
done:
+ if (dataspace && (H5S_close(dataspace) < 0))
+ HDONE_ERROR(H5E_DATASPACE, H5E_CANTFREE, FAIL, "can't close dataspace");
+
if (sel_iter) {
if (sel_iter_init && H5S_SELECT_ITER_RELEASE(sel_iter) < 0)
HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator");
sel_iter = H5FL_FREE(H5S_sel_iter_t, sel_iter);
}
- if (dataspace && (H5S_close(dataspace) < 0))
- HDONE_ERROR(H5E_DATASPACE, H5E_CANTFREE, FAIL, "can't close dataspace");
- if (fill_space && (H5S_close(fill_space) < 0))
- HDONE_ERROR(H5E_DATASET, H5E_CLOSEERROR, FAIL, "can't close fill space");
- /* Release the fill buffer info, if it's been initialized */
- if (fb_info_init && H5D__fill_term(&fb_info) < 0)
- HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "Can't release fill buffer info");
+ H5MM_free(key_buf);
/* On failure, try to free all resources used by entries in the chunk list */
if (ret_value < 0) {
- for (i = 0; i < chunk_list->num_chunk_infos; i++) {
- if (chunk_list->chunk_infos[i].buf) {
- H5MM_free(chunk_list->chunk_infos[i].buf);
- chunk_list->chunk_infos[i].buf = NULL;
+ for (size_t info_idx = 0; info_idx < chunk_list->num_chunk_infos; info_idx++) {
+ if (chunk_list->chunk_infos[info_idx].buf) {
+ H5MM_free(chunk_list->chunk_infos[info_idx].buf);
+ chunk_list->chunk_infos[info_idx].buf = NULL;
}
}
}
@@ -4589,7 +4992,7 @@ done:
static herr_t
H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t *chunk_list,
size_t *num_chunks_assigned_map, H5D_io_info_t *io_info,
- H5D_chk_idx_info_t *idx_info, int mpi_rank, int mpi_size)
+ size_t num_dset_infos, int mpi_rank, int mpi_size)
{
H5D_chunk_alloc_info_t *collective_list = NULL;
MPI_Datatype send_type;
@@ -4599,11 +5002,10 @@ H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t
bool need_sort = false;
size_t collective_num_entries = 0;
size_t num_local_chunks_processed = 0;
- size_t i;
- void *gathered_array = NULL;
- int *counts_disps_array = NULL;
- int *counts_ptr = NULL;
- int *displacements_ptr = NULL;
+ void *gathered_array = NULL;
+ int *counts_disps_array = NULL;
+ int *counts_ptr = NULL;
+ int *displacements_ptr = NULL;
int mpi_code;
herr_t ret_value = SUCCEED;
@@ -4611,8 +5013,6 @@ H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t
assert(chunk_list);
assert(io_info);
- assert(idx_info);
- assert(idx_info->storage->idx_type != H5D_CHUNK_IDX_NONE);
#ifdef H5Dmpio_DEBUG
H5D_MPIO_TRACE_ENTER(mpi_rank);
@@ -4651,15 +5051,15 @@ H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t
/* Set the receive counts from the assigned chunks map */
counts_ptr = counts_disps_array;
- for (i = 0; i < (size_t)mpi_size; i++)
- H5_CHECKED_ASSIGN(counts_ptr[i], int, num_chunks_assigned_map[i], size_t);
+ for (int curr_rank = 0; curr_rank < mpi_size; curr_rank++)
+ H5_CHECKED_ASSIGN(counts_ptr[curr_rank], int, num_chunks_assigned_map[curr_rank], size_t);
/* Set the displacements into the receive buffer for the gather operation */
displacements_ptr = &counts_disps_array[mpi_size];
*displacements_ptr = 0;
- for (i = 1; i < (size_t)mpi_size; i++)
- displacements_ptr[i] = displacements_ptr[i - 1] + counts_ptr[i - 1];
+ for (int curr_rank = 1; curr_rank < mpi_size; curr_rank++)
+ displacements_ptr[curr_rank] = displacements_ptr[curr_rank - 1] + counts_ptr[curr_rank - 1];
}
/* Perform gather operation */
@@ -4685,14 +5085,27 @@ H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t
}
/* Collectively re-allocate the modified chunks (from each rank) in the file */
- collective_list = (H5D_chunk_alloc_info_t *)gathered_array;
- for (i = 0, num_local_chunks_processed = 0; i < collective_num_entries; i++) {
- H5D_chunk_alloc_info_t *coll_entry = &collective_list[i];
- bool need_insert;
- bool update_local_chunk;
-
- if (H5D__chunk_file_alloc(idx_info, &coll_entry->chunk_current, &coll_entry->chunk_new, &need_insert,
- NULL) < 0)
+ collective_list = (H5D_chunk_alloc_info_t *)gathered_array;
+ num_local_chunks_processed = 0;
+ for (size_t entry_idx = 0; entry_idx < collective_num_entries; entry_idx++) {
+ H5D_mpio_filtered_dset_info_t *cached_dset_info;
+ H5D_chunk_alloc_info_t *coll_entry = &collective_list[entry_idx];
+ bool need_insert;
+ bool update_local_chunk;
+
+ /* Find the cached dataset info for the dataset this chunk is in */
+ if (num_dset_infos > 1) {
+ HASH_FIND(hh, chunk_list->dset_info.dset_info_hash_table, &coll_entry->dset_oloc_addr,
+ sizeof(haddr_t), cached_dset_info);
+ if (cached_dset_info == NULL)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTFIND, FAIL, "unable to find cached dataset info entry");
+ }
+ else
+ cached_dset_info = chunk_list->dset_info.single_dset_info;
+ assert(cached_dset_info);
+
+ if (H5D__chunk_file_alloc(&cached_dset_info->chunk_idx_info, &coll_entry->chunk_current,
+ &coll_entry->chunk_new, &need_insert, NULL) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk");
/*
@@ -4700,9 +5113,12 @@ H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t
* rank, make sure to update the chunk entry in the local
* chunk list
*/
- update_local_chunk = (num_local_chunks_processed < chunk_list->num_chunk_infos) &&
- (coll_entry->chunk_idx ==
- chunk_list->chunk_infos[num_local_chunks_processed].index_info.chunk_idx);
+ update_local_chunk =
+ (num_local_chunks_processed < chunk_list->num_chunk_infos) &&
+ (coll_entry->dset_oloc_addr ==
+ chunk_list->chunk_infos[num_local_chunks_processed].index_info.dset_oloc_addr) &&
+ (coll_entry->chunk_idx ==
+ chunk_list->chunk_infos[num_local_chunks_processed].index_info.chunk_idx);
if (update_local_chunk) {
H5D_filtered_collective_chunk_info_t *local_chunk;
@@ -4782,38 +5198,35 @@ done:
static herr_t
H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t *chunk_list,
size_t *num_chunks_assigned_map, H5D_io_info_t *io_info,
- H5D_dset_io_info_t *di, H5D_chk_idx_info_t *idx_info,
- int mpi_rank, int mpi_size)
+ size_t num_dset_infos, int mpi_rank, int mpi_size)
{
- H5D_chunk_ud_t chunk_ud;
- MPI_Datatype send_type;
- MPI_Datatype recv_type;
- bool send_type_derived = false;
- bool recv_type_derived = false;
- hsize_t scaled_coords[H5O_LAYOUT_NDIMS];
- size_t collective_num_entries = 0;
- size_t i;
- void *gathered_array = NULL;
- int *counts_disps_array = NULL;
- int *counts_ptr = NULL;
- int *displacements_ptr = NULL;
- int mpi_code;
- herr_t ret_value = SUCCEED;
+ MPI_Datatype send_type;
+ MPI_Datatype recv_type;
+ size_t collective_num_entries = 0;
+ bool send_type_derived = false;
+ bool recv_type_derived = false;
+ void *gathered_array = NULL;
+ int *counts_disps_array = NULL;
+ int *counts_ptr = NULL;
+ int *displacements_ptr = NULL;
+ int mpi_code;
+ herr_t ret_value = SUCCEED;
FUNC_ENTER_PACKAGE
assert(chunk_list);
assert(io_info);
- assert(di);
- assert(idx_info);
#ifdef H5Dmpio_DEBUG
H5D_MPIO_TRACE_ENTER(mpi_rank);
H5D_MPIO_TIME_START(mpi_rank, "Reinsertion of modified chunks into chunk index");
#endif
- /* Only re-insert chunks if index has an insert method */
- if (!idx_info->storage->ops->insert)
+ /*
+ * If no datasets involved have a chunk index 'insert'
+ * operation, this function is a no-op
+ */
+ if (chunk_list->no_dset_index_insert_methods)
HGOTO_DONE(SUCCEED);
/*
@@ -4848,15 +5261,15 @@ H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t *
/* Set the receive counts from the assigned chunks map */
counts_ptr = counts_disps_array;
- for (i = 0; i < (size_t)mpi_size; i++)
- H5_CHECKED_ASSIGN(counts_ptr[i], int, num_chunks_assigned_map[i], size_t);
+ for (int curr_rank = 0; curr_rank < mpi_size; curr_rank++)
+ H5_CHECKED_ASSIGN(counts_ptr[curr_rank], int, num_chunks_assigned_map[curr_rank], size_t);
/* Set the displacements into the receive buffer for the gather operation */
displacements_ptr = &counts_disps_array[mpi_size];
*displacements_ptr = 0;
- for (i = 1; i < (size_t)mpi_size; i++)
- displacements_ptr[i] = displacements_ptr[i - 1] + counts_ptr[i - 1];
+ for (int curr_rank = 1; curr_rank < mpi_size; curr_rank++)
+ displacements_ptr[curr_rank] = displacements_ptr[curr_rank - 1] + counts_ptr[curr_rank - 1];
}
/* Perform gather operation */
@@ -4881,11 +5294,12 @@ H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t *
"can't gather chunk index re-insertion info to/from ranks");
}
- /* Initialize static chunk udata fields from chunk index info */
- H5D_MPIO_INIT_CHUNK_UD_INFO(chunk_ud, idx_info);
-
- for (i = 0; i < collective_num_entries; i++) {
- H5D_chunk_insert_info_t *coll_entry = &((H5D_chunk_insert_info_t *)gathered_array)[i];
+ for (size_t entry_idx = 0; entry_idx < collective_num_entries; entry_idx++) {
+ H5D_mpio_filtered_dset_info_t *cached_dset_info;
+ H5D_chunk_insert_info_t *coll_entry = &((H5D_chunk_insert_info_t *)gathered_array)[entry_idx];
+ H5D_chunk_ud_t chunk_ud;
+ haddr_t prev_tag = HADDR_UNDEF;
+ hsize_t scaled_coords[H5O_LAYOUT_NDIMS];
/*
* We only need to reinsert this chunk if we had to actually
@@ -4894,13 +5308,28 @@ H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t *
if (!coll_entry->index_info.need_insert)
continue;
- chunk_ud.chunk_block = coll_entry->chunk_block;
- chunk_ud.chunk_idx = coll_entry->index_info.chunk_idx;
- chunk_ud.filter_mask = coll_entry->index_info.filter_mask;
- chunk_ud.common.scaled = scaled_coords;
+ /* Find the cached dataset info for the dataset this chunk is in */
+ if (num_dset_infos > 1) {
+ HASH_FIND(hh, chunk_list->dset_info.dset_info_hash_table, &coll_entry->index_info.dset_oloc_addr,
+ sizeof(haddr_t), cached_dset_info);
+ if (cached_dset_info == NULL)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTFIND, FAIL, "unable to find cached dataset info entry");
+ }
+ else
+ cached_dset_info = chunk_list->dset_info.single_dset_info;
+ assert(cached_dset_info);
+
+ chunk_ud.common.layout = cached_dset_info->chunk_idx_info.layout;
+ chunk_ud.common.storage = cached_dset_info->chunk_idx_info.storage;
+ chunk_ud.common.scaled = scaled_coords;
+
+ chunk_ud.chunk_block = coll_entry->chunk_block;
+ chunk_ud.chunk_idx = coll_entry->index_info.chunk_idx;
+ chunk_ud.filter_mask = coll_entry->index_info.filter_mask;
/* Calculate scaled coordinates for the chunk */
- if (idx_info->layout->idx_type == H5D_CHUNK_IDX_EARRAY && idx_info->layout->u.earray.unlim_dim > 0) {
+ if (cached_dset_info->chunk_idx_info.layout->idx_type == H5D_CHUNK_IDX_EARRAY &&
+ cached_dset_info->chunk_idx_info.layout->u.earray.unlim_dim > 0) {
/*
* Extensible arrays where the unlimited dimension is not
* the slowest-changing dimension "swizzle" the coordinates
@@ -4914,17 +5343,20 @@ H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t *
* callback that accepts a chunk index and provides the
* caller with the scaled coordinates for that chunk.
*/
- H5VM_array_calc_pre(chunk_ud.chunk_idx, di->dset->shared->ndims,
- idx_info->layout->u.earray.swizzled_down_chunks, scaled_coords);
+ H5VM_array_calc_pre(chunk_ud.chunk_idx, cached_dset_info->dset_io_info->dset->shared->ndims,
+ cached_dset_info->chunk_idx_info.layout->u.earray.swizzled_down_chunks,
+ scaled_coords);
- H5VM_unswizzle_coords(hsize_t, scaled_coords, idx_info->layout->u.earray.unlim_dim);
+ H5VM_unswizzle_coords(hsize_t, scaled_coords,
+ cached_dset_info->chunk_idx_info.layout->u.earray.unlim_dim);
}
else {
- H5VM_array_calc_pre(chunk_ud.chunk_idx, di->dset->shared->ndims,
- di->dset->shared->layout.u.chunk.down_chunks, scaled_coords);
+ H5VM_array_calc_pre(chunk_ud.chunk_idx, cached_dset_info->dset_io_info->dset->shared->ndims,
+ cached_dset_info->dset_io_info->dset->shared->layout.u.chunk.down_chunks,
+ scaled_coords);
}
- scaled_coords[di->dset->shared->ndims] = 0;
+ scaled_coords[cached_dset_info->dset_io_info->dset->shared->ndims] = 0;
#ifndef NDEBUG
/*
@@ -4936,10 +5368,18 @@ H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t *
* they match.
*/
for (size_t dbg_idx = 0; dbg_idx < chunk_list->num_chunk_infos; dbg_idx++) {
- if (coll_entry->index_info.chunk_idx == chunk_list->chunk_infos[dbg_idx].index_info.chunk_idx) {
+ bool same_chunk;
+
+ /* Chunks must have the same index and reside in the same dataset */
+ same_chunk = (0 == H5_addr_cmp(coll_entry->index_info.dset_oloc_addr,
+ chunk_list->chunk_infos[dbg_idx].index_info.dset_oloc_addr));
+ same_chunk = same_chunk && (coll_entry->index_info.chunk_idx ==
+ chunk_list->chunk_infos[dbg_idx].index_info.chunk_idx);
+
+ if (same_chunk) {
bool coords_match =
!memcmp(scaled_coords, chunk_list->chunk_infos[dbg_idx].chunk_info->scaled,
- di->dset->shared->ndims * sizeof(hsize_t));
+ cached_dset_info->dset_io_info->dset->shared->ndims * sizeof(hsize_t));
assert(coords_match && "Calculated scaled coordinates for chunk didn't match "
"chunk's actual scaled coordinates!");
@@ -4948,8 +5388,15 @@ H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t *
}
#endif
- if ((idx_info->storage->ops->insert)(idx_info, &chunk_ud, di->dset) < 0)
+ /* Set metadata tagging with dataset oheader addr */
+ H5AC_tag(cached_dset_info->dset_io_info->dset->oloc.addr, &prev_tag);
+
+ if ((cached_dset_info->chunk_idx_info.storage->ops->insert)(
+ &cached_dset_info->chunk_idx_info, &chunk_ud, cached_dset_info->dset_io_info->dset) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINSERT, FAIL, "unable to insert chunk address into index");
+
+ /* Reset metadata tagging */
+ H5AC_tag(prev_tag, NULL);
}
done:
@@ -5005,9 +5452,9 @@ H5D__mpio_get_chunk_redistribute_info_types(MPI_Datatype *contig_type, bool *con
bool struct_type_derived = false;
MPI_Datatype chunk_block_type = MPI_DATATYPE_NULL;
bool chunk_block_type_derived = false;
- MPI_Datatype types[5];
- MPI_Aint displacements[5];
- int block_lengths[5];
+ MPI_Datatype types[6];
+ MPI_Aint displacements[6];
+ int block_lengths[6];
int field_count;
int mpi_code;
herr_t ret_value = SUCCEED;
@@ -5026,29 +5473,32 @@ H5D__mpio_get_chunk_redistribute_info_types(MPI_Datatype *contig_type, bool *con
if (H5F_mpi_get_file_block_type(false, &chunk_block_type, &chunk_block_type_derived) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't create derived type for chunk file description");
- field_count = 5;
+ field_count = 6;
assert(field_count == (sizeof(types) / sizeof(MPI_Datatype)));
/*
* Create structure type to pack chunk H5F_block_t structure
- * next to chunk_idx, orig_owner, new_owner and num_writers
- * fields
+ * next to chunk_idx, dset_oloc_addr, orig_owner, new_owner
+ * and num_writers fields
*/
block_lengths[0] = 1;
block_lengths[1] = 1;
block_lengths[2] = 1;
block_lengths[3] = 1;
block_lengths[4] = 1;
+ block_lengths[5] = 1;
displacements[0] = offsetof(H5D_chunk_redistribute_info_t, chunk_block);
displacements[1] = offsetof(H5D_chunk_redistribute_info_t, chunk_idx);
- displacements[2] = offsetof(H5D_chunk_redistribute_info_t, orig_owner);
- displacements[3] = offsetof(H5D_chunk_redistribute_info_t, new_owner);
- displacements[4] = offsetof(H5D_chunk_redistribute_info_t, num_writers);
+ displacements[2] = offsetof(H5D_chunk_redistribute_info_t, dset_oloc_addr);
+ displacements[3] = offsetof(H5D_chunk_redistribute_info_t, orig_owner);
+ displacements[4] = offsetof(H5D_chunk_redistribute_info_t, new_owner);
+ displacements[5] = offsetof(H5D_chunk_redistribute_info_t, num_writers);
types[0] = chunk_block_type;
types[1] = HSIZE_AS_MPI_TYPE;
- types[2] = MPI_INT;
+ types[2] = HADDR_AS_MPI_TYPE;
types[3] = MPI_INT;
types[4] = MPI_INT;
+ types[5] = MPI_INT;
if (MPI_SUCCESS !=
(mpi_code = MPI_Type_create_struct(field_count, block_lengths, displacements, types, contig_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_struct failed", mpi_code)
@@ -5057,25 +5507,28 @@ H5D__mpio_get_chunk_redistribute_info_types(MPI_Datatype *contig_type, bool *con
if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(contig_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
- /* Create struct type to extract the chunk_current, chunk_idx, orig_owner,
- * new_owner and num_writers fields from a H5D_filtered_collective_chunk_info_t
- * structure
+ /* Create struct type to extract the chunk_current, chunk_idx,
+ * dset_oloc_addr, orig_owner, new_owner and num_writers fields
+ * from a H5D_filtered_collective_chunk_info_t structure
*/
block_lengths[0] = 1;
block_lengths[1] = 1;
block_lengths[2] = 1;
block_lengths[3] = 1;
block_lengths[4] = 1;
+ block_lengths[5] = 1;
displacements[0] = offsetof(H5D_filtered_collective_chunk_info_t, chunk_current);
displacements[1] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.chunk_idx);
- displacements[2] = offsetof(H5D_filtered_collective_chunk_info_t, orig_owner);
- displacements[3] = offsetof(H5D_filtered_collective_chunk_info_t, new_owner);
- displacements[4] = offsetof(H5D_filtered_collective_chunk_info_t, num_writers);
+ displacements[2] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.dset_oloc_addr);
+ displacements[3] = offsetof(H5D_filtered_collective_chunk_info_t, orig_owner);
+ displacements[4] = offsetof(H5D_filtered_collective_chunk_info_t, new_owner);
+ displacements[5] = offsetof(H5D_filtered_collective_chunk_info_t, num_writers);
types[0] = chunk_block_type;
types[1] = HSIZE_AS_MPI_TYPE;
- types[2] = MPI_INT;
+ types[2] = HADDR_AS_MPI_TYPE;
types[3] = MPI_INT;
types[4] = MPI_INT;
+ types[5] = MPI_INT;
if (MPI_SUCCESS !=
(mpi_code = MPI_Type_create_struct(field_count, block_lengths, displacements, types, &struct_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_struct failed", mpi_code)
@@ -5146,9 +5599,9 @@ H5D__mpio_get_chunk_alloc_info_types(MPI_Datatype *contig_type, bool *contig_typ
bool struct_type_derived = false;
MPI_Datatype chunk_block_type = MPI_DATATYPE_NULL;
bool chunk_block_type_derived = false;
- MPI_Datatype types[3];
- MPI_Aint displacements[3];
- int block_lengths[3];
+ MPI_Datatype types[4];
+ MPI_Aint displacements[4];
+ int block_lengths[4];
int field_count;
int mpi_code;
herr_t ret_value = SUCCEED;
@@ -5167,22 +5620,25 @@ H5D__mpio_get_chunk_alloc_info_types(MPI_Datatype *contig_type, bool *contig_typ
if (H5F_mpi_get_file_block_type(false, &chunk_block_type, &chunk_block_type_derived) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't create derived type for chunk file description");
- field_count = 3;
+ field_count = 4;
assert(field_count == (sizeof(types) / sizeof(MPI_Datatype)));
/*
* Create structure type to pack both chunk H5F_block_t structures
- * next to chunk_idx field
+ * next to chunk_idx and dset_oloc_addr fields
*/
block_lengths[0] = 1;
block_lengths[1] = 1;
block_lengths[2] = 1;
+ block_lengths[3] = 1;
displacements[0] = offsetof(H5D_chunk_alloc_info_t, chunk_current);
displacements[1] = offsetof(H5D_chunk_alloc_info_t, chunk_new);
displacements[2] = offsetof(H5D_chunk_alloc_info_t, chunk_idx);
+ displacements[3] = offsetof(H5D_chunk_alloc_info_t, dset_oloc_addr);
types[0] = chunk_block_type;
types[1] = chunk_block_type;
types[2] = HSIZE_AS_MPI_TYPE;
+ types[3] = HADDR_AS_MPI_TYPE;
if (MPI_SUCCESS !=
(mpi_code = MPI_Type_create_struct(field_count, block_lengths, displacements, types, contig_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_struct failed", mpi_code)
@@ -5192,18 +5648,22 @@ H5D__mpio_get_chunk_alloc_info_types(MPI_Datatype *contig_type, bool *contig_typ
HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
/*
- * Create struct type to extract the chunk_current, chunk_new and chunk_idx
- * fields from a H5D_filtered_collective_chunk_info_t structure
+ * Create struct type to extract the chunk_current, chunk_new, chunk_idx
+ * and dset_oloc_addr fields from a H5D_filtered_collective_chunk_info_t
+ * structure
*/
block_lengths[0] = 1;
block_lengths[1] = 1;
block_lengths[2] = 1;
+ block_lengths[3] = 1;
displacements[0] = offsetof(H5D_filtered_collective_chunk_info_t, chunk_current);
displacements[1] = offsetof(H5D_filtered_collective_chunk_info_t, chunk_new);
displacements[2] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.chunk_idx);
+ displacements[3] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.dset_oloc_addr);
types[0] = chunk_block_type;
types[1] = chunk_block_type;
types[2] = HSIZE_AS_MPI_TYPE;
+ types[3] = HADDR_AS_MPI_TYPE;
if (MPI_SUCCESS !=
(mpi_code = MPI_Type_create_struct(field_count, block_lengths, displacements, types, &struct_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_struct failed", mpi_code)
@@ -5277,9 +5737,9 @@ H5D__mpio_get_chunk_insert_info_types(MPI_Datatype *contig_type, bool *contig_ty
MPI_Datatype chunk_block_type = MPI_DATATYPE_NULL;
bool chunk_block_type_derived = false;
MPI_Aint contig_type_extent;
- MPI_Datatype types[4];
- MPI_Aint displacements[4];
- int block_lengths[4];
+ MPI_Datatype types[5];
+ MPI_Aint displacements[5];
+ int block_lengths[5];
int field_count;
int mpi_code;
herr_t ret_value = SUCCEED;
@@ -5298,7 +5758,7 @@ H5D__mpio_get_chunk_insert_info_types(MPI_Datatype *contig_type, bool *contig_ty
if (H5F_mpi_get_file_block_type(false, &chunk_block_type, &chunk_block_type_derived) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't create derived type for chunk file description");
- field_count = 4;
+ field_count = 5;
assert(field_count == (sizeof(types) / sizeof(MPI_Datatype)));
/*
@@ -5311,14 +5771,17 @@ H5D__mpio_get_chunk_insert_info_types(MPI_Datatype *contig_type, bool *contig_ty
block_lengths[1] = 1;
block_lengths[2] = 1;
block_lengths[3] = 1;
+ block_lengths[4] = 1;
displacements[0] = offsetof(H5D_chunk_insert_info_t, chunk_block);
displacements[1] = offsetof(H5D_chunk_insert_info_t, index_info.chunk_idx);
- displacements[2] = offsetof(H5D_chunk_insert_info_t, index_info.filter_mask);
- displacements[3] = offsetof(H5D_chunk_insert_info_t, index_info.need_insert);
+ displacements[2] = offsetof(H5D_chunk_insert_info_t, index_info.dset_oloc_addr);
+ displacements[3] = offsetof(H5D_chunk_insert_info_t, index_info.filter_mask);
+ displacements[4] = offsetof(H5D_chunk_insert_info_t, index_info.need_insert);
types[0] = chunk_block_type;
types[1] = HSIZE_AS_MPI_TYPE;
- types[2] = MPI_UNSIGNED;
- types[3] = MPI_C_BOOL;
+ types[2] = HADDR_AS_MPI_TYPE;
+ types[3] = MPI_UNSIGNED;
+ types[4] = MPI_C_BOOL;
if (MPI_SUCCESS !=
(mpi_code = MPI_Type_create_struct(field_count, block_lengths, displacements, types, &struct_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_struct failed", mpi_code)
@@ -5344,8 +5807,9 @@ H5D__mpio_get_chunk_insert_info_types(MPI_Datatype *contig_type, bool *contig_ty
*/
displacements[0] = offsetof(H5D_filtered_collective_chunk_info_t, chunk_new);
displacements[1] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.chunk_idx);
- displacements[2] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.filter_mask);
- displacements[3] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.need_insert);
+ displacements[2] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.dset_oloc_addr);
+ displacements[3] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.filter_mask);
+ displacements[4] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.need_insert);
if (MPI_SUCCESS !=
(mpi_code = MPI_Type_create_struct(field_count, block_lengths, displacements, types, &struct_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_struct failed", mpi_code)
@@ -5552,6 +6016,8 @@ H5D__mpio_dump_collective_filtered_chunk_list(H5D_filtered_collective_io_info_t
chunk_rank < 3 ? 0 : chunk_entry->chunk_info->scaled[2],
chunk_rank < 4 ? 0 : chunk_entry->chunk_info->scaled[3]);
H5D_MPIO_DEBUG_VA(mpi_rank, " Chunk Index: %" PRIuHSIZE, chunk_entry->index_info.chunk_idx);
+ H5D_MPIO_DEBUG_VA(mpi_rank, " Dataset Object Header Address: %" PRIuHADDR,
+ chunk_entry->index_info.dset_oloc_addr);
H5D_MPIO_DEBUG_VA(mpi_rank, " Filter Mask: %u", chunk_entry->index_info.filter_mask);
H5D_MPIO_DEBUG_VA(mpi_rank, " Need Insert: %s",
chunk_entry->index_info.need_insert ? "YES" : "NO");