diff options
Diffstat (limited to 'src/H5mpi.c')
-rw-r--r-- | src/H5mpi.c | 233 |
1 files changed, 233 insertions, 0 deletions
diff --git a/src/H5mpi.c b/src/H5mpi.c index aea0104..15fb785 100644 --- a/src/H5mpi.c +++ b/src/H5mpi.c @@ -549,4 +549,237 @@ done: FUNC_LEAVE_NOAPI(ret_value) } /* end H5_mpio_create_large_type() */ +/*------------------------------------------------------------------------- + * Function: H5_mpio_gatherv_alloc + * + * Purpose: A wrapper around MPI_(All)gatherv that performs allocation + * of the receive buffer on the caller's behalf. This + * routine's parameters are as follows: + * + * `send_buf` - The buffer that data will be sent from for + * the calling MPI rank. Analogous to + * MPI_(All)gatherv's `sendbuf` parameter. + * + * `send_count` - The number of `send_type` elements in the + * send buffer. Analogous to MPI_(All)gatherv's + * `sendcount` parameter. + * + * `send_type` - The MPI Datatype of the elements in the send + * buffer. Analogous to MPI_(All)gatherv's + * `sendtype` parameter. + * + * `recv_counts` - An array containing the number of elements + * to be received from each MPI rank. + * Analogous to MPI_(All)gatherv's `recvcount` + * parameter. + * + * `displacements` - An array containing the displacements + * in the receive buffer where data from + * each MPI rank should be placed. Analogous + * to MPI_(All)gatherv's `displs` parameter. + * + * `recv_type` - The MPI Datatype of the elements in the + * receive buffer. Analogous to + * MPI_(All)gatherv's `recvtype` parameter. + * + * `allgather` - Specifies whether the gather operation to be + * performed should be MPI_Allgatherv (TRUE) or + * MPI_Gatherv (FALSE). + * + * `root` - For MPI_Gatherv operations, specifies the rank + * that will receive the data sent by other ranks. + * Analogous to MPI_Gatherv's `root` parameter. For + * MPI_Allgatherv operations, this parameter is + * ignored. + * + * `comm` - Specifies the MPI Communicator for the operation. + * Analogous to MPI_(All)gatherv's `comm` parameter. + * + * `mpi_rank` - Specifies the calling rank's rank value, as + * obtained by calling MPI_Comm_rank on the + * MPI Communicator `comm`. + * + * `mpi_size` - Specifies the MPI Communicator size, as + * obtained by calling MPI_Comm_size on the + * MPI Communicator `comm`. + * + * `out_buf` - Resulting buffer that is allocated and + * returned to the caller after data has been + * gathered into it. Returned only to the rank + * specified by `root` for MPI_Gatherv + * operations, or to all ranks for + * MPI_Allgatherv operations. + * + * `out_buf_num_entries` - The number of elements in the + * resulting buffer, in terms of + * the MPI Datatype provided for + * `recv_type`. + * + * Notes: This routine is collective across `comm`. + * + * Return: Non-negative on success/Negative on failure + * + *------------------------------------------------------------------------- + */ +herr_t +H5_mpio_gatherv_alloc(void *send_buf, int send_count, MPI_Datatype send_type, const int recv_counts[], + const int displacements[], MPI_Datatype recv_type, hbool_t allgather, int root, + MPI_Comm comm, int mpi_rank, int mpi_size, void **out_buf, size_t *out_buf_num_entries) +{ + size_t recv_buf_num_entries = 0; + void * recv_buf = NULL; +#if MPI_VERSION >= 3 + MPI_Count type_lb; + MPI_Count type_extent; +#else + MPI_Aint type_lb; + MPI_Aint type_extent; +#endif + int mpi_code; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_NOAPI(FAIL) + + HDassert(send_buf || send_count == 0); + if (allgather || (mpi_rank == root)) + HDassert(out_buf && out_buf_num_entries); + + /* Retrieve the extent of the MPI Datatype being used */ +#if MPI_VERSION >= 3 + if (MPI_SUCCESS != (mpi_code = MPI_Type_get_extent_x(recv_type, &type_lb, &type_extent))) +#else + if (MPI_SUCCESS != (mpi_code = MPI_Type_get_extent(recv_type, &type_lb, &type_extent))) +#endif + HMPI_GOTO_ERROR(FAIL, "MPI_Type_get_extent(_x) failed", mpi_code) + + if (type_extent < 0) + HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "MPI recv_type had a negative extent") + + /* + * Calculate the total size of the buffer being + * returned and allocate it + */ + if (allgather || (mpi_rank == root)) { + size_t i; + size_t buf_size; + + for (i = 0, recv_buf_num_entries = 0; i < (size_t)mpi_size; i++) + recv_buf_num_entries += (size_t)recv_counts[i]; + buf_size = recv_buf_num_entries * (size_t)type_extent; + + /* If our buffer size is 0, there's nothing to do */ + if (buf_size == 0) + HGOTO_DONE(SUCCEED) + + if (NULL == (recv_buf = H5MM_malloc(buf_size))) + /* Push an error, but still participate in collective gather operation */ + HDONE_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "couldn't allocate receive buffer") + } + + /* Perform gather operation */ + if (allgather) { + if (MPI_SUCCESS != (mpi_code = MPI_Allgatherv(send_buf, send_count, send_type, recv_buf, recv_counts, + displacements, recv_type, comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Allgatherv failed", mpi_code) + } + else { + if (MPI_SUCCESS != (mpi_code = MPI_Gatherv(send_buf, send_count, send_type, recv_buf, recv_counts, + displacements, recv_type, root, comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Gatherv failed", mpi_code) + } + + if (allgather || (mpi_rank == root)) { + *out_buf = recv_buf; + *out_buf_num_entries = recv_buf_num_entries; + } + +done: + if (ret_value < 0) { + if (recv_buf) + H5MM_free(recv_buf); + } + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5_mpio_gatherv_alloc() */ + +/*------------------------------------------------------------------------- + * Function: H5_mpio_gatherv_alloc_simple + * + * Purpose: A slightly simplified interface to H5_mpio_gatherv_alloc + * which calculates the receive counts and receive buffer + * displacements for the caller. + * + * Notes: This routine is collective across `comm`. + * + * Return: Non-negative on success/Negative on failure + * + *------------------------------------------------------------------------- + */ +herr_t +H5_mpio_gatherv_alloc_simple(void *send_buf, int send_count, MPI_Datatype send_type, MPI_Datatype recv_type, + hbool_t allgather, int root, MPI_Comm comm, int mpi_rank, int mpi_size, + void **out_buf, size_t *out_buf_num_entries) +{ + int * recv_counts_disps_array = NULL; + int mpi_code; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_NOAPI(FAIL) + + HDassert(send_buf || send_count == 0); + if (allgather || (mpi_rank == root)) + HDassert(out_buf && out_buf_num_entries); + + /* + * Allocate array to store the receive counts of each rank, as well as + * the displacements into the final array where each rank will place + * their data. The first half of the array contains the receive counts + * (in rank order), while the latter half contains the displacements + * (also in rank order). + */ + if (allgather || (mpi_rank == root)) { + if (NULL == + (recv_counts_disps_array = H5MM_malloc(2 * (size_t)mpi_size * sizeof(*recv_counts_disps_array)))) + /* Push an error, but still participate in collective gather operation */ + HDONE_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, + "couldn't allocate receive counts and displacements array") + } + + /* Collect each rank's send count to interested ranks */ + if (allgather) { + if (MPI_SUCCESS != + (mpi_code = MPI_Allgather(&send_count, 1, MPI_INT, recv_counts_disps_array, 1, MPI_INT, comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code) + } + else { + if (MPI_SUCCESS != + (mpi_code = MPI_Gather(&send_count, 1, MPI_INT, recv_counts_disps_array, 1, MPI_INT, root, comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Gather failed", mpi_code) + } + + /* Set the displacements into the receive buffer for the gather operation */ + if (allgather || (mpi_rank == root)) { + size_t i; + int * displacements_ptr; + + displacements_ptr = &recv_counts_disps_array[mpi_size]; + + *displacements_ptr = 0; + for (i = 1; i < (size_t)mpi_size; i++) + displacements_ptr[i] = displacements_ptr[i - 1] + recv_counts_disps_array[i - 1]; + } + + /* Perform gather operation */ + if (H5_mpio_gatherv_alloc(send_buf, send_count, send_type, recv_counts_disps_array, + &recv_counts_disps_array[mpi_size], recv_type, allgather, root, comm, mpi_rank, + mpi_size, out_buf, out_buf_num_entries) < 0) + HGOTO_ERROR(H5E_LIB, H5E_CANTGATHER, FAIL, "can't gather data") + +done: + if (recv_counts_disps_array) + H5MM_free(recv_counts_disps_array); + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5_mpio_gatherv_alloc_simple() */ + #endif /* H5_HAVE_PARALLEL */ |