summaryrefslogtreecommitdiffstats
path: root/src/H5Dmpio.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/H5Dmpio.c')
-rw-r--r--src/H5Dmpio.c149
1 files changed, 98 insertions, 51 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c
index bc37840..d721ae6 100644
--- a/src/H5Dmpio.c
+++ b/src/H5Dmpio.c
@@ -234,7 +234,7 @@ static herr_t H5D__chunk_redistribute_shared_chunks(const H5D_io_info_t *io_info
H5D_filtered_collective_io_info_t *local_chunk_array, size_t *local_chunk_array_num_entries);
static herr_t H5D__mpio_array_gatherv(void *local_array, size_t local_array_num_entries,
size_t array_entry_size, void **gathered_array, size_t *gathered_array_num_entries,
- int nprocs, hbool_t allgather, int root, MPI_Comm comm, int (*sort_func)(const void *, const void *));
+ hbool_t allgather, int root, MPI_Comm comm, int (*sort_func)(const void *, const void *));
static herr_t H5D__mpio_filtered_collective_write_type(
H5D_filtered_collective_io_info_t *chunk_list, size_t num_entries,
MPI_Datatype *new_mem_type, hbool_t *mem_type_derived,
@@ -418,19 +418,16 @@ done:
* Function: H5D__mpio_array_gatherv
*
* Purpose: Given an array, specified in local_array, by each processor
- * calling this function, gathers each array into a single
+ * calling this function, collects each array into a single
* array which is then either gathered to the processor
* specified by root, when allgather is false, or is
* distributed back to all processors when allgather is true.
*
- * The size of each entry and number of entries in the array
- * contributed by an individual processor should be specified
- * in array_entry_size and local_array_num_entries,
+ * The number of entries in the array contributed by an
+ * individual processor and the size of each entry should be
+ * specified in local_array_num_entries and array_entry_size,
* respectively.
*
- * The number of processors participating in the gather
- * operation should be specified for nprocs.
- *
* The MPI communicator to use should be specified for comm.
*
* If the sort_func argument is supplied, the array is sorted
@@ -448,14 +445,13 @@ done:
static herr_t
H5D__mpio_array_gatherv(void *local_array, size_t local_array_num_entries,
size_t array_entry_size, void **_gathered_array, size_t *_gathered_array_num_entries,
- int nprocs, hbool_t allgather, int root, MPI_Comm comm, int (*sort_func)(const void *, const void *))
+ hbool_t allgather, int root, MPI_Comm comm, int (*sort_func)(const void *, const void *))
{
size_t gathered_array_num_entries = 0; /* The size of the newly-constructed array */
- size_t i;
void *gathered_array = NULL; /* The newly-constructed array returned to the caller */
- int *receive_counts_array = NULL; /* Array containing number of entries each process is contributing */
- int *displacements_array = NULL; /* Array of displacements where each process places its data in the final array */
- int mpi_code;
+ int *receive_counts_array = NULL; /* Array containing number of entries each processor is contributing */
+ int *displacements_array = NULL; /* Array of displacements where each processor places its data in the final array */
+ int mpi_code, mpi_rank, mpi_size;
int sendcount;
herr_t ret_value = SUCCEED;
@@ -464,34 +460,62 @@ H5D__mpio_array_gatherv(void *local_array, size_t local_array_num_entries,
HDassert(_gathered_array);
HDassert(_gathered_array_num_entries);
- /* Determine the size of the end result array */
+ MPI_Comm_size(comm, &mpi_size);
+ MPI_Comm_rank(comm, &mpi_rank);
+
+ /*
+ * Determine the size of the end result array by collecting the number
+ * of entries contributed by each processor into a single total.
+ */
if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&local_array_num_entries, &gathered_array_num_entries, 1, MPI_INT, MPI_SUM, comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code)
- /* If 0 entries resulted from the collective operation, no one is writing anything */
+ /* If 0 entries resulted from the collective operation, no processor is contributing anything and there is nothing to do */
if (gathered_array_num_entries > 0) {
- if (NULL == (gathered_array = H5MM_malloc(gathered_array_num_entries * array_entry_size)))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate gathered array")
+ /*
+ * If gathering to all processors, all processors need to allocate space for the resulting array, as well as
+ * the receive counts and displacements arrays for the collective MPI_Allgatherv call. Otherwise, only the
+ * root processor needs to allocate the space for an MPI_Gatherv call.
+ */
+ if (allgather || (mpi_rank == root)) {
+ if (NULL == (gathered_array = H5MM_malloc(gathered_array_num_entries * array_entry_size)))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate gathered array")
- if (NULL == (receive_counts_array = (int *) H5MM_malloc((size_t) nprocs * sizeof(int))))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive counts array")
+ if (NULL == (receive_counts_array = (int *) H5MM_malloc((size_t) mpi_size * sizeof(int))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive counts array")
- if (NULL == (displacements_array = (int *) H5MM_malloc((size_t) nprocs * sizeof(int))))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive displacements array")
+ if (NULL == (displacements_array = (int *) H5MM_malloc((size_t) mpi_size * sizeof(int))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive displacements array")
+ } /* end if */
- /* Inform each process of how many entries each other process is contributing to the resulting array */
- if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&local_array_num_entries, 1, MPI_INT, receive_counts_array, 1, MPI_INT, comm)))
- HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code)
+ /*
+ * If gathering to all processors, inform each processor of how many entries each other processor is
+ * contributing to the resulting array by collecting the counts into each processor's "receive counts"
+ * array. Otherwise, inform only the root processor of how many entries each other processor is contributing.
+ */
+ if (allgather) {
+ if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&local_array_num_entries, 1, MPI_INT, receive_counts_array, 1, MPI_INT, comm)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code)
+ } /* end if */
+ else {
+ if (MPI_SUCCESS != (mpi_code = MPI_Gather(&local_array_num_entries, 1, MPI_INT, receive_counts_array, 1, MPI_INT, root, comm)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Gather failed", mpi_code)
+ } /* end else */
- /* Multiply each receive count by the size of the array entry, since the data is sent as bytes */
- for (i = 0; i < (size_t) nprocs; i++)
- H5_CHECKED_ASSIGN(receive_counts_array[i], int, (size_t) receive_counts_array[i] * array_entry_size, size_t);
+ if (allgather || (mpi_rank == root)) {
+ size_t i;
- /* Set receive buffer offsets for MPI_Allgatherv */
- displacements_array[0] = 0;
- for (i = 1; i < (size_t) nprocs; i++)
- displacements_array[i] = displacements_array[i - 1] + receive_counts_array[i - 1];
+ /* Multiply each receive count by the size of the array entry, since the data is sent as bytes. */
+ for (i = 0; i < (size_t) mpi_size; i++)
+ H5_CHECKED_ASSIGN(receive_counts_array[i], int, (size_t) receive_counts_array[i] * array_entry_size, size_t);
+ /* Set receive buffer offsets for the collective MPI_Allgatherv/MPI_Gatherv call. */
+ displacements_array[0] = 0;
+ for (i = 1; i < (size_t) mpi_size; i++)
+ displacements_array[i] = displacements_array[i - 1] + receive_counts_array[i - 1];
+ } /* end if */
+
+ /* As the data is sent as bytes, calculate the true sendcount for the data. */
H5_CHECKED_ASSIGN(sendcount, int, local_array_num_entries * array_entry_size, size_t);
if (allgather) {
@@ -502,10 +526,11 @@ H5D__mpio_array_gatherv(void *local_array, size_t local_array_num_entries,
else {
if (MPI_SUCCESS != (mpi_code = MPI_Gatherv(local_array, sendcount, MPI_BYTE,
gathered_array, receive_counts_array, displacements_array, MPI_BYTE, root, comm)))
- HMPI_GOTO_ERROR(FAIL, "MPI_Allgatherv failed", mpi_code)
+ HMPI_GOTO_ERROR(FAIL, "MPI_Gatherv failed", mpi_code)
} /* end else */
- if (sort_func) HDqsort(gathered_array, gathered_array_num_entries, array_entry_size, sort_func);
+ if (sort_func && (allgather || (mpi_rank == root)))
+ HDqsort(gathered_array, gathered_array_num_entries, array_entry_size, sort_func);
} /* end if */
*_gathered_array = gathered_array;
@@ -1295,8 +1320,7 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in
* of the chunks in the file.
*/
if (H5D__mpio_array_gatherv(chunk_list, chunk_list_num_entries, sizeof(H5D_filtered_collective_io_info_t),
- (void **) &collective_chunk_list, &collective_chunk_list_num_entries, mpi_size,
- true, 0, io_info->comm, NULL) < 0)
+ (void **) &collective_chunk_list, &collective_chunk_list_num_entries, true, 0, io_info->comm, NULL) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather new chunk sizes")
/* Collectively re-allocate the modified chunks (from each process) in the file */
@@ -1768,8 +1792,7 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
* of the chunks in the file
*/
if (H5D__mpio_array_gatherv(&chunk_list[i], have_chunk_to_process ? 1 : 0, sizeof(H5D_filtered_collective_io_info_t),
- (void **) &collective_chunk_list, &collective_chunk_list_num_entries, mpi_size,
- true, 0, io_info->comm, NULL) < 0)
+ (void **) &collective_chunk_list, &collective_chunk_list_num_entries, true, 0, io_info->comm, NULL) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather new chunk sizes")
/* Participate in the collective re-allocation of all chunks modified
@@ -2655,8 +2678,8 @@ H5D__chunk_redistribute_shared_chunks(const H5D_io_info_t *io_info, const H5D_ty
* call, the gathered list will initially be sorted in increasing order of chunk offset in the file.
*/
if (H5D__mpio_array_gatherv(local_chunk_array, *local_chunk_array_num_entries, sizeof(H5D_filtered_collective_io_info_t),
- (void **) &shared_chunks_info_array, &shared_chunks_info_array_num_entries, mpi_size,
- false, 0, io_info->comm, H5D__cmp_filtered_collective_io_info_entry) < 0)
+ (void **) &shared_chunks_info_array, &shared_chunks_info_array_num_entries, false, 0,
+ io_info->comm, H5D__cmp_filtered_collective_io_info_entry) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather array")
/* Rank 0 redistributes any shared chunks to new owners as necessary */
@@ -2981,7 +3004,7 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
{
H5D_chunk_info_t *chunk_info = NULL;
H5S_sel_iter_t *mem_iter = NULL; /* Memory iterator for H5D__scatter_mem/H5D__gather_mem */
- unsigned char *mod_data = NULL; /* Chunk modification data sent by a process to a chunk's owner */
+ H5S_sel_iter_t *file_iter = NULL;
H5Z_EDC_t err_detect; /* Error detection info */
H5Z_cb_t filter_cb; /* I/O filter callback function */
unsigned filter_mask = 0;
@@ -2989,11 +3012,13 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
hssize_t extent_npoints;
hsize_t true_chunk_size;
hbool_t mem_iter_init = FALSE;
+ hbool_t file_iter_init = FALSE;
size_t buf_size;
size_t i;
H5S_t *dataspace = NULL; /* Other process' dataspace for the chunk */
- void *tmp_gath_buf = NULL; /* Temporary gather buffer for owner of the chunk to gather into from
- application write buffer before scattering out to the chunk data buffer */
+ void *tmp_gath_buf = NULL; /* Temporary gather buffer to gather into from application buffer
+ before scattering out to the chunk data buffer (when writing data),
+ or vice versa (when reading data) */
int mpi_code;
herr_t ret_value = SUCCEED;
@@ -3073,9 +3098,6 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
mem_iter_init = TRUE;
- if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
-
/* If this is a read operation, scatter the read chunk data to the user's buffer.
*
* If this is a write operation, update the chunk data buffer with the modifications
@@ -3084,16 +3106,39 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
*/
switch (io_info->op_type) {
case H5D_IO_OP_READ:
- if (H5D__scatter_mem(chunk_entry->buf, chunk_info->mspace, mem_iter, (size_t)iter_nelmts, io_info->u.rbuf) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't scatter to read buffer")
+ if (NULL == (file_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(H5S_sel_iter_t))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate file iterator")
+
+ if (H5S_select_iter_init(file_iter, chunk_info->fspace, type_info->src_type_size) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
+ file_iter_init = TRUE;
+
+ if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->fspace)) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
+
+ if (NULL == (tmp_gath_buf = H5MM_malloc((hsize_t) iter_nelmts * type_info->src_type_size)))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate temporary gather buffer")
+
+ if (!H5D__gather_mem(chunk_entry->buf, chunk_info->fspace, file_iter, (size_t) iter_nelmts, tmp_gath_buf))
+ HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "couldn't gather from chunk buffer")
+
+ if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
+
+ if (H5D__scatter_mem(tmp_gath_buf, chunk_info->mspace, mem_iter, (size_t) iter_nelmts, io_info->u.rbuf) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't scatter to read buffer")
+
break;
case H5D_IO_OP_WRITE:
+ if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
+
if (NULL == (tmp_gath_buf = H5MM_malloc((hsize_t) iter_nelmts * type_info->src_type_size)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate temporary gather buffer")
/* Gather modification data from the application write buffer into a temporary buffer */
- if(!H5D__gather_mem(io_info->u.wbuf, chunk_info->mspace, mem_iter, (size_t)iter_nelmts, tmp_gath_buf))
+ if(!H5D__gather_mem(io_info->u.wbuf, chunk_info->mspace, mem_iter, (size_t) iter_nelmts, tmp_gath_buf))
HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't gather from write buffer")
if (H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
@@ -3111,7 +3156,7 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
/* Scatter the owner's modification data into the chunk data buffer according to
* the file space.
*/
- if(H5D__scatter_mem(tmp_gath_buf, chunk_info->fspace, mem_iter, (size_t)iter_nelmts, chunk_entry->buf) < 0)
+ if(H5D__scatter_mem(tmp_gath_buf, chunk_info->fspace, mem_iter, (size_t) iter_nelmts, chunk_entry->buf) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't scatter to chunk data buffer")
if (H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
@@ -3177,10 +3222,12 @@ done:
H5MM_free(chunk_entry->async_info.receive_buffer_array);
if (chunk_entry->async_info.receive_requests_array)
H5MM_free(chunk_entry->async_info.receive_requests_array);
- if (mod_data)
- H5MM_free(mod_data);
if (tmp_gath_buf)
H5MM_free(tmp_gath_buf);
+ if (file_iter_init && H5S_SELECT_ITER_RELEASE(file_iter) < 0)
+ HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
+ if (file_iter)
+ H5MM_free(file_iter);
if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
if (mem_iter)