summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorvchoi-hdfgroup <55293060+vchoi-hdfgroup@users.noreply.github.com>2023-11-16 16:12:45 (GMT)
committerGitHub <noreply@github.com>2023-11-16 16:12:45 (GMT)
commited31aaca798353f205bdd05290d4fe8c5e978778 (patch)
tree0d5183c225350de80ad72b65d578527d0089960a /src
parentef39882fa1e13740d2530c7a0637bd1f1a822b68 (diff)
downloadhdf5-ed31aaca798353f205bdd05290d4fe8c5e978778.zip
hdf5-ed31aaca798353f205bdd05290d4fe8c5e978778.tar.gz
hdf5-ed31aaca798353f205bdd05290d4fe8c5e978778.tar.bz2
Implement selection vector I/O with collective chunk filling (#3826)
* Changes for ECP-344: Implement selection vector I/O with collective chunk filling. Also fix a bug in H5FD__mpio_write_vector() to account for fixed size optimization when computing max address. * Fixes based on PR review comments: For H5Dchunk.c: fix H5MM_xfree() For H5FDmpio.c: 1) Revert the fix to H5FD__mpio_write_vector() 2) Apply the patch from Neil on the proper length of s_sizes reported by H5FD__mpio_vector_build_types() * Put back the logic of dividing up the work among all the mpi ranks similar to the original H5D__chunk_collective_fill() routine. * Add a test to verify the fix for the illegal reference problem in H5FD__mpio_write_vector().
Diffstat (limited to 'src')
-rw-r--r--src/H5Dchunk.c264
-rw-r--r--src/H5FDmpio.c28
2 files changed, 122 insertions, 170 deletions
diff --git a/src/H5Dchunk.c b/src/H5Dchunk.c
index 41d774d..c8bad76 100644
--- a/src/H5Dchunk.c
+++ b/src/H5Dchunk.c
@@ -5536,11 +5536,9 @@ done:
/*-------------------------------------------------------------------------
* Function: H5D__chunk_collective_fill
*
- * Purpose: Use MPIO collective write to fill the chunks (if number of
- * chunks to fill is greater than the number of MPI procs;
- * otherwise use independent I/O).
+ * Purpose: Use MPIO selection vector I/O for writing fill chunks
*
- * Return: Non-negative on success/Negative on failure
+ * Return: Non-negative on success/Negative on failure
*
*-------------------------------------------------------------------------
*/
@@ -5554,19 +5552,24 @@ H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_
int mpi_code; /* MPI return code */
size_t num_blocks; /* Number of blocks between processes. */
size_t leftover_blocks; /* Number of leftover blocks to handle */
- int blocks, leftover; /* converted to int for MPI */
- MPI_Aint *chunk_disp_array = NULL;
- MPI_Aint *block_disps = NULL;
- int *block_lens = NULL;
- MPI_Datatype mem_type = MPI_BYTE, file_type = MPI_BYTE;
- H5FD_mpio_xfer_t prev_xfer_mode; /* Previous data xfer mode */
- bool have_xfer_mode = false; /* Whether the previous xffer mode has been retrieved */
- bool need_sort = false;
- size_t i; /* Local index variable */
+ int blocks; /* converted to int for MPI */
+ int leftover; /* converted to int for MPI */
+ H5FD_mpio_xfer_t prev_xfer_mode; /* Previous data xfer mode */
+ bool have_xfer_mode = false; /* Whether the previous xffer mode has been retrieved */
+ size_t i; /* Local index variable */
+ haddr_t *io_addrs = NULL;
+ size_t *io_sizes = NULL;
+ const void **io_wbufs = NULL;
+ H5FD_mem_t io_types[2];
+ bool all_same_block_len = true;
+ bool need_sort = false;
+ size_t io_2sizes[2];
herr_t ret_value = SUCCEED; /* Return value */
FUNC_ENTER_PACKAGE
+ assert(chunk_fill_info->num_chunks != 0);
+
/*
* If a separate fill buffer is provided for partial chunks, ensure
* that the "don't filter partial edge chunks" flag is set.
@@ -5589,6 +5592,7 @@ H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_
/* Distribute evenly the number of blocks between processes. */
if (mpi_size == 0)
HGOTO_ERROR(H5E_DATASET, H5E_BADVALUE, FAIL, "Resulted in division by zero");
+
num_blocks =
(size_t)(chunk_fill_info->num_chunks / (size_t)mpi_size); /* value should be the same on all procs */
@@ -5602,157 +5606,97 @@ H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_
H5_CHECKED_ASSIGN(leftover, int, leftover_blocks, size_t);
/* Check if we have any chunks to write on this rank */
- if (num_blocks > 0 || (leftover && leftover > mpi_rank)) {
- MPI_Aint partial_fill_buf_disp = 0;
- bool all_same_block_len = true;
-
- /* Allocate buffers */
- if (NULL == (chunk_disp_array = (MPI_Aint *)H5MM_malloc((size_t)(blocks + 1) * sizeof(MPI_Aint))))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk file displacement buffer");
-
- if (partial_chunk_fill_buf) {
- MPI_Aint fill_buf_addr;
- MPI_Aint partial_fill_buf_addr;
-
- /* Calculate the displacement between the fill buffer and partial chunk fill buffer */
- if (MPI_SUCCESS != (mpi_code = MPI_Get_address(fill_buf, &fill_buf_addr)))
- HMPI_GOTO_ERROR(FAIL, "MPI_Get_address failed", mpi_code)
- if (MPI_SUCCESS != (mpi_code = MPI_Get_address(partial_chunk_fill_buf, &partial_fill_buf_addr)))
- HMPI_GOTO_ERROR(FAIL, "MPI_Get_address failed", mpi_code)
-
-#if H5_CHECK_MPI_VERSION(3, 1)
- partial_fill_buf_disp = MPI_Aint_diff(partial_fill_buf_addr, fill_buf_addr);
-#else
- partial_fill_buf_disp = partial_fill_buf_addr - fill_buf_addr;
-#endif
+ if (num_blocks > 0 || leftover > mpi_rank) {
- /*
- * Allocate all-zero block displacements array. If a block's displacement
- * is left as zero, that block will be written to from the regular fill
- * buffer. If a block represents an unfiltered partial edge chunk, its
- * displacement will be set so that the block is written to from the
- * unfiltered fill buffer.
- */
- if (NULL == (block_disps = (MPI_Aint *)H5MM_calloc((size_t)(blocks + 1) * sizeof(MPI_Aint))))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate block displacements buffer");
- }
+ if (NULL == (io_addrs = H5MM_malloc((size_t)(blocks + 1) * sizeof(*io_addrs))))
+ HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL,
+ "couldn't allocate space for I/O addresses vector");
- /*
- * Perform initial scan of chunk info list to:
- * - make sure that chunk addresses are monotonically non-decreasing
- * - check if all blocks have the same length
- */
- for (i = 1; i < chunk_fill_info->num_chunks; i++) {
- if (chunk_fill_info->chunk_info[i].addr < chunk_fill_info->chunk_info[i - 1].addr)
- need_sort = true;
-
- if (chunk_fill_info->chunk_info[i].chunk_size != chunk_fill_info->chunk_info[i - 1].chunk_size)
- all_same_block_len = false;
- }
+ if (NULL == (io_wbufs = H5MM_malloc((size_t)(blocks + 1) * sizeof(*io_wbufs))))
+ HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "couldn't allocate space for I/O buffers vector");
+ }
- if (need_sort)
- qsort(chunk_fill_info->chunk_info, chunk_fill_info->num_chunks,
- sizeof(struct chunk_coll_fill_info), H5D__chunk_cmp_coll_fill_info);
+ /*
+ * Perform initial scan of chunk info list to:
+ * - make sure that chunk addresses are monotonically non-decreasing
+ * - check if all blocks have the same length
+ */
+ for (i = 1; i < chunk_fill_info->num_chunks; i++) {
+ if (chunk_fill_info->chunk_info[i].addr < chunk_fill_info->chunk_info[i - 1].addr)
+ need_sort = true;
- /* Allocate buffer for block lengths if necessary */
- if (!all_same_block_len)
- if (NULL == (block_lens = (int *)H5MM_malloc((size_t)(blocks + 1) * sizeof(int))))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk lengths buffer");
+ if (chunk_fill_info->chunk_info[i].chunk_size != chunk_fill_info->chunk_info[i - 1].chunk_size)
+ all_same_block_len = false;
+ }
- for (i = 0; i < (size_t)blocks; i++) {
- size_t idx = i + (size_t)(mpi_rank * blocks);
+ /*
+ * Note that we sort all of the chunks here, and not just a subset
+ * corresponding to this rank. We do this since we have found MPI I/O to work
+ * better when each rank writes blocks that are contiguous in the file,
+ * and by sorting the full list we maximize the chance of that happening.
+ */
+ if (need_sort)
+ qsort(chunk_fill_info->chunk_info, chunk_fill_info->num_chunks, sizeof(struct chunk_coll_fill_info),
+ H5D__chunk_cmp_coll_fill_info);
- /* store the chunk address as an MPI_Aint */
- chunk_disp_array[i] = (MPI_Aint)(chunk_fill_info->chunk_info[idx].addr);
+ /*
+ * If all the chunks have the same length, use the compressed feature
+ * to store the size.
+ * Otherwise, allocate the array of sizes for storing chunk sizes.
+ */
+ if (all_same_block_len) {
+ io_2sizes[0] = chunk_fill_info->chunk_info[0].chunk_size;
+ io_2sizes[1] = 0;
+ }
+ else {
+ if (NULL == (io_sizes = H5MM_malloc((size_t)(blocks + 1) * sizeof(*io_sizes))))
+ HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "couldn't allocate space for I/O sizes vector");
+ }
- if (!all_same_block_len)
- H5_CHECKED_ASSIGN(block_lens[i], int, chunk_fill_info->chunk_info[idx].chunk_size, size_t);
+ /*
+ * Since the type of all chunks is raw data, use the compressed feature
+ * to store the chunk type.
+ */
+ io_types[0] = H5FD_MEM_DRAW;
+ io_types[1] = H5FD_MEM_NOLIST;
- if (chunk_fill_info->chunk_info[idx].unfiltered_partial_chunk) {
- assert(partial_chunk_fill_buf);
- block_disps[i] = partial_fill_buf_disp;
- }
- } /* end for */
+ /*
+ * For the chunks corresponding to this rank, fill in the
+ * address, size and buf pointer for each chunk.
+ */
+ for (i = 0; i < (size_t)blocks; i++) {
+ size_t idx = i + (size_t)(mpi_rank * blocks);
- /* Calculate if there are any leftover blocks after evenly
- * distributing. If there are, then round-robin the distribution
- * to processes 0 -> leftover.
- */
- if (leftover && leftover > mpi_rank) {
- chunk_disp_array[blocks] =
- (MPI_Aint)chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].addr;
-
- if (!all_same_block_len)
- H5_CHECKED_ASSIGN(block_lens[blocks], int,
- chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].chunk_size,
- size_t);
-
- if (chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].unfiltered_partial_chunk) {
- assert(partial_chunk_fill_buf);
- block_disps[blocks] = partial_fill_buf_disp;
- }
+ io_addrs[i] = chunk_fill_info->chunk_info[idx].addr;
- blocks++;
- }
+ if (!all_same_block_len)
+ io_sizes[i] = chunk_fill_info->chunk_info[idx].chunk_size;
- /* Create file and memory types for the write operation */
- if (all_same_block_len) {
- int block_len;
+ if (chunk_fill_info->chunk_info[idx].unfiltered_partial_chunk)
+ io_wbufs[i] = partial_chunk_fill_buf;
+ else
+ io_wbufs[i] = fill_buf;
+ }
- H5_CHECKED_ASSIGN(block_len, int, chunk_fill_info->chunk_info[0].chunk_size, size_t);
+ /*
+ * For the leftover chunk corresponding to this rank, fill in the
+ * address, size and buf pointer for the chunk.
+ */
+ if (leftover > mpi_rank) {
+ io_addrs[blocks] = chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].addr;
- mpi_code =
- MPI_Type_create_hindexed_block(blocks, block_len, chunk_disp_array, MPI_BYTE, &file_type);
- if (mpi_code != MPI_SUCCESS)
- HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed_block failed", mpi_code)
+ if (!all_same_block_len)
+ io_sizes[blocks] = chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].chunk_size;
- if (partial_chunk_fill_buf) {
- /*
- * If filters are disabled for partial edge chunks, those chunks could
- * potentially have the same block length as the other chunks, but still
- * need to be written to using the unfiltered fill buffer. Use an hindexed
- * block type rather than an hvector.
- */
- mpi_code =
- MPI_Type_create_hindexed_block(blocks, block_len, block_disps, MPI_BYTE, &mem_type);
- if (mpi_code != MPI_SUCCESS)
- HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed_block failed", mpi_code)
- }
- else {
- mpi_code = MPI_Type_create_hvector(blocks, block_len, 0, MPI_BYTE, &mem_type);
- if (mpi_code != MPI_SUCCESS)
- HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hvector failed", mpi_code)
- }
- }
- else {
- /*
- * Currently, different block lengths implies that there are partial
- * edge chunks and the "don't filter partial edge chunks" flag is set.
- */
+ if (chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].unfiltered_partial_chunk) {
assert(partial_chunk_fill_buf);
- assert(block_lens);
- assert(block_disps);
-
- mpi_code = MPI_Type_create_hindexed(blocks, block_lens, chunk_disp_array, MPI_BYTE, &file_type);
- if (mpi_code != MPI_SUCCESS)
- HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code)
-
- mpi_code = MPI_Type_create_hindexed(blocks, block_lens, block_disps, MPI_BYTE, &mem_type);
- if (mpi_code != MPI_SUCCESS)
- HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code)
+ io_wbufs[blocks] = partial_chunk_fill_buf;
}
+ else
+ io_wbufs[blocks] = fill_buf;
- if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(&file_type)))
- HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
- if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(&mem_type)))
- HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
- } /* end if */
-
- /* Set MPI-IO VFD properties */
-
- /* Set MPI datatypes for operation */
- if (H5CX_set_mpi_coll_datatypes(mem_type, file_type) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set MPI-I/O properties");
+ blocks++;
+ }
/* Get current transfer mode */
if (H5CX_get_io_xfer_mode(&prev_xfer_mode) < 0)
@@ -5763,31 +5707,24 @@ H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_
if (H5CX_set_io_xfer_mode(H5FD_MPIO_COLLECTIVE) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set transfer mode");
- /* Low-level write (collective) */
- if (H5F_shared_block_write(H5F_SHARED(dset->oloc.file), H5FD_MEM_DRAW, (haddr_t)0,
- (blocks) ? (size_t)1 : (size_t)0, fill_buf) < 0)
- HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "unable to write raw data to file");
-
/* Barrier so processes don't race ahead */
if (MPI_SUCCESS != (mpi_code = MPI_Barrier(mpi_comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Barrier failed", mpi_code)
+ /* Perform the selection vector I/O for the chunks */
+ if (H5F_shared_vector_write(H5F_SHARED(dset->oloc.file), (uint32_t)blocks, io_types, io_addrs,
+ all_same_block_len ? io_2sizes : io_sizes, io_wbufs) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "vector write call failed");
+
done:
if (have_xfer_mode)
- /* Set transfer mode */
+ /* Restore transfer mode */
if (H5CX_set_io_xfer_mode(prev_xfer_mode) < 0)
HDONE_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set transfer mode");
- /* free things */
- if (MPI_BYTE != file_type)
- if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type)))
- HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
- if (MPI_BYTE != mem_type)
- if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type)))
- HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
- H5MM_xfree(chunk_disp_array);
- H5MM_xfree(block_disps);
- H5MM_xfree(block_lens);
+ H5MM_xfree(io_addrs);
+ H5MM_xfree(io_wbufs);
+ H5MM_xfree(io_sizes);
FUNC_LEAVE_NOAPI(ret_value)
} /* end H5D__chunk_collective_fill() */
@@ -5805,6 +5742,7 @@ H5D__chunk_cmp_coll_fill_info(const void *_entry1, const void *_entry2)
FUNC_LEAVE_NOAPI(H5_addr_cmp(entry1->addr, entry2->addr))
} /* end H5D__chunk_cmp_coll_fill_info() */
+
#endif /* H5_HAVE_PARALLEL */
/*-------------------------------------------------------------------------
diff --git a/src/H5FDmpio.c b/src/H5FDmpio.c
index d5dd126..8aae79e 100644
--- a/src/H5FDmpio.c
+++ b/src/H5FDmpio.c
@@ -106,7 +106,7 @@ static herr_t H5FD__mpio_ctl(H5FD_t *_file, uint64_t op_code, uint64_t flags, co
/* Other functions */
static herr_t H5FD__mpio_vector_build_types(uint32_t count, H5FD_mem_t types[], haddr_t addrs[],
size_t sizes[], H5_flexible_const_ptr_t bufs[],
- haddr_t *s_addrs[], size_t *s_sizes[],
+ haddr_t *s_addrs[], size_t *s_sizes[], uint32_t *s_sizes_len,
H5_flexible_const_ptr_t *s_bufs[], bool *vector_was_sorted,
MPI_Offset *mpi_off, H5_flexible_const_ptr_t *mpi_bufs_base,
int *size_i, MPI_Datatype *buf_type, bool *buf_type_created,
@@ -1675,7 +1675,8 @@ done:
static herr_t
H5FD__mpio_vector_build_types(uint32_t count, H5FD_mem_t types[], haddr_t addrs[], size_t sizes[],
H5_flexible_const_ptr_t bufs[], haddr_t *s_addrs[], size_t *s_sizes[],
- H5_flexible_const_ptr_t *s_bufs[], bool *vector_was_sorted, MPI_Offset *mpi_off,
+ uint32_t *s_sizes_len, H5_flexible_const_ptr_t *s_bufs[],
+ bool *vector_was_sorted, MPI_Offset *mpi_off,
H5_flexible_const_ptr_t *mpi_bufs_base, int *size_i, MPI_Datatype *buf_type,
bool *buf_type_created, MPI_Datatype *file_type, bool *file_type_created,
char *unused)
@@ -1716,6 +1717,10 @@ H5FD__mpio_vector_build_types(uint32_t count, H5FD_mem_t types[], haddr_t addrs[
/* Get bio I/O transition point (may be lower than 2G for testing) */
bigio_count = H5_mpi_get_bigio_count();
+ /* Start with s_sizes_len at count */
+ if (s_sizes_len)
+ *s_sizes_len = count;
+
if (count == 1) {
/* Single block. Just use a series of MPI_BYTEs for the file view.
*/
@@ -1808,8 +1813,13 @@ H5FD__mpio_vector_build_types(uint32_t count, H5FD_mem_t types[], haddr_t addrs[
if (!fixed_size) {
if ((*s_sizes)[i] == 0) {
assert(vector_was_sorted);
+ assert(i > 0);
fixed_size = true;
size = sizes[i - 1];
+
+ /* Return the used length of the s_sizes buffer */
+ if (s_sizes_len)
+ *s_sizes_len = (uint32_t)i;
}
else {
size = (*s_sizes)[i];
@@ -2098,7 +2108,7 @@ H5FD__mpio_read_vector(H5FD_t *_file, hid_t H5_ATTR_UNUSED dxpl_id, uint32_t cou
if (xfer_mode == H5FD_MPIO_COLLECTIVE) {
/* Build MPI types, etc. */
if (H5FD__mpio_vector_build_types(count, types, addrs, sizes, (H5_flexible_const_ptr_t *)bufs,
- &s_addrs, &s_sizes, (H5_flexible_const_ptr_t **)&s_bufs,
+ &s_addrs, &s_sizes, NULL, (H5_flexible_const_ptr_t **)&s_bufs,
&vector_was_sorted, &mpi_off,
(H5_flexible_const_ptr_t *)&mpi_bufs_base, &size_i, &buf_type,
&buf_type_created, &file_type, &file_type_created, &unused) < 0)
@@ -2464,17 +2474,21 @@ H5FD__mpio_write_vector(H5FD_t *_file, hid_t H5_ATTR_UNUSED dxpl_id, uint32_t co
HGOTO_ERROR(H5E_VFL, H5E_CANTGET, FAIL, "can't get MPI-I/O transfer mode");
if (xfer_mode == H5FD_MPIO_COLLECTIVE) {
+ uint32_t s_sizes_len;
+
/* Build MPI types, etc. */
if (H5FD__mpio_vector_build_types(count, types, addrs, sizes, (H5_flexible_const_ptr_t *)bufs,
- &s_addrs, &s_sizes, (H5_flexible_const_ptr_t **)&s_bufs,
- &vector_was_sorted, &mpi_off,
+ &s_addrs, &s_sizes, &s_sizes_len,
+ (H5_flexible_const_ptr_t **)&s_bufs, &vector_was_sorted, &mpi_off,
(H5_flexible_const_ptr_t *)&mpi_bufs_base, &size_i, &buf_type,
&buf_type_created, &file_type, &file_type_created, &unused) < 0)
HGOTO_ERROR(H5E_VFL, H5E_CANTGET, FAIL, "can't build MPI datatypes for I/O");
- /* Compute max address written to */
+ /* Compute max address written to. Note s_sizes is indexed according to the length of that array as
+ * reported by H5FD__mpio_vector_build_types(), which may be shorter if using the compressed arrays
+ * feature. */
if (count > 0)
- max_addr = s_addrs[count - 1] + (haddr_t)(s_sizes[count - 1]);
+ max_addr = s_addrs[count - 1] + (haddr_t)(s_sizes[s_sizes_len - 1]);
/* free sorted vectors if they exist */
if (!vector_was_sorted) {