From 57cd77c478e5828bb89708d0efe109629f88ac3e Mon Sep 17 00:00:00 2001 From: Neil Fortner Date: Fri, 24 Sep 2021 12:24:20 -0500 Subject: Implement parallel collective support for selection I/O. --- src/H5Dchunk.c | 10 +++--- src/H5Dcontig.c | 8 ++--- src/H5Dio.c | 22 +++++++------ src/H5FDint.c | 2 -- src/H5FDmpio.c | 89 ++++++++++++++++++++++++++++++++++++++++++-------- testpar/t_coll_chunk.c | 5 ++- testpar/t_dset.c | 46 ++++++++++++++------------ 7 files changed, 126 insertions(+), 56 deletions(-) diff --git a/src/H5Dchunk.c b/src/H5Dchunk.c index dcc3baa..f11e717 100644 --- a/src/H5Dchunk.c +++ b/src/H5Dchunk.c @@ -2650,8 +2650,7 @@ H5D__chunk_read(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, hsize_ /* Issue selection I/O call (we can skip the page buffer because we've * already verified it won't be used, and the metadata accumulator * because this is raw data) */ - if (num_chunks > 0 && - H5F_shared_select_read(H5F_SHARED(io_info->dset->oloc.file), H5FD_MEM_DRAW, (uint32_t)num_chunks, + if (H5F_shared_select_read(H5F_SHARED(io_info->dset->oloc.file), H5FD_MEM_DRAW, (uint32_t)num_chunks, (const H5S_t * const *)chunk_mem_spaces, (const H5S_t * const *)chunk_file_spaces, chunk_addrs, element_sizes, bufs) < 0) HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "chunk selection read failed") @@ -2985,10 +2984,9 @@ H5D__chunk_write(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, hsize /* Issue selection I/O call (we can skip the page buffer because we've * already verified it won't be used, and the metadata accumulator * because this is raw data) */ - if (num_chunks > 0 && H5F_shared_select_write( - H5F_SHARED(io_info->dset->oloc.file), H5FD_MEM_DRAW, (uint32_t)num_chunks, - (const H5S_t * const *)chunk_mem_spaces, (const H5S_t * const *)chunk_file_spaces, - chunk_addrs, element_sizes, bufs) < 0) + if (H5F_shared_select_write(H5F_SHARED(io_info->dset->oloc.file), H5FD_MEM_DRAW, (uint32_t)num_chunks, + (const H5S_t * const *)chunk_mem_spaces, (const H5S_t * const *)chunk_file_spaces, + chunk_addrs, element_sizes, bufs) < 0) HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "chunk selection read failed") /* Clean up memory */ diff --git a/src/H5Dcontig.c b/src/H5Dcontig.c index d2a84da..cf6f543 100644 --- a/src/H5Dcontig.c +++ b/src/H5Dcontig.c @@ -658,7 +658,7 @@ H5D__contig_read(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, hsize /* Issue selection I/O call (we can skip the page buffer because we've * already verified it won't be used, and the metadata accumulator * because this is raw data) Only call funciton if nelmts > 0. */ - if (nelmts > 0 && H5F_shared_select_read(H5F_SHARED(io_info->dset->oloc.file), H5FD_MEM_DRAW, 1, + if (H5F_shared_select_read(H5F_SHARED(io_info->dset->oloc.file), H5FD_MEM_DRAW, nelmts > 0 ? 1 : 0, &mem_space, &file_space, &(io_info->store->contig.dset_addr), &dst_type_size, &(io_info->u.rbuf)) < 0) HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "contiguous selection read failed") @@ -705,9 +705,9 @@ H5D__contig_write(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, hsiz /* Issue selection I/O call (we can skip the page buffer because we've * already verified it won't be used, and the metadata accumulator * because this is raw data). Only call funciton if nelmts > 0. */ - if (nelmts > 0 && H5F_shared_select_write(H5F_SHARED(io_info->dset->oloc.file), H5FD_MEM_DRAW, 1, - &mem_space, &file_space, &(io_info->store->contig.dset_addr), - &dst_type_size, &(io_info->u.wbuf)) < 0) + if (H5F_shared_select_write(H5F_SHARED(io_info->dset->oloc.file), H5FD_MEM_DRAW, nelmts > 0 ? 1 : 0, + &mem_space, &file_space, &(io_info->store->contig.dset_addr), + &dst_type_size, &(io_info->u.wbuf)) < 0) HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "contiguous selection write failed") } /* end if */ else diff --git a/src/H5Dio.c b/src/H5Dio.c index cc5f5bb..191ee5c 100644 --- a/src/H5Dio.c +++ b/src/H5Dio.c @@ -845,11 +845,8 @@ H5D__ioinfo_adjust(H5D_io_info_t *io_info, const H5D_t *dset, const H5S_t *file_ H5CX_set_mpio_actual_io_mode(H5D_MPIO_NO_COLLECTIVE); } /* end if */ - /* Make any parallel I/O adjustments. Do not use collective code path if - * we're using selection I/O - in this case the file driver will handle it. - */ - /* Check for selection/vector support in file driver? -NAF */ - if (io_info->using_mpi_vfd /*&& !H5_use_selection_io_g*/) { + /* Make any parallel I/O adjustments */ + if (io_info->using_mpi_vfd) { H5FD_mpio_xfer_t xfer_mode; /* Parallel transfer for this request */ htri_t opt; /* Flag whether a selection is optimizable */ @@ -867,11 +864,16 @@ H5D__ioinfo_adjust(H5D_io_info_t *io_info, const H5D_t *dset, const H5S_t *file_ /* Check if we can use the optimized parallel I/O routines */ if (opt == TRUE) { - /* Override the I/O op pointers to the MPI-specific routines */ - io_info->io_ops.multi_read = dset->shared->layout.ops->par_read; - io_info->io_ops.multi_write = dset->shared->layout.ops->par_write; - io_info->io_ops.single_read = H5D__mpio_select_read; - io_info->io_ops.single_write = H5D__mpio_select_write; + /* Override the I/O op pointers to the MPI-specific routines, unless + * selection I/O is to be used - in this case the file driver will + * handle collective I/O */ + /* Check for selection/vector support in file driver? -NAF */ + if(!io_info->use_select_io) { + io_info->io_ops.multi_read = dset->shared->layout.ops->par_read; + io_info->io_ops.multi_write = dset->shared->layout.ops->par_write; + io_info->io_ops.single_read = H5D__mpio_select_read; + io_info->io_ops.single_write = H5D__mpio_select_write; + } /* end if */ } /* end if */ else { /* Check if there are any filters in the pipeline. If there are, diff --git a/src/H5FDint.c b/src/H5FDint.c index f558b59..69dbdc4 100644 --- a/src/H5FDint.c +++ b/src/H5FDint.c @@ -751,7 +751,6 @@ H5FD__read_selection_translate(H5FD_t *file, H5FD_mem_t type, hid_t dxpl_id, uin /* Sanity checks */ HDassert(file); HDassert(file->cls); - HDassert(count > 0); HDassert(vec_arr_nalloc == sizeof(sizes_static) / sizeof(sizes_static[0])); HDassert(vec_arr_nalloc == sizeof(vec_bufs_static) / sizeof(vec_bufs_static[0])); HDassert(mem_spaces); @@ -1370,7 +1369,6 @@ H5FD__write_selection_translate(H5FD_t *file, H5FD_mem_t type, hid_t dxpl_id, ui /* Sanity checks */ HDassert(file); HDassert(file->cls); - HDassert(count > 0); HDassert(vec_arr_nalloc == sizeof(sizes_static) / sizeof(sizes_static[0])); HDassert(vec_arr_nalloc == sizeof(vec_bufs_static) / sizeof(vec_bufs_static[0])); HDassert(mem_spaces); diff --git a/src/H5FDmpio.c b/src/H5FDmpio.c index 9a99b45..6a6dd2b 100644 --- a/src/H5FDmpio.c +++ b/src/H5FDmpio.c @@ -1573,6 +1573,7 @@ H5FD__mpio_read_vector(H5FD_t *_file, hid_t H5_ATTR_UNUSED dxpl_id, uint32_t cou H5FD_mem_t * s_types = NULL; haddr_t * s_addrs = NULL; size_t * s_sizes = NULL; + size_t s_size; void ** s_bufs = NULL; int * mpi_block_lengths = NULL; char unused = 0; /* Unused, except for non-NULL pointer value */ @@ -1603,6 +1604,7 @@ H5FD__mpio_read_vector(H5FD_t *_file, hid_t H5_ATTR_UNUSED dxpl_id, uint32_t cou int io_size; /* Actual number of bytes requested */ int n; #endif + hbool_t rank0_bcast = FALSE; /* If read-with-rank0-and-bcast flag was used */ herr_t ret_value = SUCCEED; FUNC_ENTER_STATIC @@ -1639,7 +1641,23 @@ H5FD__mpio_read_vector(H5FD_t *_file, hid_t H5_ATTR_UNUSED dxpl_id, uint32_t cou if (xfer_mode == H5FD_MPIO_COLLECTIVE) { - if (count > 0) { /* create MPI derived types describing the vector write */ + if (count == 1) { + /* Single block. Just use a series of MPI_BYTEs for the file view. + */ + size_i = (int)sizes[0]; + buf_type = MPI_BYTE; + file_type = MPI_BYTE; + mpi_bufs_base = bufs[0]; + + /* Setup s_sizes (needed for incomplete read filling code) */ + vector_was_sorted = TRUE; + s_sizes = sizes; + + /* some numeric conversions */ + if (H5FD_mpi_haddr_to_MPIOff(addrs[0], &mpi_off) < 0) + HGOTO_ERROR(H5E_INTERNAL, H5E_BADRANGE, FAIL, "can't set MPI offset") + } + else if (count > 0) { /* create MPI derived types describing the vector write */ /* sort the vector I/O request into increasing address order if required * @@ -1750,6 +1768,10 @@ H5FD__mpio_read_vector(H5FD_t *_file, hid_t H5_ATTR_UNUSED dxpl_id, uint32_t cou if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(&file_type))) HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit for file_type failed", mpi_code) + + /* some numeric conversions */ + if (H5FD_mpi_haddr_to_MPIOff((haddr_t)0, &mpi_off) < 0) + HGOTO_ERROR(H5E_INTERNAL, H5E_BADRANGE, FAIL, "can't set MPI off to 0") } else { @@ -1763,14 +1785,14 @@ H5FD__mpio_read_vector(H5FD_t *_file, hid_t H5_ATTR_UNUSED dxpl_id, uint32_t cou /* MPI count to read */ size_i = 0; + + /* some numeric conversions */ + if (H5FD_mpi_haddr_to_MPIOff((haddr_t)0, &mpi_off) < 0) + HGOTO_ERROR(H5E_INTERNAL, H5E_BADRANGE, FAIL, "can't set MPI off to 0") } /* Portably initialize MPI status variable */ - HDmemset(&mpi_stat, 0, sizeof(MPI_Status)); - - /* some numeric conversions */ - if (H5FD_mpi_haddr_to_MPIOff((haddr_t)0, &mpi_off) < 0) - HGOTO_ERROR(H5E_INTERNAL, H5E_BADRANGE, FAIL, "can't set MPI off to 0") + HDmemset(&mpi_stat, 0, sizeof(mpi_stat)); #ifdef H5FDmpio_DEBUG if (H5FD_mpio_Debug[(int)'r']) @@ -1782,6 +1804,10 @@ H5FD__mpio_read_vector(H5FD_t *_file, hid_t H5_ATTR_UNUSED dxpl_id, uint32_t cou H5FD_mpi_native_g, file->info))) HMPI_GOTO_ERROR(FAIL, "MPI_File_set_view failed", mpi_code) + /* Reset mpi_off to 0 since the view now starts at the data offset */ + if (H5FD_mpi_haddr_to_MPIOff((haddr_t)0, &mpi_off) < 0) + HGOTO_ERROR(H5E_INTERNAL, H5E_BADRANGE, FAIL, "can't set MPI off to 0") + /* Get the collective_opt property to check whether the application wants to do IO individually. */ if (H5CX_get_mpio_coll_opt(&coll_opt_mode) < 0) @@ -1793,15 +1819,30 @@ H5FD__mpio_read_vector(H5FD_t *_file, hid_t H5_ATTR_UNUSED dxpl_id, uint32_t cou if (H5FD_mpio_Debug[(int)'r']) HDfprintf(stdout, "%s: using MPIO collective mode\n", FUNC); #endif - if (coll_opt_mode == H5FD_MPIO_COLLECTIVE_IO) { #ifdef H5FDmpio_DEBUG if (H5FD_mpio_Debug[(int)'r']) HDfprintf(stdout, "%s: doing MPI collective IO\n", FUNC); #endif + /* Check whether we should read from rank 0 and broadcast to other ranks */ + if (H5CX_get_mpio_rank0_bcast()) { +#ifdef H5FDmpio_DEBUG + if (H5FD_mpio_Debug[(int)'r']) + HDfprintf(stdout, "%s: doing read-rank0-and-MPI_Bcast\n", FUNC); +#endif + /* Indicate path we've taken */ + rank0_bcast = TRUE; - if (MPI_SUCCESS != (mpi_code = MPI_File_read_at_all(file->f, mpi_off, mpi_bufs_base, size_i, - buf_type, &mpi_stat))) + /* Read on rank 0 Bcast to other ranks */ + if (file->mpi_rank == 0) + if (MPI_SUCCESS != (mpi_code = MPI_File_read_at(file->f, mpi_off, mpi_bufs_base, size_i, + buf_type, &mpi_stat))) + HMPI_GOTO_ERROR(FAIL, "MPI_File_read_at_all failed", mpi_code) + if (MPI_SUCCESS != (mpi_code = MPI_Bcast(mpi_bufs_base, size_i, buf_type, 0, file->comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Bcast failed", mpi_code) + } /* end if */ + else if (MPI_SUCCESS != (mpi_code = MPI_File_read_at_all(file->f, mpi_off, mpi_bufs_base, size_i, + buf_type, &mpi_stat))) HMPI_GOTO_ERROR(FAIL, "MPI_File_read_at_all failed", mpi_code) } /* end if */ else if (size_i > 0) { @@ -1822,13 +1863,32 @@ H5FD__mpio_read_vector(H5FD_t *_file, hid_t H5_ATTR_UNUSED dxpl_id, uint32_t cou H5FD_mpi_native_g, file->info))) HMPI_GOTO_ERROR(FAIL, "MPI_File_set_view failed", mpi_code) - /* How many bytes were actually read? */ + /* Only retrieve bytes read if this rank _actually_ participated in I/O */ + if (!rank0_bcast || (rank0_bcast && file->mpi_rank == 0)) { + /* How many bytes were actually read? */ #if MPI_VERSION >= 3 - if (MPI_SUCCESS != (mpi_code = MPI_Get_elements_x(&mpi_stat, buf_type, &bytes_read))) + if (MPI_SUCCESS != (mpi_code = MPI_Get_elements_x(&mpi_stat, buf_type, &bytes_read))) #else - if (MPI_SUCCESS != (mpi_code = MPI_Get_elements(&mpi_stat, MPI_BYTE, &bytes_read))) + if (MPI_SUCCESS != (mpi_code = MPI_Get_elements(&mpi_stat, MPI_BYTE, &bytes_read))) #endif - HMPI_GOTO_ERROR(FAIL, "MPI_Get_elements failed", mpi_code) + HMPI_GOTO_ERROR(FAIL, "MPI_Get_elements failed", mpi_code) + } /* end if */ + + /* If the rank0-bcast feature was used, broadcast the # of bytes read to + * other ranks, which didn't perform any I/O. + */ + /* NOTE: This could be optimized further to be combined with the broadcast + * of the data. (QAK - 2019/1/2) + * Or have rank 0 clear the unread parts of the buffer prior to + * the bcast. (NAF - 2021/9/15) + */ + if (rank0_bcast) +#if MPI_VERSION >= 3 + if (MPI_SUCCESS != MPI_Bcast(&bytes_read, 1, MPI_COUNT, 0, file->comm)) +#else + if (MPI_SUCCESS != MPI_Bcast(&bytes_read, 1, MPI_INT, 0, file->comm)) +#endif + HMPI_GOTO_ERROR(FAIL, "MPI_Bcast failed", 0) /* Get the type's size */ #if MPI_VERSION >= 3 @@ -1915,6 +1975,9 @@ H5FD__mpio_read_vector(H5FD_t *_file, hid_t H5_ATTR_UNUSED dxpl_id, uint32_t cou /* Check if we acutally need to do I/O */ if (addrs[i] < max_addr) { + /* Portably initialize MPI status variable */ + HDmemset(&mpi_stat, 0, sizeof(mpi_stat)); + /* Issue read */ if (MPI_SUCCESS != (mpi_code = MPI_File_read_at(file->f, mpi_off, bufs[i], size_i, MPI_BYTE, &mpi_stat))) diff --git a/testpar/t_coll_chunk.c b/testpar/t_coll_chunk.c index 651a392..29341d7 100644 --- a/testpar/t_coll_chunk.c +++ b/testpar/t_coll_chunk.c @@ -832,7 +832,10 @@ coll_chunktest(const char *filename, int chunk_factor, int select_factor, int ap VRFY((status >= 0), "dataset write succeeded"); #ifdef H5_HAVE_INSTRUMENTED_LIBRARY - if (facc_type == FACC_MPIO) { + /* Only check chunk optimization mode if selection I/O is not being used - + * selection I/O bypasses this IO mode decision - it's effectively always + * multi chunk currently */ + if (facc_type == FACC_MPIO && !H5_use_selection_io_g) { switch (api_option) { case API_LINK_HARD: status = H5Pget(xfer_plist, H5D_XFER_COLL_CHUNK_LINK_HARD_NAME, &prop_value); diff --git a/testpar/t_dset.c b/testpar/t_dset.c index 3e4a304..d65a2a9 100644 --- a/testpar/t_dset.c +++ b/testpar/t_dset.c @@ -3348,32 +3348,38 @@ actual_io_mode_tests(void) MPI_Comm_size(MPI_COMM_WORLD, &mpi_size); MPI_Comm_size(MPI_COMM_WORLD, &mpi_rank); - test_actual_io_mode(TEST_ACTUAL_IO_NO_COLLECTIVE); + /* Only run these tests if selection I/O is not being used - selection I/O + * bypasses this IO mode decision - it's effectively always multi chunk + * currently */ + if (!H5_use_selection_io_g) { + test_actual_io_mode(TEST_ACTUAL_IO_NO_COLLECTIVE); + + /* + * Test multi-chunk-io via proc_num threshold + */ + test_actual_io_mode(TEST_ACTUAL_IO_MULTI_CHUNK_IND); + test_actual_io_mode(TEST_ACTUAL_IO_MULTI_CHUNK_COL); - /* - * Test multi-chunk-io via proc_num threshold - */ - test_actual_io_mode(TEST_ACTUAL_IO_MULTI_CHUNK_IND); - test_actual_io_mode(TEST_ACTUAL_IO_MULTI_CHUNK_COL); + /* The Multi Chunk Mixed test requires atleast three processes. */ + if (mpi_size > 2) + test_actual_io_mode(TEST_ACTUAL_IO_MULTI_CHUNK_MIX); + else + HDfprintf(stdout, "Multi Chunk Mixed test requires 3 proceses minimum\n"); - /* The Multi Chunk Mixed test requires atleast three processes. */ - if (mpi_size > 2) - test_actual_io_mode(TEST_ACTUAL_IO_MULTI_CHUNK_MIX); - else - HDfprintf(stdout, "Multi Chunk Mixed test requires 3 proceses minimum\n"); + test_actual_io_mode(TEST_ACTUAL_IO_MULTI_CHUNK_MIX_DISAGREE); - test_actual_io_mode(TEST_ACTUAL_IO_MULTI_CHUNK_MIX_DISAGREE); + /* + * Test multi-chunk-io via setting direct property + */ + test_actual_io_mode(TEST_ACTUAL_IO_DIRECT_MULTI_CHUNK_IND); + test_actual_io_mode(TEST_ACTUAL_IO_DIRECT_MULTI_CHUNK_COL); - /* - * Test multi-chunk-io via setting direct property - */ - test_actual_io_mode(TEST_ACTUAL_IO_DIRECT_MULTI_CHUNK_IND); - test_actual_io_mode(TEST_ACTUAL_IO_DIRECT_MULTI_CHUNK_COL); + test_actual_io_mode(TEST_ACTUAL_IO_LINK_CHUNK); + test_actual_io_mode(TEST_ACTUAL_IO_CONTIGUOUS); - test_actual_io_mode(TEST_ACTUAL_IO_LINK_CHUNK); - test_actual_io_mode(TEST_ACTUAL_IO_CONTIGUOUS); + test_actual_io_mode(TEST_ACTUAL_IO_RESET); + } - test_actual_io_mode(TEST_ACTUAL_IO_RESET); return; } -- cgit v0.12