diff options
Diffstat (limited to 'src/H5Dmpio.c')
-rw-r--r-- | src/H5Dmpio.c | 99 |
1 files changed, 61 insertions, 38 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c index 887a05f..81aa799 100644 --- a/src/H5Dmpio.c +++ b/src/H5Dmpio.c @@ -234,7 +234,7 @@ static herr_t H5D__chunk_redistribute_shared_chunks(const H5D_io_info_t *io_info H5D_filtered_collective_io_info_t *local_chunk_array, size_t *local_chunk_array_num_entries); static herr_t H5D__mpio_array_gatherv(void *local_array, size_t local_array_num_entries, size_t array_entry_size, void **gathered_array, size_t *gathered_array_num_entries, - int nprocs, hbool_t allgather, int root, MPI_Comm comm, int (*sort_func)(const void *, const void *)); + hbool_t allgather, int root, MPI_Comm comm, int (*sort_func)(const void *, const void *)); static herr_t H5D__mpio_filtered_collective_write_type( H5D_filtered_collective_io_info_t *chunk_list, size_t num_entries, MPI_Datatype *new_mem_type, hbool_t *mem_type_derived, @@ -418,19 +418,16 @@ done: * Function: H5D__mpio_array_gatherv * * Purpose: Given an array, specified in local_array, by each processor - * calling this function, gathers each array into a single + * calling this function, collects each array into a single * array which is then either gathered to the processor * specified by root, when allgather is false, or is * distributed back to all processors when allgather is true. * - * The size of each entry and number of entries in the array - * contributed by an individual processor should be specified - * in array_entry_size and local_array_num_entries, + * The number of entries in the array contributed by an + * individual processor and the size of each entry should be + * specified in local_array_num_entries and array_entry_size, * respectively. * - * The number of processors participating in the gather - * operation should be specified for nprocs. - * * The MPI communicator to use should be specified for comm. * * If the sort_func argument is supplied, the array is sorted @@ -448,14 +445,13 @@ done: static herr_t H5D__mpio_array_gatherv(void *local_array, size_t local_array_num_entries, size_t array_entry_size, void **_gathered_array, size_t *_gathered_array_num_entries, - int nprocs, hbool_t allgather, int root, MPI_Comm comm, int (*sort_func)(const void *, const void *)) + hbool_t allgather, int root, MPI_Comm comm, int (*sort_func)(const void *, const void *)) { size_t gathered_array_num_entries = 0; /* The size of the newly-constructed array */ - size_t i; void *gathered_array = NULL; /* The newly-constructed array returned to the caller */ - int *receive_counts_array = NULL; /* Array containing number of entries each process is contributing */ - int *displacements_array = NULL; /* Array of displacements where each process places its data in the final array */ - int mpi_code; + int *receive_counts_array = NULL; /* Array containing number of entries each processor is contributing */ + int *displacements_array = NULL; /* Array of displacements where each processor places its data in the final array */ + int mpi_code, mpi_rank, mpi_size; int sendcount; herr_t ret_value = SUCCEED; @@ -464,34 +460,62 @@ H5D__mpio_array_gatherv(void *local_array, size_t local_array_num_entries, HDassert(_gathered_array); HDassert(_gathered_array_num_entries); - /* Determine the size of the end result array */ + MPI_Comm_size(comm, &mpi_size); + MPI_Comm_rank(comm, &mpi_rank); + + /* + * Determine the size of the end result array by collecting the number + * of entries contributed by each processor into a single total. + */ if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&local_array_num_entries, &gathered_array_num_entries, 1, MPI_INT, MPI_SUM, comm))) HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code) - /* If 0 entries resulted from the collective operation, no one is writing anything */ + /* If 0 entries resulted from the collective operation, no processor is contributing anything and there is nothing to do */ if (gathered_array_num_entries > 0) { - if (NULL == (gathered_array = H5MM_malloc(gathered_array_num_entries * array_entry_size))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate gathered array") + /* + * If gathering to all processors, all processors need to allocate space for the resulting array, as well as + * the receive counts and displacements arrays for the collective MPI_Allgatherv call. Otherwise, only the + * root processor needs to allocate the space for an MPI_Gatherv call. + */ + if (allgather || (mpi_rank == root)) { + if (NULL == (gathered_array = H5MM_malloc(gathered_array_num_entries * array_entry_size))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate gathered array") - if (NULL == (receive_counts_array = (int *) H5MM_malloc((size_t) nprocs * sizeof(int)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive counts array") + if (NULL == (receive_counts_array = (int *) H5MM_malloc((size_t) mpi_size * sizeof(int)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive counts array") - if (NULL == (displacements_array = (int *) H5MM_malloc((size_t) nprocs * sizeof(int)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive displacements array") + if (NULL == (displacements_array = (int *) H5MM_malloc((size_t) mpi_size * sizeof(int)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive displacements array") + } /* end if */ - /* Inform each process of how many entries each other process is contributing to the resulting array */ - if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&local_array_num_entries, 1, MPI_INT, receive_counts_array, 1, MPI_INT, comm))) - HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code) + /* + * If gathering to all processors, inform each processor of how many entries each other processor is + * contributing to the resulting array by collecting the counts into each processor's "receive counts" + * array. Otherwise, inform only the root processor of how many entries each other processor is contributing. + */ + if (allgather) { + if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&local_array_num_entries, 1, MPI_INT, receive_counts_array, 1, MPI_INT, comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code) + } /* end if */ + else { + if (MPI_SUCCESS != (mpi_code = MPI_Gather(&local_array_num_entries, 1, MPI_INT, receive_counts_array, 1, MPI_INT, root, comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Gather failed", mpi_code) + } /* end else */ + + if (allgather || (mpi_rank == root)) { + size_t i; - /* Multiply each receive count by the size of the array entry, since the data is sent as bytes */ - for (i = 0; i < (size_t) nprocs; i++) - H5_CHECKED_ASSIGN(receive_counts_array[i], int, (size_t) receive_counts_array[i] * array_entry_size, size_t); + /* Multiply each receive count by the size of the array entry, since the data is sent as bytes. */ + for (i = 0; i < (size_t) mpi_size; i++) + H5_CHECKED_ASSIGN(receive_counts_array[i], int, (size_t) receive_counts_array[i] * array_entry_size, size_t); - /* Set receive buffer offsets for MPI_Allgatherv */ - displacements_array[0] = 0; - for (i = 1; i < (size_t) nprocs; i++) - displacements_array[i] = displacements_array[i - 1] + receive_counts_array[i - 1]; + /* Set receive buffer offsets for the collective MPI_Allgatherv/MPI_Gatherv call. */ + displacements_array[0] = 0; + for (i = 1; i < (size_t) mpi_size; i++) + displacements_array[i] = displacements_array[i - 1] + receive_counts_array[i - 1]; + } /* end if */ + /* As the data is sent as bytes, calculate the true sendcount for the data. */ H5_CHECKED_ASSIGN(sendcount, int, local_array_num_entries * array_entry_size, size_t); if (allgather) { @@ -505,7 +529,8 @@ H5D__mpio_array_gatherv(void *local_array, size_t local_array_num_entries, HMPI_GOTO_ERROR(FAIL, "MPI_Allgatherv failed", mpi_code) } /* end else */ - if (sort_func) HDqsort(gathered_array, gathered_array_num_entries, array_entry_size, sort_func); + if (sort_func && (allgather || (mpi_rank == root))) + HDqsort(gathered_array, gathered_array_num_entries, array_entry_size, sort_func); } /* end if */ *_gathered_array = gathered_array; @@ -1295,8 +1320,7 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in * of the chunks in the file. */ if (H5D__mpio_array_gatherv(chunk_list, chunk_list_num_entries, sizeof(H5D_filtered_collective_io_info_t), - (void **) &collective_chunk_list, &collective_chunk_list_num_entries, mpi_size, - true, 0, io_info->comm, NULL) < 0) + (void **) &collective_chunk_list, &collective_chunk_list_num_entries, true, 0, io_info->comm, NULL) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather new chunk sizes") /* Collectively re-allocate the modified chunks (from each process) in the file */ @@ -1768,8 +1792,7 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i * of the chunks in the file */ if (H5D__mpio_array_gatherv(&chunk_list[i], have_chunk_to_process ? 1 : 0, sizeof(H5D_filtered_collective_io_info_t), - (void **) &collective_chunk_list, &collective_chunk_list_num_entries, mpi_size, - true, 0, io_info->comm, NULL) < 0) + (void **) &collective_chunk_list, &collective_chunk_list_num_entries, true, 0, io_info->comm, NULL) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather new chunk sizes") /* Participate in the collective re-allocation of all chunks modified @@ -2655,8 +2678,8 @@ H5D__chunk_redistribute_shared_chunks(const H5D_io_info_t *io_info, const H5D_ty * call, the gathered list will initially be sorted in increasing order of chunk offset in the file. */ if (H5D__mpio_array_gatherv(local_chunk_array, *local_chunk_array_num_entries, sizeof(H5D_filtered_collective_io_info_t), - (void **) &shared_chunks_info_array, &shared_chunks_info_array_num_entries, mpi_size, - false, 0, io_info->comm, H5D__cmp_filtered_collective_io_info_entry) < 0) + (void **) &shared_chunks_info_array, &shared_chunks_info_array_num_entries, false, 0, + io_info->comm, H5D__cmp_filtered_collective_io_info_entry) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather array") /* Rank 0 redistributes any shared chunks to new owners as necessary */ |