diff options
author | vchoi-hdfgroup <55293060+vchoi-hdfgroup@users.noreply.github.com> | 2023-11-16 16:12:45 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-11-16 16:12:45 (GMT) |
commit | ed31aaca798353f205bdd05290d4fe8c5e978778 (patch) | |
tree | 0d5183c225350de80ad72b65d578527d0089960a /src | |
parent | ef39882fa1e13740d2530c7a0637bd1f1a822b68 (diff) | |
download | hdf5-ed31aaca798353f205bdd05290d4fe8c5e978778.zip hdf5-ed31aaca798353f205bdd05290d4fe8c5e978778.tar.gz hdf5-ed31aaca798353f205bdd05290d4fe8c5e978778.tar.bz2 |
Implement selection vector I/O with collective chunk filling (#3826)
* Changes for ECP-344: Implement selection vector I/O with collective chunk filling.
Also fix a bug in H5FD__mpio_write_vector() to account for fixed size optimization
when computing max address.
* Fixes based on PR review comments:
For H5Dchunk.c: fix H5MM_xfree()
For H5FDmpio.c:
1) Revert the fix to H5FD__mpio_write_vector()
2) Apply the patch from Neil on the proper length of s_sizes reported by H5FD__mpio_vector_build_types()
* Put back the logic of dividing up the work among all the mpi ranks similar to the
original H5D__chunk_collective_fill() routine.
* Add a test to verify the fix for the illegal reference problem in H5FD__mpio_write_vector().
Diffstat (limited to 'src')
-rw-r--r-- | src/H5Dchunk.c | 264 | ||||
-rw-r--r-- | src/H5FDmpio.c | 28 |
2 files changed, 122 insertions, 170 deletions
diff --git a/src/H5Dchunk.c b/src/H5Dchunk.c index 41d774d..c8bad76 100644 --- a/src/H5Dchunk.c +++ b/src/H5Dchunk.c @@ -5536,11 +5536,9 @@ done: /*------------------------------------------------------------------------- * Function: H5D__chunk_collective_fill * - * Purpose: Use MPIO collective write to fill the chunks (if number of - * chunks to fill is greater than the number of MPI procs; - * otherwise use independent I/O). + * Purpose: Use MPIO selection vector I/O for writing fill chunks * - * Return: Non-negative on success/Negative on failure + * Return: Non-negative on success/Negative on failure * *------------------------------------------------------------------------- */ @@ -5554,19 +5552,24 @@ H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_ int mpi_code; /* MPI return code */ size_t num_blocks; /* Number of blocks between processes. */ size_t leftover_blocks; /* Number of leftover blocks to handle */ - int blocks, leftover; /* converted to int for MPI */ - MPI_Aint *chunk_disp_array = NULL; - MPI_Aint *block_disps = NULL; - int *block_lens = NULL; - MPI_Datatype mem_type = MPI_BYTE, file_type = MPI_BYTE; - H5FD_mpio_xfer_t prev_xfer_mode; /* Previous data xfer mode */ - bool have_xfer_mode = false; /* Whether the previous xffer mode has been retrieved */ - bool need_sort = false; - size_t i; /* Local index variable */ + int blocks; /* converted to int for MPI */ + int leftover; /* converted to int for MPI */ + H5FD_mpio_xfer_t prev_xfer_mode; /* Previous data xfer mode */ + bool have_xfer_mode = false; /* Whether the previous xffer mode has been retrieved */ + size_t i; /* Local index variable */ + haddr_t *io_addrs = NULL; + size_t *io_sizes = NULL; + const void **io_wbufs = NULL; + H5FD_mem_t io_types[2]; + bool all_same_block_len = true; + bool need_sort = false; + size_t io_2sizes[2]; herr_t ret_value = SUCCEED; /* Return value */ FUNC_ENTER_PACKAGE + assert(chunk_fill_info->num_chunks != 0); + /* * If a separate fill buffer is provided for partial chunks, ensure * that the "don't filter partial edge chunks" flag is set. @@ -5589,6 +5592,7 @@ H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_ /* Distribute evenly the number of blocks between processes. */ if (mpi_size == 0) HGOTO_ERROR(H5E_DATASET, H5E_BADVALUE, FAIL, "Resulted in division by zero"); + num_blocks = (size_t)(chunk_fill_info->num_chunks / (size_t)mpi_size); /* value should be the same on all procs */ @@ -5602,157 +5606,97 @@ H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_ H5_CHECKED_ASSIGN(leftover, int, leftover_blocks, size_t); /* Check if we have any chunks to write on this rank */ - if (num_blocks > 0 || (leftover && leftover > mpi_rank)) { - MPI_Aint partial_fill_buf_disp = 0; - bool all_same_block_len = true; - - /* Allocate buffers */ - if (NULL == (chunk_disp_array = (MPI_Aint *)H5MM_malloc((size_t)(blocks + 1) * sizeof(MPI_Aint)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk file displacement buffer"); - - if (partial_chunk_fill_buf) { - MPI_Aint fill_buf_addr; - MPI_Aint partial_fill_buf_addr; - - /* Calculate the displacement between the fill buffer and partial chunk fill buffer */ - if (MPI_SUCCESS != (mpi_code = MPI_Get_address(fill_buf, &fill_buf_addr))) - HMPI_GOTO_ERROR(FAIL, "MPI_Get_address failed", mpi_code) - if (MPI_SUCCESS != (mpi_code = MPI_Get_address(partial_chunk_fill_buf, &partial_fill_buf_addr))) - HMPI_GOTO_ERROR(FAIL, "MPI_Get_address failed", mpi_code) - -#if H5_CHECK_MPI_VERSION(3, 1) - partial_fill_buf_disp = MPI_Aint_diff(partial_fill_buf_addr, fill_buf_addr); -#else - partial_fill_buf_disp = partial_fill_buf_addr - fill_buf_addr; -#endif + if (num_blocks > 0 || leftover > mpi_rank) { - /* - * Allocate all-zero block displacements array. If a block's displacement - * is left as zero, that block will be written to from the regular fill - * buffer. If a block represents an unfiltered partial edge chunk, its - * displacement will be set so that the block is written to from the - * unfiltered fill buffer. - */ - if (NULL == (block_disps = (MPI_Aint *)H5MM_calloc((size_t)(blocks + 1) * sizeof(MPI_Aint)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate block displacements buffer"); - } + if (NULL == (io_addrs = H5MM_malloc((size_t)(blocks + 1) * sizeof(*io_addrs)))) + HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, + "couldn't allocate space for I/O addresses vector"); - /* - * Perform initial scan of chunk info list to: - * - make sure that chunk addresses are monotonically non-decreasing - * - check if all blocks have the same length - */ - for (i = 1; i < chunk_fill_info->num_chunks; i++) { - if (chunk_fill_info->chunk_info[i].addr < chunk_fill_info->chunk_info[i - 1].addr) - need_sort = true; - - if (chunk_fill_info->chunk_info[i].chunk_size != chunk_fill_info->chunk_info[i - 1].chunk_size) - all_same_block_len = false; - } + if (NULL == (io_wbufs = H5MM_malloc((size_t)(blocks + 1) * sizeof(*io_wbufs)))) + HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "couldn't allocate space for I/O buffers vector"); + } - if (need_sort) - qsort(chunk_fill_info->chunk_info, chunk_fill_info->num_chunks, - sizeof(struct chunk_coll_fill_info), H5D__chunk_cmp_coll_fill_info); + /* + * Perform initial scan of chunk info list to: + * - make sure that chunk addresses are monotonically non-decreasing + * - check if all blocks have the same length + */ + for (i = 1; i < chunk_fill_info->num_chunks; i++) { + if (chunk_fill_info->chunk_info[i].addr < chunk_fill_info->chunk_info[i - 1].addr) + need_sort = true; - /* Allocate buffer for block lengths if necessary */ - if (!all_same_block_len) - if (NULL == (block_lens = (int *)H5MM_malloc((size_t)(blocks + 1) * sizeof(int)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk lengths buffer"); + if (chunk_fill_info->chunk_info[i].chunk_size != chunk_fill_info->chunk_info[i - 1].chunk_size) + all_same_block_len = false; + } - for (i = 0; i < (size_t)blocks; i++) { - size_t idx = i + (size_t)(mpi_rank * blocks); + /* + * Note that we sort all of the chunks here, and not just a subset + * corresponding to this rank. We do this since we have found MPI I/O to work + * better when each rank writes blocks that are contiguous in the file, + * and by sorting the full list we maximize the chance of that happening. + */ + if (need_sort) + qsort(chunk_fill_info->chunk_info, chunk_fill_info->num_chunks, sizeof(struct chunk_coll_fill_info), + H5D__chunk_cmp_coll_fill_info); - /* store the chunk address as an MPI_Aint */ - chunk_disp_array[i] = (MPI_Aint)(chunk_fill_info->chunk_info[idx].addr); + /* + * If all the chunks have the same length, use the compressed feature + * to store the size. + * Otherwise, allocate the array of sizes for storing chunk sizes. + */ + if (all_same_block_len) { + io_2sizes[0] = chunk_fill_info->chunk_info[0].chunk_size; + io_2sizes[1] = 0; + } + else { + if (NULL == (io_sizes = H5MM_malloc((size_t)(blocks + 1) * sizeof(*io_sizes)))) + HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "couldn't allocate space for I/O sizes vector"); + } - if (!all_same_block_len) - H5_CHECKED_ASSIGN(block_lens[i], int, chunk_fill_info->chunk_info[idx].chunk_size, size_t); + /* + * Since the type of all chunks is raw data, use the compressed feature + * to store the chunk type. + */ + io_types[0] = H5FD_MEM_DRAW; + io_types[1] = H5FD_MEM_NOLIST; - if (chunk_fill_info->chunk_info[idx].unfiltered_partial_chunk) { - assert(partial_chunk_fill_buf); - block_disps[i] = partial_fill_buf_disp; - } - } /* end for */ + /* + * For the chunks corresponding to this rank, fill in the + * address, size and buf pointer for each chunk. + */ + for (i = 0; i < (size_t)blocks; i++) { + size_t idx = i + (size_t)(mpi_rank * blocks); - /* Calculate if there are any leftover blocks after evenly - * distributing. If there are, then round-robin the distribution - * to processes 0 -> leftover. - */ - if (leftover && leftover > mpi_rank) { - chunk_disp_array[blocks] = - (MPI_Aint)chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].addr; - - if (!all_same_block_len) - H5_CHECKED_ASSIGN(block_lens[blocks], int, - chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].chunk_size, - size_t); - - if (chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].unfiltered_partial_chunk) { - assert(partial_chunk_fill_buf); - block_disps[blocks] = partial_fill_buf_disp; - } + io_addrs[i] = chunk_fill_info->chunk_info[idx].addr; - blocks++; - } + if (!all_same_block_len) + io_sizes[i] = chunk_fill_info->chunk_info[idx].chunk_size; - /* Create file and memory types for the write operation */ - if (all_same_block_len) { - int block_len; + if (chunk_fill_info->chunk_info[idx].unfiltered_partial_chunk) + io_wbufs[i] = partial_chunk_fill_buf; + else + io_wbufs[i] = fill_buf; + } - H5_CHECKED_ASSIGN(block_len, int, chunk_fill_info->chunk_info[0].chunk_size, size_t); + /* + * For the leftover chunk corresponding to this rank, fill in the + * address, size and buf pointer for the chunk. + */ + if (leftover > mpi_rank) { + io_addrs[blocks] = chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].addr; - mpi_code = - MPI_Type_create_hindexed_block(blocks, block_len, chunk_disp_array, MPI_BYTE, &file_type); - if (mpi_code != MPI_SUCCESS) - HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed_block failed", mpi_code) + if (!all_same_block_len) + io_sizes[blocks] = chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].chunk_size; - if (partial_chunk_fill_buf) { - /* - * If filters are disabled for partial edge chunks, those chunks could - * potentially have the same block length as the other chunks, but still - * need to be written to using the unfiltered fill buffer. Use an hindexed - * block type rather than an hvector. - */ - mpi_code = - MPI_Type_create_hindexed_block(blocks, block_len, block_disps, MPI_BYTE, &mem_type); - if (mpi_code != MPI_SUCCESS) - HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed_block failed", mpi_code) - } - else { - mpi_code = MPI_Type_create_hvector(blocks, block_len, 0, MPI_BYTE, &mem_type); - if (mpi_code != MPI_SUCCESS) - HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hvector failed", mpi_code) - } - } - else { - /* - * Currently, different block lengths implies that there are partial - * edge chunks and the "don't filter partial edge chunks" flag is set. - */ + if (chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].unfiltered_partial_chunk) { assert(partial_chunk_fill_buf); - assert(block_lens); - assert(block_disps); - - mpi_code = MPI_Type_create_hindexed(blocks, block_lens, chunk_disp_array, MPI_BYTE, &file_type); - if (mpi_code != MPI_SUCCESS) - HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code) - - mpi_code = MPI_Type_create_hindexed(blocks, block_lens, block_disps, MPI_BYTE, &mem_type); - if (mpi_code != MPI_SUCCESS) - HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code) + io_wbufs[blocks] = partial_chunk_fill_buf; } + else + io_wbufs[blocks] = fill_buf; - if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(&file_type))) - HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) - if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(&mem_type))) - HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) - } /* end if */ - - /* Set MPI-IO VFD properties */ - - /* Set MPI datatypes for operation */ - if (H5CX_set_mpi_coll_datatypes(mem_type, file_type) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set MPI-I/O properties"); + blocks++; + } /* Get current transfer mode */ if (H5CX_get_io_xfer_mode(&prev_xfer_mode) < 0) @@ -5763,31 +5707,24 @@ H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_ if (H5CX_set_io_xfer_mode(H5FD_MPIO_COLLECTIVE) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set transfer mode"); - /* Low-level write (collective) */ - if (H5F_shared_block_write(H5F_SHARED(dset->oloc.file), H5FD_MEM_DRAW, (haddr_t)0, - (blocks) ? (size_t)1 : (size_t)0, fill_buf) < 0) - HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "unable to write raw data to file"); - /* Barrier so processes don't race ahead */ if (MPI_SUCCESS != (mpi_code = MPI_Barrier(mpi_comm))) HMPI_GOTO_ERROR(FAIL, "MPI_Barrier failed", mpi_code) + /* Perform the selection vector I/O for the chunks */ + if (H5F_shared_vector_write(H5F_SHARED(dset->oloc.file), (uint32_t)blocks, io_types, io_addrs, + all_same_block_len ? io_2sizes : io_sizes, io_wbufs) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "vector write call failed"); + done: if (have_xfer_mode) - /* Set transfer mode */ + /* Restore transfer mode */ if (H5CX_set_io_xfer_mode(prev_xfer_mode) < 0) HDONE_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set transfer mode"); - /* free things */ - if (MPI_BYTE != file_type) - if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type))) - HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) - if (MPI_BYTE != mem_type) - if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type))) - HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) - H5MM_xfree(chunk_disp_array); - H5MM_xfree(block_disps); - H5MM_xfree(block_lens); + H5MM_xfree(io_addrs); + H5MM_xfree(io_wbufs); + H5MM_xfree(io_sizes); FUNC_LEAVE_NOAPI(ret_value) } /* end H5D__chunk_collective_fill() */ @@ -5805,6 +5742,7 @@ H5D__chunk_cmp_coll_fill_info(const void *_entry1, const void *_entry2) FUNC_LEAVE_NOAPI(H5_addr_cmp(entry1->addr, entry2->addr)) } /* end H5D__chunk_cmp_coll_fill_info() */ + #endif /* H5_HAVE_PARALLEL */ /*------------------------------------------------------------------------- diff --git a/src/H5FDmpio.c b/src/H5FDmpio.c index d5dd126..8aae79e 100644 --- a/src/H5FDmpio.c +++ b/src/H5FDmpio.c @@ -106,7 +106,7 @@ static herr_t H5FD__mpio_ctl(H5FD_t *_file, uint64_t op_code, uint64_t flags, co /* Other functions */ static herr_t H5FD__mpio_vector_build_types(uint32_t count, H5FD_mem_t types[], haddr_t addrs[], size_t sizes[], H5_flexible_const_ptr_t bufs[], - haddr_t *s_addrs[], size_t *s_sizes[], + haddr_t *s_addrs[], size_t *s_sizes[], uint32_t *s_sizes_len, H5_flexible_const_ptr_t *s_bufs[], bool *vector_was_sorted, MPI_Offset *mpi_off, H5_flexible_const_ptr_t *mpi_bufs_base, int *size_i, MPI_Datatype *buf_type, bool *buf_type_created, @@ -1675,7 +1675,8 @@ done: static herr_t H5FD__mpio_vector_build_types(uint32_t count, H5FD_mem_t types[], haddr_t addrs[], size_t sizes[], H5_flexible_const_ptr_t bufs[], haddr_t *s_addrs[], size_t *s_sizes[], - H5_flexible_const_ptr_t *s_bufs[], bool *vector_was_sorted, MPI_Offset *mpi_off, + uint32_t *s_sizes_len, H5_flexible_const_ptr_t *s_bufs[], + bool *vector_was_sorted, MPI_Offset *mpi_off, H5_flexible_const_ptr_t *mpi_bufs_base, int *size_i, MPI_Datatype *buf_type, bool *buf_type_created, MPI_Datatype *file_type, bool *file_type_created, char *unused) @@ -1716,6 +1717,10 @@ H5FD__mpio_vector_build_types(uint32_t count, H5FD_mem_t types[], haddr_t addrs[ /* Get bio I/O transition point (may be lower than 2G for testing) */ bigio_count = H5_mpi_get_bigio_count(); + /* Start with s_sizes_len at count */ + if (s_sizes_len) + *s_sizes_len = count; + if (count == 1) { /* Single block. Just use a series of MPI_BYTEs for the file view. */ @@ -1808,8 +1813,13 @@ H5FD__mpio_vector_build_types(uint32_t count, H5FD_mem_t types[], haddr_t addrs[ if (!fixed_size) { if ((*s_sizes)[i] == 0) { assert(vector_was_sorted); + assert(i > 0); fixed_size = true; size = sizes[i - 1]; + + /* Return the used length of the s_sizes buffer */ + if (s_sizes_len) + *s_sizes_len = (uint32_t)i; } else { size = (*s_sizes)[i]; @@ -2098,7 +2108,7 @@ H5FD__mpio_read_vector(H5FD_t *_file, hid_t H5_ATTR_UNUSED dxpl_id, uint32_t cou if (xfer_mode == H5FD_MPIO_COLLECTIVE) { /* Build MPI types, etc. */ if (H5FD__mpio_vector_build_types(count, types, addrs, sizes, (H5_flexible_const_ptr_t *)bufs, - &s_addrs, &s_sizes, (H5_flexible_const_ptr_t **)&s_bufs, + &s_addrs, &s_sizes, NULL, (H5_flexible_const_ptr_t **)&s_bufs, &vector_was_sorted, &mpi_off, (H5_flexible_const_ptr_t *)&mpi_bufs_base, &size_i, &buf_type, &buf_type_created, &file_type, &file_type_created, &unused) < 0) @@ -2464,17 +2474,21 @@ H5FD__mpio_write_vector(H5FD_t *_file, hid_t H5_ATTR_UNUSED dxpl_id, uint32_t co HGOTO_ERROR(H5E_VFL, H5E_CANTGET, FAIL, "can't get MPI-I/O transfer mode"); if (xfer_mode == H5FD_MPIO_COLLECTIVE) { + uint32_t s_sizes_len; + /* Build MPI types, etc. */ if (H5FD__mpio_vector_build_types(count, types, addrs, sizes, (H5_flexible_const_ptr_t *)bufs, - &s_addrs, &s_sizes, (H5_flexible_const_ptr_t **)&s_bufs, - &vector_was_sorted, &mpi_off, + &s_addrs, &s_sizes, &s_sizes_len, + (H5_flexible_const_ptr_t **)&s_bufs, &vector_was_sorted, &mpi_off, (H5_flexible_const_ptr_t *)&mpi_bufs_base, &size_i, &buf_type, &buf_type_created, &file_type, &file_type_created, &unused) < 0) HGOTO_ERROR(H5E_VFL, H5E_CANTGET, FAIL, "can't build MPI datatypes for I/O"); - /* Compute max address written to */ + /* Compute max address written to. Note s_sizes is indexed according to the length of that array as + * reported by H5FD__mpio_vector_build_types(), which may be shorter if using the compressed arrays + * feature. */ if (count > 0) - max_addr = s_addrs[count - 1] + (haddr_t)(s_sizes[count - 1]); + max_addr = s_addrs[count - 1] + (haddr_t)(s_sizes[s_sizes_len - 1]); /* free sorted vectors if they exist */ if (!vector_was_sorted) { |