summaryrefslogtreecommitdiffstats
path: root/src/H5Dmpio.c
diff options
context:
space:
mode:
authorJordan Henderson <jhenderson@hdfgroup.org>2017-04-09 21:36:19 (GMT)
committerJordan Henderson <jhenderson@hdfgroup.org>2017-04-09 21:36:19 (GMT)
commitbbc9f1be45539241673942d347beced6da9a5fc5 (patch)
tree2b23a32ecc7fb7e46ccb85bb2bdf2c9775813067 /src/H5Dmpio.c
parent1488ed344e6c9696a416a0dd23f19c8e70a2a9e8 (diff)
downloadhdf5-bbc9f1be45539241673942d347beced6da9a5fc5.zip
hdf5-bbc9f1be45539241673942d347beced6da9a5fc5.tar.gz
hdf5-bbc9f1be45539241673942d347beced6da9a5fc5.tar.bz2
Make array_gather routine more general
Diffstat (limited to 'src/H5Dmpio.c')
-rw-r--r--src/H5Dmpio.c111
1 files changed, 66 insertions, 45 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c
index c3a015d..826407e 100644
--- a/src/H5Dmpio.c
+++ b/src/H5Dmpio.c
@@ -145,10 +145,9 @@ static herr_t H5D__mpio_get_sum_chunk(const H5D_io_info_t *io_info,
static herr_t H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info,
const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm,
H5D_filtered_collective_io_info_t **chunk_list, size_t *num_entries);
-static herr_t H5D__mpio_array_gather(const H5D_io_info_t *io_info, void *local_array,
- size_t local_array_num_entries, size_t array_entry_size,
- void **gathered_array, size_t *gathered_array_num_entries,
- int (*sort_func)(const void *, const void *));
+static herr_t H5D__mpio_array_gatherv(void *local_array, size_t local_array_num_entries,
+ size_t array_entry_size, void **gathered_array, size_t *gathered_array_num_entries,
+ int nprocs, hbool_t allgather, int root, MPI_Comm comm, int (*sort_func)(const void *, const void *));
static herr_t H5D__mpio_filtered_collective_write_type(
H5D_filtered_collective_io_info_t *chunk_list, size_t num_entries,
MPI_Datatype *new_mem_type, hbool_t *mem_type_derived,
@@ -329,46 +328,44 @@ done:
/*-------------------------------------------------------------------------
- * Function: H5D__mpio_array_gather
+ * Function: H5D__mpio_array_gatherv
*
* Purpose: Given arrays by MPI ranks, gathers them into a single array
- * which is then distributed back to all ranks. If the
- * sort_func argument is specified, the list is sorted before
- * being returned.
+ * which is either gathered to the rank specified by root when
+ * allgather is false, or is distributed back to all ranks
+ * when allgather is true. If the sort_func argument is
+ * specified, the list is sorted before being returned.
+ *
+ * If allgather is specified as true, root is ignored.
*
* Return: Non-negative on success/Negative on failure
*
* Programmer: Jordan Henderson
- * Friday, January 6th, 2016
+ * Sunday, April 9th, 2017
*
*-------------------------------------------------------------------------
*/
static herr_t
-H5D__mpio_array_gather(const H5D_io_info_t *io_info, void *local_array,
- size_t local_array_num_entries, size_t array_entry_size,
- void **_gathered_array, size_t *_gathered_array_num_entries,
- int (*sort_func)(const void *, const void *))
+H5D__mpio_array_gatherv(void *local_array, size_t local_array_num_entries,
+ size_t array_entry_size, void **_gathered_array, size_t *_gathered_array_num_entries,
+ int nprocs, hbool_t allgather, int root, MPI_Comm comm, int (*sort_func)(const void *, const void *))
{
size_t gathered_array_num_entries = 0; /* The size of the newly-constructed array */
size_t i;
void *gathered_array = NULL; /* The newly-constructed array returned to the caller */
int *receive_counts_array = NULL; /* Array containing number of entries each process is contributing */
int *displacements_array = NULL; /* Array of displacements where each process places its data in the final array */
- int mpi_code, mpi_size;
+ int mpi_code;
int sendcount;
herr_t ret_value = SUCCEED;
FUNC_ENTER_STATIC
- HDassert(io_info);
HDassert(_gathered_array);
HDassert(_gathered_array_num_entries);
- if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0)
- HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size")
-
/* Determine the size of the end result array */
- if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&local_array_num_entries, &gathered_array_num_entries, 1, MPI_INT, MPI_SUM, io_info->comm)))
+ if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&local_array_num_entries, &gathered_array_num_entries, 1, MPI_INT, MPI_SUM, comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code)
/* If 0 entries resulted from the collective operation, no one is writing anything */
@@ -376,29 +373,37 @@ H5D__mpio_array_gather(const H5D_io_info_t *io_info, void *local_array,
if (NULL == (gathered_array = H5MM_malloc(gathered_array_num_entries * array_entry_size)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate gathered array")
- if (NULL == (receive_counts_array = (int *) H5MM_malloc((size_t) mpi_size * sizeof(*receive_counts_array))))
+ if (NULL == (receive_counts_array = (int *) H5MM_malloc((size_t) nprocs * sizeof(*receive_counts_array))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive counts array")
- if (NULL == (displacements_array = (int *) H5MM_malloc((size_t) mpi_size * sizeof(*displacements_array))))
+ if (NULL == (displacements_array = (int *) H5MM_malloc((size_t) nprocs * sizeof(*displacements_array))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive displacements array")
/* Inform each process of how many entries each other process is contributing to the resulting array */
- if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&local_array_num_entries, 1, MPI_INT, receive_counts_array, 1, MPI_INT, io_info->comm)))
+ if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&local_array_num_entries, 1, MPI_INT, receive_counts_array, 1, MPI_INT, comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code)
/* Multiply each receive count by the size of the array entry, since the data is sent as bytes */
- for (i = 0; i < (size_t) mpi_size; i++)
+ for (i = 0; i < (size_t) nprocs; i++)
H5_CHECKED_ASSIGN(receive_counts_array[i], int, (size_t) receive_counts_array[i] * array_entry_size, size_t);
/* Set receive buffer offsets for MPI_Allgatherv */
displacements_array[0] = 0;
- for (i = 1; i < (size_t) mpi_size; i++)
+ for (i = 1; i < (size_t) nprocs; i++)
displacements_array[i] = displacements_array[i - 1] + receive_counts_array[i - 1];
H5_CHECKED_ASSIGN(sendcount, int, local_array_num_entries * array_entry_size, size_t);
- if (MPI_SUCCESS != (mpi_code = MPI_Allgatherv(local_array, sendcount, MPI_BYTE,
- gathered_array, receive_counts_array, displacements_array, MPI_BYTE, io_info->comm)))
- HMPI_GOTO_ERROR(FAIL, "MPI_Allgatherv failed", mpi_code)
+
+ if (allgather) {
+ if (MPI_SUCCESS != (mpi_code = MPI_Allgatherv(local_array, sendcount, MPI_BYTE,
+ gathered_array, receive_counts_array, displacements_array, MPI_BYTE, comm)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Allgatherv failed", mpi_code)
+ }
+ else {
+ if (MPI_SUCCESS != (mpi_code = MPI_Gatherv(local_array, sendcount, MPI_BYTE,
+ gathered_array, receive_counts_array, displacements_array, MPI_BYTE, root, comm)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Allgatherv failed", mpi_code)
+ }
if (sort_func) HDqsort(gathered_array, gathered_array_num_entries, array_entry_size, sort_func);
} /* end if */
@@ -413,7 +418,7 @@ done:
H5MM_free(displacements_array);
FUNC_LEAVE_NOAPI(ret_value)
-} /* end H5D__mpio_array_gather() */
+} /* end H5D__mpio_array_gatherv() */
/*-------------------------------------------------------------------------
@@ -1354,8 +1359,9 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in
/* Gather the new chunk sizes to all processes for a collective reallocation
* of the chunks in the file.
*/
- if (H5D__mpio_array_gather(io_info, chunk_list, chunk_list_num_entries, sizeof(*chunk_list),
- (void **) &collective_chunk_list, &collective_chunk_list_num_entries, NULL) < 0)
+ if (H5D__mpio_array_gatherv(chunk_list, chunk_list_num_entries, sizeof(*chunk_list),
+ (void **) &collective_chunk_list, &collective_chunk_list_num_entries, mpi_size,
+ true, 0, io_info->comm, NULL) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather new chunk sizes")
/* Collectively re-allocate the modified chunks (from each process) in the file */
@@ -1838,8 +1844,9 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
/* Gather the new chunk sizes to all processes for a collective re-allocation
* of the chunks in the file
*/
- if (H5D__mpio_array_gather(io_info, &chunk_list[i], have_chunk_to_process ? 1 : 0, sizeof(*chunk_list),
- (void **) &collective_chunk_list, &collective_chunk_list_num_entries, NULL) < 0)
+ if (H5D__mpio_array_gatherv(io_info, &chunk_list[i], have_chunk_to_process ? 1 : 0, sizeof(*chunk_list),
+ (void **) &collective_chunk_list, &collective_chunk_list_num_entries, mpi_size,
+ true, 0, io_info->comm, NULL) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather new chunk sizes")
/* Participate in the collective re-allocation of all chunks modified
@@ -2628,9 +2635,9 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(H5S_sel_iter_t))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator")
- if (H5D__mpio_array_gather(io_info, local_info_array, num_chunks_selected,
- sizeof(*local_info_array), (void **) &shared_chunks_info_array, &shared_chunks_info_array_num_entries,
- H5D__cmp_filtered_collective_io_info_entry) < 0)
+ if (H5D__mpio_array_gather(local_info_array, num_chunks_selected, sizeof(*local_info_array),
+ (void **) &shared_chunks_info_array, &shared_chunks_info_array_num_entries, mpi_size,
+ false, 0, H5D__cmp_filtered_collective_io_info_entry) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather array")
for (i = 0, num_chunks_selected = 0, num_send_requests = 0; i < shared_chunks_info_array_num_entries;) {
@@ -2753,27 +2760,41 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
} /* end else */
} /* end for */
+ /* Rank 0 redistributes any shared chunks to new owners as necessary */
+ if (mpi_rank == 0) {
+
+ }
+
/* Release old list */
if (local_info_array)
H5MM_free(local_info_array);
/* Local info list becomes modified (redistributed) chunk list */
local_info_array = shared_chunks_info_array;
+
+ /* Now that the chunks have been redistributed, each process must send its modification data
+ * to the new owners of any of the chunks it previously possessed
+ */
+ for (i = 0; i < num_chunks_selected; i++) {
+ if (mpi_rank != local_info_array[i].owner) {
+
+ }
+ } /* end for */
+
+ /* Wait for all async send requests to complete before returning */
+ if (num_send_requests) {
+ if (NULL == (send_statuses = (MPI_Status *) H5MM_malloc(num_send_requests * sizeof(*send_statuses))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send statuses buffer")
+
+ H5_CHECK_OVERFLOW(num_send_requests, size_t, int);
+ if (MPI_SUCCESS != (mpi_code = MPI_Waitall((int) num_send_requests, send_requests, send_statuses)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code)
+ }
} /* end if */
*chunk_list = local_info_array;
*num_entries = num_chunks_selected;
- /* Wait for all async send requests to complete before returning */
- if (num_send_requests) {
- if (NULL == (send_statuses = (MPI_Status *) H5MM_malloc(num_send_requests * sizeof(*send_statuses))))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send statuses buffer")
-
- H5_CHECK_OVERFLOW(num_send_requests, size_t, int);
- if (MPI_SUCCESS != (mpi_code = MPI_Waitall((int) num_send_requests, send_requests, send_statuses)))
- HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code)
- }
-
done:
if (send_requests)
H5MM_free(send_requests);