diff options
author | Jordan Henderson <jhenderson@hdfgroup.org> | 2017-04-09 21:36:19 (GMT) |
---|---|---|
committer | Jordan Henderson <jhenderson@hdfgroup.org> | 2017-04-09 21:36:19 (GMT) |
commit | bbc9f1be45539241673942d347beced6da9a5fc5 (patch) | |
tree | 2b23a32ecc7fb7e46ccb85bb2bdf2c9775813067 | |
parent | 1488ed344e6c9696a416a0dd23f19c8e70a2a9e8 (diff) | |
download | hdf5-bbc9f1be45539241673942d347beced6da9a5fc5.zip hdf5-bbc9f1be45539241673942d347beced6da9a5fc5.tar.gz hdf5-bbc9f1be45539241673942d347beced6da9a5fc5.tar.bz2 |
Make array_gather routine more general
-rw-r--r-- | src/H5Dmpio.c | 111 |
1 files changed, 66 insertions, 45 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c index c3a015d..826407e 100644 --- a/src/H5Dmpio.c +++ b/src/H5Dmpio.c @@ -145,10 +145,9 @@ static herr_t H5D__mpio_get_sum_chunk(const H5D_io_info_t *io_info, static herr_t H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm, H5D_filtered_collective_io_info_t **chunk_list, size_t *num_entries); -static herr_t H5D__mpio_array_gather(const H5D_io_info_t *io_info, void *local_array, - size_t local_array_num_entries, size_t array_entry_size, - void **gathered_array, size_t *gathered_array_num_entries, - int (*sort_func)(const void *, const void *)); +static herr_t H5D__mpio_array_gatherv(void *local_array, size_t local_array_num_entries, + size_t array_entry_size, void **gathered_array, size_t *gathered_array_num_entries, + int nprocs, hbool_t allgather, int root, MPI_Comm comm, int (*sort_func)(const void *, const void *)); static herr_t H5D__mpio_filtered_collective_write_type( H5D_filtered_collective_io_info_t *chunk_list, size_t num_entries, MPI_Datatype *new_mem_type, hbool_t *mem_type_derived, @@ -329,46 +328,44 @@ done: /*------------------------------------------------------------------------- - * Function: H5D__mpio_array_gather + * Function: H5D__mpio_array_gatherv * * Purpose: Given arrays by MPI ranks, gathers them into a single array - * which is then distributed back to all ranks. If the - * sort_func argument is specified, the list is sorted before - * being returned. + * which is either gathered to the rank specified by root when + * allgather is false, or is distributed back to all ranks + * when allgather is true. If the sort_func argument is + * specified, the list is sorted before being returned. + * + * If allgather is specified as true, root is ignored. * * Return: Non-negative on success/Negative on failure * * Programmer: Jordan Henderson - * Friday, January 6th, 2016 + * Sunday, April 9th, 2017 * *------------------------------------------------------------------------- */ static herr_t -H5D__mpio_array_gather(const H5D_io_info_t *io_info, void *local_array, - size_t local_array_num_entries, size_t array_entry_size, - void **_gathered_array, size_t *_gathered_array_num_entries, - int (*sort_func)(const void *, const void *)) +H5D__mpio_array_gatherv(void *local_array, size_t local_array_num_entries, + size_t array_entry_size, void **_gathered_array, size_t *_gathered_array_num_entries, + int nprocs, hbool_t allgather, int root, MPI_Comm comm, int (*sort_func)(const void *, const void *)) { size_t gathered_array_num_entries = 0; /* The size of the newly-constructed array */ size_t i; void *gathered_array = NULL; /* The newly-constructed array returned to the caller */ int *receive_counts_array = NULL; /* Array containing number of entries each process is contributing */ int *displacements_array = NULL; /* Array of displacements where each process places its data in the final array */ - int mpi_code, mpi_size; + int mpi_code; int sendcount; herr_t ret_value = SUCCEED; FUNC_ENTER_STATIC - HDassert(io_info); HDassert(_gathered_array); HDassert(_gathered_array_num_entries); - if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0) - HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size") - /* Determine the size of the end result array */ - if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&local_array_num_entries, &gathered_array_num_entries, 1, MPI_INT, MPI_SUM, io_info->comm))) + if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&local_array_num_entries, &gathered_array_num_entries, 1, MPI_INT, MPI_SUM, comm))) HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code) /* If 0 entries resulted from the collective operation, no one is writing anything */ @@ -376,29 +373,37 @@ H5D__mpio_array_gather(const H5D_io_info_t *io_info, void *local_array, if (NULL == (gathered_array = H5MM_malloc(gathered_array_num_entries * array_entry_size))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate gathered array") - if (NULL == (receive_counts_array = (int *) H5MM_malloc((size_t) mpi_size * sizeof(*receive_counts_array)))) + if (NULL == (receive_counts_array = (int *) H5MM_malloc((size_t) nprocs * sizeof(*receive_counts_array)))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive counts array") - if (NULL == (displacements_array = (int *) H5MM_malloc((size_t) mpi_size * sizeof(*displacements_array)))) + if (NULL == (displacements_array = (int *) H5MM_malloc((size_t) nprocs * sizeof(*displacements_array)))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive displacements array") /* Inform each process of how many entries each other process is contributing to the resulting array */ - if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&local_array_num_entries, 1, MPI_INT, receive_counts_array, 1, MPI_INT, io_info->comm))) + if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&local_array_num_entries, 1, MPI_INT, receive_counts_array, 1, MPI_INT, comm))) HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code) /* Multiply each receive count by the size of the array entry, since the data is sent as bytes */ - for (i = 0; i < (size_t) mpi_size; i++) + for (i = 0; i < (size_t) nprocs; i++) H5_CHECKED_ASSIGN(receive_counts_array[i], int, (size_t) receive_counts_array[i] * array_entry_size, size_t); /* Set receive buffer offsets for MPI_Allgatherv */ displacements_array[0] = 0; - for (i = 1; i < (size_t) mpi_size; i++) + for (i = 1; i < (size_t) nprocs; i++) displacements_array[i] = displacements_array[i - 1] + receive_counts_array[i - 1]; H5_CHECKED_ASSIGN(sendcount, int, local_array_num_entries * array_entry_size, size_t); - if (MPI_SUCCESS != (mpi_code = MPI_Allgatherv(local_array, sendcount, MPI_BYTE, - gathered_array, receive_counts_array, displacements_array, MPI_BYTE, io_info->comm))) - HMPI_GOTO_ERROR(FAIL, "MPI_Allgatherv failed", mpi_code) + + if (allgather) { + if (MPI_SUCCESS != (mpi_code = MPI_Allgatherv(local_array, sendcount, MPI_BYTE, + gathered_array, receive_counts_array, displacements_array, MPI_BYTE, comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Allgatherv failed", mpi_code) + } + else { + if (MPI_SUCCESS != (mpi_code = MPI_Gatherv(local_array, sendcount, MPI_BYTE, + gathered_array, receive_counts_array, displacements_array, MPI_BYTE, root, comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Allgatherv failed", mpi_code) + } if (sort_func) HDqsort(gathered_array, gathered_array_num_entries, array_entry_size, sort_func); } /* end if */ @@ -413,7 +418,7 @@ done: H5MM_free(displacements_array); FUNC_LEAVE_NOAPI(ret_value) -} /* end H5D__mpio_array_gather() */ +} /* end H5D__mpio_array_gatherv() */ /*------------------------------------------------------------------------- @@ -1354,8 +1359,9 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in /* Gather the new chunk sizes to all processes for a collective reallocation * of the chunks in the file. */ - if (H5D__mpio_array_gather(io_info, chunk_list, chunk_list_num_entries, sizeof(*chunk_list), - (void **) &collective_chunk_list, &collective_chunk_list_num_entries, NULL) < 0) + if (H5D__mpio_array_gatherv(chunk_list, chunk_list_num_entries, sizeof(*chunk_list), + (void **) &collective_chunk_list, &collective_chunk_list_num_entries, mpi_size, + true, 0, io_info->comm, NULL) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather new chunk sizes") /* Collectively re-allocate the modified chunks (from each process) in the file */ @@ -1838,8 +1844,9 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i /* Gather the new chunk sizes to all processes for a collective re-allocation * of the chunks in the file */ - if (H5D__mpio_array_gather(io_info, &chunk_list[i], have_chunk_to_process ? 1 : 0, sizeof(*chunk_list), - (void **) &collective_chunk_list, &collective_chunk_list_num_entries, NULL) < 0) + if (H5D__mpio_array_gatherv(io_info, &chunk_list[i], have_chunk_to_process ? 1 : 0, sizeof(*chunk_list), + (void **) &collective_chunk_list, &collective_chunk_list_num_entries, mpi_size, + true, 0, io_info->comm, NULL) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather new chunk sizes") /* Participate in the collective re-allocation of all chunks modified @@ -2628,9 +2635,9 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(H5S_sel_iter_t)))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator") - if (H5D__mpio_array_gather(io_info, local_info_array, num_chunks_selected, - sizeof(*local_info_array), (void **) &shared_chunks_info_array, &shared_chunks_info_array_num_entries, - H5D__cmp_filtered_collective_io_info_entry) < 0) + if (H5D__mpio_array_gather(local_info_array, num_chunks_selected, sizeof(*local_info_array), + (void **) &shared_chunks_info_array, &shared_chunks_info_array_num_entries, mpi_size, + false, 0, H5D__cmp_filtered_collective_io_info_entry) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather array") for (i = 0, num_chunks_selected = 0, num_send_requests = 0; i < shared_chunks_info_array_num_entries;) { @@ -2753,27 +2760,41 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ } /* end else */ } /* end for */ + /* Rank 0 redistributes any shared chunks to new owners as necessary */ + if (mpi_rank == 0) { + + } + /* Release old list */ if (local_info_array) H5MM_free(local_info_array); /* Local info list becomes modified (redistributed) chunk list */ local_info_array = shared_chunks_info_array; + + /* Now that the chunks have been redistributed, each process must send its modification data + * to the new owners of any of the chunks it previously possessed + */ + for (i = 0; i < num_chunks_selected; i++) { + if (mpi_rank != local_info_array[i].owner) { + + } + } /* end for */ + + /* Wait for all async send requests to complete before returning */ + if (num_send_requests) { + if (NULL == (send_statuses = (MPI_Status *) H5MM_malloc(num_send_requests * sizeof(*send_statuses)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send statuses buffer") + + H5_CHECK_OVERFLOW(num_send_requests, size_t, int); + if (MPI_SUCCESS != (mpi_code = MPI_Waitall((int) num_send_requests, send_requests, send_statuses))) + HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code) + } } /* end if */ *chunk_list = local_info_array; *num_entries = num_chunks_selected; - /* Wait for all async send requests to complete before returning */ - if (num_send_requests) { - if (NULL == (send_statuses = (MPI_Status *) H5MM_malloc(num_send_requests * sizeof(*send_statuses)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send statuses buffer") - - H5_CHECK_OVERFLOW(num_send_requests, size_t, int); - if (MPI_SUCCESS != (mpi_code = MPI_Waitall((int) num_send_requests, send_requests, send_statuses))) - HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code) - } - done: if (send_requests) H5MM_free(send_requests); |