summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/H5Dmpio.c44
1 files changed, 31 insertions, 13 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c
index b475eaf..c1d044f 100644
--- a/src/H5Dmpio.c
+++ b/src/H5Dmpio.c
@@ -1361,9 +1361,8 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in
HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual io mode property")
/* Build a list of selected chunks in the collective io operation */
- /* XXX: Not sure about correct minor error code */
if (H5D__construct_filtered_io_info_list(io_info, type_info, fm, &chunk_list, &chunk_list_num_entries) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't construct filtered I/O info list")
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "couldn't construct filtered I/O info list")
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, "Incoming messages from other processes:\n");
@@ -1482,10 +1481,9 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in
HDmemcpy(chunk_list, &collective_chunk_list[offset], num_chunks_selected_array[mpi_rank] * sizeof(H5D_filtered_collective_io_info_t));
/* Create single MPI type encompassing each selection in the dataspace */
- /* XXX: change minor error code */
if (H5D__mpio_filtered_collective_write_type(chunk_list, chunk_list_num_entries,
&mem_type, &mem_type_is_derived, &file_type, &file_type_is_derived) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't create MPI link chunk I/O type")
+ HGOTO_ERROR(H5E_DATASET, H5E_BADTYPE, FAIL, "couldn't create MPI link chunk I/O type")
/* Override the write buffer to point to the address of the first
* chunk data buffer
@@ -1834,6 +1832,7 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
MPI_Datatype *mem_type_array = NULL;
hbool_t *file_type_is_derived_array = NULL;
hbool_t *mem_type_is_derived_array = NULL;
+ hbool_t *has_chunk_selected_array = NULL; /* Array of whether or not each process is contributing a chunk to each iteration */
size_t chunk_list_num_entries;
size_t collective_chunk_list_num_entries;
size_t i, j; /* Local index variable */
@@ -1859,9 +1858,8 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual chunk io mode property")
/* Build a list of selected chunks in the collective IO operation */
- /* XXX: change minor error code */
if (H5D__construct_filtered_io_info_list(io_info, type_info, fm, &chunk_list, &chunk_list_num_entries) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't construct filtered I/O info list")
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "couldn't construct filtered I/O info list")
/* Set up contiguous I/O info object */
HDmemcpy(&ctg_io_info, io_info, sizeof(ctg_io_info));
@@ -1981,16 +1979,26 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
HDfprintf(debug_file, "------------------------------\n\n");
#endif
+ if (NULL == (has_chunk_selected_array = (hbool_t *) H5MM_malloc((size_t) mpi_size * sizeof(*has_chunk_selected_array))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate num chunks selected array")
+
+ if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&have_chunk_to_process, 1, MPI_C_BOOL, has_chunk_selected_array,
+ 1, MPI_C_BOOL, io_info->comm)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code)
+
/* If this process has a chunk to work on, create a MPI type for the
* memory and file for writing out the chunk
*/
if (have_chunk_to_process) {
- int mpi_type_count;
+ size_t offset;
+ int mpi_type_count;
+
+ for (j = 0, offset = 0; j < (size_t) mpi_rank; j++)
+ offset += has_chunk_selected_array[j];
/* Collect the new chunk info back to the local copy, since only the record in the
* collective array gets updated by the chunk re-allocation */
- /* XXX: offset could be wrong if a process runs out of chunks */
- HDmemcpy(&chunk_list[i].new_chunk, &collective_chunk_list[mpi_rank].new_chunk, sizeof(chunk_list[i].new_chunk));
+ HDmemcpy(&chunk_list[i].new_chunk, &collective_chunk_list[offset].new_chunk, sizeof(chunk_list[i].new_chunk));
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, "New chunk record after memcpy back to local:\n");
@@ -2058,6 +2066,8 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
if (collective_chunk_list)
collective_chunk_list = (H5D_filtered_collective_io_info_t *) H5MM_free(collective_chunk_list);
+ if (has_chunk_selected_array)
+ has_chunk_selected_array = (hbool_t *) H5MM_free(has_chunk_selected_array);
} /* end for */
/* Free the MPI file and memory types, if they were derived */
@@ -3155,9 +3165,18 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
* If this is a write operation where the chunk is being fully overwritten, enough memory
* must be allocated for the size of the unfiltered chunk.
*/
- /* XXX: Return value of macro should be checked */
- buf_size = (!chunk_entry->full_overwrite || io_info->op_type == H5D_IO_OP_READ) ? chunk_entry->old_chunk.length
- : (hsize_t) H5S_GET_EXTENT_NPOINTS(chunk_entry->chunk_info.fspace) * type_info->src_type_size;
+ if (!chunk_entry->full_overwrite || io_info->op_type == H5D_IO_OP_READ) {
+ buf_size = chunk_entry->old_chunk.length;
+ }
+ else {
+ hssize_t extent_npoints;
+
+ if ((extent_npoints = H5S_GET_EXTENT_NPOINTS(chunk_entry->chunk_info.fspace)) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
+
+ buf_size = (hsize_t) extent_npoints * type_info->src_type_size;
+ }
+
chunk_entry->new_chunk.length = buf_size;
#ifdef PARALLEL_COMPRESS_DEBUG
@@ -3230,7 +3249,6 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
- /* XXX: Implement re-alloc strategy too avoid too many malloc/frees */
/* Update the chunk data with any modifications from other processes */
while (chunk_entry->num_writers > 1) {
const unsigned char *mod_data_p; /* Use second pointer since H5S_decode advances pointer */