summaryrefslogtreecommitdiffstats
path: root/src/H5Dmpio.c
diff options
context:
space:
mode:
authorJordan Henderson <jhenderson@hdfgroup.org>2017-05-01 15:01:53 (GMT)
committerJordan Henderson <jhenderson@hdfgroup.org>2017-05-01 15:01:53 (GMT)
commitd367d397e3380d325154a97f5718642d0657db70 (patch)
tree581bc5213a78d5715eebd5c32257960ca5d0a5e4 /src/H5Dmpio.c
parent4a0937f26ade129130b626f9ecbf95822ba139d1 (diff)
downloadhdf5-d367d397e3380d325154a97f5718642d0657db70.zip
hdf5-d367d397e3380d325154a97f5718642d0657db70.tar.gz
hdf5-d367d397e3380d325154a97f5718642d0657db70.tar.bz2
Separate shared chunks redistribution code out to its own function
Diffstat (limited to 'src/H5Dmpio.c')
-rw-r--r--src/H5Dmpio.c452
1 files changed, 245 insertions, 207 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c
index 8e662ec..0f3d09e 100644
--- a/src/H5Dmpio.c
+++ b/src/H5Dmpio.c
@@ -159,6 +159,9 @@ static herr_t H5D__mpio_get_sum_chunk(const H5D_io_info_t *io_info,
static herr_t H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info,
const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm,
H5D_filtered_collective_io_info_t **chunk_list, size_t *num_entries);
+static herr_t H5D__chunk_redistribute_shared_chunks(const H5D_io_info_t *io_info,
+ const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm,
+ H5D_filtered_collective_io_info_t *local_chunk_array, size_t *local_chunk_array_num_entries);
static herr_t H5D__mpio_array_gatherv(void *local_array, size_t local_array_num_entries,
size_t array_entry_size, void **gathered_array, size_t *gathered_array_num_entries,
int nprocs, hbool_t allgather, int root, MPI_Comm comm, int (*sort_func)(const void *, const void *));
@@ -2590,17 +2593,9 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
const H5D_chunk_map_t *fm, H5D_filtered_collective_io_info_t **chunk_list, size_t *num_entries)
{
H5D_filtered_collective_io_info_t *local_info_array = NULL; /* The list of initially selected chunks for this process */
- H5D_filtered_collective_io_info_t *shared_chunks_info_array = NULL; /* The list of all chunks selected in the operation by all processes */
- H5S_sel_iter_t *mem_iter = NULL; /* Memory iterator for H5D__gather_mem */
- unsigned char **mod_data = NULL; /* Array of chunk modification data buffers sent by a process to new chunk owners */
- MPI_Request *send_requests = NULL; /* Array of MPI_Isend chunk modification data send requests */
- MPI_Status *send_statuses = NULL; /* Array of MPI_Isend chunk modification send statuses */
- hbool_t mem_iter_init = FALSE;
size_t num_chunks_selected;
- size_t i, last_assigned_idx;
- int *send_counts = NULL;
- int *send_displacements = NULL;
- int mpi_rank, mpi_size, mpi_code;
+ size_t i;
+ int mpi_rank;
herr_t ret_value = SUCCEED;
FUNC_ENTER_STATIC
@@ -2614,8 +2609,6 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0)
HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank")
- if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0)
- HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size")
/* Each process builds a local list of the chunks they have selected */
if ((num_chunks_selected = H5SL_count(fm->sel_chunks))) {
@@ -2662,236 +2655,281 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
} /* end if */
/* Redistribute shared chunks to new owners as necessary */
- if (io_info->op_type == H5D_IO_OP_WRITE) {
- size_t shared_chunks_info_array_num_entries = 0;
- size_t num_send_requests = 0;
-
- if (num_chunks_selected)
- if (NULL == (send_requests = (MPI_Request *) H5MM_malloc(num_chunks_selected * sizeof(*send_requests))))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send requests buffer")
-
- if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(H5S_sel_iter_t))))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator")
-
- if (H5D__mpio_array_gatherv(local_info_array, num_chunks_selected, sizeof(*local_info_array),
- (void **) &shared_chunks_info_array, &shared_chunks_info_array_num_entries, mpi_size,
- false, 0, io_info->comm, H5D__cmp_filtered_collective_io_info_entry) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather array")
-
- /* Rank 0 redistributes any shared chunks to new owners as necessary */
- if (mpi_rank == 0) {
- if (NULL == (send_counts = (int *) H5MM_calloc((size_t) mpi_size * sizeof(*send_counts))))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate send counts buffer")
-
- if (NULL == (send_displacements = (int *) H5MM_malloc((size_t) mpi_size * sizeof(*send_displacements))))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate send displacements buffer")
-
- for (i = 0; i < shared_chunks_info_array_num_entries;) {
- H5D_filtered_collective_io_info_t chunk_entry;
- haddr_t last_seen_addr = shared_chunks_info_array[i].chunk_states.chunk_current.offset;
- size_t set_begin_index = i;
- size_t total_io_size = 0;
- size_t max_io_size = 0;
- size_t num_writers = 0;
- int new_chunk_owner = 0;
-
- /* Process each set of duplicate entries caused by another process writing to the same chunk */
- do {
- chunk_entry = shared_chunks_info_array[i];
-
- send_counts[chunk_entry.owners.original_owner] += (int) sizeof(chunk_entry);
-
- /* Add this chunk entry's I/O size to the running total */
- total_io_size += chunk_entry.io_size;
-
- /* The new owner of the chunk is determined by the process
- * which is writing the most data to the chunk
- */
- if (chunk_entry.io_size > max_io_size) {
- max_io_size = chunk_entry.io_size;
- new_chunk_owner = chunk_entry.owners.original_owner;
- }
-
- num_writers++;
- } while (++i < shared_chunks_info_array_num_entries && shared_chunks_info_array[i].chunk_states.chunk_current.offset == last_seen_addr);
-
- /* Set all of the chunk entries' "new_owner" fields */
- for (; set_begin_index < i; set_begin_index++) {
- shared_chunks_info_array[set_begin_index].owners.new_owner = new_chunk_owner;
- shared_chunks_info_array[set_begin_index].num_writers = num_writers;
- } /* end for */
- } /* end for */
+ if (io_info->op_type == H5D_IO_OP_WRITE)
+ if (H5D__chunk_redistribute_shared_chunks(io_info, type_info, fm, local_info_array, &num_chunks_selected) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTDISTRIBUTE, FAIL, "unable to redistribute shared chunks")
- /* Sort the new list's in order of previous owner so that each original owner of a chunk
- * entry gets that entry back, with the possibly newly-modified "new_owner" field
- */
- HDqsort(shared_chunks_info_array, shared_chunks_info_array_num_entries,
- sizeof(*shared_chunks_info_array), H5D__cmp_filtered_collective_io_info_entry_owner);
+ *chunk_list = local_info_array;
+ *num_entries = num_chunks_selected;
- send_displacements[0] = 0;
- for (i = 1; i < (size_t) mpi_size; i++)
- send_displacements[i] = send_displacements[i - 1] + send_counts[i - 1];
- } /* end if */
+done:
+ FUNC_LEAVE_NOAPI(ret_value)
+} /* end H5D__construct_filtered_io_info_list() */
- /* Scatter the segments of the list back to each process */
- if (MPI_SUCCESS != (mpi_code = MPI_Scatterv(shared_chunks_info_array, send_counts,
- send_displacements, MPI_BYTE, local_info_array, num_chunks_selected * (int) sizeof(*local_info_array),
- MPI_BYTE, 0, io_info->comm)))
- HMPI_GOTO_ERROR(FAIL, "unable to scatter shared chunks info buffer", mpi_code)
-
- if (shared_chunks_info_array) {
- H5MM_free(shared_chunks_info_array);
- shared_chunks_info_array = NULL;
- }
-
- /* Now that the chunks have been redistributed, each process must send its modification data
- * to the new owners of any of the chunks it previously possessed. Accordingly, each process
- * must also issue asynchronous receives for any messages it may receive for each of the
- * chunks it is assigned, in order to avoid potential deadlocking issues.
- */
- if (num_chunks_selected)
- if (NULL == (mod_data = (unsigned char **) H5MM_malloc(num_chunks_selected * sizeof(*mod_data))))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate modification data buffer array")
-
- for (i = 0, last_assigned_idx = 0; i < num_chunks_selected; i++) {
- H5D_filtered_collective_io_info_t *chunk_entry = &local_info_array[i];
-
- if (mpi_rank != chunk_entry->owners.new_owner) {
- H5D_chunk_info_t *chunk_info = NULL;
- unsigned char *mod_data_p = NULL;
- hssize_t iter_nelmts;
- size_t mod_data_size;
-
- /* Look up the chunk and get its file and memory dataspaces */
- if (NULL == (chunk_info = (H5D_chunk_info_t *) H5SL_search(fm->sel_chunks, &chunk_entry->index)))
- HGOTO_ERROR(H5E_DATASPACE, H5E_NOTFOUND, FAIL, "can't locate chunk in skip list")
-
- /* Determine size of serialized chunk file dataspace, plus the size of
- * the data being written
- */
- if (H5S_encode(chunk_info->fspace, &mod_data_p, &mod_data_size) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to get encoded dataspace size")
+
+/*-------------------------------------------------------------------------
+ * Function: H5D__chunk_redistribute_shared_chunks
+ *
+ * Purpose:
+ *
+ * Return: Non-negative on success/Negative on failure
+ *
+ * Programmer: Jordan Henderson
+ * Monday, May 1, 2017
+ *
+ *-------------------------------------------------------------------------
+ */
+static herr_t
+H5D__chunk_redistribute_shared_chunks(const H5D_io_info_t *io_info, const H5D_type_info_t *type_info,
+ const H5D_chunk_map_t *fm, H5D_filtered_collective_io_info_t *local_chunk_array, size_t *local_chunk_array_num_entries)
+{
+ H5D_filtered_collective_io_info_t *shared_chunks_info_array = NULL; /* The list of all chunks selected in the operation by all processes */
+ H5S_sel_iter_t *mem_iter = NULL; /* Memory iterator for H5D__gather_mem */
+ unsigned char **mod_data = NULL; /* Array of chunk modification data buffers sent by a process to new chunk owners */
+ MPI_Request *send_requests = NULL; /* Array of MPI_Isend chunk modification data send requests */
+ MPI_Status *send_statuses = NULL; /* Array of MPI_Isend chunk modification send statuses */
+ hbool_t mem_iter_init = FALSE;
+ size_t shared_chunks_info_array_num_entries = 0;
+ size_t num_send_requests = 0;
+ size_t i, last_assigned_idx;
+ int *send_counts = NULL;
+ int *send_displacements = NULL;
+ int mpi_rank, mpi_size, mpi_code;
+ herr_t ret_value = SUCCEED;
- if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
+ FUNC_ENTER_STATIC
- mod_data_size += (size_t) iter_nelmts * type_info->src_type_size;
+ HDassert(io_info);
+ HDassert(type_info);
+ HDassert(local_chunk_array_num_entries);
- if (NULL == (mod_data[num_send_requests] = (unsigned char *) H5MM_malloc(mod_data_size)))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification send buffer")
+ if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank")
+ if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size")
- /* Serialize the chunk's file dataspace into the buffer */
- mod_data_p = mod_data[num_send_requests];
- if (H5S_encode(chunk_info->fspace, &mod_data_p, &mod_data_size) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to encode dataspace")
+ if (*local_chunk_array_num_entries)
+ if (NULL == (send_requests = (MPI_Request *) H5MM_malloc(*local_chunk_array_num_entries * sizeof(*send_requests))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send requests buffer")
- /* Intialize iterator for memory selection */
- if (H5S_select_iter_init(mem_iter, chunk_info->mspace, type_info->src_type_size) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
- mem_iter_init = TRUE;
+ if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(H5S_sel_iter_t))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator")
- /* Collect the modification data into the buffer */
- if (!H5D__gather_mem(io_info->u.wbuf, chunk_info->mspace, mem_iter,
- (size_t) iter_nelmts, io_info->dxpl_cache, mod_data_p))
- HGOTO_ERROR(H5E_IO, H5E_CANTGATHER, FAIL, "couldn't gather from write buffer")
+ if (H5D__mpio_array_gatherv(local_chunk_array, *local_chunk_array_num_entries, sizeof(*local_chunk_array),
+ (void **) &shared_chunks_info_array, &shared_chunks_info_array_num_entries, mpi_size,
+ false, 0, io_info->comm, H5D__cmp_filtered_collective_io_info_entry) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather array")
- /* Send modification data to new owner */
- H5_CHECK_OVERFLOW(mod_data_size, size_t, int)
- H5_CHECK_OVERFLOW(chunk_entry->index, hsize_t, int)
- if (MPI_SUCCESS != (mpi_code = MPI_Isend(mod_data[num_send_requests], (int) mod_data_size, MPI_BYTE,
- chunk_entry->owners.new_owner, (int) chunk_entry->index, io_info->comm, &send_requests[num_send_requests])))
- HMPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code)
+ /* Rank 0 redistributes any shared chunks to new owners as necessary */
+ if (mpi_rank == 0) {
+ if (NULL == (send_counts = (int *) H5MM_calloc((size_t) mpi_size * sizeof(*send_counts))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate send counts buffer")
- if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release memory selection iterator")
- mem_iter_init = FALSE;
+ if (NULL == (send_displacements = (int *) H5MM_malloc((size_t) mpi_size * sizeof(*send_displacements))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate send displacements buffer")
- num_send_requests++;
- } /* end if */
- else {
- /* Allocate all necessary buffers for an asynchronous receive operation */
- if (chunk_entry->num_writers > 1) {
- MPI_Message message;
- MPI_Status status;
- size_t j;
-
- chunk_entry->async_info.num_receive_requests = (int) chunk_entry->num_writers - 1;
- if (NULL == (chunk_entry->async_info.receive_requests_array = (MPI_Request *) H5MM_malloc(chunk_entry->async_info.num_receive_requests * sizeof(MPI_Request))))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate async requests array")
-
- if (NULL == (chunk_entry->async_info.receive_buffer_array = (unsigned char **) H5MM_malloc(chunk_entry->async_info.num_receive_requests * sizeof(unsigned char *))))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate async receive buffers")
-
- for (j = 0; j < chunk_entry->num_writers - 1; j++) {
- int count = 0;
-
- /* Probe for a particular message from any process, removing that message
- * from the receive queue in the process and allocating that much memory
- * for the asynchronous receive
- */
- if (MPI_SUCCESS != (mpi_code = MPI_Mprobe(MPI_ANY_SOURCE, (int) chunk_entry->index, io_info->comm, &message, &status)))
- HMPI_GOTO_ERROR(FAIL, "MPI_Mprobe failed", mpi_code)
-
- if (MPI_SUCCESS != (mpi_code = MPI_Get_count(&status, MPI_BYTE, &count)))
- HMPI_GOTO_ERROR(FAIL, "MPI_Get_count failed", mpi_code)
-
- if (NULL == (chunk_entry->async_info.receive_buffer_array[j] = (unsigned char *) H5MM_malloc(count * sizeof(char *))))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate modification data receive buffer")
-
- if (MPI_SUCCESS != (mpi_code = MPI_Imrecv(chunk_entry->async_info.receive_buffer_array[j], count, MPI_BYTE,
- &message, &chunk_entry->async_info.receive_requests_array[j])))
- HMPI_GOTO_ERROR(FAIL, "MPI_Imrecv failed", mpi_code)
- }
+ for (i = 0; i < shared_chunks_info_array_num_entries;) {
+ H5D_filtered_collective_io_info_t chunk_entry;
+ haddr_t last_seen_addr = shared_chunks_info_array[i].chunk_states.chunk_current.offset;
+ size_t set_begin_index = i;
+ size_t total_io_size = 0;
+ size_t max_io_size = 0;
+ size_t num_writers = 0;
+ int new_chunk_owner = 0;
+
+ /* Process each set of duplicate entries caused by another process writing to the same chunk */
+ do {
+ chunk_entry = shared_chunks_info_array[i];
+
+ send_counts[chunk_entry.owners.original_owner] += (int) sizeof(chunk_entry);
+
+ /* Add this chunk entry's I/O size to the running total */
+ total_io_size += chunk_entry.io_size;
+
+ /* The new owner of the chunk is determined by the process
+ * which is writing the most data to the chunk
+ */
+ if (chunk_entry.io_size > max_io_size) {
+ max_io_size = chunk_entry.io_size;
+ new_chunk_owner = chunk_entry.owners.original_owner;
}
- local_info_array[last_assigned_idx++] = local_info_array[i];
- } /* end else */
+ num_writers++;
+ } while (++i < shared_chunks_info_array_num_entries && shared_chunks_info_array[i].chunk_states.chunk_current.offset == last_seen_addr);
+
+ /* Set all of the chunk entries' "new_owner" fields */
+ for (; set_begin_index < i; set_begin_index++) {
+ shared_chunks_info_array[set_begin_index].owners.new_owner = new_chunk_owner;
+ shared_chunks_info_array[set_begin_index].num_writers = num_writers;
+ } /* end for */
} /* end for */
- num_chunks_selected = last_assigned_idx;
+ /* Sort the new list in order of previous owner so that each original owner of a chunk
+ * entry gets that entry back, with the possibly newly-modified "new_owner" field
+ */
+ HDqsort(shared_chunks_info_array, shared_chunks_info_array_num_entries,
+ sizeof(*shared_chunks_info_array), H5D__cmp_filtered_collective_io_info_entry_owner);
+
+ send_displacements[0] = 0;
+ for (i = 1; i < (size_t) mpi_size; i++)
+ send_displacements[i] = send_displacements[i - 1] + send_counts[i - 1];
+ } /* end if */
- /* Wait for all async send requests to complete before returning */
- if (num_send_requests) {
- if (NULL == (send_statuses = (MPI_Status *) H5MM_malloc(num_send_requests * sizeof(*send_statuses))))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send statuses buffer")
+ /* Scatter the segments of the list back to each process */
+ if (MPI_SUCCESS != (mpi_code = MPI_Scatterv(shared_chunks_info_array, send_counts,
+ send_displacements, MPI_BYTE, local_chunk_array, *local_chunk_array_num_entries * (int) sizeof(*local_chunk_array),
+ MPI_BYTE, 0, io_info->comm)))
+ HMPI_GOTO_ERROR(FAIL, "unable to scatter shared chunks info buffer", mpi_code)
- H5_CHECK_OVERFLOW(num_send_requests, size_t, int);
- if (MPI_SUCCESS != (mpi_code = MPI_Waitall((int) num_send_requests, send_requests, send_statuses)))
- HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code)
+ if (shared_chunks_info_array) {
+ H5MM_free(shared_chunks_info_array);
+ shared_chunks_info_array = NULL;
+ }
+
+ /* Now that the chunks have been redistributed, each process must send its modification data
+ * to the new owners of any of the chunks it previously possessed. Accordingly, each process
+ * must also issue asynchronous receives for any messages it may receive for each of the
+ * chunks it is assigned, in order to avoid potential deadlocking issues.
+ */
+ if (*local_chunk_array_num_entries)
+ if (NULL == (mod_data = (unsigned char **) H5MM_malloc(*local_chunk_array_num_entries * sizeof(*mod_data))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate modification data buffer array")
+
+ for (i = 0, last_assigned_idx = 0; i < *local_chunk_array_num_entries; i++) {
+ H5D_filtered_collective_io_info_t *chunk_entry = &local_chunk_array[i];
+
+ if (mpi_rank != chunk_entry->owners.new_owner) {
+ H5D_chunk_info_t *chunk_info = NULL;
+ unsigned char *mod_data_p = NULL;
+ hssize_t iter_nelmts;
+ size_t mod_data_size;
+
+ /* Look up the chunk and get its file and memory dataspaces */
+ if (NULL == (chunk_info = (H5D_chunk_info_t *) H5SL_search(fm->sel_chunks, &chunk_entry->index)))
+ HGOTO_ERROR(H5E_DATASPACE, H5E_NOTFOUND, FAIL, "can't locate chunk in skip list")
+
+ /* Determine size of serialized chunk file dataspace, plus the size of
+ * the data being written
+ */
+ if (H5S_encode(chunk_info->fspace, &mod_data_p, &mod_data_size) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to get encoded dataspace size")
+
+ if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
+
+ mod_data_size += (size_t) iter_nelmts * type_info->src_type_size;
+
+ if (NULL == (mod_data[num_send_requests] = (unsigned char *) H5MM_malloc(mod_data_size)))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification send buffer")
+
+ /* Serialize the chunk's file dataspace into the buffer */
+ mod_data_p = mod_data[num_send_requests];
+ if (H5S_encode(chunk_info->fspace, &mod_data_p, &mod_data_size) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to encode dataspace")
+
+ /* Intialize iterator for memory selection */
+ if (H5S_select_iter_init(mem_iter, chunk_info->mspace, type_info->src_type_size) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
+ mem_iter_init = TRUE;
+
+ /* Collect the modification data into the buffer */
+ if (!H5D__gather_mem(io_info->u.wbuf, chunk_info->mspace, mem_iter,
+ (size_t) iter_nelmts, io_info->dxpl_cache, mod_data_p))
+ HGOTO_ERROR(H5E_IO, H5E_CANTGATHER, FAIL, "couldn't gather from write buffer")
+
+ /* Send modification data to new owner */
+ H5_CHECK_OVERFLOW(mod_data_size, size_t, int)
+ H5_CHECK_OVERFLOW(chunk_entry->index, hsize_t, int)
+ if (MPI_SUCCESS != (mpi_code = MPI_Isend(mod_data[num_send_requests], (int) mod_data_size, MPI_BYTE,
+ chunk_entry->owners.new_owner, (int) chunk_entry->index, io_info->comm, &send_requests[num_send_requests])))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code)
+
+ if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release memory selection iterator")
+ mem_iter_init = FALSE;
+
+ num_send_requests++;
} /* end if */
+ else {
+ /* Allocate all necessary buffers for an asynchronous receive operation */
+ if (chunk_entry->num_writers > 1) {
+ MPI_Message message;
+ MPI_Status status;
+ size_t j;
- /* Now that all async send requests have completed, free up the send
- * buffers used in the async operations
- */
- for (i = 0; i < num_send_requests; i++) {
- if (mod_data[i])
- H5MM_free(mod_data[i]);
- }
- } /* end if */
+ chunk_entry->async_info.num_receive_requests = (int) chunk_entry->num_writers - 1;
+ if (NULL == (chunk_entry->async_info.receive_requests_array = (MPI_Request *) H5MM_malloc(chunk_entry->async_info.num_receive_requests * sizeof(MPI_Request))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate async requests array")
- *chunk_list = local_info_array;
- *num_entries = num_chunks_selected;
+ if (NULL == (chunk_entry->async_info.receive_buffer_array = (unsigned char **) H5MM_malloc(chunk_entry->async_info.num_receive_requests * sizeof(unsigned char *))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate async receive buffers")
+
+ for (j = 0; j < chunk_entry->num_writers - 1; j++) {
+ int count = 0;
+
+ /* Probe for a particular message from any process, removing that message
+ * from the receive queue in the process and allocating that much memory
+ * for the asynchronous receive
+ */
+ if (MPI_SUCCESS != (mpi_code = MPI_Mprobe(MPI_ANY_SOURCE, (int) chunk_entry->index, io_info->comm, &message, &status)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Mprobe failed", mpi_code)
+
+ if (MPI_SUCCESS != (mpi_code = MPI_Get_count(&status, MPI_BYTE, &count)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Get_count failed", mpi_code)
+
+ if (NULL == (chunk_entry->async_info.receive_buffer_array[j] = (unsigned char *) H5MM_malloc(count * sizeof(char *))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate modification data receive buffer")
+
+ if (MPI_SUCCESS != (mpi_code = MPI_Imrecv(chunk_entry->async_info.receive_buffer_array[j], count, MPI_BYTE,
+ &message, &chunk_entry->async_info.receive_requests_array[j])))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Imrecv failed", mpi_code)
+ } /* end for */
+ } /* end if */
+
+ local_chunk_array[last_assigned_idx++] = local_chunk_array[i];
+ } /* end else */
+ } /* end for */
+
+ *local_chunk_array_num_entries = last_assigned_idx;
+
+ /* Wait for all async send requests to complete before returning */
+ if (num_send_requests) {
+ if (NULL == (send_statuses = (MPI_Status *) H5MM_malloc(num_send_requests * sizeof(*send_statuses))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send statuses buffer")
+
+ H5_CHECK_OVERFLOW(num_send_requests, size_t, int);
+ if (MPI_SUCCESS != (mpi_code = MPI_Waitall((int) num_send_requests, send_requests, send_statuses)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code)
+ } /* end if */
done:
- if (shared_chunks_info_array)
- H5MM_free(shared_chunks_info_array);
- if (send_counts)
- H5MM_free(send_counts);
- if (send_displacements)
- H5MM_free(send_displacements);
+ /* Now that all async send requests have completed, free up the send
+ * buffers used in the async operations
+ */
+ for (i = 0; i < num_send_requests; i++) {
+ if (mod_data[i])
+ H5MM_free(mod_data[i]);
+ }
+
if (send_requests)
H5MM_free(send_requests);
if (send_statuses)
H5MM_free(send_statuses);
+ if (send_counts)
+ H5MM_free(send_counts);
+ if (send_displacements)
+ H5MM_free(send_displacements);
if (mod_data)
H5MM_free(mod_data);
if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
if (mem_iter)
H5MM_free(mem_iter);
+ if (shared_chunks_info_array)
+ H5MM_free(shared_chunks_info_array);
FUNC_LEAVE_NOAPI(ret_value)
-} /* end H5D__construct_filtered_io_info_list() */
+} /* end H5D__chunk_redistribute_shared_chunks() */
/*-------------------------------------------------------------------------