diff options
Diffstat (limited to 'src/H5Dmpio.c')
-rw-r--r-- | src/H5Dmpio.c | 44 |
1 files changed, 31 insertions, 13 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c index b475eaf..c1d044f 100644 --- a/src/H5Dmpio.c +++ b/src/H5Dmpio.c @@ -1361,9 +1361,8 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual io mode property") /* Build a list of selected chunks in the collective io operation */ - /* XXX: Not sure about correct minor error code */ if (H5D__construct_filtered_io_info_list(io_info, type_info, fm, &chunk_list, &chunk_list_num_entries) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't construct filtered I/O info list") + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "couldn't construct filtered I/O info list") #ifdef PARALLEL_COMPRESS_DEBUG HDfprintf(debug_file, "Incoming messages from other processes:\n"); @@ -1482,10 +1481,9 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in HDmemcpy(chunk_list, &collective_chunk_list[offset], num_chunks_selected_array[mpi_rank] * sizeof(H5D_filtered_collective_io_info_t)); /* Create single MPI type encompassing each selection in the dataspace */ - /* XXX: change minor error code */ if (H5D__mpio_filtered_collective_write_type(chunk_list, chunk_list_num_entries, &mem_type, &mem_type_is_derived, &file_type, &file_type_is_derived) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't create MPI link chunk I/O type") + HGOTO_ERROR(H5E_DATASET, H5E_BADTYPE, FAIL, "couldn't create MPI link chunk I/O type") /* Override the write buffer to point to the address of the first * chunk data buffer @@ -1834,6 +1832,7 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i MPI_Datatype *mem_type_array = NULL; hbool_t *file_type_is_derived_array = NULL; hbool_t *mem_type_is_derived_array = NULL; + hbool_t *has_chunk_selected_array = NULL; /* Array of whether or not each process is contributing a chunk to each iteration */ size_t chunk_list_num_entries; size_t collective_chunk_list_num_entries; size_t i, j; /* Local index variable */ @@ -1859,9 +1858,8 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual chunk io mode property") /* Build a list of selected chunks in the collective IO operation */ - /* XXX: change minor error code */ if (H5D__construct_filtered_io_info_list(io_info, type_info, fm, &chunk_list, &chunk_list_num_entries) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't construct filtered I/O info list") + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "couldn't construct filtered I/O info list") /* Set up contiguous I/O info object */ HDmemcpy(&ctg_io_info, io_info, sizeof(ctg_io_info)); @@ -1981,16 +1979,26 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i HDfprintf(debug_file, "------------------------------\n\n"); #endif + if (NULL == (has_chunk_selected_array = (hbool_t *) H5MM_malloc((size_t) mpi_size * sizeof(*has_chunk_selected_array)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate num chunks selected array") + + if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&have_chunk_to_process, 1, MPI_C_BOOL, has_chunk_selected_array, + 1, MPI_C_BOOL, io_info->comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code) + /* If this process has a chunk to work on, create a MPI type for the * memory and file for writing out the chunk */ if (have_chunk_to_process) { - int mpi_type_count; + size_t offset; + int mpi_type_count; + + for (j = 0, offset = 0; j < (size_t) mpi_rank; j++) + offset += has_chunk_selected_array[j]; /* Collect the new chunk info back to the local copy, since only the record in the * collective array gets updated by the chunk re-allocation */ - /* XXX: offset could be wrong if a process runs out of chunks */ - HDmemcpy(&chunk_list[i].new_chunk, &collective_chunk_list[mpi_rank].new_chunk, sizeof(chunk_list[i].new_chunk)); + HDmemcpy(&chunk_list[i].new_chunk, &collective_chunk_list[offset].new_chunk, sizeof(chunk_list[i].new_chunk)); #ifdef PARALLEL_COMPRESS_DEBUG HDfprintf(debug_file, "New chunk record after memcpy back to local:\n"); @@ -2058,6 +2066,8 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i if (collective_chunk_list) collective_chunk_list = (H5D_filtered_collective_io_info_t *) H5MM_free(collective_chunk_list); + if (has_chunk_selected_array) + has_chunk_selected_array = (hbool_t *) H5MM_free(has_chunk_selected_array); } /* end for */ /* Free the MPI file and memory types, if they were derived */ @@ -3155,9 +3165,18 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk * If this is a write operation where the chunk is being fully overwritten, enough memory * must be allocated for the size of the unfiltered chunk. */ - /* XXX: Return value of macro should be checked */ - buf_size = (!chunk_entry->full_overwrite || io_info->op_type == H5D_IO_OP_READ) ? chunk_entry->old_chunk.length - : (hsize_t) H5S_GET_EXTENT_NPOINTS(chunk_entry->chunk_info.fspace) * type_info->src_type_size; + if (!chunk_entry->full_overwrite || io_info->op_type == H5D_IO_OP_READ) { + buf_size = chunk_entry->old_chunk.length; + } + else { + hssize_t extent_npoints; + + if ((extent_npoints = H5S_GET_EXTENT_NPOINTS(chunk_entry->chunk_info.fspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + + buf_size = (hsize_t) extent_npoints * type_info->src_type_size; + } + chunk_entry->new_chunk.length = buf_size; #ifdef PARALLEL_COMPRESS_DEBUG @@ -3230,7 +3249,6 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") - /* XXX: Implement re-alloc strategy too avoid too many malloc/frees */ /* Update the chunk data with any modifications from other processes */ while (chunk_entry->num_writers > 1) { const unsigned char *mod_data_p; /* Use second pointer since H5S_decode advances pointer */ |