diff options
author | Jordan Henderson <jhenderson@hdfgroup.org> | 2017-01-26 20:03:36 (GMT) |
---|---|---|
committer | Jordan Henderson <jhenderson@hdfgroup.org> | 2017-01-26 20:03:36 (GMT) |
commit | 344781f17d983bd38aac8c687461c0bcfcf7dacf (patch) | |
tree | a514b29c648f7bee64b86ee77219d386596c1018 /src/H5Dmpio.c | |
parent | 0b6016a3fc6f7cf5b60160e638e3ae7280952106 (diff) | |
download | hdf5-344781f17d983bd38aac8c687461c0bcfcf7dacf.zip hdf5-344781f17d983bd38aac8c687461c0bcfcf7dacf.tar.gz hdf5-344781f17d983bd38aac8c687461c0bcfcf7dacf.tar.bz2 |
Multiple Bug Fixes
Add comments explaining different variables
Check more overflow/conversion sign change issues
Diffstat (limited to 'src/H5Dmpio.c')
-rw-r--r-- | src/H5Dmpio.c | 223 |
1 files changed, 121 insertions, 102 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c index 39a5e15..453e420 100644 --- a/src/H5Dmpio.c +++ b/src/H5Dmpio.c @@ -101,6 +101,7 @@ typedef struct H5D_chunk_addr_info_t { H5D_chunk_info_t chunk_info; } H5D_chunk_addr_info_t; +/* Information about a chunk when performing collective filtered IO */ typedef struct H5D_filtered_collective_io_info_t { H5D_chunk_info_t chunk_info; H5F_block_t old_chunk; @@ -151,7 +152,7 @@ static herr_t H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, H5D_filtered_collective_io_info_t **chunk_list, size_t *num_entries, size_t **_num_chunks_selected_array); static herr_t H5D__mpio_array_gather(const H5D_io_info_t *io_info, void *local_array, - size_t array_num_entries, size_t array_entry_size, + size_t local_array_num_entries, size_t array_entry_size, void **gathered_array, size_t *gathered_array_num_entries, int (*sort_func)(const void *, const void *)); static herr_t H5D__mpio_filtered_collective_write_type( @@ -161,8 +162,8 @@ static herr_t H5D__mpio_filtered_collective_write_type( static herr_t H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk_entry, const H5D_io_info_t *io_info, const H5D_type_info_t *type_info); static int H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2); -static int H5D__cmp_filtered_collective_io_entry(const void *filtered_collective_io_entry1, - const void *filtered_collective_io_entry2); +static int H5D__cmp_filtered_collective_io_info_entry(const void *filtered_collective_io_info_entry1, + const void *filtered_collective_io_info_entry2); /*********************/ @@ -350,15 +351,15 @@ done: */ static herr_t H5D__mpio_array_gather(const H5D_io_info_t *io_info, void *local_array, - size_t array_num_entries, size_t array_entry_size, + size_t local_array_num_entries, size_t array_entry_size, void **_gathered_array, size_t *_gathered_array_num_entries, int (*sort_func)(const void *, const void *)) { size_t gathered_array_num_entries = 0; size_t i; void *gathered_array = NULL; - int *receive_counts_array = NULL; - int *displacements_array = NULL; + int *receive_counts_array = NULL; /* Array containing number of entries each process contributes */ + int *displacements_array = NULL; /* Array of displacements where each process places its data in the final array */ int mpi_code, mpi_size; int sendcount; herr_t ret_value = SUCCEED; @@ -373,24 +374,24 @@ H5D__mpio_array_gather(const H5D_io_info_t *io_info, void *local_array, if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0) HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size") - if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&array_num_entries, &gathered_array_num_entries, 1, MPI_INT, MPI_SUM, io_info->comm))) + /* Determine the size of the end result array */ + if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&local_array_num_entries, &gathered_array_num_entries, 1, MPI_INT, MPI_SUM, io_info->comm))) HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code) if (NULL == (gathered_array = H5MM_malloc(gathered_array_num_entries * array_entry_size))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate total gathered array") + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate gathered array") if (NULL == (receive_counts_array = (int *) H5MM_malloc((size_t) mpi_size * sizeof(*receive_counts_array)))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive counts array") if (NULL == (displacements_array = (int *) H5MM_malloc((size_t) mpi_size * sizeof(*displacements_array)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate displacements array") + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive displacements array") - if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&array_num_entries, 1, MPI_INT, receive_counts_array, 1, MPI_INT, io_info->comm))) + /* Inform each process of how many entries each other process is contributing to the resulting array */ + if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&local_array_num_entries, 1, MPI_INT, receive_counts_array, 1, MPI_INT, io_info->comm))) HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code) - /* Multiply each receive count by the size of the array entry, - * since the data is sent in a count of bytes - */ + /* Multiply each receive count by the size of the array entry, since the data is sent as bytes */ for (i = 0; i < (size_t) mpi_size; i++) H5_CHECKED_ASSIGN(receive_counts_array[i], int, (size_t) receive_counts_array[i] * array_entry_size, size_t); @@ -399,7 +400,7 @@ H5D__mpio_array_gather(const H5D_io_info_t *io_info, void *local_array, for (i = 1; i < (size_t) mpi_size; i++) displacements_array[i] = displacements_array[i - 1] + receive_counts_array[i - 1]; - H5_CHECKED_ASSIGN(sendcount, int, array_num_entries * array_entry_size, size_t); + H5_CHECKED_ASSIGN(sendcount, int, local_array_num_entries * array_entry_size, size_t); if (MPI_SUCCESS != (mpi_code = MPI_Allgatherv(local_array, sendcount, MPI_BYTE, gathered_array, receive_counts_array, displacements_array, MPI_BYTE, io_info->comm))) HMPI_GOTO_ERROR(FAIL, "MPI_Allgatherv failed", mpi_code) @@ -734,9 +735,10 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf if(NULL == (dx_plist = (H5P_genplist_t *)H5I_object(io_info->raw_dxpl_id))) HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a file access property list") - /* Check the optional property list on what to do with collective chunk IO. */ + /* Check the optional property list for the collective chunk IO optimization option */ if(H5P_get(dx_plist, H5D_XFER_MPIO_CHUNK_OPT_HARD_NAME, &chunk_opt_mode) < 0) HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't get chunk optimization option") + if(H5FD_MPIO_CHUNK_ONE_IO == chunk_opt_mode) io_option = H5D_ONE_LINK_CHUNK_IO; /*no opt*/ /* direct request to multi-chunk-io */ @@ -816,13 +818,14 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf switch (io_option) { case H5D_ONE_LINK_CHUNK_IO: case H5D_ONE_LINK_CHUNK_IO_MORE_OPT: + /* Check if there are any filters in the pipeline */ if(io_info->dset->shared->dcpl_cache.pline.nused > 0) { + /* For now, Multi-chunk IO must be forced for parallel filtered read, + * so that data can be unfiltered as it is received. There is significant + * complexity in unfiltering the data when it is read all at once into a + * single buffer. + */ if (io_info->op_type == H5D_IO_OP_READ) { - /* XXX: For now, Multi-chunk IO must be forced for parallel filtered read, - * so that data can be unfiltered as it is received. There is complexity - * in unfiltering the data when it is read all at once into a single - * buffer. - */ if(H5D__multi_chunk_filtered_collective_io(io_info, type_info, fm, dx_plist) < 0) HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple filtered chunk MPI-IO") } @@ -832,6 +835,7 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf } } else { + /* Perform unfiltered link chunk collective IO */ if(H5D__link_chunk_collective_io(io_info, type_info, fm, sum_chunk, dx_plist) < 0) HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish linked chunk MPI-IO") } @@ -839,11 +843,13 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf case H5D_MULTI_CHUNK_IO: /* direct request to do multi-chunk IO */ default: /* multiple chunk IO via threshold */ + /* Check if there are any filters in the pipeline */ if(io_info->dset->shared->dcpl_cache.pline.nused > 0) { if(H5D__multi_chunk_filtered_collective_io(io_info, type_info, fm, dx_plist) < 0) HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple filtered chunk MPI-IO") } else { + /* Perform unfiltered multi chunk collective IO */ if(H5D__multi_chunk_collective_io(io_info, type_info, fm, dx_plist) < 0) HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple chunk MPI-IO") } @@ -1437,15 +1443,15 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in if (chunk_list_num_entries) { size_t offset; - /* XXX: During the collective re-allocation of chunks in the file, the record for each - * chunk is only update in the total array, not in the local copy of chunks on each + /* During the collective re-allocation of chunks in the file, the record for each + * chunk is only updated in the collective array, not in the local copy of chunks on each * process. However, each process needs the updated chunk records so that they can create - * a MPI type for the collective write that will write to the chunk's new locations instead - * of the old ones. This ugly hack seems to be the best solution to copy the information - * back to the local array and avoid having to modify the collective write type function - * in an ugly way so that it will accept the total array instead of the local array. - * This works correctly because the array gather function guarantees that the chunk - * data in the total array is ordered in blocks by rank. + * a MPI type for the collective write that will write to the chunk's possible new locations + * in the file instead of the old ones. This ugly hack seems to be the best solution to + * copy the information back to the local array and avoid having to modify the collective + * write type function in an ugly way so that it will accept the collective array instead + * of the local array. This works correctly because the array gather function guarantees + * that the chunk data in the collective array is ordered in blocks by rank. */ for (i = 0, offset = 0; i < (size_t) mpi_rank; i++) offset += num_chunks_selected_array[i]; @@ -1499,9 +1505,6 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in HDfprintf(debug_file, "---------------------------------------\n"); #endif } /* end if */ - else { /* Filtered collective read */ - - } /* end else */ done: /* Free resources used by a process which had some selection */ @@ -1748,7 +1751,6 @@ done: * as opposed to collective IO of every chunk at once * * XXX: Update later to reflect changes in structure - * XXX: Add read operation description * * 1. Construct a list of selected chunks in the collective IO * operation @@ -1758,7 +1760,10 @@ done: * process seen that is writing the most data becomes * the new owner in the case of ties) * 2. If the operation is a read operation - * A. + * A. Loop through each chunk in the operation + * I. Read the chunk from the file + * II. Unfilter the chunk + * III. Scatter the read chunk data to the user's buffer * 3. If the operation is a write operation * A. Loop through each chunk in the operation * I. If this is not a full overwrite of the chunk @@ -1964,7 +1969,9 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i if (have_chunk_to_process) { int mpi_type_count; - /* Collect the new chunk info back to the local copy */ + /* Collect the new chunk info back to the local copy, since only the record in the + * collective array gets updated by the chunk re-allocation */ + /* XXX: offset could be wrong if a process runs out of chunks */ HDmemcpy(&chunk_list[i].new_chunk, &collective_chunk_list[mpi_rank].new_chunk, sizeof(chunk_list[i].new_chunk)); #ifdef PARALLEL_COMPRESS_DEBUG @@ -1988,6 +1995,8 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) file_type_is_derived_array[i] = TRUE; + mpi_buf_count = 1; + /* Set up the base storage address for this operation */ ctg_store.contig.dset_addr = chunk_list[i].new_chunk.offset; @@ -1996,8 +2005,10 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i */ ctg_io_info.u.wbuf = chunk_list[i].buf; } /* end if */ - - mpi_buf_count = (mem_type_is_derived_array[i] && file_type_is_derived_array[i]) ? 1 : 0; + else { + mem_type_array[i] = file_type_array[i] = MPI_BYTE; + mpi_buf_count = 0; + } /* end else */ /* Perform the I/O */ if (H5D__final_collective_io(&ctg_io_info, type_info, mpi_buf_count, &file_type_array[i], &mem_type_array[i]) < 0) @@ -2248,7 +2259,7 @@ H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2) /*------------------------------------------------------------------------- - * Function: H5D__cmp_filtered_collective_io_entry + * Function: H5D__cmp_filtered_collective_io_info_entry * * Purpose: Routine to compare filtered collective chunk io info * entries @@ -2264,17 +2275,17 @@ H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2) *------------------------------------------------------------------------- */ static int -H5D__cmp_filtered_collective_io_entry(const void *filtered_collective_io_entry1, const void *filtered_collective_io_entry2) +H5D__cmp_filtered_collective_io_info_entry(const void *filtered_collective_io_info_entry1, const void *filtered_collective_io_info_entry2) { haddr_t addr1, addr2; FUNC_ENTER_STATIC_NOERR - addr1 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_entry1)->new_chunk.offset; - addr2 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_entry2)->new_chunk.offset; + addr1 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry1)->new_chunk.offset; + addr2 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry2)->new_chunk.offset; FUNC_LEAVE_NOAPI(H5F_addr_cmp(addr1, addr2)) -} /* end H5D__cmp_filtered_collective_io_entry() */ +} /* end H5D__cmp_filtered_collective_io_info_entry() */ /*------------------------------------------------------------------------- @@ -2638,7 +2649,6 @@ done: /*------------------------------------------------------------------------- * Function: H5D__construct_filtered_io_info_list * - * XXX: Revise description * Purpose: Constructs a list of entries which contain the necessary * information for inter-process communication when performing * collective io on filtered chunks. This list is used by @@ -2658,19 +2668,19 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ const H5D_chunk_map_t *fm, H5D_filtered_collective_io_info_t **chunk_list, size_t *num_entries, size_t **_num_chunks_selected_array) { - H5D_filtered_collective_io_info_t *local_info_array = NULL; - H5D_filtered_collective_io_info_t *overlap_info_array = NULL; - H5S_sel_iter_t *mem_iter = NULL; - unsigned char *mod_data = NULL; + H5D_filtered_collective_io_info_t *local_info_array = NULL; /* The list of initially select chunks for this process */ + H5D_filtered_collective_io_info_t *overlap_info_array = NULL; /* The list of all chunks selected in the operation by all processes */ + H5S_sel_iter_t *mem_iter = NULL; /* Memory iterator for H5D__gather_mem */ + unsigned char *mod_data = NULL; /* Chunk modification data sent by a process to a chunk's owner */ H5SL_node_t *chunk_node; - MPI_Request *send_requests = NULL; - MPI_Status *send_statuses = NULL; - hbool_t no_overlap = FALSE; + MPI_Request *send_requests = NULL; /* Array of MPI_Isend chunk modification data send requests */ + MPI_Status *send_statuses = NULL; /* Array of MPI_Isend chunk modification send statuses */ + hbool_t no_overlap = FALSE; /* Whether or not the user guarantees a one-process-only-per-chunk write style */ hbool_t mem_iter_init = FALSE; - size_t num_send_requests; + size_t num_send_requests = 0; size_t num_chunks_selected; size_t overlap_info_array_num_entries; - size_t *num_chunks_selected_array = NULL; + size_t *num_chunks_selected_array = NULL; /* Array of number of chunks selected on each process */ int mpi_rank, mpi_size, mpi_code; herr_t ret_value = SUCCEED; @@ -2698,6 +2708,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ for (num_chunks_selected = 0; chunk_node; num_chunks_selected++) { H5D_chunk_info_t *chunk_info = (H5D_chunk_info_t *) H5SL_item(chunk_node); H5D_chunk_ud_t udata; + hssize_t select_npoints; /* Obtain this chunk's address */ if (H5D__chunk_lookup(io_info->dset, io_info->md_dxpl_id, chunk_info->scaled, &udata) < 0) @@ -2705,11 +2716,14 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ local_info_array[num_chunks_selected].chunk_info = *chunk_info; local_info_array[num_chunks_selected].old_chunk = local_info_array[num_chunks_selected].new_chunk = udata.chunk_block; - local_info_array[num_chunks_selected].io_size = H5S_GET_SELECT_NPOINTS(chunk_info->mspace) * type_info->src_type_size; local_info_array[num_chunks_selected].num_writers = 0; local_info_array[num_chunks_selected].owner = mpi_rank; local_info_array[num_chunks_selected].buf = NULL; + if ((select_npoints = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + local_info_array[num_chunks_selected].io_size = (size_t) select_npoints * type_info->src_type_size; + chunk_node = H5SL_next(chunk_node); } /* end for */ @@ -2746,7 +2760,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ HDfprintf(debug_file, "-----------------------------------\n\n"); #endif - /* Redistribute chunks to new owners as necessary */ + /* Redistribute shared chunks to new owners as necessary */ if (!no_overlap && (io_info->op_type == H5D_IO_OP_WRITE)) { size_t i; @@ -2759,8 +2773,8 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ /* XXX: Change minor error code */ if (H5D__mpio_array_gather(io_info, local_info_array, num_chunks_selected, sizeof(*local_info_array), (void **) &overlap_info_array, &overlap_info_array_num_entries, - H5D__cmp_filtered_collective_io_entry) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't ") + H5D__cmp_filtered_collective_io_info_entry) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't gather array") for (i = 0, num_chunks_selected = 0, num_send_requests = 0; i < overlap_info_array_num_entries;) { H5D_filtered_collective_io_info_t chunk_entry; @@ -2769,6 +2783,15 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ size_t max_bytes = 0; int new_owner = 0; + /* Set the chunk entry's file dataspace to NULL as a sentinel value. + * Any process which is contributing modifications to this chunk will + * obtain a valid file space while processing duplicates below. Any + * process which still has a NULL file space after processing all of + * the duplicate entries for a shared chunk are assumed to not be + * contributing to the chunk and so will not try to access an invalid + * dataspace when processes are sending chunk data to new owners */ + chunk_entry.chunk_info.fspace = NULL; + /* Process duplicate entries caused by another process writing * to the same chunk */ @@ -2803,14 +2826,13 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ /* New owner takes possession of the chunk */ overlap_info_array[num_chunks_selected++] = chunk_entry; } /* end if */ - else { - unsigned char *mod_data_p = NULL; - hssize_t iter_nelmts; + else if (chunk_entry.chunk_info.fspace) { + unsigned char *mod_data_p = NULL; /* Use second pointer since H5S_encode advances pointer */ + hssize_t iter_nelmts; /* Number of points to iterate over for the send operation */ size_t mod_data_size; - /* XXX: Need some way of checking chunk entry to validate that this process - * is actually contributing some data for this chunk update - */ + /* Not the new owner of this chunk, encode the file space selection and + * modification data into a buffer and send it to the new chunk owner */ /* Determine size of serialized chunk memory dataspace plus the size * of the data being written @@ -2825,10 +2847,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_entry.chunk_info.mspace)) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") - /* XXX: For now, make sure enough memory is allocated by just adding the chunk - * size - */ - mod_data_size += iter_nelmts * type_info->src_type_size; + mod_data_size += (size_t) iter_nelmts * type_info->src_type_size; #ifdef PARALLEL_COMPRESS_DEBUG HDfprintf(debug_file, "| Allocing %zd bytes for mod. data buffer.\n", (size_t) mod_data_size); @@ -2870,14 +2889,14 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ /* Send modification data to new owner */ H5_CHECK_OVERFLOW(mod_data_size, size_t, int) + H5_CHECK_OVERFLOW(chunk_entry.chunk_info.index, hsize_t, int) if (MPI_SUCCESS != (mpi_code = MPI_Isend(mod_data, (int) mod_data_size, MPI_BYTE, new_owner, - chunk_entry.chunk_info.index, io_info->comm, &send_requests[num_send_requests++]))) + (int) chunk_entry.chunk_info.index, io_info->comm, &send_requests[num_send_requests++]))) HMPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code) #ifdef PARALLEL_COMPRESS_DEBUG HDfprintf(debug_file, "| Mod. data sent.\n|\n"); #endif - if (mod_data) mod_data = (unsigned char *) H5MM_free(mod_data); if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) @@ -2918,7 +2937,8 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ if (NULL == (num_chunks_selected_array = (size_t *) H5MM_malloc((size_t) mpi_size * sizeof(*num_chunks_selected_array)))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate num chunks selected array") - if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&num_chunks_selected, 1, MPI_UNSIGNED_LONG_LONG, num_chunks_selected_array, 1, MPI_UNSIGNED_LONG_LONG, io_info->comm))) + if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&num_chunks_selected, 1, MPI_UNSIGNED_LONG_LONG, num_chunks_selected_array, + 1, MPI_UNSIGNED_LONG_LONG, io_info->comm))) HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code) #ifdef PARALLEL_COMPRESS_DEBUG @@ -2966,9 +2986,7 @@ done: * Purpose: Constructs a MPI derived datatype for both the memory and * the file for a collective write of filtered chunks. The * datatype contains the offsets in the file and the locations - * of the filtered chunk data buffers - * - * XXX: Same type may be reusable for filtered collective read + * of the filtered chunk data buffers. * * Return: Non-negative on success/Negative on failure * @@ -3011,7 +3029,7 @@ H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chun HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "memory allocation failed for collective write offset array") /* Ensure the list is sorted in ascending order of offset in the file */ - HDqsort(chunk_list, num_entries, sizeof(*chunk_list), H5D__cmp_filtered_collective_io_entry); + HDqsort(chunk_list, num_entries, sizeof(*chunk_list), H5D__cmp_filtered_collective_io_info_entry); #ifdef PARALLEL_COMPRESS_DEBUG HDfprintf(debug_file, "MPI Write type entries:\n"); @@ -3020,8 +3038,9 @@ H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chun base_buf = chunk_list[0].buf; for (i = 0; i < num_entries; i++) { - /* XXX: Revise description */ - /* Set up array position */ + /* Set up the offset in the file, the length of the chunk data, and the relative + * displacement of the chunk data write buffer + */ file_offset_array[i] = (MPI_Aint) chunk_list[i].new_chunk.offset; length_array[i] = (int) chunk_list[i].new_chunk.length; write_buf_array[i] = (MPI_Aint) chunk_list[i].buf - (MPI_Aint) base_buf; @@ -3035,7 +3054,7 @@ H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chun } HDfprintf(debug_file, "]\n|\n"); #endif - } /* end while */ + } /* end for */ #ifdef PARALLEL_COMPRESS_DEBUG HDfprintf(debug_file, "---------------------------------\n\n"); @@ -3057,18 +3076,24 @@ H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chun } /* end if */ done: - length_array = (int *) H5MM_free(length_array); - write_buf_array = (MPI_Aint *) H5MM_free(write_buf_array); - file_offset_array = (MPI_Aint *) H5MM_free(file_offset_array); + if (write_buf_array) + H5MM_free(write_buf_array); + if (file_offset_array) + H5MM_free(file_offset_array); + if (length_array) + H5MM_free(length_array); FUNC_LEAVE_NOAPI(ret_value) } /* end H5D__mpio_filtered_collective_write_type() */ /*------------------------------------------------------------------------- - * Function: H5D__filtered_collective_chunk_io + * Function: H5D__filtered_collective_chunk_entry_io * - * Purpose: XXX: description + * Purpose: Given an entry for a filtered chunk, performs the necessary + * steps for updating the chunk data during a collective + * write, or for reading the chunk from file during a + * collective read. * * Return: Non-negative on success/Negative on failure * @@ -3081,14 +3106,14 @@ static herr_t H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk_entry, const H5D_io_info_t *io_info, const H5D_type_info_t *type_info) { - H5S_sel_iter_t *mem_iter = NULL; - unsigned char *mod_data = NULL; + H5S_sel_iter_t *mem_iter = NULL; /* Memory iterator for H5D__scatter_mem/H5D__gather_mem */ + unsigned char *mod_data = NULL; /* Chunk modification data sent by a process to a chunk's owner */ unsigned filter_mask = 0; - hssize_t iter_nelmts; - hbool_t full_overwrite = FALSE; + hssize_t iter_nelmts; /* Number of points to iterate over for the chunk IO operation */ + hbool_t full_overwrite = FALSE; /* Whether this is a full overwrite of this chunk */ hbool_t mem_iter_init = FALSE; size_t buf_size; - H5S_t *dataspace = NULL; + H5S_t *dataspace = NULL; /* Other process' dataspace for the chunk */ herr_t ret_value = SUCCEED; FUNC_ENTER_STATIC @@ -3110,7 +3135,7 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk * If this is a write operation where the chunk is being fully overwritten, enough memory * must be allocated for the size of the unfiltered chunk. */ - /* XXX: Return value of macro should be checked instead */ + /* XXX: Return value of macro should be checked */ buf_size = (!full_overwrite || io_info->op_type == H5D_IO_OP_READ) ? chunk_entry->old_chunk.length : (hsize_t) H5S_GET_EXTENT_NPOINTS(chunk_entry->chunk_info.fspace) * type_info->src_type_size; chunk_entry->new_chunk.length = buf_size; @@ -3149,8 +3174,6 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk #endif } /* end if */ - /* Owner of this chunk, receive modification data from other processes */ - /* Initialize iterator for memory selection */ if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(*mem_iter)))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator") @@ -3167,7 +3190,7 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk * * If this is a write operation, update the chunk data buffer with the modifications * from the current process, then apply any modifications from other processes. Finally, - * filter the newly-update chunk. + * filter the newly-updated chunk. */ switch (io_info->op_type) { case H5D_IO_OP_READ: @@ -3187,15 +3210,12 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk /* Update the chunk data with any modifications from other processes */ while (chunk_entry->num_writers > 1) { - unsigned char *mod_data_p = NULL; - MPI_Status status; - int count; - int mpi_code; - - /* XXX: Since the receive tag needs to be an int, it is possible that a chunk's index - * may fall outside the range of an int and cause an overflow problem when casting down - * here - */ + const unsigned char *mod_data_p; /* Use second pointer since H5S_decode advances pointer */ + MPI_Status status; + int count; + int mpi_code; + + /* Probe for the incoming message from another process */ H5_CHECK_OVERFLOW(chunk_entry->chunk_info.index, hsize_t, int) if (MPI_SUCCESS != (mpi_code = MPI_Probe(MPI_ANY_SOURCE, (int) chunk_entry->chunk_info.index, io_info->comm, &status))) @@ -3213,19 +3233,19 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk HDfprintf(debug_file, "| - Message size is %d bytes.\n", count); #endif - if (NULL == (mod_data = (unsigned char *) H5MM_malloc(count))) + if (NULL == (mod_data = (unsigned char *) H5MM_malloc((size_t) count))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate modification data receive buffer") mod_data_p = mod_data; if (MPI_SUCCESS != (mpi_code = MPI_Recv(mod_data, count, MPI_BYTE, MPI_ANY_SOURCE, - chunk_entry->chunk_info.index, io_info->comm, &status))) + (int) chunk_entry->chunk_info.index, io_info->comm, &status))) HMPI_GOTO_ERROR(FAIL, "MPI_Recv failed", mpi_code) #ifdef PARALLEL_COMPRESS_DEBUG HDfprintf(debug_file, "| - Received the message.\n"); #endif - /* Decode the chunk's memory dataspace */ + /* Decode the process' chunk file dataspace */ if (NULL == (dataspace = H5S_decode(&mod_data_p))) HGOTO_ERROR(H5E_DATASET, H5E_CANTDECODE, FAIL, "unable to decode dataspace") @@ -3292,7 +3312,6 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk #endif /* Filter the chunk */ - filter_mask = 0; if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, &filter_mask, io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb, (size_t *) &chunk_entry->new_chunk.length, &buf_size, &chunk_entry->buf) < 0) |