summaryrefslogtreecommitdiffstats
path: root/src/H5Dmpio.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/H5Dmpio.c')
-rw-r--r--src/H5Dmpio.c156
1 files changed, 98 insertions, 58 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c
index 88171c8..8e662ec 100644
--- a/src/H5Dmpio.c
+++ b/src/H5Dmpio.c
@@ -97,22 +97,28 @@ typedef struct H5D_chunk_addr_info_t {
/* Information about a single chunk when performing collective filtered I/O */
typedef struct H5D_filtered_collective_io_info_t {
- hsize_t index; /* "Index" of chunk in dataset */
- hsize_t scaled[H5O_LAYOUT_NDIMS]; /* Scaled coordinates of chunk (in file dataset's dataspace) */
- hbool_t full_overwrite; /* Whether or not this chunk is being fully overwritten */
- size_t num_writers; /* Total number of processes writing to this chunk */
- size_t io_size; /* Size of the I/O to this chunk */
- void *buf; /* Chunk data to be written to file/that has been read from file*/
+ hsize_t index; /* "Index" of chunk in dataset */
+ hsize_t scaled[H5O_LAYOUT_NDIMS]; /* Scaled coordinates of chunk (in file dataset's dataspace) */
+ hbool_t full_overwrite; /* Whether or not this chunk is being fully overwritten */
+ size_t num_writers; /* Total number of processes writing to this chunk */
+ size_t io_size; /* Size of the I/O to this chunk */
+ void *buf; /* Chunk data to be written to file/that has been read from file*/
struct {
- H5F_block_t chunk_current; /* The address in the file and size of this chunk before the I/O and filtering operations */
- H5F_block_t new_chunk; /* The address in the file and size of this chunk after the I/O and filtering operations */
+ H5F_block_t chunk_current; /* The address in the file and size of this chunk before the I/O and filtering operations */
+ H5F_block_t new_chunk; /* The address in the file and size of this chunk after the I/O and filtering operations */
} chunk_states;
struct {
- int original_owner; /* The process which originally had this chunk selected in the I/O operation */
- int new_owner; /* The process which the chunk has been re-assigned to */
+ int original_owner; /* The process which originally had this chunk selected in the I/O operation */
+ int new_owner; /* The process which the chunk has been re-assigned to */
} owners;
+
+ struct {
+ MPI_Request *receive_requests_array;
+ unsigned char **receive_buffer_array;
+ int num_receive_requests;
+ } async_info;
} H5D_filtered_collective_io_info_t;
/********************/
@@ -2586,7 +2592,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
H5D_filtered_collective_io_info_t *local_info_array = NULL; /* The list of initially selected chunks for this process */
H5D_filtered_collective_io_info_t *shared_chunks_info_array = NULL; /* The list of all chunks selected in the operation by all processes */
H5S_sel_iter_t *mem_iter = NULL; /* Memory iterator for H5D__gather_mem */
- unsigned char *mod_data = NULL; /* Chunk modification data sent by a process to a chunk's owner */
+ unsigned char **mod_data = NULL; /* Array of chunk modification data buffers sent by a process to new chunk owners */
MPI_Request *send_requests = NULL; /* Array of MPI_Isend chunk modification data send requests */
MPI_Status *send_statuses = NULL; /* Array of MPI_Isend chunk modification send statuses */
hbool_t mem_iter_init = FALSE;
@@ -2636,6 +2642,10 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
local_info_array[i].owners.original_owner = local_info_array[i].owners.new_owner = mpi_rank;
local_info_array[i].buf = NULL;
+ local_info_array[i].async_info.num_receive_requests = 0;
+ local_info_array[i].async_info.receive_buffer_array = NULL;
+ local_info_array[i].async_info.receive_requests_array = NULL;
+
HDmemcpy(local_info_array[i].scaled, chunk_info->scaled, sizeof(chunk_info->scaled));
if ((select_npoints = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0)
@@ -2689,7 +2699,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
do {
chunk_entry = shared_chunks_info_array[i];
- send_counts[chunk_entry.owners.original_owner] += sizeof(chunk_entry);
+ send_counts[chunk_entry.owners.original_owner] += (int) sizeof(chunk_entry);
/* Add this chunk entry's I/O size to the running total */
total_io_size += chunk_entry.io_size;
@@ -2725,7 +2735,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
/* Scatter the segments of the list back to each process */
if (MPI_SUCCESS != (mpi_code = MPI_Scatterv(shared_chunks_info_array, send_counts,
- send_displacements, MPI_BYTE, local_info_array, num_chunks_selected * sizeof(*local_info_array),
+ send_displacements, MPI_BYTE, local_info_array, num_chunks_selected * (int) sizeof(*local_info_array),
MPI_BYTE, 0, io_info->comm)))
HMPI_GOTO_ERROR(FAIL, "unable to scatter shared chunks info buffer", mpi_code)
@@ -2735,24 +2745,31 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
}
/* Now that the chunks have been redistributed, each process must send its modification data
- * to the new owners of any of the chunks it previously possessed
+ * to the new owners of any of the chunks it previously possessed. Accordingly, each process
+ * must also issue asynchronous receives for any messages it may receive for each of the
+ * chunks it is assigned, in order to avoid potential deadlocking issues.
*/
+ if (num_chunks_selected)
+ if (NULL == (mod_data = (unsigned char **) H5MM_malloc(num_chunks_selected * sizeof(*mod_data))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate modification data buffer array")
+
for (i = 0, last_assigned_idx = 0; i < num_chunks_selected; i++) {
- if (mpi_rank != local_info_array[i].owners.new_owner) {
- H5D_filtered_collective_io_info_t chunk_entry = local_info_array[i];
+ H5D_filtered_collective_io_info_t *chunk_entry = &local_info_array[i];
+
+ if (mpi_rank != chunk_entry->owners.new_owner) {
H5D_chunk_info_t *chunk_info = NULL;
unsigned char *mod_data_p = NULL;
hssize_t iter_nelmts;
size_t mod_data_size;
/* Look up the chunk and get its file and memory dataspaces */
- if (NULL == (chunk_info = (H5D_chunk_info_t *) H5SL_search(fm->sel_chunks, &chunk_entry.index)))
+ if (NULL == (chunk_info = (H5D_chunk_info_t *) H5SL_search(fm->sel_chunks, &chunk_entry->index)))
HGOTO_ERROR(H5E_DATASPACE, H5E_NOTFOUND, FAIL, "can't locate chunk in skip list")
/* Determine size of serialized chunk file dataspace, plus the size of
* the data being written
*/
- if (H5S_encode(chunk_info->fspace, &mod_data, &mod_data_size) < 0)
+ if (H5S_encode(chunk_info->fspace, &mod_data_p, &mod_data_size) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to get encoded dataspace size")
if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0)
@@ -2760,11 +2777,11 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
mod_data_size += (size_t) iter_nelmts * type_info->src_type_size;
- if (NULL == (mod_data = (unsigned char *) H5MM_malloc(mod_data_size)))
+ if (NULL == (mod_data[num_send_requests] = (unsigned char *) H5MM_malloc(mod_data_size)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification send buffer")
/* Serialize the chunk's file dataspace into the buffer */
- mod_data_p = mod_data;
+ mod_data_p = mod_data[num_send_requests];
if (H5S_encode(chunk_info->fspace, &mod_data_p, &mod_data_size) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to encode dataspace")
@@ -2780,21 +2797,53 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
/* Send modification data to new owner */
H5_CHECK_OVERFLOW(mod_data_size, size_t, int)
- H5_CHECK_OVERFLOW(chunk_entry.index, hsize_t, int)
- if (MPI_SUCCESS != (mpi_code = MPI_Isend(mod_data, (int) mod_data_size, MPI_BYTE, chunk_entry.owners.new_owner,
- (int) chunk_entry.index, io_info->comm, &send_requests[num_send_requests++])))
+ H5_CHECK_OVERFLOW(chunk_entry->index, hsize_t, int)
+ if (MPI_SUCCESS != (mpi_code = MPI_Isend(mod_data[num_send_requests], (int) mod_data_size, MPI_BYTE,
+ chunk_entry->owners.new_owner, (int) chunk_entry->index, io_info->comm, &send_requests[num_send_requests])))
HMPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code)
if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release memory selection iterator")
mem_iter_init = FALSE;
- if (mod_data) {
- H5MM_free(mod_data);
- mod_data = NULL;
- } /* end if */
+ num_send_requests++;
} /* end if */
else {
+ /* Allocate all necessary buffers for an asynchronous receive operation */
+ if (chunk_entry->num_writers > 1) {
+ MPI_Message message;
+ MPI_Status status;
+ size_t j;
+
+ chunk_entry->async_info.num_receive_requests = (int) chunk_entry->num_writers - 1;
+ if (NULL == (chunk_entry->async_info.receive_requests_array = (MPI_Request *) H5MM_malloc(chunk_entry->async_info.num_receive_requests * sizeof(MPI_Request))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate async requests array")
+
+ if (NULL == (chunk_entry->async_info.receive_buffer_array = (unsigned char **) H5MM_malloc(chunk_entry->async_info.num_receive_requests * sizeof(unsigned char *))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate async receive buffers")
+
+ for (j = 0; j < chunk_entry->num_writers - 1; j++) {
+ int count = 0;
+
+ /* Probe for a particular message from any process, removing that message
+ * from the receive queue in the process and allocating that much memory
+ * for the asynchronous receive
+ */
+ if (MPI_SUCCESS != (mpi_code = MPI_Mprobe(MPI_ANY_SOURCE, (int) chunk_entry->index, io_info->comm, &message, &status)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Mprobe failed", mpi_code)
+
+ if (MPI_SUCCESS != (mpi_code = MPI_Get_count(&status, MPI_BYTE, &count)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Get_count failed", mpi_code)
+
+ if (NULL == (chunk_entry->async_info.receive_buffer_array[j] = (unsigned char *) H5MM_malloc(count * sizeof(char *))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate modification data receive buffer")
+
+ if (MPI_SUCCESS != (mpi_code = MPI_Imrecv(chunk_entry->async_info.receive_buffer_array[j], count, MPI_BYTE,
+ &message, &chunk_entry->async_info.receive_requests_array[j])))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Imrecv failed", mpi_code)
+ }
+ }
+
local_info_array[last_assigned_idx++] = local_info_array[i];
} /* end else */
} /* end for */
@@ -2810,6 +2859,14 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
if (MPI_SUCCESS != (mpi_code = MPI_Waitall((int) num_send_requests, send_requests, send_statuses)))
HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code)
} /* end if */
+
+ /* Now that all async send requests have completed, free up the send
+ * buffers used in the async operations
+ */
+ for (i = 0; i < num_send_requests; i++) {
+ if (mod_data[i])
+ H5MM_free(mod_data[i]);
+ }
} /* end if */
*chunk_list = local_info_array;
@@ -2951,10 +3008,11 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
hssize_t iter_nelmts; /* Number of points to iterate over for the chunk IO operation */
hbool_t mem_iter_init = FALSE;
size_t buf_size;
- size_t mod_data_alloced_bytes = 0;
+ size_t i;
H5S_t *dataspace = NULL; /* Other process' dataspace for the chunk */
void *tmp_gath_buf = NULL; /* Temporary gather buffer for owner of the chunk to gather into from
application write buffer before scattering out to the chunk data buffer */
+ int mpi_code;
herr_t ret_value = SUCCEED;
FUNC_ENTER_STATIC
@@ -3060,36 +3118,15 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
mem_iter_init = FALSE;
- /* Update the chunk data with any modifications from other processes */
- while (chunk_entry->num_writers > 1) {
- const unsigned char *mod_data_p; /* Use second pointer since H5S_decode advances pointer */
- MPI_Status status;
- int count;
- int mpi_code;
-
- /* Probe for the incoming message from another process */
- H5_CHECK_OVERFLOW(chunk_entry->index, hsize_t, int)
- if (MPI_SUCCESS != (mpi_code = MPI_Probe(MPI_ANY_SOURCE, (int) chunk_entry->index,
- io_info->comm, &status)))
- HMPI_GOTO_ERROR(FAIL, "MPI_Probe failed", mpi_code)
-
- /* Retrieve the message size */
- if (MPI_SUCCESS != (mpi_code = MPI_Get_count(&status, MPI_BYTE, &count)))
- HMPI_GOTO_ERROR(FAIL, "MPI_Get_count failed", mpi_code)
-
- if ((size_t) count > mod_data_alloced_bytes) {
- if (NULL == (mod_data = (unsigned char *) H5MM_realloc(mod_data, (size_t) count)))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate modification data receive buffer")
-
- mod_data_alloced_bytes = (size_t) count;
- }
+ if (MPI_SUCCESS != (mpi_code = MPI_Waitall(chunk_entry->async_info.num_receive_requests,
+ chunk_entry->async_info.receive_requests_array, MPI_STATUSES_IGNORE)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code)
- if (MPI_SUCCESS != (mpi_code = MPI_Recv(mod_data, count, MPI_BYTE, MPI_ANY_SOURCE,
- (int) chunk_entry->index, io_info->comm, &status)))
- HMPI_GOTO_ERROR(FAIL, "MPI_Recv failed", mpi_code)
+ for (i = 0; i < (size_t) chunk_entry->async_info.num_receive_requests; i++) {
+ const unsigned char *mod_data_p;
/* Decode the process' chunk file dataspace */
- mod_data_p = mod_data;
+ mod_data_p = chunk_entry->async_info.receive_buffer_array[i];
if (NULL == (dataspace = H5S_decode(&mod_data_p)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTDECODE, FAIL, "unable to decode dataspace")
@@ -3105,8 +3142,6 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
io_info->dxpl_cache, chunk_entry->buf) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't scatter to write buffer")
- chunk_entry->num_writers--;
-
if (H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
mem_iter_init = FALSE;
@@ -3115,7 +3150,8 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
HGOTO_ERROR(H5E_DATASPACE, H5E_CANTFREE, FAIL, "can't close dataspace")
dataspace = NULL;
}
- } /* end while */
+ H5MM_free(chunk_entry->async_info.receive_buffer_array[i]);
+ }
/* Filter the chunk */
if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, &filter_mask,
@@ -3135,6 +3171,10 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
} /* end switch */
done:
+ if (chunk_entry->async_info.receive_buffer_array)
+ H5MM_free(chunk_entry->async_info.receive_buffer_array);
+ if (chunk_entry->async_info.receive_requests_array)
+ H5MM_free(chunk_entry->async_info.receive_requests_array);
if (mod_data)
H5MM_free(mod_data);
if (tmp_gath_buf)