summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJordan Henderson <jhenderson@hdfgroup.org>2017-01-26 20:03:36 (GMT)
committerJordan Henderson <jhenderson@hdfgroup.org>2017-01-26 20:03:36 (GMT)
commit344781f17d983bd38aac8c687461c0bcfcf7dacf (patch)
treea514b29c648f7bee64b86ee77219d386596c1018
parent0b6016a3fc6f7cf5b60160e638e3ae7280952106 (diff)
downloadhdf5-344781f17d983bd38aac8c687461c0bcfcf7dacf.zip
hdf5-344781f17d983bd38aac8c687461c0bcfcf7dacf.tar.gz
hdf5-344781f17d983bd38aac8c687461c0bcfcf7dacf.tar.bz2
Multiple Bug Fixes
Add comments explaining different variables Check more overflow/conversion sign change issues
-rw-r--r--src/H5Dmpio.c223
1 files changed, 121 insertions, 102 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c
index 39a5e15..453e420 100644
--- a/src/H5Dmpio.c
+++ b/src/H5Dmpio.c
@@ -101,6 +101,7 @@ typedef struct H5D_chunk_addr_info_t {
H5D_chunk_info_t chunk_info;
} H5D_chunk_addr_info_t;
+/* Information about a chunk when performing collective filtered IO */
typedef struct H5D_filtered_collective_io_info_t {
H5D_chunk_info_t chunk_info;
H5F_block_t old_chunk;
@@ -151,7 +152,7 @@ static herr_t H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info,
H5D_filtered_collective_io_info_t **chunk_list, size_t *num_entries,
size_t **_num_chunks_selected_array);
static herr_t H5D__mpio_array_gather(const H5D_io_info_t *io_info, void *local_array,
- size_t array_num_entries, size_t array_entry_size,
+ size_t local_array_num_entries, size_t array_entry_size,
void **gathered_array, size_t *gathered_array_num_entries,
int (*sort_func)(const void *, const void *));
static herr_t H5D__mpio_filtered_collective_write_type(
@@ -161,8 +162,8 @@ static herr_t H5D__mpio_filtered_collective_write_type(
static herr_t H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk_entry,
const H5D_io_info_t *io_info, const H5D_type_info_t *type_info);
static int H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2);
-static int H5D__cmp_filtered_collective_io_entry(const void *filtered_collective_io_entry1,
- const void *filtered_collective_io_entry2);
+static int H5D__cmp_filtered_collective_io_info_entry(const void *filtered_collective_io_info_entry1,
+ const void *filtered_collective_io_info_entry2);
/*********************/
@@ -350,15 +351,15 @@ done:
*/
static herr_t
H5D__mpio_array_gather(const H5D_io_info_t *io_info, void *local_array,
- size_t array_num_entries, size_t array_entry_size,
+ size_t local_array_num_entries, size_t array_entry_size,
void **_gathered_array, size_t *_gathered_array_num_entries,
int (*sort_func)(const void *, const void *))
{
size_t gathered_array_num_entries = 0;
size_t i;
void *gathered_array = NULL;
- int *receive_counts_array = NULL;
- int *displacements_array = NULL;
+ int *receive_counts_array = NULL; /* Array containing number of entries each process contributes */
+ int *displacements_array = NULL; /* Array of displacements where each process places its data in the final array */
int mpi_code, mpi_size;
int sendcount;
herr_t ret_value = SUCCEED;
@@ -373,24 +374,24 @@ H5D__mpio_array_gather(const H5D_io_info_t *io_info, void *local_array,
if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0)
HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size")
- if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&array_num_entries, &gathered_array_num_entries, 1, MPI_INT, MPI_SUM, io_info->comm)))
+ /* Determine the size of the end result array */
+ if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&local_array_num_entries, &gathered_array_num_entries, 1, MPI_INT, MPI_SUM, io_info->comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code)
if (NULL == (gathered_array = H5MM_malloc(gathered_array_num_entries * array_entry_size)))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate total gathered array")
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate gathered array")
if (NULL == (receive_counts_array = (int *) H5MM_malloc((size_t) mpi_size * sizeof(*receive_counts_array))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive counts array")
if (NULL == (displacements_array = (int *) H5MM_malloc((size_t) mpi_size * sizeof(*displacements_array))))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate displacements array")
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive displacements array")
- if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&array_num_entries, 1, MPI_INT, receive_counts_array, 1, MPI_INT, io_info->comm)))
+ /* Inform each process of how many entries each other process is contributing to the resulting array */
+ if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&local_array_num_entries, 1, MPI_INT, receive_counts_array, 1, MPI_INT, io_info->comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code)
- /* Multiply each receive count by the size of the array entry,
- * since the data is sent in a count of bytes
- */
+ /* Multiply each receive count by the size of the array entry, since the data is sent as bytes */
for (i = 0; i < (size_t) mpi_size; i++)
H5_CHECKED_ASSIGN(receive_counts_array[i], int, (size_t) receive_counts_array[i] * array_entry_size, size_t);
@@ -399,7 +400,7 @@ H5D__mpio_array_gather(const H5D_io_info_t *io_info, void *local_array,
for (i = 1; i < (size_t) mpi_size; i++)
displacements_array[i] = displacements_array[i - 1] + receive_counts_array[i - 1];
- H5_CHECKED_ASSIGN(sendcount, int, array_num_entries * array_entry_size, size_t);
+ H5_CHECKED_ASSIGN(sendcount, int, local_array_num_entries * array_entry_size, size_t);
if (MPI_SUCCESS != (mpi_code = MPI_Allgatherv(local_array, sendcount, MPI_BYTE,
gathered_array, receive_counts_array, displacements_array, MPI_BYTE, io_info->comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Allgatherv failed", mpi_code)
@@ -734,9 +735,10 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf
if(NULL == (dx_plist = (H5P_genplist_t *)H5I_object(io_info->raw_dxpl_id)))
HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a file access property list")
- /* Check the optional property list on what to do with collective chunk IO. */
+ /* Check the optional property list for the collective chunk IO optimization option */
if(H5P_get(dx_plist, H5D_XFER_MPIO_CHUNK_OPT_HARD_NAME, &chunk_opt_mode) < 0)
HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't get chunk optimization option")
+
if(H5FD_MPIO_CHUNK_ONE_IO == chunk_opt_mode)
io_option = H5D_ONE_LINK_CHUNK_IO; /*no opt*/
/* direct request to multi-chunk-io */
@@ -816,13 +818,14 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf
switch (io_option) {
case H5D_ONE_LINK_CHUNK_IO:
case H5D_ONE_LINK_CHUNK_IO_MORE_OPT:
+ /* Check if there are any filters in the pipeline */
if(io_info->dset->shared->dcpl_cache.pline.nused > 0) {
+ /* For now, Multi-chunk IO must be forced for parallel filtered read,
+ * so that data can be unfiltered as it is received. There is significant
+ * complexity in unfiltering the data when it is read all at once into a
+ * single buffer.
+ */
if (io_info->op_type == H5D_IO_OP_READ) {
- /* XXX: For now, Multi-chunk IO must be forced for parallel filtered read,
- * so that data can be unfiltered as it is received. There is complexity
- * in unfiltering the data when it is read all at once into a single
- * buffer.
- */
if(H5D__multi_chunk_filtered_collective_io(io_info, type_info, fm, dx_plist) < 0)
HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple filtered chunk MPI-IO")
}
@@ -832,6 +835,7 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf
}
}
else {
+ /* Perform unfiltered link chunk collective IO */
if(H5D__link_chunk_collective_io(io_info, type_info, fm, sum_chunk, dx_plist) < 0)
HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish linked chunk MPI-IO")
}
@@ -839,11 +843,13 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf
case H5D_MULTI_CHUNK_IO: /* direct request to do multi-chunk IO */
default: /* multiple chunk IO via threshold */
+ /* Check if there are any filters in the pipeline */
if(io_info->dset->shared->dcpl_cache.pline.nused > 0) {
if(H5D__multi_chunk_filtered_collective_io(io_info, type_info, fm, dx_plist) < 0)
HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple filtered chunk MPI-IO")
}
else {
+ /* Perform unfiltered multi chunk collective IO */
if(H5D__multi_chunk_collective_io(io_info, type_info, fm, dx_plist) < 0)
HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple chunk MPI-IO")
}
@@ -1437,15 +1443,15 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in
if (chunk_list_num_entries) {
size_t offset;
- /* XXX: During the collective re-allocation of chunks in the file, the record for each
- * chunk is only update in the total array, not in the local copy of chunks on each
+ /* During the collective re-allocation of chunks in the file, the record for each
+ * chunk is only updated in the collective array, not in the local copy of chunks on each
* process. However, each process needs the updated chunk records so that they can create
- * a MPI type for the collective write that will write to the chunk's new locations instead
- * of the old ones. This ugly hack seems to be the best solution to copy the information
- * back to the local array and avoid having to modify the collective write type function
- * in an ugly way so that it will accept the total array instead of the local array.
- * This works correctly because the array gather function guarantees that the chunk
- * data in the total array is ordered in blocks by rank.
+ * a MPI type for the collective write that will write to the chunk's possible new locations
+ * in the file instead of the old ones. This ugly hack seems to be the best solution to
+ * copy the information back to the local array and avoid having to modify the collective
+ * write type function in an ugly way so that it will accept the collective array instead
+ * of the local array. This works correctly because the array gather function guarantees
+ * that the chunk data in the collective array is ordered in blocks by rank.
*/
for (i = 0, offset = 0; i < (size_t) mpi_rank; i++)
offset += num_chunks_selected_array[i];
@@ -1499,9 +1505,6 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in
HDfprintf(debug_file, "---------------------------------------\n");
#endif
} /* end if */
- else { /* Filtered collective read */
-
- } /* end else */
done:
/* Free resources used by a process which had some selection */
@@ -1748,7 +1751,6 @@ done:
* as opposed to collective IO of every chunk at once
*
* XXX: Update later to reflect changes in structure
- * XXX: Add read operation description
*
* 1. Construct a list of selected chunks in the collective IO
* operation
@@ -1758,7 +1760,10 @@ done:
* process seen that is writing the most data becomes
* the new owner in the case of ties)
* 2. If the operation is a read operation
- * A.
+ * A. Loop through each chunk in the operation
+ * I. Read the chunk from the file
+ * II. Unfilter the chunk
+ * III. Scatter the read chunk data to the user's buffer
* 3. If the operation is a write operation
* A. Loop through each chunk in the operation
* I. If this is not a full overwrite of the chunk
@@ -1964,7 +1969,9 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
if (have_chunk_to_process) {
int mpi_type_count;
- /* Collect the new chunk info back to the local copy */
+ /* Collect the new chunk info back to the local copy, since only the record in the
+ * collective array gets updated by the chunk re-allocation */
+ /* XXX: offset could be wrong if a process runs out of chunks */
HDmemcpy(&chunk_list[i].new_chunk, &collective_chunk_list[mpi_rank].new_chunk, sizeof(chunk_list[i].new_chunk));
#ifdef PARALLEL_COMPRESS_DEBUG
@@ -1988,6 +1995,8 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
file_type_is_derived_array[i] = TRUE;
+ mpi_buf_count = 1;
+
/* Set up the base storage address for this operation */
ctg_store.contig.dset_addr = chunk_list[i].new_chunk.offset;
@@ -1996,8 +2005,10 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
*/
ctg_io_info.u.wbuf = chunk_list[i].buf;
} /* end if */
-
- mpi_buf_count = (mem_type_is_derived_array[i] && file_type_is_derived_array[i]) ? 1 : 0;
+ else {
+ mem_type_array[i] = file_type_array[i] = MPI_BYTE;
+ mpi_buf_count = 0;
+ } /* end else */
/* Perform the I/O */
if (H5D__final_collective_io(&ctg_io_info, type_info, mpi_buf_count, &file_type_array[i], &mem_type_array[i]) < 0)
@@ -2248,7 +2259,7 @@ H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2)
/*-------------------------------------------------------------------------
- * Function: H5D__cmp_filtered_collective_io_entry
+ * Function: H5D__cmp_filtered_collective_io_info_entry
*
* Purpose: Routine to compare filtered collective chunk io info
* entries
@@ -2264,17 +2275,17 @@ H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2)
*-------------------------------------------------------------------------
*/
static int
-H5D__cmp_filtered_collective_io_entry(const void *filtered_collective_io_entry1, const void *filtered_collective_io_entry2)
+H5D__cmp_filtered_collective_io_info_entry(const void *filtered_collective_io_info_entry1, const void *filtered_collective_io_info_entry2)
{
haddr_t addr1, addr2;
FUNC_ENTER_STATIC_NOERR
- addr1 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_entry1)->new_chunk.offset;
- addr2 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_entry2)->new_chunk.offset;
+ addr1 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry1)->new_chunk.offset;
+ addr2 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry2)->new_chunk.offset;
FUNC_LEAVE_NOAPI(H5F_addr_cmp(addr1, addr2))
-} /* end H5D__cmp_filtered_collective_io_entry() */
+} /* end H5D__cmp_filtered_collective_io_info_entry() */
/*-------------------------------------------------------------------------
@@ -2638,7 +2649,6 @@ done:
/*-------------------------------------------------------------------------
* Function: H5D__construct_filtered_io_info_list
*
- * XXX: Revise description
* Purpose: Constructs a list of entries which contain the necessary
* information for inter-process communication when performing
* collective io on filtered chunks. This list is used by
@@ -2658,19 +2668,19 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
const H5D_chunk_map_t *fm, H5D_filtered_collective_io_info_t **chunk_list, size_t *num_entries,
size_t **_num_chunks_selected_array)
{
- H5D_filtered_collective_io_info_t *local_info_array = NULL;
- H5D_filtered_collective_io_info_t *overlap_info_array = NULL;
- H5S_sel_iter_t *mem_iter = NULL;
- unsigned char *mod_data = NULL;
+ H5D_filtered_collective_io_info_t *local_info_array = NULL; /* The list of initially select chunks for this process */
+ H5D_filtered_collective_io_info_t *overlap_info_array = NULL; /* The list of all chunks selected in the operation by all processes */
+ H5S_sel_iter_t *mem_iter = NULL; /* Memory iterator for H5D__gather_mem */
+ unsigned char *mod_data = NULL; /* Chunk modification data sent by a process to a chunk's owner */
H5SL_node_t *chunk_node;
- MPI_Request *send_requests = NULL;
- MPI_Status *send_statuses = NULL;
- hbool_t no_overlap = FALSE;
+ MPI_Request *send_requests = NULL; /* Array of MPI_Isend chunk modification data send requests */
+ MPI_Status *send_statuses = NULL; /* Array of MPI_Isend chunk modification send statuses */
+ hbool_t no_overlap = FALSE; /* Whether or not the user guarantees a one-process-only-per-chunk write style */
hbool_t mem_iter_init = FALSE;
- size_t num_send_requests;
+ size_t num_send_requests = 0;
size_t num_chunks_selected;
size_t overlap_info_array_num_entries;
- size_t *num_chunks_selected_array = NULL;
+ size_t *num_chunks_selected_array = NULL; /* Array of number of chunks selected on each process */
int mpi_rank, mpi_size, mpi_code;
herr_t ret_value = SUCCEED;
@@ -2698,6 +2708,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
for (num_chunks_selected = 0; chunk_node; num_chunks_selected++) {
H5D_chunk_info_t *chunk_info = (H5D_chunk_info_t *) H5SL_item(chunk_node);
H5D_chunk_ud_t udata;
+ hssize_t select_npoints;
/* Obtain this chunk's address */
if (H5D__chunk_lookup(io_info->dset, io_info->md_dxpl_id, chunk_info->scaled, &udata) < 0)
@@ -2705,11 +2716,14 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
local_info_array[num_chunks_selected].chunk_info = *chunk_info;
local_info_array[num_chunks_selected].old_chunk = local_info_array[num_chunks_selected].new_chunk = udata.chunk_block;
- local_info_array[num_chunks_selected].io_size = H5S_GET_SELECT_NPOINTS(chunk_info->mspace) * type_info->src_type_size;
local_info_array[num_chunks_selected].num_writers = 0;
local_info_array[num_chunks_selected].owner = mpi_rank;
local_info_array[num_chunks_selected].buf = NULL;
+ if ((select_npoints = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
+ local_info_array[num_chunks_selected].io_size = (size_t) select_npoints * type_info->src_type_size;
+
chunk_node = H5SL_next(chunk_node);
} /* end for */
@@ -2746,7 +2760,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
HDfprintf(debug_file, "-----------------------------------\n\n");
#endif
- /* Redistribute chunks to new owners as necessary */
+ /* Redistribute shared chunks to new owners as necessary */
if (!no_overlap && (io_info->op_type == H5D_IO_OP_WRITE)) {
size_t i;
@@ -2759,8 +2773,8 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
/* XXX: Change minor error code */
if (H5D__mpio_array_gather(io_info, local_info_array, num_chunks_selected,
sizeof(*local_info_array), (void **) &overlap_info_array, &overlap_info_array_num_entries,
- H5D__cmp_filtered_collective_io_entry) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't ")
+ H5D__cmp_filtered_collective_io_info_entry) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't gather array")
for (i = 0, num_chunks_selected = 0, num_send_requests = 0; i < overlap_info_array_num_entries;) {
H5D_filtered_collective_io_info_t chunk_entry;
@@ -2769,6 +2783,15 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
size_t max_bytes = 0;
int new_owner = 0;
+ /* Set the chunk entry's file dataspace to NULL as a sentinel value.
+ * Any process which is contributing modifications to this chunk will
+ * obtain a valid file space while processing duplicates below. Any
+ * process which still has a NULL file space after processing all of
+ * the duplicate entries for a shared chunk are assumed to not be
+ * contributing to the chunk and so will not try to access an invalid
+ * dataspace when processes are sending chunk data to new owners */
+ chunk_entry.chunk_info.fspace = NULL;
+
/* Process duplicate entries caused by another process writing
* to the same chunk
*/
@@ -2803,14 +2826,13 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
/* New owner takes possession of the chunk */
overlap_info_array[num_chunks_selected++] = chunk_entry;
} /* end if */
- else {
- unsigned char *mod_data_p = NULL;
- hssize_t iter_nelmts;
+ else if (chunk_entry.chunk_info.fspace) {
+ unsigned char *mod_data_p = NULL; /* Use second pointer since H5S_encode advances pointer */
+ hssize_t iter_nelmts; /* Number of points to iterate over for the send operation */
size_t mod_data_size;
- /* XXX: Need some way of checking chunk entry to validate that this process
- * is actually contributing some data for this chunk update
- */
+ /* Not the new owner of this chunk, encode the file space selection and
+ * modification data into a buffer and send it to the new chunk owner */
/* Determine size of serialized chunk memory dataspace plus the size
* of the data being written
@@ -2825,10 +2847,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_entry.chunk_info.mspace)) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
- /* XXX: For now, make sure enough memory is allocated by just adding the chunk
- * size
- */
- mod_data_size += iter_nelmts * type_info->src_type_size;
+ mod_data_size += (size_t) iter_nelmts * type_info->src_type_size;
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, "| Allocing %zd bytes for mod. data buffer.\n", (size_t) mod_data_size);
@@ -2870,14 +2889,14 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
/* Send modification data to new owner */
H5_CHECK_OVERFLOW(mod_data_size, size_t, int)
+ H5_CHECK_OVERFLOW(chunk_entry.chunk_info.index, hsize_t, int)
if (MPI_SUCCESS != (mpi_code = MPI_Isend(mod_data, (int) mod_data_size, MPI_BYTE, new_owner,
- chunk_entry.chunk_info.index, io_info->comm, &send_requests[num_send_requests++])))
+ (int) chunk_entry.chunk_info.index, io_info->comm, &send_requests[num_send_requests++])))
HMPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code)
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, "| Mod. data sent.\n|\n");
#endif
-
if (mod_data)
mod_data = (unsigned char *) H5MM_free(mod_data);
if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
@@ -2918,7 +2937,8 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
if (NULL == (num_chunks_selected_array = (size_t *) H5MM_malloc((size_t) mpi_size * sizeof(*num_chunks_selected_array))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate num chunks selected array")
- if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&num_chunks_selected, 1, MPI_UNSIGNED_LONG_LONG, num_chunks_selected_array, 1, MPI_UNSIGNED_LONG_LONG, io_info->comm)))
+ if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&num_chunks_selected, 1, MPI_UNSIGNED_LONG_LONG, num_chunks_selected_array,
+ 1, MPI_UNSIGNED_LONG_LONG, io_info->comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code)
#ifdef PARALLEL_COMPRESS_DEBUG
@@ -2966,9 +2986,7 @@ done:
* Purpose: Constructs a MPI derived datatype for both the memory and
* the file for a collective write of filtered chunks. The
* datatype contains the offsets in the file and the locations
- * of the filtered chunk data buffers
- *
- * XXX: Same type may be reusable for filtered collective read
+ * of the filtered chunk data buffers.
*
* Return: Non-negative on success/Negative on failure
*
@@ -3011,7 +3029,7 @@ H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chun
HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "memory allocation failed for collective write offset array")
/* Ensure the list is sorted in ascending order of offset in the file */
- HDqsort(chunk_list, num_entries, sizeof(*chunk_list), H5D__cmp_filtered_collective_io_entry);
+ HDqsort(chunk_list, num_entries, sizeof(*chunk_list), H5D__cmp_filtered_collective_io_info_entry);
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, "MPI Write type entries:\n");
@@ -3020,8 +3038,9 @@ H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chun
base_buf = chunk_list[0].buf;
for (i = 0; i < num_entries; i++) {
- /* XXX: Revise description */
- /* Set up array position */
+ /* Set up the offset in the file, the length of the chunk data, and the relative
+ * displacement of the chunk data write buffer
+ */
file_offset_array[i] = (MPI_Aint) chunk_list[i].new_chunk.offset;
length_array[i] = (int) chunk_list[i].new_chunk.length;
write_buf_array[i] = (MPI_Aint) chunk_list[i].buf - (MPI_Aint) base_buf;
@@ -3035,7 +3054,7 @@ H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chun
}
HDfprintf(debug_file, "]\n|\n");
#endif
- } /* end while */
+ } /* end for */
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, "---------------------------------\n\n");
@@ -3057,18 +3076,24 @@ H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chun
} /* end if */
done:
- length_array = (int *) H5MM_free(length_array);
- write_buf_array = (MPI_Aint *) H5MM_free(write_buf_array);
- file_offset_array = (MPI_Aint *) H5MM_free(file_offset_array);
+ if (write_buf_array)
+ H5MM_free(write_buf_array);
+ if (file_offset_array)
+ H5MM_free(file_offset_array);
+ if (length_array)
+ H5MM_free(length_array);
FUNC_LEAVE_NOAPI(ret_value)
} /* end H5D__mpio_filtered_collective_write_type() */
/*-------------------------------------------------------------------------
- * Function: H5D__filtered_collective_chunk_io
+ * Function: H5D__filtered_collective_chunk_entry_io
*
- * Purpose: XXX: description
+ * Purpose: Given an entry for a filtered chunk, performs the necessary
+ * steps for updating the chunk data during a collective
+ * write, or for reading the chunk from file during a
+ * collective read.
*
* Return: Non-negative on success/Negative on failure
*
@@ -3081,14 +3106,14 @@ static herr_t
H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk_entry,
const H5D_io_info_t *io_info, const H5D_type_info_t *type_info)
{
- H5S_sel_iter_t *mem_iter = NULL;
- unsigned char *mod_data = NULL;
+ H5S_sel_iter_t *mem_iter = NULL; /* Memory iterator for H5D__scatter_mem/H5D__gather_mem */
+ unsigned char *mod_data = NULL; /* Chunk modification data sent by a process to a chunk's owner */
unsigned filter_mask = 0;
- hssize_t iter_nelmts;
- hbool_t full_overwrite = FALSE;
+ hssize_t iter_nelmts; /* Number of points to iterate over for the chunk IO operation */
+ hbool_t full_overwrite = FALSE; /* Whether this is a full overwrite of this chunk */
hbool_t mem_iter_init = FALSE;
size_t buf_size;
- H5S_t *dataspace = NULL;
+ H5S_t *dataspace = NULL; /* Other process' dataspace for the chunk */
herr_t ret_value = SUCCEED;
FUNC_ENTER_STATIC
@@ -3110,7 +3135,7 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
* If this is a write operation where the chunk is being fully overwritten, enough memory
* must be allocated for the size of the unfiltered chunk.
*/
- /* XXX: Return value of macro should be checked instead */
+ /* XXX: Return value of macro should be checked */
buf_size = (!full_overwrite || io_info->op_type == H5D_IO_OP_READ) ? chunk_entry->old_chunk.length
: (hsize_t) H5S_GET_EXTENT_NPOINTS(chunk_entry->chunk_info.fspace) * type_info->src_type_size;
chunk_entry->new_chunk.length = buf_size;
@@ -3149,8 +3174,6 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
#endif
} /* end if */
- /* Owner of this chunk, receive modification data from other processes */
-
/* Initialize iterator for memory selection */
if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(*mem_iter))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator")
@@ -3167,7 +3190,7 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
*
* If this is a write operation, update the chunk data buffer with the modifications
* from the current process, then apply any modifications from other processes. Finally,
- * filter the newly-update chunk.
+ * filter the newly-updated chunk.
*/
switch (io_info->op_type) {
case H5D_IO_OP_READ:
@@ -3187,15 +3210,12 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
/* Update the chunk data with any modifications from other processes */
while (chunk_entry->num_writers > 1) {
- unsigned char *mod_data_p = NULL;
- MPI_Status status;
- int count;
- int mpi_code;
-
- /* XXX: Since the receive tag needs to be an int, it is possible that a chunk's index
- * may fall outside the range of an int and cause an overflow problem when casting down
- * here
- */
+ const unsigned char *mod_data_p; /* Use second pointer since H5S_decode advances pointer */
+ MPI_Status status;
+ int count;
+ int mpi_code;
+
+ /* Probe for the incoming message from another process */
H5_CHECK_OVERFLOW(chunk_entry->chunk_info.index, hsize_t, int)
if (MPI_SUCCESS != (mpi_code = MPI_Probe(MPI_ANY_SOURCE, (int) chunk_entry->chunk_info.index,
io_info->comm, &status)))
@@ -3213,19 +3233,19 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
HDfprintf(debug_file, "| - Message size is %d bytes.\n", count);
#endif
- if (NULL == (mod_data = (unsigned char *) H5MM_malloc(count)))
+ if (NULL == (mod_data = (unsigned char *) H5MM_malloc((size_t) count)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate modification data receive buffer")
mod_data_p = mod_data;
if (MPI_SUCCESS != (mpi_code = MPI_Recv(mod_data, count, MPI_BYTE, MPI_ANY_SOURCE,
- chunk_entry->chunk_info.index, io_info->comm, &status)))
+ (int) chunk_entry->chunk_info.index, io_info->comm, &status)))
HMPI_GOTO_ERROR(FAIL, "MPI_Recv failed", mpi_code)
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, "| - Received the message.\n");
#endif
- /* Decode the chunk's memory dataspace */
+ /* Decode the process' chunk file dataspace */
if (NULL == (dataspace = H5S_decode(&mod_data_p)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTDECODE, FAIL, "unable to decode dataspace")
@@ -3292,7 +3312,6 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
#endif
/* Filter the chunk */
- filter_mask = 0;
if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, &filter_mask,
io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb,
(size_t *) &chunk_entry->new_chunk.length, &buf_size, &chunk_entry->buf) < 0)