diff options
author | Jordan Henderson <jhenderson@hdfgroup.org> | 2017-05-01 15:01:53 (GMT) |
---|---|---|
committer | Jordan Henderson <jhenderson@hdfgroup.org> | 2017-05-01 15:01:53 (GMT) |
commit | d367d397e3380d325154a97f5718642d0657db70 (patch) | |
tree | 581bc5213a78d5715eebd5c32257960ca5d0a5e4 /src/H5Dmpio.c | |
parent | 4a0937f26ade129130b626f9ecbf95822ba139d1 (diff) | |
download | hdf5-d367d397e3380d325154a97f5718642d0657db70.zip hdf5-d367d397e3380d325154a97f5718642d0657db70.tar.gz hdf5-d367d397e3380d325154a97f5718642d0657db70.tar.bz2 |
Separate shared chunks redistribution code out to its own function
Diffstat (limited to 'src/H5Dmpio.c')
-rw-r--r-- | src/H5Dmpio.c | 452 |
1 files changed, 245 insertions, 207 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c index 8e662ec..0f3d09e 100644 --- a/src/H5Dmpio.c +++ b/src/H5Dmpio.c @@ -159,6 +159,9 @@ static herr_t H5D__mpio_get_sum_chunk(const H5D_io_info_t *io_info, static herr_t H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm, H5D_filtered_collective_io_info_t **chunk_list, size_t *num_entries); +static herr_t H5D__chunk_redistribute_shared_chunks(const H5D_io_info_t *io_info, + const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm, + H5D_filtered_collective_io_info_t *local_chunk_array, size_t *local_chunk_array_num_entries); static herr_t H5D__mpio_array_gatherv(void *local_array, size_t local_array_num_entries, size_t array_entry_size, void **gathered_array, size_t *gathered_array_num_entries, int nprocs, hbool_t allgather, int root, MPI_Comm comm, int (*sort_func)(const void *, const void *)); @@ -2590,17 +2593,9 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ const H5D_chunk_map_t *fm, H5D_filtered_collective_io_info_t **chunk_list, size_t *num_entries) { H5D_filtered_collective_io_info_t *local_info_array = NULL; /* The list of initially selected chunks for this process */ - H5D_filtered_collective_io_info_t *shared_chunks_info_array = NULL; /* The list of all chunks selected in the operation by all processes */ - H5S_sel_iter_t *mem_iter = NULL; /* Memory iterator for H5D__gather_mem */ - unsigned char **mod_data = NULL; /* Array of chunk modification data buffers sent by a process to new chunk owners */ - MPI_Request *send_requests = NULL; /* Array of MPI_Isend chunk modification data send requests */ - MPI_Status *send_statuses = NULL; /* Array of MPI_Isend chunk modification send statuses */ - hbool_t mem_iter_init = FALSE; size_t num_chunks_selected; - size_t i, last_assigned_idx; - int *send_counts = NULL; - int *send_displacements = NULL; - int mpi_rank, mpi_size, mpi_code; + size_t i; + int mpi_rank; herr_t ret_value = SUCCEED; FUNC_ENTER_STATIC @@ -2614,8 +2609,6 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0) HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank") - if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0) - HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size") /* Each process builds a local list of the chunks they have selected */ if ((num_chunks_selected = H5SL_count(fm->sel_chunks))) { @@ -2662,236 +2655,281 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ } /* end if */ /* Redistribute shared chunks to new owners as necessary */ - if (io_info->op_type == H5D_IO_OP_WRITE) { - size_t shared_chunks_info_array_num_entries = 0; - size_t num_send_requests = 0; - - if (num_chunks_selected) - if (NULL == (send_requests = (MPI_Request *) H5MM_malloc(num_chunks_selected * sizeof(*send_requests)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send requests buffer") - - if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(H5S_sel_iter_t)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator") - - if (H5D__mpio_array_gatherv(local_info_array, num_chunks_selected, sizeof(*local_info_array), - (void **) &shared_chunks_info_array, &shared_chunks_info_array_num_entries, mpi_size, - false, 0, io_info->comm, H5D__cmp_filtered_collective_io_info_entry) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather array") - - /* Rank 0 redistributes any shared chunks to new owners as necessary */ - if (mpi_rank == 0) { - if (NULL == (send_counts = (int *) H5MM_calloc((size_t) mpi_size * sizeof(*send_counts)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate send counts buffer") - - if (NULL == (send_displacements = (int *) H5MM_malloc((size_t) mpi_size * sizeof(*send_displacements)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate send displacements buffer") - - for (i = 0; i < shared_chunks_info_array_num_entries;) { - H5D_filtered_collective_io_info_t chunk_entry; - haddr_t last_seen_addr = shared_chunks_info_array[i].chunk_states.chunk_current.offset; - size_t set_begin_index = i; - size_t total_io_size = 0; - size_t max_io_size = 0; - size_t num_writers = 0; - int new_chunk_owner = 0; - - /* Process each set of duplicate entries caused by another process writing to the same chunk */ - do { - chunk_entry = shared_chunks_info_array[i]; - - send_counts[chunk_entry.owners.original_owner] += (int) sizeof(chunk_entry); - - /* Add this chunk entry's I/O size to the running total */ - total_io_size += chunk_entry.io_size; - - /* The new owner of the chunk is determined by the process - * which is writing the most data to the chunk - */ - if (chunk_entry.io_size > max_io_size) { - max_io_size = chunk_entry.io_size; - new_chunk_owner = chunk_entry.owners.original_owner; - } - - num_writers++; - } while (++i < shared_chunks_info_array_num_entries && shared_chunks_info_array[i].chunk_states.chunk_current.offset == last_seen_addr); - - /* Set all of the chunk entries' "new_owner" fields */ - for (; set_begin_index < i; set_begin_index++) { - shared_chunks_info_array[set_begin_index].owners.new_owner = new_chunk_owner; - shared_chunks_info_array[set_begin_index].num_writers = num_writers; - } /* end for */ - } /* end for */ + if (io_info->op_type == H5D_IO_OP_WRITE) + if (H5D__chunk_redistribute_shared_chunks(io_info, type_info, fm, local_info_array, &num_chunks_selected) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTDISTRIBUTE, FAIL, "unable to redistribute shared chunks") - /* Sort the new list's in order of previous owner so that each original owner of a chunk - * entry gets that entry back, with the possibly newly-modified "new_owner" field - */ - HDqsort(shared_chunks_info_array, shared_chunks_info_array_num_entries, - sizeof(*shared_chunks_info_array), H5D__cmp_filtered_collective_io_info_entry_owner); + *chunk_list = local_info_array; + *num_entries = num_chunks_selected; - send_displacements[0] = 0; - for (i = 1; i < (size_t) mpi_size; i++) - send_displacements[i] = send_displacements[i - 1] + send_counts[i - 1]; - } /* end if */ +done: + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__construct_filtered_io_info_list() */ - /* Scatter the segments of the list back to each process */ - if (MPI_SUCCESS != (mpi_code = MPI_Scatterv(shared_chunks_info_array, send_counts, - send_displacements, MPI_BYTE, local_info_array, num_chunks_selected * (int) sizeof(*local_info_array), - MPI_BYTE, 0, io_info->comm))) - HMPI_GOTO_ERROR(FAIL, "unable to scatter shared chunks info buffer", mpi_code) - - if (shared_chunks_info_array) { - H5MM_free(shared_chunks_info_array); - shared_chunks_info_array = NULL; - } - - /* Now that the chunks have been redistributed, each process must send its modification data - * to the new owners of any of the chunks it previously possessed. Accordingly, each process - * must also issue asynchronous receives for any messages it may receive for each of the - * chunks it is assigned, in order to avoid potential deadlocking issues. - */ - if (num_chunks_selected) - if (NULL == (mod_data = (unsigned char **) H5MM_malloc(num_chunks_selected * sizeof(*mod_data)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate modification data buffer array") - - for (i = 0, last_assigned_idx = 0; i < num_chunks_selected; i++) { - H5D_filtered_collective_io_info_t *chunk_entry = &local_info_array[i]; - - if (mpi_rank != chunk_entry->owners.new_owner) { - H5D_chunk_info_t *chunk_info = NULL; - unsigned char *mod_data_p = NULL; - hssize_t iter_nelmts; - size_t mod_data_size; - - /* Look up the chunk and get its file and memory dataspaces */ - if (NULL == (chunk_info = (H5D_chunk_info_t *) H5SL_search(fm->sel_chunks, &chunk_entry->index))) - HGOTO_ERROR(H5E_DATASPACE, H5E_NOTFOUND, FAIL, "can't locate chunk in skip list") - - /* Determine size of serialized chunk file dataspace, plus the size of - * the data being written - */ - if (H5S_encode(chunk_info->fspace, &mod_data_p, &mod_data_size) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to get encoded dataspace size") + +/*------------------------------------------------------------------------- + * Function: H5D__chunk_redistribute_shared_chunks + * + * Purpose: + * + * Return: Non-negative on success/Negative on failure + * + * Programmer: Jordan Henderson + * Monday, May 1, 2017 + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__chunk_redistribute_shared_chunks(const H5D_io_info_t *io_info, const H5D_type_info_t *type_info, + const H5D_chunk_map_t *fm, H5D_filtered_collective_io_info_t *local_chunk_array, size_t *local_chunk_array_num_entries) +{ + H5D_filtered_collective_io_info_t *shared_chunks_info_array = NULL; /* The list of all chunks selected in the operation by all processes */ + H5S_sel_iter_t *mem_iter = NULL; /* Memory iterator for H5D__gather_mem */ + unsigned char **mod_data = NULL; /* Array of chunk modification data buffers sent by a process to new chunk owners */ + MPI_Request *send_requests = NULL; /* Array of MPI_Isend chunk modification data send requests */ + MPI_Status *send_statuses = NULL; /* Array of MPI_Isend chunk modification send statuses */ + hbool_t mem_iter_init = FALSE; + size_t shared_chunks_info_array_num_entries = 0; + size_t num_send_requests = 0; + size_t i, last_assigned_idx; + int *send_counts = NULL; + int *send_displacements = NULL; + int mpi_rank, mpi_size, mpi_code; + herr_t ret_value = SUCCEED; - if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + FUNC_ENTER_STATIC - mod_data_size += (size_t) iter_nelmts * type_info->src_type_size; + HDassert(io_info); + HDassert(type_info); + HDassert(local_chunk_array_num_entries); - if (NULL == (mod_data[num_send_requests] = (unsigned char *) H5MM_malloc(mod_data_size))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification send buffer") + if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0) + HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank") + if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0) + HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size") - /* Serialize the chunk's file dataspace into the buffer */ - mod_data_p = mod_data[num_send_requests]; - if (H5S_encode(chunk_info->fspace, &mod_data_p, &mod_data_size) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to encode dataspace") + if (*local_chunk_array_num_entries) + if (NULL == (send_requests = (MPI_Request *) H5MM_malloc(*local_chunk_array_num_entries * sizeof(*send_requests)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send requests buffer") - /* Intialize iterator for memory selection */ - if (H5S_select_iter_init(mem_iter, chunk_info->mspace, type_info->src_type_size) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") - mem_iter_init = TRUE; + if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(H5S_sel_iter_t)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator") - /* Collect the modification data into the buffer */ - if (!H5D__gather_mem(io_info->u.wbuf, chunk_info->mspace, mem_iter, - (size_t) iter_nelmts, io_info->dxpl_cache, mod_data_p)) - HGOTO_ERROR(H5E_IO, H5E_CANTGATHER, FAIL, "couldn't gather from write buffer") + if (H5D__mpio_array_gatherv(local_chunk_array, *local_chunk_array_num_entries, sizeof(*local_chunk_array), + (void **) &shared_chunks_info_array, &shared_chunks_info_array_num_entries, mpi_size, + false, 0, io_info->comm, H5D__cmp_filtered_collective_io_info_entry) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather array") - /* Send modification data to new owner */ - H5_CHECK_OVERFLOW(mod_data_size, size_t, int) - H5_CHECK_OVERFLOW(chunk_entry->index, hsize_t, int) - if (MPI_SUCCESS != (mpi_code = MPI_Isend(mod_data[num_send_requests], (int) mod_data_size, MPI_BYTE, - chunk_entry->owners.new_owner, (int) chunk_entry->index, io_info->comm, &send_requests[num_send_requests]))) - HMPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code) + /* Rank 0 redistributes any shared chunks to new owners as necessary */ + if (mpi_rank == 0) { + if (NULL == (send_counts = (int *) H5MM_calloc((size_t) mpi_size * sizeof(*send_counts)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate send counts buffer") - if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release memory selection iterator") - mem_iter_init = FALSE; + if (NULL == (send_displacements = (int *) H5MM_malloc((size_t) mpi_size * sizeof(*send_displacements)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate send displacements buffer") - num_send_requests++; - } /* end if */ - else { - /* Allocate all necessary buffers for an asynchronous receive operation */ - if (chunk_entry->num_writers > 1) { - MPI_Message message; - MPI_Status status; - size_t j; - - chunk_entry->async_info.num_receive_requests = (int) chunk_entry->num_writers - 1; - if (NULL == (chunk_entry->async_info.receive_requests_array = (MPI_Request *) H5MM_malloc(chunk_entry->async_info.num_receive_requests * sizeof(MPI_Request)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate async requests array") - - if (NULL == (chunk_entry->async_info.receive_buffer_array = (unsigned char **) H5MM_malloc(chunk_entry->async_info.num_receive_requests * sizeof(unsigned char *)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate async receive buffers") - - for (j = 0; j < chunk_entry->num_writers - 1; j++) { - int count = 0; - - /* Probe for a particular message from any process, removing that message - * from the receive queue in the process and allocating that much memory - * for the asynchronous receive - */ - if (MPI_SUCCESS != (mpi_code = MPI_Mprobe(MPI_ANY_SOURCE, (int) chunk_entry->index, io_info->comm, &message, &status))) - HMPI_GOTO_ERROR(FAIL, "MPI_Mprobe failed", mpi_code) - - if (MPI_SUCCESS != (mpi_code = MPI_Get_count(&status, MPI_BYTE, &count))) - HMPI_GOTO_ERROR(FAIL, "MPI_Get_count failed", mpi_code) - - if (NULL == (chunk_entry->async_info.receive_buffer_array[j] = (unsigned char *) H5MM_malloc(count * sizeof(char *)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate modification data receive buffer") - - if (MPI_SUCCESS != (mpi_code = MPI_Imrecv(chunk_entry->async_info.receive_buffer_array[j], count, MPI_BYTE, - &message, &chunk_entry->async_info.receive_requests_array[j]))) - HMPI_GOTO_ERROR(FAIL, "MPI_Imrecv failed", mpi_code) - } + for (i = 0; i < shared_chunks_info_array_num_entries;) { + H5D_filtered_collective_io_info_t chunk_entry; + haddr_t last_seen_addr = shared_chunks_info_array[i].chunk_states.chunk_current.offset; + size_t set_begin_index = i; + size_t total_io_size = 0; + size_t max_io_size = 0; + size_t num_writers = 0; + int new_chunk_owner = 0; + + /* Process each set of duplicate entries caused by another process writing to the same chunk */ + do { + chunk_entry = shared_chunks_info_array[i]; + + send_counts[chunk_entry.owners.original_owner] += (int) sizeof(chunk_entry); + + /* Add this chunk entry's I/O size to the running total */ + total_io_size += chunk_entry.io_size; + + /* The new owner of the chunk is determined by the process + * which is writing the most data to the chunk + */ + if (chunk_entry.io_size > max_io_size) { + max_io_size = chunk_entry.io_size; + new_chunk_owner = chunk_entry.owners.original_owner; } - local_info_array[last_assigned_idx++] = local_info_array[i]; - } /* end else */ + num_writers++; + } while (++i < shared_chunks_info_array_num_entries && shared_chunks_info_array[i].chunk_states.chunk_current.offset == last_seen_addr); + + /* Set all of the chunk entries' "new_owner" fields */ + for (; set_begin_index < i; set_begin_index++) { + shared_chunks_info_array[set_begin_index].owners.new_owner = new_chunk_owner; + shared_chunks_info_array[set_begin_index].num_writers = num_writers; + } /* end for */ } /* end for */ - num_chunks_selected = last_assigned_idx; + /* Sort the new list in order of previous owner so that each original owner of a chunk + * entry gets that entry back, with the possibly newly-modified "new_owner" field + */ + HDqsort(shared_chunks_info_array, shared_chunks_info_array_num_entries, + sizeof(*shared_chunks_info_array), H5D__cmp_filtered_collective_io_info_entry_owner); + + send_displacements[0] = 0; + for (i = 1; i < (size_t) mpi_size; i++) + send_displacements[i] = send_displacements[i - 1] + send_counts[i - 1]; + } /* end if */ - /* Wait for all async send requests to complete before returning */ - if (num_send_requests) { - if (NULL == (send_statuses = (MPI_Status *) H5MM_malloc(num_send_requests * sizeof(*send_statuses)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send statuses buffer") + /* Scatter the segments of the list back to each process */ + if (MPI_SUCCESS != (mpi_code = MPI_Scatterv(shared_chunks_info_array, send_counts, + send_displacements, MPI_BYTE, local_chunk_array, *local_chunk_array_num_entries * (int) sizeof(*local_chunk_array), + MPI_BYTE, 0, io_info->comm))) + HMPI_GOTO_ERROR(FAIL, "unable to scatter shared chunks info buffer", mpi_code) - H5_CHECK_OVERFLOW(num_send_requests, size_t, int); - if (MPI_SUCCESS != (mpi_code = MPI_Waitall((int) num_send_requests, send_requests, send_statuses))) - HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code) + if (shared_chunks_info_array) { + H5MM_free(shared_chunks_info_array); + shared_chunks_info_array = NULL; + } + + /* Now that the chunks have been redistributed, each process must send its modification data + * to the new owners of any of the chunks it previously possessed. Accordingly, each process + * must also issue asynchronous receives for any messages it may receive for each of the + * chunks it is assigned, in order to avoid potential deadlocking issues. + */ + if (*local_chunk_array_num_entries) + if (NULL == (mod_data = (unsigned char **) H5MM_malloc(*local_chunk_array_num_entries * sizeof(*mod_data)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate modification data buffer array") + + for (i = 0, last_assigned_idx = 0; i < *local_chunk_array_num_entries; i++) { + H5D_filtered_collective_io_info_t *chunk_entry = &local_chunk_array[i]; + + if (mpi_rank != chunk_entry->owners.new_owner) { + H5D_chunk_info_t *chunk_info = NULL; + unsigned char *mod_data_p = NULL; + hssize_t iter_nelmts; + size_t mod_data_size; + + /* Look up the chunk and get its file and memory dataspaces */ + if (NULL == (chunk_info = (H5D_chunk_info_t *) H5SL_search(fm->sel_chunks, &chunk_entry->index))) + HGOTO_ERROR(H5E_DATASPACE, H5E_NOTFOUND, FAIL, "can't locate chunk in skip list") + + /* Determine size of serialized chunk file dataspace, plus the size of + * the data being written + */ + if (H5S_encode(chunk_info->fspace, &mod_data_p, &mod_data_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to get encoded dataspace size") + + if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + + mod_data_size += (size_t) iter_nelmts * type_info->src_type_size; + + if (NULL == (mod_data[num_send_requests] = (unsigned char *) H5MM_malloc(mod_data_size))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification send buffer") + + /* Serialize the chunk's file dataspace into the buffer */ + mod_data_p = mod_data[num_send_requests]; + if (H5S_encode(chunk_info->fspace, &mod_data_p, &mod_data_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to encode dataspace") + + /* Intialize iterator for memory selection */ + if (H5S_select_iter_init(mem_iter, chunk_info->mspace, type_info->src_type_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") + mem_iter_init = TRUE; + + /* Collect the modification data into the buffer */ + if (!H5D__gather_mem(io_info->u.wbuf, chunk_info->mspace, mem_iter, + (size_t) iter_nelmts, io_info->dxpl_cache, mod_data_p)) + HGOTO_ERROR(H5E_IO, H5E_CANTGATHER, FAIL, "couldn't gather from write buffer") + + /* Send modification data to new owner */ + H5_CHECK_OVERFLOW(mod_data_size, size_t, int) + H5_CHECK_OVERFLOW(chunk_entry->index, hsize_t, int) + if (MPI_SUCCESS != (mpi_code = MPI_Isend(mod_data[num_send_requests], (int) mod_data_size, MPI_BYTE, + chunk_entry->owners.new_owner, (int) chunk_entry->index, io_info->comm, &send_requests[num_send_requests]))) + HMPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code) + + if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release memory selection iterator") + mem_iter_init = FALSE; + + num_send_requests++; } /* end if */ + else { + /* Allocate all necessary buffers for an asynchronous receive operation */ + if (chunk_entry->num_writers > 1) { + MPI_Message message; + MPI_Status status; + size_t j; - /* Now that all async send requests have completed, free up the send - * buffers used in the async operations - */ - for (i = 0; i < num_send_requests; i++) { - if (mod_data[i]) - H5MM_free(mod_data[i]); - } - } /* end if */ + chunk_entry->async_info.num_receive_requests = (int) chunk_entry->num_writers - 1; + if (NULL == (chunk_entry->async_info.receive_requests_array = (MPI_Request *) H5MM_malloc(chunk_entry->async_info.num_receive_requests * sizeof(MPI_Request)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate async requests array") - *chunk_list = local_info_array; - *num_entries = num_chunks_selected; + if (NULL == (chunk_entry->async_info.receive_buffer_array = (unsigned char **) H5MM_malloc(chunk_entry->async_info.num_receive_requests * sizeof(unsigned char *)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate async receive buffers") + + for (j = 0; j < chunk_entry->num_writers - 1; j++) { + int count = 0; + + /* Probe for a particular message from any process, removing that message + * from the receive queue in the process and allocating that much memory + * for the asynchronous receive + */ + if (MPI_SUCCESS != (mpi_code = MPI_Mprobe(MPI_ANY_SOURCE, (int) chunk_entry->index, io_info->comm, &message, &status))) + HMPI_GOTO_ERROR(FAIL, "MPI_Mprobe failed", mpi_code) + + if (MPI_SUCCESS != (mpi_code = MPI_Get_count(&status, MPI_BYTE, &count))) + HMPI_GOTO_ERROR(FAIL, "MPI_Get_count failed", mpi_code) + + if (NULL == (chunk_entry->async_info.receive_buffer_array[j] = (unsigned char *) H5MM_malloc(count * sizeof(char *)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate modification data receive buffer") + + if (MPI_SUCCESS != (mpi_code = MPI_Imrecv(chunk_entry->async_info.receive_buffer_array[j], count, MPI_BYTE, + &message, &chunk_entry->async_info.receive_requests_array[j]))) + HMPI_GOTO_ERROR(FAIL, "MPI_Imrecv failed", mpi_code) + } /* end for */ + } /* end if */ + + local_chunk_array[last_assigned_idx++] = local_chunk_array[i]; + } /* end else */ + } /* end for */ + + *local_chunk_array_num_entries = last_assigned_idx; + + /* Wait for all async send requests to complete before returning */ + if (num_send_requests) { + if (NULL == (send_statuses = (MPI_Status *) H5MM_malloc(num_send_requests * sizeof(*send_statuses)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send statuses buffer") + + H5_CHECK_OVERFLOW(num_send_requests, size_t, int); + if (MPI_SUCCESS != (mpi_code = MPI_Waitall((int) num_send_requests, send_requests, send_statuses))) + HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code) + } /* end if */ done: - if (shared_chunks_info_array) - H5MM_free(shared_chunks_info_array); - if (send_counts) - H5MM_free(send_counts); - if (send_displacements) - H5MM_free(send_displacements); + /* Now that all async send requests have completed, free up the send + * buffers used in the async operations + */ + for (i = 0; i < num_send_requests; i++) { + if (mod_data[i]) + H5MM_free(mod_data[i]); + } + if (send_requests) H5MM_free(send_requests); if (send_statuses) H5MM_free(send_statuses); + if (send_counts) + H5MM_free(send_counts); + if (send_displacements) + H5MM_free(send_displacements); if (mod_data) H5MM_free(mod_data); if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") if (mem_iter) H5MM_free(mem_iter); + if (shared_chunks_info_array) + H5MM_free(shared_chunks_info_array); FUNC_LEAVE_NOAPI(ret_value) -} /* end H5D__construct_filtered_io_info_list() */ +} /* end H5D__chunk_redistribute_shared_chunks() */ /*------------------------------------------------------------------------- |