diff options
Diffstat (limited to 'src/H5Dmpio.c')
-rw-r--r-- | src/H5Dmpio.c | 156 |
1 files changed, 98 insertions, 58 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c index 88171c8..8e662ec 100644 --- a/src/H5Dmpio.c +++ b/src/H5Dmpio.c @@ -97,22 +97,28 @@ typedef struct H5D_chunk_addr_info_t { /* Information about a single chunk when performing collective filtered I/O */ typedef struct H5D_filtered_collective_io_info_t { - hsize_t index; /* "Index" of chunk in dataset */ - hsize_t scaled[H5O_LAYOUT_NDIMS]; /* Scaled coordinates of chunk (in file dataset's dataspace) */ - hbool_t full_overwrite; /* Whether or not this chunk is being fully overwritten */ - size_t num_writers; /* Total number of processes writing to this chunk */ - size_t io_size; /* Size of the I/O to this chunk */ - void *buf; /* Chunk data to be written to file/that has been read from file*/ + hsize_t index; /* "Index" of chunk in dataset */ + hsize_t scaled[H5O_LAYOUT_NDIMS]; /* Scaled coordinates of chunk (in file dataset's dataspace) */ + hbool_t full_overwrite; /* Whether or not this chunk is being fully overwritten */ + size_t num_writers; /* Total number of processes writing to this chunk */ + size_t io_size; /* Size of the I/O to this chunk */ + void *buf; /* Chunk data to be written to file/that has been read from file*/ struct { - H5F_block_t chunk_current; /* The address in the file and size of this chunk before the I/O and filtering operations */ - H5F_block_t new_chunk; /* The address in the file and size of this chunk after the I/O and filtering operations */ + H5F_block_t chunk_current; /* The address in the file and size of this chunk before the I/O and filtering operations */ + H5F_block_t new_chunk; /* The address in the file and size of this chunk after the I/O and filtering operations */ } chunk_states; struct { - int original_owner; /* The process which originally had this chunk selected in the I/O operation */ - int new_owner; /* The process which the chunk has been re-assigned to */ + int original_owner; /* The process which originally had this chunk selected in the I/O operation */ + int new_owner; /* The process which the chunk has been re-assigned to */ } owners; + + struct { + MPI_Request *receive_requests_array; + unsigned char **receive_buffer_array; + int num_receive_requests; + } async_info; } H5D_filtered_collective_io_info_t; /********************/ @@ -2586,7 +2592,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ H5D_filtered_collective_io_info_t *local_info_array = NULL; /* The list of initially selected chunks for this process */ H5D_filtered_collective_io_info_t *shared_chunks_info_array = NULL; /* The list of all chunks selected in the operation by all processes */ H5S_sel_iter_t *mem_iter = NULL; /* Memory iterator for H5D__gather_mem */ - unsigned char *mod_data = NULL; /* Chunk modification data sent by a process to a chunk's owner */ + unsigned char **mod_data = NULL; /* Array of chunk modification data buffers sent by a process to new chunk owners */ MPI_Request *send_requests = NULL; /* Array of MPI_Isend chunk modification data send requests */ MPI_Status *send_statuses = NULL; /* Array of MPI_Isend chunk modification send statuses */ hbool_t mem_iter_init = FALSE; @@ -2636,6 +2642,10 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ local_info_array[i].owners.original_owner = local_info_array[i].owners.new_owner = mpi_rank; local_info_array[i].buf = NULL; + local_info_array[i].async_info.num_receive_requests = 0; + local_info_array[i].async_info.receive_buffer_array = NULL; + local_info_array[i].async_info.receive_requests_array = NULL; + HDmemcpy(local_info_array[i].scaled, chunk_info->scaled, sizeof(chunk_info->scaled)); if ((select_npoints = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0) @@ -2689,7 +2699,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ do { chunk_entry = shared_chunks_info_array[i]; - send_counts[chunk_entry.owners.original_owner] += sizeof(chunk_entry); + send_counts[chunk_entry.owners.original_owner] += (int) sizeof(chunk_entry); /* Add this chunk entry's I/O size to the running total */ total_io_size += chunk_entry.io_size; @@ -2725,7 +2735,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ /* Scatter the segments of the list back to each process */ if (MPI_SUCCESS != (mpi_code = MPI_Scatterv(shared_chunks_info_array, send_counts, - send_displacements, MPI_BYTE, local_info_array, num_chunks_selected * sizeof(*local_info_array), + send_displacements, MPI_BYTE, local_info_array, num_chunks_selected * (int) sizeof(*local_info_array), MPI_BYTE, 0, io_info->comm))) HMPI_GOTO_ERROR(FAIL, "unable to scatter shared chunks info buffer", mpi_code) @@ -2735,24 +2745,31 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ } /* Now that the chunks have been redistributed, each process must send its modification data - * to the new owners of any of the chunks it previously possessed + * to the new owners of any of the chunks it previously possessed. Accordingly, each process + * must also issue asynchronous receives for any messages it may receive for each of the + * chunks it is assigned, in order to avoid potential deadlocking issues. */ + if (num_chunks_selected) + if (NULL == (mod_data = (unsigned char **) H5MM_malloc(num_chunks_selected * sizeof(*mod_data)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate modification data buffer array") + for (i = 0, last_assigned_idx = 0; i < num_chunks_selected; i++) { - if (mpi_rank != local_info_array[i].owners.new_owner) { - H5D_filtered_collective_io_info_t chunk_entry = local_info_array[i]; + H5D_filtered_collective_io_info_t *chunk_entry = &local_info_array[i]; + + if (mpi_rank != chunk_entry->owners.new_owner) { H5D_chunk_info_t *chunk_info = NULL; unsigned char *mod_data_p = NULL; hssize_t iter_nelmts; size_t mod_data_size; /* Look up the chunk and get its file and memory dataspaces */ - if (NULL == (chunk_info = (H5D_chunk_info_t *) H5SL_search(fm->sel_chunks, &chunk_entry.index))) + if (NULL == (chunk_info = (H5D_chunk_info_t *) H5SL_search(fm->sel_chunks, &chunk_entry->index))) HGOTO_ERROR(H5E_DATASPACE, H5E_NOTFOUND, FAIL, "can't locate chunk in skip list") /* Determine size of serialized chunk file dataspace, plus the size of * the data being written */ - if (H5S_encode(chunk_info->fspace, &mod_data, &mod_data_size) < 0) + if (H5S_encode(chunk_info->fspace, &mod_data_p, &mod_data_size) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to get encoded dataspace size") if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0) @@ -2760,11 +2777,11 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ mod_data_size += (size_t) iter_nelmts * type_info->src_type_size; - if (NULL == (mod_data = (unsigned char *) H5MM_malloc(mod_data_size))) + if (NULL == (mod_data[num_send_requests] = (unsigned char *) H5MM_malloc(mod_data_size))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification send buffer") /* Serialize the chunk's file dataspace into the buffer */ - mod_data_p = mod_data; + mod_data_p = mod_data[num_send_requests]; if (H5S_encode(chunk_info->fspace, &mod_data_p, &mod_data_size) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to encode dataspace") @@ -2780,21 +2797,53 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ /* Send modification data to new owner */ H5_CHECK_OVERFLOW(mod_data_size, size_t, int) - H5_CHECK_OVERFLOW(chunk_entry.index, hsize_t, int) - if (MPI_SUCCESS != (mpi_code = MPI_Isend(mod_data, (int) mod_data_size, MPI_BYTE, chunk_entry.owners.new_owner, - (int) chunk_entry.index, io_info->comm, &send_requests[num_send_requests++]))) + H5_CHECK_OVERFLOW(chunk_entry->index, hsize_t, int) + if (MPI_SUCCESS != (mpi_code = MPI_Isend(mod_data[num_send_requests], (int) mod_data_size, MPI_BYTE, + chunk_entry->owners.new_owner, (int) chunk_entry->index, io_info->comm, &send_requests[num_send_requests]))) HMPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code) if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release memory selection iterator") mem_iter_init = FALSE; - if (mod_data) { - H5MM_free(mod_data); - mod_data = NULL; - } /* end if */ + num_send_requests++; } /* end if */ else { + /* Allocate all necessary buffers for an asynchronous receive operation */ + if (chunk_entry->num_writers > 1) { + MPI_Message message; + MPI_Status status; + size_t j; + + chunk_entry->async_info.num_receive_requests = (int) chunk_entry->num_writers - 1; + if (NULL == (chunk_entry->async_info.receive_requests_array = (MPI_Request *) H5MM_malloc(chunk_entry->async_info.num_receive_requests * sizeof(MPI_Request)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate async requests array") + + if (NULL == (chunk_entry->async_info.receive_buffer_array = (unsigned char **) H5MM_malloc(chunk_entry->async_info.num_receive_requests * sizeof(unsigned char *)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate async receive buffers") + + for (j = 0; j < chunk_entry->num_writers - 1; j++) { + int count = 0; + + /* Probe for a particular message from any process, removing that message + * from the receive queue in the process and allocating that much memory + * for the asynchronous receive + */ + if (MPI_SUCCESS != (mpi_code = MPI_Mprobe(MPI_ANY_SOURCE, (int) chunk_entry->index, io_info->comm, &message, &status))) + HMPI_GOTO_ERROR(FAIL, "MPI_Mprobe failed", mpi_code) + + if (MPI_SUCCESS != (mpi_code = MPI_Get_count(&status, MPI_BYTE, &count))) + HMPI_GOTO_ERROR(FAIL, "MPI_Get_count failed", mpi_code) + + if (NULL == (chunk_entry->async_info.receive_buffer_array[j] = (unsigned char *) H5MM_malloc(count * sizeof(char *)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate modification data receive buffer") + + if (MPI_SUCCESS != (mpi_code = MPI_Imrecv(chunk_entry->async_info.receive_buffer_array[j], count, MPI_BYTE, + &message, &chunk_entry->async_info.receive_requests_array[j]))) + HMPI_GOTO_ERROR(FAIL, "MPI_Imrecv failed", mpi_code) + } + } + local_info_array[last_assigned_idx++] = local_info_array[i]; } /* end else */ } /* end for */ @@ -2810,6 +2859,14 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ if (MPI_SUCCESS != (mpi_code = MPI_Waitall((int) num_send_requests, send_requests, send_statuses))) HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code) } /* end if */ + + /* Now that all async send requests have completed, free up the send + * buffers used in the async operations + */ + for (i = 0; i < num_send_requests; i++) { + if (mod_data[i]) + H5MM_free(mod_data[i]); + } } /* end if */ *chunk_list = local_info_array; @@ -2951,10 +3008,11 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk hssize_t iter_nelmts; /* Number of points to iterate over for the chunk IO operation */ hbool_t mem_iter_init = FALSE; size_t buf_size; - size_t mod_data_alloced_bytes = 0; + size_t i; H5S_t *dataspace = NULL; /* Other process' dataspace for the chunk */ void *tmp_gath_buf = NULL; /* Temporary gather buffer for owner of the chunk to gather into from application write buffer before scattering out to the chunk data buffer */ + int mpi_code; herr_t ret_value = SUCCEED; FUNC_ENTER_STATIC @@ -3060,36 +3118,15 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") mem_iter_init = FALSE; - /* Update the chunk data with any modifications from other processes */ - while (chunk_entry->num_writers > 1) { - const unsigned char *mod_data_p; /* Use second pointer since H5S_decode advances pointer */ - MPI_Status status; - int count; - int mpi_code; - - /* Probe for the incoming message from another process */ - H5_CHECK_OVERFLOW(chunk_entry->index, hsize_t, int) - if (MPI_SUCCESS != (mpi_code = MPI_Probe(MPI_ANY_SOURCE, (int) chunk_entry->index, - io_info->comm, &status))) - HMPI_GOTO_ERROR(FAIL, "MPI_Probe failed", mpi_code) - - /* Retrieve the message size */ - if (MPI_SUCCESS != (mpi_code = MPI_Get_count(&status, MPI_BYTE, &count))) - HMPI_GOTO_ERROR(FAIL, "MPI_Get_count failed", mpi_code) - - if ((size_t) count > mod_data_alloced_bytes) { - if (NULL == (mod_data = (unsigned char *) H5MM_realloc(mod_data, (size_t) count))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate modification data receive buffer") - - mod_data_alloced_bytes = (size_t) count; - } + if (MPI_SUCCESS != (mpi_code = MPI_Waitall(chunk_entry->async_info.num_receive_requests, + chunk_entry->async_info.receive_requests_array, MPI_STATUSES_IGNORE))) + HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code) - if (MPI_SUCCESS != (mpi_code = MPI_Recv(mod_data, count, MPI_BYTE, MPI_ANY_SOURCE, - (int) chunk_entry->index, io_info->comm, &status))) - HMPI_GOTO_ERROR(FAIL, "MPI_Recv failed", mpi_code) + for (i = 0; i < (size_t) chunk_entry->async_info.num_receive_requests; i++) { + const unsigned char *mod_data_p; /* Decode the process' chunk file dataspace */ - mod_data_p = mod_data; + mod_data_p = chunk_entry->async_info.receive_buffer_array[i]; if (NULL == (dataspace = H5S_decode(&mod_data_p))) HGOTO_ERROR(H5E_DATASET, H5E_CANTDECODE, FAIL, "unable to decode dataspace") @@ -3105,8 +3142,6 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk io_info->dxpl_cache, chunk_entry->buf) < 0) HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't scatter to write buffer") - chunk_entry->num_writers--; - if (H5S_SELECT_ITER_RELEASE(mem_iter) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") mem_iter_init = FALSE; @@ -3115,7 +3150,8 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk HGOTO_ERROR(H5E_DATASPACE, H5E_CANTFREE, FAIL, "can't close dataspace") dataspace = NULL; } - } /* end while */ + H5MM_free(chunk_entry->async_info.receive_buffer_array[i]); + } /* Filter the chunk */ if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, &filter_mask, @@ -3135,6 +3171,10 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk } /* end switch */ done: + if (chunk_entry->async_info.receive_buffer_array) + H5MM_free(chunk_entry->async_info.receive_buffer_array); + if (chunk_entry->async_info.receive_requests_array) + H5MM_free(chunk_entry->async_info.receive_requests_array); if (mod_data) H5MM_free(mod_data); if (tmp_gath_buf) |