summaryrefslogtreecommitdiffstats
path: root/src/H5Dmpio.c
diff options
context:
space:
mode:
authorJordan Henderson <jhenderson@hdfgroup.org>2017-01-19 17:54:07 (GMT)
committerJordan Henderson <jhenderson@hdfgroup.org>2017-01-19 17:54:07 (GMT)
commit740e85a82dc01ce6ea62d86bef557e31012a0695 (patch)
treefa686af21e134a04bc8eec63e4879c29b534f689 /src/H5Dmpio.c
parent2c8bddb4ab08427baafd2d205e6157a163da1fd5 (diff)
downloadhdf5-740e85a82dc01ce6ea62d86bef557e31012a0695.zip
hdf5-740e85a82dc01ce6ea62d86bef557e31012a0695.tar.gz
hdf5-740e85a82dc01ce6ea62d86bef557e31012a0695.tar.bz2
Code refactoring
Modify single chunk entry function to handle both read and write cases Store array of MPI derived types in Multi-chunk IO so that all freeing can be done at end instead of during processing Add read support for Multi-chunk IO only currently
Diffstat (limited to 'src/H5Dmpio.c')
-rw-r--r--src/H5Dmpio.c354
1 files changed, 197 insertions, 157 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c
index b223203..6d97cb1 100644
--- a/src/H5Dmpio.c
+++ b/src/H5Dmpio.c
@@ -158,9 +158,8 @@ static herr_t H5D__mpio_filtered_collective_write_type(
H5D_filtered_collective_io_info_t *chunk_list, size_t num_entries,
MPI_Datatype *new_mem_type, hbool_t *mem_type_derived,
MPI_Datatype *new_file_type, hbool_t *file_type_derived);
-static herr_t H5D__update_filtered_collective_chunk_entry(
- H5D_filtered_collective_io_info_t *chunk_entry, const H5D_io_info_t *io_info,
- const H5D_type_info_t *type_info);
+static herr_t H5D__filtered_collective_chunk_io(H5D_filtered_collective_io_info_t *chunk_entry,
+ const H5D_io_info_t *io_info, const H5D_type_info_t *type_info);
static int H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2);
static int H5D__cmp_filtered_collective_io_entry(const void *filtered_collective_io_entry1,
const void *filtered_collective_io_entry2);
@@ -390,10 +389,10 @@ H5D__mpio_array_gather(const H5D_io_info_t *io_info, void *local_array,
HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code)
/* Multiply each receive count by the size of the array entry,
- * since the data is sent in a count of bytes */
- /* XXX: Check to make sure array_entry_size doesn't overflow an int */
+ * since the data is sent in a count of bytes
+ */
for (i = 0; i < (size_t) mpi_size; i++)
- receive_counts_array[i] *= array_entry_size;
+ H5_CHECKED_ASSIGN(receive_counts_array[i], int, (size_t) receive_counts_array[i] * array_entry_size, size_t);
/* Set receive buffer offsets for MPI_Allgatherv */
displacements_array[0] = 0;
@@ -572,9 +571,6 @@ H5D__mpio_get_sum_chunk(const H5D_io_info_t *io_info, const H5D_chunk_map_t *fm,
H5_CHECKED_ASSIGN(num_chunkf, int, ori_num_chunkf, size_t);
/* Determine the summation of number of chunks for all processes */
- /* XXX: In the case that more than one process is writing to the
- * same chunk, the summation of H5SL_count calls is incorrect. - JTH
- */
if(MPI_SUCCESS != (mpi_code = MPI_Allreduce(&num_chunkf, sum_chunkf, 1, MPI_INT, MPI_SUM, io_info->comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code)
@@ -1305,18 +1301,18 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in
H5D_chunk_map_t *fm, H5P_genplist_t *dx_plist)
{
H5D_filtered_collective_io_info_t *chunk_list = NULL; /* The list of chunks being read/written */
- H5D_filtered_collective_io_info_t *collective_chunk_list = NULL;
- H5D_mpio_actual_chunk_opt_mode_t actual_chunk_opt_mode = H5D_MPIO_LINK_CHUNK;
- H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_CHUNK_COLLECTIVE;
- H5D_storage_t ctg_store;
+ H5D_filtered_collective_io_info_t *collective_chunk_list = NULL; /* The list of chunks used during collective operations */
+ H5D_mpio_actual_chunk_opt_mode_t actual_chunk_opt_mode = H5D_MPIO_LINK_CHUNK; /* The actual chunk IO optimization mode */
+ H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_CHUNK_COLLECTIVE; /* The chunk IO mode used (Independent vs Collective) */
+ H5D_storage_t ctg_store; /* Chunk storage information as contiguous dataset */
MPI_Datatype mem_type = MPI_BYTE;
MPI_Datatype file_type = MPI_BYTE;
hbool_t mem_type_is_derived = FALSE;
hbool_t file_type_is_derived = FALSE;
size_t chunk_list_num_entries;
size_t collective_chunk_list_num_entries;
- size_t *num_chunks_selected_array = NULL;
- size_t i;
+ size_t *num_chunks_selected_array = NULL; /* Array of number of chunks selected on each process */
+ size_t i; /* Local index variable */
int mpi_rank, mpi_size, mpi_code;
herr_t ret_value = SUCCEED;
@@ -1333,7 +1329,8 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in
HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual chunk opt mode property")
/* Set the actual-io-mode property.
- * Link chunk I/O does not break to independent, so can set right away */
+ * Link chunk filtered I/O does not break to independent, so can set right away
+ */
if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_IO_MODE_NAME, &actual_io_mode) < 0)
HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual io mode property")
@@ -1377,18 +1374,20 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in
/* Iterate through all the chunks in the collective write operation,
* updating each chunk with the data modifications from other processes,
- * then re-filtering the chunk. */
+ * then re-filtering the chunk.
+ */
/* XXX: Not sure about minor error code */
for (i = 0; i < chunk_list_num_entries; i++)
- if (H5D__update_filtered_collective_chunk_entry(&chunk_list[i], io_info, type_info) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't update filtered chunk entry")
+ if (H5D__filtered_collective_chunk_io(&chunk_list[i], io_info, type_info) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't process chunk entry")
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, "---------------------------------------------------\n\n");
#endif
/* Gather the new chunk sizes to all processes for a collective reallocation
- * of the chunks in the file */
+ * of the chunks in the file.
+ */
/* XXX: change minor error code */
if (H5D__mpio_array_gather(io_info, chunk_list, chunk_list_num_entries, sizeof(*chunk_list),
(void **) &collective_chunk_list, &collective_chunk_list_num_entries, NULL) < 0)
@@ -1422,7 +1421,8 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in
/* If this process has any chunks selected, create a MPI type for collectively
* writing out the chunks to file. Otherwise, the process contributes to the
- * collective write with a none type. */
+ * collective write with a none type.
+ */
if (chunk_list_num_entries) {
size_t offset;
@@ -1436,13 +1436,10 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in
* This works correctly because the array gather function guarantees that the chunk
* data in the total array is ordered in blocks by rank.
*/
- /* XXX: No need to use bytes here, should be able to simply find offset in
- * terms of H5D_filtered_collective_io_info_t's */
- offset = 0;
- for (i = 0; i < (size_t) mpi_rank; i++)
- offset += num_chunks_selected_array[i] * sizeof(H5D_filtered_collective_io_info_t);
+ for (i = 0, offset = 0; i < (size_t) mpi_rank; i++)
+ offset += num_chunks_selected_array[i];
- HDmemcpy(chunk_list, &((char *) collective_chunk_list)[offset], num_chunks_selected_array[mpi_rank] * sizeof(H5D_filtered_collective_io_info_t));
+ HDmemcpy(chunk_list, &collective_chunk_list[offset], num_chunks_selected_array[mpi_rank] * sizeof(H5D_filtered_collective_io_info_t));
/* Create single MPI type encompassing each selection in the dataspace */
/* XXX: change minor error code */
@@ -1451,7 +1448,8 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in
HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't create MPI link chunk I/O type")
/* Override the write buffer to point to the address of the first
- * chunk data buffer */
+ * chunk data buffer
+ */
io_info->u.wbuf = chunk_list[0].buf;
} /* end if */
@@ -1467,7 +1465,8 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in
HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish MPI-IO")
/* Participate in the collective re-insertion of all chunks modified
- * in this iteration into the chunk index */
+ * in this iteration into the chunk index
+ */
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, "Reinserting chunks into chunk index.\n");
HDfprintf(debug_file, "---------------------------------------\n");
@@ -1747,7 +1746,9 @@ done:
* chunk will take ownership of the chunk (the first
* process seen that is writing the most data becomes
* the new owner in the case of ties)
- * 2. If the operation is a write operation
+ * 2. If the operation is a read operation
+ * A.
+ * 3. If the operation is a write operation
* A. Loop through each chunk in the operation
* I. If this is not a full overwrite of the chunk
* a) Read the chunk from file and pass the chunk
@@ -1773,8 +1774,6 @@ done:
* V. All processes collectively re-insert each
* chunk from the gathered array into the chunk
* index
- * 3. If the operation is a read operation
- * I.
*
* Return: Non-negative on success/Negative on failure
*
@@ -1788,19 +1787,19 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
H5D_chunk_map_t *fm, H5P_genplist_t *dx_plist)
{
H5D_filtered_collective_io_info_t *chunk_list = NULL; /* The list of chunks being read/written */
- H5D_filtered_collective_io_info_t *collective_chunk_list = NULL;
- H5D_mpio_actual_chunk_opt_mode_t actual_chunk_opt_mode = H5D_MPIO_MULTI_CHUNK; /* actual chunk optimization mode */
- H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_CHUNK_COLLECTIVE; /* Local variable for tracking the I/O mode used. */
+ H5D_filtered_collective_io_info_t *collective_chunk_list = NULL; /* The list of chunks used during collective operations */
+ H5D_mpio_actual_chunk_opt_mode_t actual_chunk_opt_mode = H5D_MPIO_MULTI_CHUNK; /* The actual chunk IO optimization mode */
+ H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_CHUNK_COLLECTIVE; /* The chunk IO mode used (Independent vs Collective) */
H5D_storage_t store; /* union of EFL and chunk pointer in file space */
H5D_io_info_t ctg_io_info; /* Contiguous I/O info object */
H5D_storage_t ctg_store; /* Chunk storage information as contiguous dataset */
- MPI_Datatype file_type = MPI_BYTE;
- MPI_Datatype mem_type = MPI_BYTE;
- hbool_t file_type_is_derived = FALSE;
- hbool_t mem_type_is_derived = FALSE;
+ MPI_Datatype *file_type_array = NULL;
+ MPI_Datatype *mem_type_array = NULL;
+ hbool_t *file_type_is_derived_array = NULL;
+ hbool_t *mem_type_is_derived_array = NULL;
size_t chunk_list_num_entries;
size_t collective_chunk_list_num_entries;
- size_t *num_chunks_selected_array = NULL;
+ size_t *num_chunks_selected_array = NULL; /* Array of number of chunks selected on each process */
size_t i, j; /* Local index variable */
int mpi_rank, mpi_size, mpi_code;
herr_t ret_value = SUCCEED;
@@ -1817,8 +1816,9 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_CHUNK_OPT_MODE_NAME, &actual_chunk_opt_mode) < 0)
HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual chunk opt mode property")
- /* Set the actual_io_mode property. Filtered collective writes can't break
- * to independent, so set actual_io_mode right away */
+ /* Set the actual_io_mode property.
+ * Multi chunk I/O does not break to independent, so can set right away
+ */
if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_IO_MODE_NAME, &actual_io_mode) < 0)
HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual chunk io mode property")
@@ -1839,7 +1839,13 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
/* Set dataset storage for I/O info */
io_info->store = &store;
- if (io_info->op_type == H5D_IO_OP_WRITE) { /* Filtered collective write */
+ if (io_info->op_type == H5D_IO_OP_READ) { /* Filtered collective read */
+ /* XXX: Change minor error code */
+ for (i = 0; i < chunk_list_num_entries; i++)
+ if (H5D__filtered_collective_chunk_io(&chunk_list[i], io_info, type_info) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't process chunk entry")
+ } /* end if */
+ else { /* Filtered collective write */
H5D_chk_idx_info_t index_info;
H5D_chunk_ud_t udata;
size_t max_num_chunks;
@@ -1870,24 +1876,42 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
HDfprintf(debug_file, "---------------------------------------------------\n");
#endif
+ /* Retrieve the maximum number of chunks being written among all processes */
if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&chunk_list_num_entries, &max_num_chunks,
1, MPI_UNSIGNED_LONG_LONG, MPI_MAX, io_info->comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code)
- /* XXX: Iteration should be for the max number among processes, since a process could
- * have no chunks assigned to it */
+ /* Allocate arrays for storing MPI file and mem types and whether or not the
+ * types were derived.
+ */
+ if (NULL == (file_type_array = (MPI_Datatype *) H5MM_malloc(max_num_chunks * sizeof(*file_type_array))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate file type array")
+
+ if (NULL == (file_type_is_derived_array = (hbool_t *) H5MM_calloc(max_num_chunks * sizeof(*file_type_is_derived_array))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate file type is derived array")
+
+ if (NULL == (mem_type_array = (MPI_Datatype *) H5MM_malloc(max_num_chunks * sizeof(*mem_type_array))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate mem type array")
+
+ if (NULL == (mem_type_is_derived_array = (hbool_t *) H5MM_calloc(max_num_chunks * sizeof(*mem_type_is_derived_array))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate mem type is derived array")
+
+ /* Iterate over the max number of chunks among all processes, as this process could
+ * have no chunks left to work on, but it still needs to participate in the collective
+ * re-allocation and re-insertion of chunks modified by other processes.
+ */
for (i = 0; i < max_num_chunks; i++) {
/* Check if this process has a chunk to work on for this iteration */
hbool_t have_chunk_to_process = i < chunk_list_num_entries;
/* XXX: Not sure about minor error code */
if (have_chunk_to_process)
- if (H5D__update_filtered_collective_chunk_entry(&chunk_list[i], io_info, type_info) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't update filtered chunk entry")
+ if (H5D__filtered_collective_chunk_io(&chunk_list[i], io_info, type_info) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't process chunk entry")
- /* Participate in the collective re-allocation of all chunks modified
- * in this iteration. Gather the new chunk sizes to all processes for
- * the collective re-allocation. */
+ /* Gather the new chunk sizes to all processes for a collective re-allocation
+ * of the chunks in the file
+ */
/* XXX: May access unavailable memory on processes with no selection */
/* XXX: change minor error code */
if (H5D__mpio_array_gather(io_info, &chunk_list[i], have_chunk_to_process ? 1 : 0, sizeof(*chunk_list),
@@ -1899,16 +1923,17 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
HDfprintf(debug_file, "------------------------------\n");
#endif
- /* Collectively re-allocate the modified chunks (from each process) in the file */
+ /* Participate in the collective re-allocation of all chunks modified
+ * in this iteration.
+ */
for (j = 0; j < collective_chunk_list_num_entries; j++) {
hbool_t insert = FALSE;
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, "| Re-allocing chunk at address %a with new length of %llu bytes.\n",
- collective_chunk_list[j].new_chunk.offset, collective_chunk_list[j].new_chunk.length);
+ collective_chunk_list[j].new_chunk.offset, collective_chunk_list[j].new_chunk.length);
#endif
- /* Collectively re-allocate the chunk in the file */
if (H5D__chunk_file_alloc(&index_info, &collective_chunk_list[j].old_chunk, &collective_chunk_list[j].new_chunk,
&insert, chunk_list[j].chunk_info.scaled) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk")
@@ -1923,7 +1948,8 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
#endif
/* If this process has a chunk to work on, create a MPI type for the
- * memory and file for writing out the chunk */
+ * memory and file for writing out the chunk
+ */
if (have_chunk_to_process) {
int mpi_type_count;
@@ -1938,35 +1964,37 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
H5_CHECKED_ASSIGN(mpi_type_count, int, chunk_list[i].new_chunk.length, hsize_t);
/* Create MPI memory type for writing to chunk */
- if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(mpi_type_count, MPI_BYTE, &mem_type)))
+ if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(mpi_type_count, MPI_BYTE, &mem_type_array[i])))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_contiguous failed", mpi_code)
- if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&mem_type)))
+ if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&mem_type_array[i])))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
- mem_type_is_derived = TRUE;
+ mem_type_is_derived_array[i] = TRUE;
/* Create MPI file type for writing to chunk */
- if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(mpi_type_count, MPI_BYTE, &file_type)))
+ if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(mpi_type_count, MPI_BYTE, &file_type_array[i])))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_contiguous failed", mpi_code)
- if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&file_type)))
+ if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&file_type_array[i])))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
- file_type_is_derived = TRUE;
+ file_type_is_derived_array[i] = TRUE;
/* Set up the base storage address for this operation */
ctg_store.contig.dset_addr = chunk_list[i].new_chunk.offset;
/* Override the write buffer to point to the address of the
- * chunk data buffer */
+ * chunk data buffer
+ */
ctg_io_info.u.wbuf = chunk_list[i].buf;
} /* end if */
- mpi_buf_count = (mem_type_is_derived && file_type_is_derived) ? 1 : 0;
+ mpi_buf_count = (mem_type_is_derived_array[i] && file_type_is_derived_array[i]) ? 1 : 0;
/* Perform the I/O */
- if (H5D__final_collective_io(&ctg_io_info, type_info, mpi_buf_count, &file_type, &mem_type) < 0)
+ if (H5D__final_collective_io(&ctg_io_info, type_info, mpi_buf_count, &file_type_array[i], &mem_type_array[i]) < 0)
HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish MPI-IO")
/* Participate in the collective re-insertion of all chunks modified
- * in this iteration into the chunk index */
+ * in this iteration into the chunk index
+ */
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, "Reinserting chunks into chunk index.\n");
HDfprintf(debug_file, "---------------------------------------\n");
@@ -1988,61 +2016,23 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
HDfprintf(debug_file, "---------------------------------------\n");
#endif
- /* Free the MPI memory and file type, if they were derived */
- /* XXX: For performance, collect each type into an array and free at end */
- if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type)))
- HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
- if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type)))
- HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
-
- mem_type_is_derived = FALSE;
- file_type_is_derived = FALSE;
-
if (collective_chunk_list)
collective_chunk_list = (H5D_filtered_collective_io_info_t *) H5MM_free(collective_chunk_list);
} /* end for */
- } /* end if */
- else { /* Filtered collective read */
- unsigned filter_mask = 0;
- size_t buf_size;
-
- for (i = 0; i < chunk_list_num_entries; i++) {
- buf_size = chunk_list[i].old_chunk.length;
-
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "Allocing %zd bytes for chunk read buffer.\n", buf_size);
-#endif
-
- if (NULL == (chunk_list[i].buf = H5MM_malloc(buf_size)))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk read buffer")
-
- /* Read the chunk from the file */
- if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_list[i].old_chunk.offset,
- buf_size, H5AC_rawdata_dxpl_id, chunk_list[i].buf) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't read raw data chunk")
- /* Unfilter the chunk before modifying it */
- if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, &filter_mask,
- io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb,
- (size_t *) &chunk_list[i].old_chunk.length, &buf_size, &chunk_list[i].buf) < 0)
- HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "couldn't unfilter chunk for modifying")
-
-#ifdef PARALLEL_COMPRESS_DEBUG
- for (size_t k = 0; k < chunk_list[i].old_chunk.length / type_info->src_type_size; k++)
- HDfprintf(debug_file, "Read buf entry %d is %lld.\n", k, ((long *) chunk_list[i].buf)[k]);
-#endif
-
- /* Scatter the unfiltered chunk data into the user's read buffer */
+ /* Free the MPI file and memory types, if they were derived */
+ for (i = 0; i < max_num_chunks; i++) {
+ if (file_type_is_derived_array[i])
+ if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type_array[i])))
+ HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
+ if (mem_type_is_derived_array[i])
+ if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type_array[i])))
+ HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
} /* end for */
} /* end else */
done:
- if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type)))
- HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
- if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type)))
- HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
-
if (chunk_list) {
for (i = 0; i < chunk_list_num_entries; i++)
if (chunk_list[i].buf)
@@ -2051,10 +2041,18 @@ done:
H5MM_free(chunk_list);
}
- if (num_chunks_selected_array)
- H5MM_free(num_chunks_selected_array);
if (collective_chunk_list)
H5MM_free(collective_chunk_list);
+ if (file_type_array)
+ H5MM_free(file_type_array);
+ if (mem_type_array)
+ H5MM_free(mem_type_array);
+ if (file_type_is_derived_array)
+ H5MM_free(file_type_is_derived_array);
+ if (mem_type_is_derived_array)
+ H5MM_free(mem_type_is_derived_array);
+ if (num_chunks_selected_array)
+ H5MM_free(num_chunks_selected_array);
FUNC_LEAVE_NOAPI(ret_value)
} /* end H5D__multi_chunk_filtered_collective_io() */
@@ -2752,17 +2750,20 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
int new_owner = 0;
/* Process duplicate entries caused by another process writing
- * to the same chunk */
+ * to the same chunk
+ */
do {
/* Store the correct chunk entry information in case this process
* becomes the new chunk's owner. The chunk entry that this process
* contributed will be the only one with a valid dataspace selection
- * on this particular process */
+ * on this particular process
+ */
if (mpi_rank == overlap_info_array[i].owner)
chunk_entry = overlap_info_array[i];
/* New owner of the chunk is determined by the process
- * which is writing the most data to the chunk */
+ * which is writing the most data to the chunk
+ */
if (overlap_info_array[i].io_size > max_bytes) {
max_bytes = overlap_info_array[i].io_size;
new_owner = overlap_info_array[i].owner;
@@ -2776,7 +2777,8 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
if (mpi_rank == new_owner) {
/* Make sure the new owner will know how many other processes will
- * be sending chunk modification data to it */
+ * be sending chunk modification data to it
+ */
chunk_entry.num_writers = num_writers;
/* New owner takes possession of the chunk */
@@ -2844,7 +2846,8 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
if (NULL == (send_statuses = (MPI_Status *) H5MM_malloc(num_send_requests * sizeof(*send_statuses))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send statuses buffer")
- if (MPI_SUCCESS != (mpi_code = MPI_Waitall(num_send_requests, send_requests, send_statuses)))
+ H5_CHECK_OVERFLOW(num_send_requests, size_t, int);
+ if (MPI_SUCCESS != (mpi_code = MPI_Waitall((int) num_send_requests, send_requests, send_statuses)))
HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code)
}
@@ -2964,12 +2967,24 @@ done:
} /* end H5D__mpio_filtered_collective_write_type() */
+/*-------------------------------------------------------------------------
+ * Function: H5D__filtered_collective_chunk_io
+ *
+ * Purpose: XXX: description
+ *
+ * Return: Non-negative on success/Negative on failure
+ *
+ * Programmer: Jordan Henderson
+ * Wednesday, January 18, 2017
+ *
+ *-------------------------------------------------------------------------
+ */
static herr_t
-H5D__update_filtered_collective_chunk_entry(H5D_filtered_collective_io_info_t *chunk_entry,
+H5D__filtered_collective_chunk_io(H5D_filtered_collective_io_info_t *chunk_entry,
const H5D_io_info_t *io_info, const H5D_type_info_t *type_info)
{
H5S_sel_iter_t *mem_iter = NULL;
- unsigned filter_mask;
+ unsigned filter_mask = 0;
hssize_t iter_nelmts;
hbool_t full_overwrite = TRUE;
hbool_t mem_iter_init = FALSE;
@@ -2979,37 +2994,44 @@ H5D__update_filtered_collective_chunk_entry(H5D_filtered_collective_io_info_t *c
FUNC_ENTER_STATIC
HDassert(chunk_entry);
+ HDassert(io_info);
+ HDassert(type_info);
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, "| Chunk at address %a:\n", chunk_entry->old_chunk.offset);
#endif
/* XXX: Determine if a chunk is being fully overwritten by looking at the total selection
- * in the dataspace */
+ * in the dataspace
+ */
- /* If this is a full overwrite of this chunk, enough memory must be allocated for
- * the size of the unfiltered chunk. Otherwise, enough memory must be allocated
- * to read the filtered chunk from the file. */
+ /* If this is a read operation or a write operation where the chunk is not being fully
+ * overwritten, enough memory must be allocated to read the filtered chunk from the file.
+ * If this is a write operation where the chunk is being fully overwritten, enough memory
+ * must be allocated for the size of the unfiltered chunk.
+ */
/* XXX: Return value of macro should be checked instead */
- buf_size = (full_overwrite) ? (hsize_t) H5S_GET_EXTENT_NPOINTS(chunk_entry->chunk_info.fspace) * type_info->src_type_size
- : chunk_entry->old_chunk.length;
+ buf_size = (!full_overwrite || io_info->op_type == H5D_IO_OP_READ) ? chunk_entry->old_chunk.length
+ : (hsize_t) H5S_GET_EXTENT_NPOINTS(chunk_entry->chunk_info.fspace) * type_info->src_type_size;
chunk_entry->new_chunk.length = buf_size;
#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| - Allocing %zd bytes for %s chunk overwrite buffer.\n", buf_size,
- full_overwrite ? "full" : "non-full");
+ HDfprintf(debug_file, "| - Allocing %zd bytes for chunk data buffer.\n", buf_size);
+ if (io_info->op_type == H5D_IO_OP_WRITE)
+ HDfprintf(debug_file, "| - Write type is: %s.\n", (full_overwrite) ? "overwrite" : "non-overwrite");
#endif
if (NULL == (chunk_entry->buf = H5MM_malloc(buf_size)))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification buffer")
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer")
- if (!full_overwrite) {
- /* Read the chunk from the file */
+ /* If this is not a full chunk overwrite or this is a read operation, the chunk must be
+ * read from the file and unfiltered.
+ */
+ if (!full_overwrite || io_info->op_type == H5D_IO_OP_READ) {
if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_entry->old_chunk.offset,
buf_size, H5AC_rawdata_dxpl_id, chunk_entry->buf) < 0)
HGOTO_ERROR(H5E_IO, H5E_READERROR, H5_ITER_ERROR, "unable to read raw data chunk")
- /* Unfilter the chunk before modifying it */
if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, &filter_mask,
io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb,
(size_t *) &chunk_entry->new_chunk.length, &buf_size, &chunk_entry->buf) < 0)
@@ -3029,12 +3051,11 @@ H5D__update_filtered_collective_chunk_entry(H5D_filtered_collective_io_info_t *c
/* Owner of this chunk, receive modification data from other processes */
- /* Update the chunk data with the modifications from the current (owning) process */
-
if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(*mem_iter))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator")
/* Initialize iterator for memory selection */
+ /* XXX: dst_type_size may need to be src_type_size depending on operation */
if (H5S_select_iter_init(mem_iter, chunk_entry->chunk_info.mspace, type_info->dst_type_size) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
mem_iter_init = TRUE;
@@ -3042,40 +3063,59 @@ H5D__update_filtered_collective_chunk_entry(H5D_filtered_collective_io_info_t *c
if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_entry->chunk_info.mspace)) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
- if (!H5D__gather_mem(io_info->u.wbuf, chunk_entry->chunk_info.mspace, mem_iter,
- (size_t) iter_nelmts, io_info->dxpl_cache, chunk_entry->buf))
- HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't gather from write buffer")
+ /* If this is a read operation, scatter the read chunk data to the user's buffer.
+ *
+ * If this is a write operation, update the chunk data buffer with the modifications
+ * from the current process, then apply any modifications from other processes. Finally,
+ * filter the newly-update chunk.
+ */
+ switch (io_info->op_type) {
+ case H5D_IO_OP_READ:
+ if (H5D__scatter_mem(chunk_entry->buf, chunk_entry->chunk_info.mspace, mem_iter,
+ (size_t) iter_nelmts, io_info->dxpl_cache, io_info->u.rbuf) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't scatter to read buffer")
+ break;
- /* Update the chunk data with any modifications from other processes */
+ case H5D_IO_OP_WRITE:
+ /* Update the chunk data with the modifications from the current (owning) process */
+ if (!H5D__gather_mem(io_info->u.wbuf, chunk_entry->chunk_info.mspace, mem_iter,
+ (size_t) iter_nelmts, io_info->dxpl_cache, chunk_entry->buf))
+ HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't gather from write buffer")
+ /* Update the chunk data with any modifications from other processes */
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| - Write Buffer:\n");
- HDfprintf(debug_file, "| - [");
- for (size_t j = 0; j < chunk_entry->new_chunk.length / type_info->src_type_size; j++) {
- if (j > 0) HDfprintf(debug_file, ", ");
- HDfprintf(debug_file, "%lld", ((long *) chunk_entry->buf)[j]);
- }
- HDfprintf(debug_file, "]\n|\n");
- HDfprintf(debug_file, "| - Write buf is at address %a.\n|\n", chunk_entry->buf);
- HDfprintf(debug_file, "| - About to filter %zd bytes in buffer of size %zd.\n|\n", chunk_entry->new_chunk.length, buf_size);
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "| - Chunk Data Buffer:\n");
+ HDfprintf(debug_file, "| - [");
+ for (size_t j = 0; j < chunk_entry->new_chunk.length / type_info->src_type_size; j++) {
+ if (j > 0) HDfprintf(debug_file, ", ");
+ HDfprintf(debug_file, "%lld", ((long *) chunk_entry->buf)[j]);
+ }
+ HDfprintf(debug_file, "]\n|\n");
+
+ HDfprintf(debug_file, "| - About to filter %zd bytes in buffer of size %zd.\n|\n", chunk_entry->new_chunk.length, buf_size);
#endif
- /* Filter the chunk */
- filter_mask = 0;
- if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, &filter_mask,
- io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb,
- (size_t *) &chunk_entry->new_chunk.length, &buf_size, &chunk_entry->buf) < 0)
- HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "output pipeline failed")
+ /* Filter the chunk */
+ filter_mask = 0;
+ if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, &filter_mask,
+ io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb,
+ (size_t *) &chunk_entry->new_chunk.length, &buf_size, &chunk_entry->buf) < 0)
+ HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "output pipeline failed")
#if H5_SIZEOF_SIZE_T > 4
- /* Check for the chunk expanding too much to encode in a 32-bit value */
- if (chunk_entry->new_chunk.length > ((size_t) 0xffffffff))
- HGOTO_ERROR(H5E_DATASET, H5E_BADRANGE, FAIL, "chunk too large for 32-bit length")
+ /* Check for the chunk expanding too much to encode in a 32-bit value */
+ if (chunk_entry->new_chunk.length > ((size_t) 0xffffffff))
+ HGOTO_ERROR(H5E_DATASET, H5E_BADRANGE, FAIL, "chunk too large for 32-bit length")
#endif
+ break;
+ default:
+ HGOTO_ERROR(H5E_DATASET, H5E_BADVALUE, FAIL, "unknown I/O operation")
+ }
+
done:
if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
@@ -3083,6 +3123,6 @@ done:
H5MM_free(mem_iter)
FUNC_LEAVE_NOAPI(ret_value)
-} /* end H5D__update_filtered_collective_chunk_entry() */
+} /* end H5D__filtered_collective_chunk_io() */
#endif /* H5_HAVE_PARALLEL */