summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorJordan Henderson <jhenderson@hdfgroup.org>2017-01-13 14:14:38 (GMT)
committerJordan Henderson <jhenderson@hdfgroup.org>2017-01-13 14:14:38 (GMT)
commita3605cbdeb5f79c5753b848dfef1706988ba10e7 (patch)
treea27678d0e33455a6c165425704122f3091c853cd /src
parent089afc48561ba8838d6a515c6b00fc6f7032ca13 (diff)
downloadhdf5-a3605cbdeb5f79c5753b848dfef1706988ba10e7.zip
hdf5-a3605cbdeb5f79c5753b848dfef1706988ba10e7.tar.gz
hdf5-a3605cbdeb5f79c5753b848dfef1706988ba10e7.tar.bz2
Switch working branch from master to develop
Diffstat (limited to 'src')
-rw-r--r--src/H5Dchunk.c5
-rw-r--r--src/H5Dint.c4
-rw-r--r--src/H5Dio.c5
-rw-r--r--src/H5Dmpio.c1400
-rw-r--r--src/H5Dpkg.h5
-rw-r--r--src/H5Dscatgath.c7
6 files changed, 1391 insertions, 35 deletions
diff --git a/src/H5Dchunk.c b/src/H5Dchunk.c
index 7a646af..8623325 100644
--- a/src/H5Dchunk.c
+++ b/src/H5Dchunk.c
@@ -297,9 +297,6 @@ static herr_t H5D__chunk_unlock(const H5D_io_info_t *io_info,
static herr_t H5D__chunk_cache_prune(const H5D_t *dset, hid_t dxpl_id,
const H5D_dxpl_cache_t *dxpl_cache, size_t size);
static herr_t H5D__chunk_prune_fill(H5D_chunk_it_ud1_t *udata, hbool_t new_unfilt_chunk);
-static herr_t H5D__chunk_file_alloc(const H5D_chk_idx_info_t *idx_info,
- const H5F_block_t *old_chunk, H5F_block_t *new_chunk, hbool_t *need_insert,
- hsize_t scaled[]);
#ifdef H5_HAVE_PARALLEL
static herr_t H5D__chunk_collective_fill(const H5D_t *dset, hid_t dxpl_id,
H5D_chunk_coll_info_t *chunk_info, size_t chunk_size, const void *fill_buf);
@@ -6240,7 +6237,7 @@ done:
*
*-------------------------------------------------------------------------
*/
-static herr_t
+herr_t
H5D__chunk_file_alloc(const H5D_chk_idx_info_t *idx_info, const H5F_block_t *old_chunk,
H5F_block_t *new_chunk, hbool_t *need_insert, hsize_t scaled[])
{
diff --git a/src/H5Dint.c b/src/H5Dint.c
index 5a11581..dac12df 100644
--- a/src/H5Dint.c
+++ b/src/H5Dint.c
@@ -1210,10 +1210,6 @@ H5D__create(H5F_t *file, hid_t type_id, const H5S_t *space, hid_t dcpl_id,
/* Don't allow compact datasets to allocate space later */
if(layout->type == H5D_COMPACT && fill->alloc_time != H5D_ALLOC_TIME_EARLY)
HGOTO_ERROR(H5E_DATASET, H5E_BADVALUE, NULL, "compact dataset must have early space allocation")
-
- /* If MPI VFD is used, no filter support yet. */
- if(H5F_HAS_FEATURE(file, H5FD_FEAT_HAS_MPI) && pline->nused > 0)
- HGOTO_ERROR(H5E_DATASET, H5E_UNSUPPORTED, NULL, "Parallel I/O does not support filters yet")
} /* end if */
/* Set the latest version of the layout, pline & fill messages, if requested */
diff --git a/src/H5Dio.c b/src/H5Dio.c
index f5087da..6a4e6ec 100644
--- a/src/H5Dio.c
+++ b/src/H5Dio.c
@@ -666,11 +666,6 @@ H5D__write(H5D_t *dataset, hid_t mem_type_id, const H5S_t *mem_space,
if(H5T_get_class(type_info.mem_type, TRUE) == H5T_REFERENCE &&
H5T_get_ref_type(type_info.mem_type) == H5R_DATASET_REGION)
HGOTO_ERROR(H5E_DATASET, H5E_UNSUPPORTED, FAIL, "Parallel IO does not support writing region reference datatypes yet")
-
- /* Can't write to chunked datasets with filters, in parallel */
- if(dataset->shared->layout.type == H5D_CHUNKED &&
- dataset->shared->dcpl_cache.pline.nused > 0)
- HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "cannot write to chunked storage with filters in parallel")
} /* end if */
else {
/* Collective access is not permissible without a MPI based VFD */
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c
index 441cc96..cfcc5c4 100644
--- a/src/H5Dmpio.c
+++ b/src/H5Dmpio.c
@@ -86,6 +86,11 @@
#define H5D_CHUNK_SELECT_IRREG 2
#define H5D_CHUNK_SELECT_NONE 0
+#define PARALLEL_COMPRESS_DEBUG
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+FILE *debug_file;
+#endif
/******************/
/* Local Typedefs */
@@ -96,6 +101,14 @@ typedef struct H5D_chunk_addr_info_t {
H5D_chunk_info_t chunk_info;
} H5D_chunk_addr_info_t;
+typedef struct H5D_filtered_collective_io_info_t {
+ H5D_chunk_info_t chunk_info;
+ H5F_block_t old_chunk;
+ H5F_block_t new_chunk;
+ int num_writers;
+ int owner;
+ void *buf;
+} H5D_filtered_collective_io_info_t;
/********************/
/* Local Prototypes */
@@ -105,9 +118,15 @@ static herr_t H5D__chunk_collective_io(H5D_io_info_t *io_info,
static herr_t H5D__multi_chunk_collective_io(H5D_io_info_t *io_info,
const H5D_type_info_t *type_info, H5D_chunk_map_t *fm,
H5P_genplist_t *dx_plist);
+static herr_t H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info,
+ const H5D_type_info_t *type_info, H5D_chunk_map_t *fm,
+ H5P_genplist_t *dx_plist);
static herr_t H5D__link_chunk_collective_io(H5D_io_info_t *io_info,
const H5D_type_info_t *type_info, H5D_chunk_map_t *fm, int sum_chunk,
H5P_genplist_t *dx_plist);
+static herr_t H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info,
+ const H5D_type_info_t *type_info, H5D_chunk_map_t *fm,
+ H5P_genplist_t *dx_plist);
static herr_t H5D__inter_collective_io(H5D_io_info_t *io_info,
const H5D_type_info_t *type_info, const H5S_t *file_space,
const H5S_t *mem_space);
@@ -126,6 +145,21 @@ static herr_t H5D__mpio_get_min_chunk(const H5D_io_info_t *io_info,
const H5D_chunk_map_t *fm, int *min_chunkf);
static herr_t H5D__mpio_get_sum_chunk(const H5D_io_info_t *io_info,
const H5D_chunk_map_t *fm, int *sum_chunkf);
+static herr_t H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info,
+ const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm,
+ H5D_filtered_collective_io_info_t **chunk_list, size_t *num_entries,
+ size_t **_num_chunks_selected_array);
+static herr_t H5D__mpio_array_gather(H5D_io_info_t *io_info, void *local_array,
+ size_t array_num_entries, size_t array_entry_size,
+ void **gathered_array, size_t *gathered_array_num_entries,
+ int (*sort_func)(const void *, const void *));
+static herr_t H5D__mpio_filtered_collective_write_type(
+ H5D_filtered_collective_io_info_t *chunk_list, size_t num_entries,
+ MPI_Datatype *new_mem_type, hbool_t *mem_type_derived,
+ MPI_Datatype *new_file_type, hbool_t *file_type_derived);
+static int H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2);
+static int H5D__cmp_filtered_collective_io_entry(const void *filtered_collective_io_entry1,
+ const void *filtered_collective_io_entry2);
/*********************/
@@ -208,11 +242,6 @@ H5D__mpio_opt_possible(const H5D_io_info_t *io_info, const H5S_t *file_space,
* use collective IO will defer until each chunk IO is reached.
*/
- /* Don't allow collective operations if filters need to be applied */
- if(io_info->dset->shared->layout.type == H5D_CHUNKED &&
- io_info->dset->shared->dcpl_cache.pline.nused > 0)
- local_cause |= H5D_MPIO_FILTERS;
-
/* Check for independent I/O */
if(local_cause & H5D_MPIO_SET_INDEPENDENT)
global_cause = local_cause;
@@ -302,6 +331,105 @@ done:
/*-------------------------------------------------------------------------
+ * Function: H5D__mpio_array_gather
+ *
+ * Purpose: Given arrays by MPI ranks, gathers them into a single large
+ * array which is then distributed back to all ranks. If the
+ * sort_func argument is specified, the list is sorted before
+ * being returned.
+ *
+ * Return: Non-negative on success/Negative on failure
+ *
+ * Programmer: Jordan Henderson
+ * Friday, January 6th, 2016
+ *
+ *-------------------------------------------------------------------------
+ */
+static herr_t
+H5D__mpio_array_gather(H5D_io_info_t *io_info, void *local_array,
+ size_t array_num_entries, size_t array_entry_size,
+ void **_gathered_array, size_t *_gathered_array_num_entries,
+ int (*sort_func)(const void *, const void *))
+{
+ size_t gathered_array_num_entries = 0;
+ size_t i;
+ void *gathered_array = NULL;
+ int *receive_counts_array = NULL;
+ int *displacements_array = NULL;
+ int mpi_code, mpi_size;
+ int sendcount;
+ herr_t ret_value = SUCCEED;
+
+ FUNC_ENTER_STATIC
+
+ HDassert(io_info);
+ HDassert(local_array);
+ HDassert(_gathered_array);
+ HDassert(_gathered_array_num_entries);
+
+ if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size")
+
+ if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&array_num_entries, &gathered_array_num_entries, 1, MPI_INT, MPI_SUM, io_info->comm)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code)
+
+ if (NULL == (gathered_array = H5MM_malloc(gathered_array_num_entries * array_entry_size)))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate total gathered array")
+
+ if (NULL == (receive_counts_array = (int *) H5MM_malloc((size_t) mpi_size * sizeof(*receive_counts_array))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive counts array")
+
+ if (NULL == (displacements_array = (int *) H5MM_malloc((size_t) mpi_size * sizeof(*displacements_array))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate displacements array")
+
+ if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&array_num_entries, 1, MPI_INT, receive_counts_array, 1, MPI_INT, io_info->comm)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code)
+
+ /* Multiply each receive count by the size of the array entry,
+ * since the data is sent in a count of bytes */
+ /* XXX: Check to make sure array_entry_size doesn't overflow an int */
+ for (i = 0; i < (size_t) mpi_size; i++)
+ receive_counts_array[i] *= array_entry_size;
+
+ /* Set receive buffer offsets for MPI_Allgatherv */
+ displacements_array[0] = 0;
+ for (i = 1; i < (size_t) mpi_size; i++)
+ displacements_array[i] = displacements_array[i - 1] + receive_counts_array[i - 1];
+
+ H5_CHECKED_ASSIGN(sendcount, int, array_num_entries * array_entry_size, size_t);
+ if (MPI_SUCCESS != (mpi_code = MPI_Allgatherv(local_array, sendcount, MPI_BYTE,
+ gathered_array, receive_counts_array, displacements_array, MPI_BYTE, io_info->comm)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Allgatherv failed", mpi_code)
+
+ if (sort_func) HDqsort(gathered_array, gathered_array_num_entries, array_entry_size, sort_func);
+
+ *_gathered_array = gathered_array;
+ *_gathered_array_num_entries = gathered_array_num_entries;
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, " Contents of gathered array:\n");
+ HDfprintf(debug_file, "------------------------------\n");
+ for (size_t j = 0; j < (size_t) gathered_array_num_entries; j++) {
+ HDfprintf(debug_file, "| Chunk Entry %zd:\n", j);
+ HDfprintf(debug_file, "| - Chunk Address: %a\n", ((H5D_filtered_collective_io_info_t *) gathered_array)[j].old_chunk.offset);
+ HDfprintf(debug_file, "| - Chunk Length: %zd\n", ((H5D_filtered_collective_io_info_t *) gathered_array)[j].old_chunk.length);
+ HDfprintf(debug_file, "| - Chunk Owner: %d\n", ((H5D_filtered_collective_io_info_t *) gathered_array)[j].owner);
+ HDfprintf(debug_file, "| - Address of mspace: %x\n", ((H5D_filtered_collective_io_info_t *) gathered_array)[j].chunk_info.mspace);
+ }
+ HDfprintf(debug_file, "------------------------------\n\n");
+#endif
+
+done:
+ if (receive_counts_array)
+ H5MM_free(receive_counts_array);
+ if (displacements_array)
+ H5MM_free(displacements_array);
+
+ FUNC_LEAVE_NOAPI(ret_value)
+} /* end H5D__filtered_collective_io_info_arraygather() */
+
+
+/*-------------------------------------------------------------------------
* Function: H5D__ioinfo_xfer_mode
*
* Purpose: Switch to between collective & independent MPI I/O
@@ -441,6 +569,9 @@ H5D__mpio_get_sum_chunk(const H5D_io_info_t *io_info, const H5D_chunk_map_t *fm,
H5_CHECKED_ASSIGN(num_chunkf, int, ori_num_chunkf, size_t);
/* Determine the summation of number of chunks for all processes */
+ /* XXX: In the case that more than one process is writing to the
+ * same chunk, the summation of H5SL_count calls is incorrect. - JTH
+ */
if(MPI_SUCCESS != (mpi_code = MPI_Allreduce(&num_chunkf, sum_chunkf, 1, MPI_INT, MPI_SUM, io_info->comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code)
@@ -683,19 +814,31 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf
#endif
/* step 2: Go ahead to do IO.*/
- if(H5D_ONE_LINK_CHUNK_IO == io_option || H5D_ONE_LINK_CHUNK_IO_MORE_OPT == io_option) {
- if(H5D__link_chunk_collective_io(io_info, type_info, fm, sum_chunk, dx_plist) < 0)
- HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish linked chunk MPI-IO")
- } /* end if */
- /* direct request to multi-chunk-io */
- else if(H5D_MULTI_CHUNK_IO == io_option) {
- if(H5D__multi_chunk_collective_io(io_info, type_info, fm, dx_plist) < 0)
- HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple chunk MPI-IO")
- } /* end if */
- else { /* multiple chunk IO via threshold */
- if(H5D__multi_chunk_collective_io(io_info, type_info, fm, dx_plist) < 0)
- HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple chunk MPI-IO")
- } /* end else */
+ switch (io_option) {
+ case H5D_ONE_LINK_CHUNK_IO:
+ case H5D_ONE_LINK_CHUNK_IO_MORE_OPT:
+ if(io_info->dset->shared->dcpl_cache.pline.nused > 0) {
+ if(H5D__link_chunk_filtered_collective_io(io_info, type_info, fm, dx_plist) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish filtered linked chunk MPI-IO")
+ }
+ else {
+ if(H5D__link_chunk_collective_io(io_info, type_info, fm, sum_chunk, dx_plist) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish linked chunk MPI-IO")
+ }
+ break;
+
+ case H5D_MULTI_CHUNK_IO: /* direct request to do multi-chunk IO */
+ default: /* multiple chunk IO via threshold */
+ if(io_info->dset->shared->dcpl_cache.pline.nused > 0) {
+ if(H5D__multi_chunk_filtered_collective_io(io_info, type_info, fm, dx_plist) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple filtered chunk MPI-IO")
+ }
+ else {
+ if(H5D__multi_chunk_collective_io(io_info, type_info, fm, dx_plist) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple chunk MPI-IO")
+ }
+ break;
+ }
done:
FUNC_LEAVE_NOAPI(ret_value)
@@ -755,6 +898,14 @@ H5D__chunk_collective_write(H5D_io_info_t *io_info, const H5D_type_info_t *type_
FUNC_ENTER_PACKAGE
+#ifdef PARALLEL_COMPRESS_DEBUG
+ char name[10];
+
+ snprintf(name, 10, "out - %d", H5F_mpi_get_rank(io_info->dset->oloc.file));
+
+ debug_file = fopen(name, "w");
+#endif
+
/* Call generic selection operation */
if(H5D__chunk_collective_io(io_info, type_info, fm) < 0)
HGOTO_ERROR(H5E_DATASPACE, H5E_WRITEERROR, FAIL, "write error")
@@ -1095,6 +1246,408 @@ if(H5DEBUG(D))
/*-------------------------------------------------------------------------
+ * Function: H5D__link_chunk_filtered_collective_io
+ *
+ * Purpose: Routine for one collective IO with one MPI derived datatype
+ * to link with all filtered chunks
+ *
+ * 1. Construct a list of selected chunks in the collective IO
+ * operation
+ * A. If any chunk is being written to by more than 1
+ * process, the process writing the most data to the
+ * chunk will take ownership of the chunk (ties are
+ * broken randomly)
+ * 2. If the operation is a write operation
+ * A. Loop through each chunk in the operation
+ * I. Determine if this is a full overwrite of the chunk
+ * a) If it is not, read the chunk from file and
+ * pass the chunk through the filter pipeline in
+ * reverse order (Unfilter the chunk)
+ * b) Else copy the owning process' modification
+ * data into a buffer of size large enough to
+ * completely overwrite the chunk
+ * II. Receive any modification data from other
+ * processes and update the chunk data with these
+ * modifications
+ * III. Filter the chunk
+ * B. Contribute the modified chunks to an array gathered
+ * by all processes which contains the new sizes of
+ * every chunk modified in the collective IO operation
+ * C. All processes collectively re-allocate each chunk
+ * from the gathered array with their new sizes after
+ * the filter operation
+ * D. Create an MPI derived type for memory and file to
+ * write out the process' selected chunks to the file
+ * 3. If the operation is a read operation
+ * A. Loop through each chunk in the operation
+ * I.
+ * 3. Proceeed with the collective IO operation
+ * 4. If the collective operation was a write operation,
+ * all processes collectively re-insert each modified
+ * chunk from the gathered array into the chunk index
+ *
+ * Return: Non-negative on success/Negative on failure
+ *
+ * Programmer: Jordan Henderson
+ * Friday, Nov. 4th, 2016
+ *
+ *-------------------------------------------------------------------------
+ */
+static herr_t
+H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info,
+ H5D_chunk_map_t *fm, H5P_genplist_t *dx_plist)
+{
+ H5D_filtered_collective_io_info_t *chunk_list = NULL;
+ H5D_filtered_collective_io_info_t *total_array = NULL;
+ H5D_mpio_actual_chunk_opt_mode_t actual_chunk_opt_mode = H5D_MPIO_LINK_CHUNK;
+ H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_CHUNK_COLLECTIVE;
+ H5D_chk_idx_info_t index_info;
+ H5S_sel_iter_t *mem_iter = NULL;
+ H5D_storage_t ctg_store;
+ MPI_Datatype mem_type;
+ MPI_Datatype file_type;
+ hbool_t mem_type_is_derived = FALSE;
+ hbool_t file_type_is_derived = FALSE;
+ hbool_t mem_iter_init = FALSE;
+ hsize_t mpi_buf_count; /* Number of MPI types */
+ haddr_t *total_chunk_addr_array = NULL;
+ size_t chunk_list_num_entries;
+ size_t total_array_num_entries;
+ size_t *num_chunks_selected_array = NULL;
+ size_t i;
+ int mpi_rank, mpi_size, mpi_code;
+ herr_t ret_value = SUCCEED;
+
+ FUNC_ENTER_STATIC
+
+ /* Obtain the current rank of the process and the number of processes */
+ if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank")
+ if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size")
+
+ /* Set the actual-chunk-opt-mode property. */
+ if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_CHUNK_OPT_MODE_NAME, &actual_chunk_opt_mode) < 0)
+ HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual chunk opt mode property")
+
+ /* Set the actual-io-mode property.
+ * Link chunk I/O does not break to independent, so can set right away */
+ if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_IO_MODE_NAME, &actual_io_mode) < 0)
+ HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual io mode property")
+
+ /* Construct chunked index info */
+ index_info.f = io_info->dset->oloc.file;
+ index_info.dxpl_id = io_info->md_dxpl_id;
+ index_info.pline = &(io_info->dset->shared->dcpl_cache.pline);
+ index_info.layout = &(io_info->dset->shared->layout.u.chunk);
+ index_info.storage = &(io_info->dset->shared->layout.storage.u.chunk);
+
+ /* Build a list of selected chunks in the collective io operation */
+ /* XXX: Not sure about correct minor error code */
+ if (H5D__construct_filtered_io_info_list(io_info, type_info, fm, &chunk_list, &chunk_list_num_entries, &num_chunks_selected_array) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't construct filtered I/O info list")
+
+ /* If this process has any selection at all in the dataspace, create
+ * a MPI type for the I/O operation. Otherwise, the process contributes
+ * with a none type. */
+ /* XXX: Processes with no selection will have to be re-worked, as they
+ * still have to do the re-allocation in the file. Get rid of else case
+ * and instead change mpi_buf_count to 0 if they have no selection
+ */
+ if (H5SL_count(fm->sel_chunks)) {
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "Incoming messages from other processes:\n");
+ HDfprintf(debug_file, "-----------------------------------------\n");
+ for (size_t j = 0; j < chunk_list_num_entries; j++) {
+ HDfprintf(debug_file, "| Owner of chunk at address %a is expecting messages from %d other processes.\n",
+ chunk_list[j].old_chunk.offset, chunk_list[j].num_writers);
+ }
+ HDfprintf(debug_file, "-----------------------------------------\n\n");
+#endif
+
+ if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(*mem_iter))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator")
+
+ if (io_info->op_type == H5D_IO_OP_WRITE) { /* Filtered collective write */
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "Processing chunks:\n");
+ HDfprintf(debug_file, "---------------------------------------------------\n");
+#endif
+
+ /* Iterate through all the chunks in the collective write operation,
+ * updating each chunk with the data modifications from other processes,
+ * then re-filtering the chunk. */
+ for (i = 0; i < chunk_list_num_entries; i++) {
+ unsigned filter_mask = 0;
+ hbool_t full_overwrite = TRUE;
+ size_t buf_size;
+ hssize_t iter_nelmts;
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "| Chunk at address %a:\n", chunk_list[i].old_chunk.offset);
+#endif
+
+ /* If this is a full overwrite of this chunk, enough memory must be allocated for
+ * the size of the unfiltered chunk. Otherwise, enough memory must be allocated
+ * to read the filtered chunk from the file. */
+ /* XXX: Return value of macro should be checked instead */
+ buf_size = (full_overwrite) ? (hsize_t) H5S_GET_EXTENT_NPOINTS(chunk_list[i].chunk_info.fspace) * type_info->src_type_size
+ : chunk_list[i].old_chunk.length;
+ chunk_list[i].new_chunk.length = buf_size;
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "| - Allocing %zd bytes for %s chunk overwrite buffer.\n", buf_size,
+ full_overwrite ? "full" : "non-full");
+#endif
+
+ if (NULL == (chunk_list[i].buf = H5MM_malloc(buf_size)))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification buffer")
+
+ /* Initialize iterator for memory selection */
+ if (H5S_select_iter_init(mem_iter, chunk_list[i].chunk_info.mspace, type_info->dst_type_size) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
+ mem_iter_init = TRUE;
+
+ /* Owner of this chunk, receive modification data from other processes */
+
+ if (!full_overwrite) {
+ /* Read the chunk from the file */
+ if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_list[i].old_chunk.offset,
+ buf_size, H5AC_rawdata_dxpl_id, chunk_list[i].buf) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_READERROR, H5_ITER_ERROR, "unable to read raw data chunk")
+
+ /* Unfilter the chunk before modifying it */
+ if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, &filter_mask,
+ io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb,
+ (size_t *) &chunk_list[i].new_chunk.length, &buf_size, &chunk_list[i].buf) < 0)
+ HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "couldn't unfilter chunk for modifying")
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "| - After decompression: Nbytes=%zd; Buf_size=%zd.\n", chunk_list[i].new_chunk.length, buf_size);
+
+ HDfprintf(debug_file, "| - Read buf:\n| - [");
+ for (size_t j = 0; j < chunk_list[i].new_chunk.length / type_info->src_type_size; j++) {
+ if (j > 0) HDfprintf(debug_file, ", ");
+ HDfprintf(debug_file, "%lld", ((long *) chunk_list[i].buf)[j]);
+ }
+ HDfprintf(debug_file, "]\n|\n");
+#endif
+ } /* end if */
+
+ /* Update the chunk data with the modifications from the current (owning) process */
+ if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_list[i].chunk_info.mspace)) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
+
+ if (!H5D__gather_mem(io_info->u.wbuf, chunk_list[i].chunk_info.mspace, mem_iter,
+ (size_t) iter_nelmts, io_info->dxpl_cache, chunk_list[i].buf))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't gather from write buffer")
+
+ /* Update the chunk data with any modifications from other processes */
+
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "| - Write Buffer:\n");
+ HDfprintf(debug_file, "| - [");
+ for (size_t j = 0; j < chunk_list[i].new_chunk.length / type_info->src_type_size; j++) {
+ if (j > 0) HDfprintf(debug_file, ", ");
+ HDfprintf(debug_file, "%lld", ((long *) chunk_list[i].buf)[j]);
+ }
+ HDfprintf(debug_file, "]\n|\n");
+
+ HDfprintf(debug_file, "| - Write buf is at address %a.\n|\n", chunk_list[i].buf);
+
+ HDfprintf(debug_file, "| - About to filter %zd bytes in buffer of size %zd.\n|\n", chunk_list[i].new_chunk.length, buf_size);
+#endif
+
+ /* Filter the chunk */
+ if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, &filter_mask,
+ io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb,
+ (size_t *) &chunk_list[i].new_chunk.length, &buf_size, &chunk_list[i].buf) < 0)
+ HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "output pipeline failed")
+
+#if H5_SIZEOF_SIZE_T > 4
+ /* Check for the chunk expanding too much to encode in a 32-bit value */
+ if (chunk_list[i].new_chunk.length > ((size_t) 0xffffffff))
+ HGOTO_ERROR(H5E_DATASET, H5E_BADRANGE, FAIL, "chunk too large for 32-bit length")
+#endif
+
+ if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
+ } /* end for */
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "---------------------------------------------------\n\n");
+#endif
+
+ /* Gather the new chunk sizes to all processes for a collective reallocation
+ * of the chunks in the file */
+ if (H5D__mpio_array_gather(io_info, chunk_list, chunk_list_num_entries, sizeof(*chunk_list),
+ (void **) &total_array, &total_array_num_entries, NULL) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't gather new chunk sizes")
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "Reallocing chunks:\n");
+ HDfprintf(debug_file, "------------------------------\n");
+#endif
+
+ /* Collectively re-allocate the modified chunks (from each process) in the file */
+ for (i = 0; i < total_array_num_entries; i++) {
+ hbool_t insert;
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "| Reallocing chunk at address %a with new length of %zd.\n", total_array[i].new_chunk.offset, total_array[i].new_chunk.length);
+#endif
+
+ if (H5D__chunk_file_alloc(&index_info, &total_array[i].old_chunk, &total_array[i].new_chunk,
+ &insert, total_array[i].chunk_info.scaled) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk")
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "| - Chunk now at address %a.\n|\n", total_array[i].new_chunk);
+#endif
+ } /* end for */
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "------------------------------\n\n");
+#endif
+
+ /* XXX: During the collective re-allocation of chunks in the file, the record for each
+ * chunk is only update in the total array, not in the local copy of chunks on each
+ * process. However, each process needs the updated chunk records so that they can create
+ * a MPI type for the collective write that will write to the chunk's new locations instead
+ * of the old ones. This ugly hack seems to be the best solution to copy the information
+ * back to the local array and avoid having to modify the collective write type function
+ * in an ugly way so that it will accept the total array instead of the local array.
+ * This works correctly because the array gather function guarantees that the chunk
+ * data in the total array is ordered in blocks by rank.
+ */
+ {
+ size_t offset;
+
+ offset = 0;
+ for (i = 0; i < (size_t) mpi_rank; i++)
+ offset += num_chunks_selected_array[i] * sizeof(H5D_filtered_collective_io_info_t);
+
+ HDmemcpy(chunk_list, &((char *) total_array)[offset], num_chunks_selected_array[mpi_rank] * sizeof(H5D_filtered_collective_io_info_t));
+ }
+
+ /* Create single MPI type encompassing each selection in the dataspace */
+ if (H5D__mpio_filtered_collective_write_type(chunk_list, chunk_list_num_entries,
+ &mem_type, &mem_type_is_derived, &file_type, &file_type_is_derived) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't create MPI link chunk I/O type")
+
+ /* Override the write buffer to point to the address of the first
+ * chunk data buffer */
+ io_info->u.wbuf = chunk_list[0].buf;
+ } /* end if */
+ else { /* Filtered collective read */
+
+ } /* end else */
+
+ /* We have a single, complicated MPI datatype for both memory & file */
+ mpi_buf_count = (hsize_t) 1;
+ }
+ else { /* No selection at all for this process, contribute none type */
+ size_t dataset_num_chunks;
+
+ /* Retrieve total # of chunks in dataset */
+ H5_CHECKED_ASSIGN(dataset_num_chunks, size_t, fm->layout->u.chunk.nchunks, hsize_t);
+
+ /* Allocate chunking information */
+ if (NULL == (total_chunk_addr_array = (haddr_t *) H5MM_malloc(sizeof(haddr_t) * dataset_num_chunks)))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate total chunk address arraybuffer")
+
+ /* Retrieve chunk address map */
+ if (H5D__chunk_addrmap(io_info, total_chunk_addr_array) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get chunk address")
+
+ /* Get chunk with lowest address */
+ ctg_store.contig.dset_addr = HADDR_MAX;
+ for (i = 0; i < dataset_num_chunks; i++)
+ if (total_chunk_addr_array[i] < ctg_store.contig.dset_addr)
+ ctg_store.contig.dset_addr = total_chunk_addr_array[i];
+ HDassert(ctg_store.contig.dset_addr != HADDR_MAX);
+
+ /* Set the MPI datatype */
+ file_type = MPI_BYTE;
+ mem_type = MPI_BYTE;
+
+ /* No chunks selected for this process */
+ mpi_buf_count = (hsize_t) 0;
+ } /* end else */
+
+ /* Set up the base storage address for this operation */
+ ctg_store.contig.dset_addr = 0;
+ io_info->store = &ctg_store;
+
+ /* Perform I/O */
+ if (H5D__final_collective_io(io_info, type_info, mpi_buf_count, &file_type, &mem_type) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish MPI-IO")
+
+ /* Collectively insert each chunk into the chunk index if this
+ * is a filtered collective write */
+ if (io_info->op_type == H5D_IO_OP_WRITE) {
+ H5D_chunk_ud_t udata;
+
+ /* Set up chunk information for insertion to chunk index */
+ udata.common.layout = index_info.layout;
+ udata.common.storage = index_info.storage;
+ udata.filter_mask = 0;
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "Reinserting chunks into chunk index.\n");
+ HDfprintf(debug_file, "---------------------------------------\n");
+#endif
+
+ for (i = 0; i < total_array_num_entries; i++) {
+ udata.chunk_block = total_array[i].new_chunk;
+ udata.common.scaled = total_array[i].chunk_info.scaled;
+
+ if ((index_info.storage->ops->insert)(&index_info, &udata, io_info->dset) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTINSERT, FAIL, "unable to insert chunk address into index")
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "| Successfully inserted chunk at address %a into the chunk index.\n", udata.chunk_block.offset);
+#endif
+ } /* end for */
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "---------------------------------------\n");
+#endif
+ } /* end if */
+
+done:
+ /* Free resources used by a process which had no selection at all */
+ if (total_chunk_addr_array)
+ H5MM_free(total_chunk_addr_array);
+
+ /* Free resources used by a process which had some selection */
+ if (chunk_list) {
+ for (i = 0; i < chunk_list_num_entries; i++)
+ if (chunk_list[i].buf)
+ H5MM_free(chunk_list[i].buf);
+
+ H5MM_free(chunk_list);
+ }
+
+ if (num_chunks_selected_array)
+ H5MM_free(num_chunks_selected_array);
+ if (total_array)
+ H5MM_free(total_array);
+ if (mem_iter)
+ H5MM_free(mem_iter);
+
+ /* Free the MPI buf and file types, if they were derived */
+ if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type)))
+ HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
+ if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type)))
+ HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
+
+ FUNC_LEAVE_NOAPI(ret_value)
+} /* end H5D__link_chunk_filtered_collective_io() */
+
+
+/*-------------------------------------------------------------------------
* Function: H5D__multi_chunk_collective_io
*
* Purpose: To do IO per chunk according to IO mode(collective/independent/none)
@@ -1308,6 +1861,543 @@ done:
/*-------------------------------------------------------------------------
+ * Function: H5D__multi_chunk_filtered_collective_io
+ *
+ * Purpose: To do filtered collective IO per chunk to save on memory,
+ * as opposed to collective IO of every chunk at once
+ *
+ * XXX: Add read operation description
+ *
+ * 1. Construct a list of selected chunks in the collective IO
+ * operation
+ * A. If any chunk is being written to by more than 1
+ * process, the process writing the most data to the
+ * chunk will take ownership of the chunk (ties are
+ * broken randomly)
+ * 2. Loop through each chunk in the operation
+ * A. If the operation is a write operation
+ * I. Determine if this is a full overwrite of the chunk
+ * a) If it is not, read the chunk from file and
+ * pass the chunk through the filter pipeline in
+ * reverse order (Unfilter the chunk)
+ * b) Else copy the owning process' modification
+ * data into a buffer of size large enough to
+ * completely overwrite the chunk
+ * II. Receive any modification data from other
+ * processes and update the chunk data with these
+ * modifications
+ * III. Filter the chunk
+ * IV. Contribute the chunk to an array gathered by
+ * all processes which contains every chunk
+ * modified in this iteration (up to one chunk
+ * per process, some processes may not have a
+ * selection/may have less chunks to work on than
+ * other processes)
+ * II. All processes collectively re-allocate each
+ * chunk from the gathered array with their new
+ * sizes after the filter operation
+ * IV. Proceed with the collective write operation
+ * V. All processes collectively re-insert each
+ * chunk from the gathered array into the chunk
+ * index
+ * B. If the operation is a read operation
+ * I.
+ *
+ * Return: Non-negative on success/Negative on failure
+ *
+ * Programmer: Jordan Henderson
+ * Friday, Dec. 2nd, 2016
+ *
+ *-------------------------------------------------------------------------
+ */
+static herr_t
+H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info,
+ H5D_chunk_map_t *fm, H5P_genplist_t *dx_plist)
+{
+ H5D_filtered_collective_io_info_t *chunk_list = NULL;
+ H5D_filtered_collective_io_info_t *gathered_array = NULL;
+ H5D_mpio_actual_chunk_opt_mode_t actual_chunk_opt_mode = H5D_MPIO_MULTI_CHUNK; /* actual chunk optimization mode */
+ H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_NO_COLLECTIVE; /* Local variable for tracking the I/O mode used. */
+ H5FD_mpio_collective_opt_t last_coll_opt_mode = H5FD_MPIO_COLLECTIVE_IO; /* Last parallel transfer with independent IO or collective IO with this mode */
+ H5FD_mpio_xfer_t last_xfer_mode = H5FD_MPIO_COLLECTIVE; /* Last parallel transfer for this request (H5D_XFER_IO_XFER_MODE_NAME) */
+ H5D_chk_idx_info_t index_info;
+ H5S_sel_iter_t *mem_iter = NULL;
+ H5D_storage_t store; /* union of EFL and chunk pointer in file space */
+ H5D_io_info_t ctg_io_info; /* Contiguous I/O info object */
+ H5D_storage_t ctg_store; /* Chunk storage information as contiguous dataset */
+ MPI_Datatype file_type;
+ MPI_Datatype mem_type;
+ uint8_t *chunk_io_option = NULL;
+ haddr_t *chunk_addr = NULL;
+ hbool_t file_type_is_derived = FALSE;
+ hbool_t mem_type_is_derived = FALSE;
+ hbool_t mem_iter_init = FALSE;
+ hsize_t mpi_buf_count;
+ size_t total_chunk; /* Total # of chunks in dataset */
+ size_t chunk_list_num_entries;
+ size_t gathered_array_num_entries;
+ size_t *num_chunks_selected_array = NULL;
+ size_t i, j; /* Local index variable */
+ int mpi_rank, mpi_size, mpi_code;
+ herr_t ret_value = SUCCEED;
+
+ FUNC_ENTER_STATIC
+
+ /* Obtain the current rank of the process and the number of processes */
+ if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank")
+ if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size")
+
+ /* Set the actual chunk opt mode property */
+ if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_CHUNK_OPT_MODE_NAME, &actual_chunk_opt_mode) < 0)
+ HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual chunk opt mode property")
+
+ /* Construct chunked index info */
+ index_info.f = io_info->dset->oloc.file;
+ index_info.dxpl_id = io_info->md_dxpl_id;
+ index_info.pline = &(io_info->dset->shared->dcpl_cache.pline);
+ index_info.layout = &(io_info->dset->shared->layout.u.chunk);
+ index_info.storage = &(io_info->dset->shared->layout.storage.u.chunk);
+
+
+ /* Build a list of selected chunks in the collective IO operation */
+ if (H5D__construct_filtered_io_info_list(io_info, type_info, fm, &chunk_list, &chunk_list_num_entries, &num_chunks_selected_array) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't construct filtered I/O info list")
+
+ /* Allocate memories */
+ /* chunk_io_option = (uint8_t *) H5MM_calloc(total_chunk);
+ chunk_addr = (haddr_t *) H5MM_calloc(total_chunk * sizeof(haddr_t)); */
+#ifdef H5D_DEBUG
+if(H5DEBUG(D))
+ HDfprintf(H5DEBUG(D), "total_chunk %Zu\n", total_chunk);
+#endif
+
+ /* Obtain IO option for each chunk */
+ /* if (H5D__obtain_mpio_mode(io_info, fm, dx_plist, chunk_io_option, chunk_addr) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "unable to obtain MPIO mode") */
+
+ /* Set up contiguous I/O info object */
+ HDmemcpy(&ctg_io_info, io_info, sizeof(ctg_io_info));
+ ctg_io_info.store = &ctg_store;
+ ctg_io_info.layout_ops = *H5D_LOPS_CONTIG;
+
+ /* Initialize temporary contiguous storage info */
+ ctg_store.contig.dset_size = (hsize_t) io_info->dset->shared->layout.u.chunk.size;
+
+ /* Set dataset storage for I/O info */
+ io_info->store = &store;
+
+ if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(*mem_iter))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator")
+
+ /* Loop over all the chunks in the collective IO operation */
+ /* XXX: Multi-chunk needs to loop over all chunks and check for write/read inside
+ * loop, not the other way around */
+ if (io_info->op_type == H5D_IO_OP_WRITE) { /* Filtered collective write */
+ H5D_chunk_ud_t udata;
+
+ /* Set up chunk information for insertion to chunk index */
+ udata.common.layout = index_info.layout;
+ udata.common.storage = index_info.storage;
+ udata.filter_mask = 0;
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "Incoming messages from other processes:\n");
+ HDfprintf(debug_file, "-----------------------------------------\n");
+ for (size_t k = 0; k < chunk_list_num_entries; k++) {
+ HDfprintf(debug_file, "| Owner of chunk at address %a is expecting messages from %d other processes.\n",
+ chunk_list[k].old_chunk.offset, chunk_list[k].num_writers);
+ }
+ HDfprintf(debug_file, "-----------------------------------------\n\n");
+
+ HDfprintf(debug_file, "Processing chunks:\n");
+ HDfprintf(debug_file, "---------------------------------------------------\n");
+#endif
+
+ for (i = 0; i < chunk_list_num_entries; i++) {
+ unsigned filter_mask = 0;
+ hbool_t full_overwrite = TRUE;
+ size_t buf_size;
+ hssize_t iter_nelmts;
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "| Chunk at address %a:\n", chunk_list[i].old_chunk.offset);
+#endif
+
+ /* XXX: Determine if a chunk is being fully overwritten by looking at the total selection
+ * in the dataspace */
+
+ /* If this is a full overwrite of this chunk, enough memory must be allocated for
+ * the size of the unfiltered chunk. Otherwise, enough memory must be allocated
+ * to read the filtered chunk from the file. */
+ /* XXX: Return value of macro should be checked instead */
+ buf_size = (full_overwrite) ? (hsize_t) H5S_GET_EXTENT_NPOINTS(chunk_list[i].chunk_info.fspace) * type_info->src_type_size
+ : chunk_list[i].old_chunk.length;
+ chunk_list[i].new_chunk.length = buf_size;
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "| - Allocing %zd bytes for %s chunk overwrite buffer.\n", buf_size,
+ full_overwrite ? "full" : "non-full");
+#endif
+
+ if (NULL == (chunk_list[i].buf = H5MM_malloc(buf_size)))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification buffer")
+
+ /* Initialize iterator for memory selection */
+ if (H5S_select_iter_init(mem_iter, chunk_list[i].chunk_info.mspace, type_info->dst_type_size) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
+ mem_iter_init = TRUE;
+
+ /* Owner of this chunk, receive modification data from other processes */
+
+ if (!full_overwrite) {
+ /* Read the chunk from the file */
+ if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_list[i].old_chunk.offset,
+ buf_size, H5AC_rawdata_dxpl_id, chunk_list[i].buf) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't read raw data chunk")
+
+ /* Unfilter the chunk before modifying it */
+ if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, &filter_mask,
+ io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb,
+ (size_t *) &chunk_list[i].new_chunk.length, &buf_size, &chunk_list[i].buf) < 0)
+ HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "couldn't unfilter chunk for modifying")
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "| - After decompression: Nbytes=%zd; Buf_size=%zd.\n", chunk_list[i].new_chunk.length, buf_size);
+
+ HDfprintf(debug_file, "| - Read buf:\n| - [");
+ for (size_t k = 0; k < chunk_list[i].new_chunk.length / type_info->src_type_size; k++) {
+ if (k > 0) HDfprintf(debug_file, ", ");
+ HDfprintf(debug_file, "%lld", ((long *) chunk_list[i].buf)[k]);
+ }
+ HDfprintf(debug_file, "]\n|\n");
+#endif
+ } /* end if */
+
+ /* Update the chunk data with the modifications from the current (owning) process */
+ if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_list[i].chunk_info.mspace)) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
+
+ if (!H5D__gather_mem(io_info->u.wbuf, chunk_list[i].chunk_info.mspace, mem_iter,
+ (size_t) iter_nelmts, io_info->dxpl_cache, chunk_list[i].buf))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't gather from write buffer")
+
+ /* Update the chunk data with any modifications from other processes */
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "| - Write Buffer:\n");
+ HDfprintf(debug_file, "| - [");
+ for (size_t k = 0; k < chunk_list[i].new_chunk.length / type_info->src_type_size; k++) {
+ if (k > 0) HDfprintf(debug_file, ", ");
+ HDfprintf(debug_file, "%lld", ((long *) chunk_list[i].buf)[k]);
+ }
+ HDfprintf(debug_file, "]\n|\n");
+
+ HDfprintf(debug_file, "| - Write buf is at address %a.\n|\n", chunk_list[i].buf);
+
+ HDfprintf(debug_file, "| - About to filter %zd bytes in buffer of size %zd.\n|\n", chunk_list[i].new_chunk.length, buf_size);
+#endif
+
+ /* Filter the chunk */
+ if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, &filter_mask,
+ io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb,
+ (size_t *) &chunk_list[i].new_chunk.length, &buf_size, &chunk_list[i].buf) < 0)
+ HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "output pipeline failed")
+
+#if H5_SIZEOF_SIZE_T > 4
+ /* Check for the chunk expanding too much to encode in a 32-bit value */
+ if (chunk_list[i].new_chunk.length > ((size_t) 0xffffffff))
+ HGOTO_ERROR(H5E_DATASET, H5E_BADRANGE, FAIL, "chunk too large for 32-bit length")
+#endif
+
+ /* Gather the new chunk sizes to all processes for a collective reallocation
+ * of the chunks in the file */
+ if (H5D__mpio_array_gather(io_info, &chunk_list[i], 1, sizeof(*chunk_list),
+ (void **) &gathered_array, &gathered_array_num_entries, NULL) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't gather new chunk sizes")
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "Reallocing chunks:\n");
+ HDfprintf(debug_file, "------------------------------\n");
+#endif
+
+ /* Collectively re-allocate the modified chunks (from each process) in the file */
+ for (j = 0; j < gathered_array_num_entries; j++) {
+ hbool_t insert = FALSE;
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "| Re-allocing chunk at address %a with new length of %llu bytes.\n",
+ gathered_array[j].new_chunk.offset, gathered_array[j].new_chunk.length);
+#endif
+
+ /* Collectively re-allocate the chunk in the file */
+ if (H5D__chunk_file_alloc(&index_info, &gathered_array[j].old_chunk, &gathered_array[j].new_chunk,
+ &insert, chunk_list[j].chunk_info.scaled) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk")
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "| Chunk now at address %a.\n|\n", gathered_array[j].new_chunk);
+#endif
+ } /* end for */
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "------------------------------\n\n");
+#endif
+
+ /* Collect the new chunk info back to the local copy */
+ /* XXX: This may encounter a problem if there is a process with no selection */
+ HDmemcpy(&chunk_list[i].new_chunk, &gathered_array[mpi_rank].new_chunk, sizeof(chunk_list[i].new_chunk));
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "New chunk record after memcpy back to local:\n");
+ HDfprintf(debug_file, " - Chunk offset: %a, Chunk length: %lld\n", chunk_list[i].new_chunk.offset, chunk_list[i].new_chunk.length);
+#endif
+
+ {
+ int count;
+
+ H5_CHECKED_ASSIGN(count, int, chunk_list[i].new_chunk.length, hsize_t);
+
+ /* Create MPI memory type for writing to chunk */
+ if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(count, MPI_BYTE, &mem_type)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Type_contiguous failed", mpi_code)
+ if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&mem_type)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
+ mem_type_is_derived = TRUE;
+
+ /* Create MPI file type for writing to chunk */
+ if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(count, MPI_BYTE, &file_type)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Type_contiguous failed", mpi_code)
+ if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&file_type)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
+ file_type_is_derived = TRUE;
+
+ mpi_buf_count = 1;
+ }
+
+
+ /* Set up the base storage address for this operation */
+ ctg_store.contig.dset_addr = chunk_list[i].new_chunk.offset;
+
+ /* Override the write buffer to point to the address of the
+ * chunk data buffer */
+ ctg_io_info.u.wbuf = chunk_list[i].buf;
+
+ /* Perform the I/O */
+ if (H5D__final_collective_io(&ctg_io_info, type_info, mpi_buf_count, &file_type, &mem_type) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish MPI-IO")
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "Reinserting chunks into chunk index.\n");
+ HDfprintf(debug_file, "---------------------------------------\n");
+#endif
+
+ /* Re-insert the modified chunks (from each process) into the chunk index */
+ for (j = 0; j < gathered_array_num_entries; j++) {
+ udata.chunk_block = gathered_array[j].new_chunk;
+ udata.common.scaled = gathered_array[j].chunk_info.scaled;
+
+ if ((index_info.storage->ops->insert)(&index_info, &udata, io_info->dset) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTINSERT, FAIL, "unable to insert chunk address into index")
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "| Successfully inserted chunk at address %a into the chunk index.\n", udata.chunk_block.offset);
+#endif
+ } /* end for */
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "---------------------------------------\n");
+#endif
+
+ /* Free the MPI memory and file type, if they were derived */
+ /* XXX: For performance, collect each type into an array and free at end */
+ if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type)))
+ HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
+ if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type)))
+ HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
+
+ mem_type_is_derived = FALSE;
+ file_type_is_derived = FALSE;
+
+ if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
+
+ if (gathered_array)
+ gathered_array = (H5D_filtered_collective_io_info_t *) H5MM_free(gathered_array);
+ } /* end for */
+ } /* end if */
+ else { /* Filtered collective read */
+ unsigned filter_mask = 0;
+ size_t buf_size;
+
+ for (i = 0; i < chunk_list_num_entries; i++) {
+ buf_size = chunk_list[i].old_chunk.length;
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "Allocing %zd bytes for chunk read buffer.\n", buf_size);
+#endif
+
+ if (NULL == (chunk_list[i].buf = H5MM_malloc(buf_size)))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk read buffer")
+
+ /* Read the chunk from the file */
+ if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_list[i].old_chunk.offset,
+ buf_size, H5AC_rawdata_dxpl_id, chunk_list[i].buf) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't read raw data chunk")
+
+ /* Unfilter the chunk before modifying it */
+ if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, &filter_mask,
+ io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb,
+ (size_t *) &chunk_list[i].old_chunk.length, &buf_size, &chunk_list[i].buf) < 0)
+ HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "couldn't unfilter chunk for modifying")
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ for (size_t k = 0; k < chunk_list[i].old_chunk.length / type_info->src_type_size; k++)
+ HDfprintf(debug_file, "Read buf entry %d is %lld.\n", k, ((long *) chunk_list[i].buf)[k]);
+#endif
+
+ /* Scatter the unfiltered chunk data into the user's read buffer */
+
+ } /* end for */
+ } /* end else */
+
+#if 0
+ /* Loop over _all_ the chunks */
+ for (u = 0; u < total_chunk; u++) {
+ H5D_chunk_info_t *chunk_info; /* Chunk info for current chunk */
+ H5S_t *fspace; /* Dataspace describing chunk & selection in it */
+ H5S_t *mspace; /* Dataspace describing selection in memory corresponding to this chunk */
+ hbool_t insert = FALSE;
+
+ /* Initialize temporary contiguous storage address */
+ ctg_store.contig.dset_addr = chunk_addr[u];
+
+#ifdef H5D_DEBUG
+if(H5DEBUG(D))
+ HDfprintf(H5DEBUG(D),"mpi_rank = %d, chunk index = %Zu\n", mpi_rank, u);
+#endif
+ /* Get the chunk info for this chunk, if there are elements selected */
+ chunk_info = &filtered_io_info_array[u].chunk_info;
+
+ /* Set the storage information for chunks with selections */
+ if (chunk_info) {
+ /* HDassert(chunk_info->index == u); */
+
+ /* Pass in chunk's coordinates in a union. */
+ store.chunk.scaled = chunk_info->scaled;
+ } /* end if */
+
+ /* Collective IO for this chunk,
+ * Note: even there is no selection for this process, the process still
+ * needs to contribute MPI NONE TYPE.
+ */
+ if (chunk_io_option[u] == H5D_CHUNK_IO_MODE_COL) {
+#ifdef H5D_DEBUG
+if(H5DEBUG(D))
+ HDfprintf(H5DEBUG(D),"inside collective chunk IO mpi_rank = %d, chunk index = %Zu\n", mpi_rank, u);
+#endif
+
+ /* Set the file & memory dataspaces */
+ if (chunk_info) {
+ fspace = chunk_info->fspace;
+ mspace = chunk_info->mspace;
+
+ /* Update the local variable tracking the dxpl's actual io mode property.
+ *
+ * Note: H5D_MPIO_COLLECTIVE_MULTI | H5D_MPIO_INDEPENDENT = H5D_MPIO_MIXED
+ * to ease switching between to mixed I/O without checking the current
+ * value of the property. You can see the definition in H5Ppublic.h
+ */
+ actual_io_mode = actual_io_mode | H5D_MPIO_CHUNK_COLLECTIVE;
+
+ } /* end if */
+ else {
+ fspace = mspace = NULL;
+ } /* end else */
+
+ /* Switch back to collective I/O */
+ if (last_xfer_mode != H5FD_MPIO_COLLECTIVE) {
+ if (H5D__ioinfo_xfer_mode(io_info, dx_plist, H5FD_MPIO_COLLECTIVE) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't switch to collective I/O")
+ last_xfer_mode = H5FD_MPIO_COLLECTIVE;
+ } /* end if */
+ if (last_coll_opt_mode != H5FD_MPIO_COLLECTIVE_IO) {
+ if (H5D__ioinfo_coll_opt_mode(io_info, dx_plist, H5FD_MPIO_COLLECTIVE_IO) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't switch to collective I/O")
+ last_coll_opt_mode = H5FD_MPIO_COLLECTIVE_IO;
+ } /* end if */
+ } /* end if */
+ else { /* possible independent IO for this chunk */
+#ifdef H5D_DEBUG
+if(H5DEBUG(D))
+ HDfprintf(H5DEBUG(D),"inside independent IO mpi_rank = %d, chunk index = %Zu\n", mpi_rank, u);
+#endif
+
+ HDassert(chunk_io_option[u] == 0);
+
+ /* Set the file & memory dataspaces */
+ if (chunk_info) {
+ fspace = chunk_info->fspace;
+ mspace = chunk_info->mspace;
+
+ /* Update the local variable tracking the dxpl's actual io mode. */
+ actual_io_mode = actual_io_mode | H5D_MPIO_CHUNK_INDEPENDENT;
+ } /* end if */
+ else {
+ fspace = mspace = NULL;
+ } /* end else */
+
+ /* Using independent I/O with file setview.*/
+ if (last_coll_opt_mode != H5FD_MPIO_INDIVIDUAL_IO) {
+ if (H5D__ioinfo_coll_opt_mode(io_info, dx_plist, H5FD_MPIO_INDIVIDUAL_IO) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't switch to individual I/O")
+ last_coll_opt_mode = H5FD_MPIO_INDIVIDUAL_IO;
+ } /* end if */
+ } /* end else */
+
+#ifdef H5D_DEBUG
+ if(H5DEBUG(D))
+ HDfprintf(H5DEBUG(D),"after inter collective IO\n");
+#endif
+ } /* end for */
+#endif
+
+ /* Write the local value of actual io mode to the DXPL. */
+ if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_IO_MODE_NAME, &actual_io_mode) < 0)
+ HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual io mode property")
+
+done:
+ if (chunk_io_option)
+ H5MM_free(chunk_io_option);
+ if (chunk_addr)
+ H5MM_free(chunk_addr);
+
+ if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type)))
+ HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
+ if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type)))
+ HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
+
+ if (chunk_list) {
+ for (i = 0; i < chunk_list_num_entries; i++)
+ if (chunk_list[i].buf)
+ H5MM_free(chunk_list[i].buf);
+
+ H5MM_free(chunk_list);
+ }
+
+ if (num_chunks_selected_array)
+ H5MM_free(num_chunks_selected_array);
+ if (gathered_array)
+ H5MM_free(gathered_array);
+ if (mem_iter)
+ H5MM_free(mem_iter);
+
+ FUNC_LEAVE_NOAPI(ret_value)
+} /* end H5D__multi_chunk_filtered_collective_io() */
+
+
+/*-------------------------------------------------------------------------
* Function: H5D__inter_collective_io
*
* Purpose: Routine for the shared part of collective IO between multiple chunk
@@ -1486,6 +2576,36 @@ H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2)
/*-------------------------------------------------------------------------
+ * Function: H5D__cmp_filtered_collective_io_entry
+ *
+ * Purpose: Routine to compare filtered collective chunk io info
+ * entries
+ *
+ * Description: Callback for qsort() to compare filtered collective chunk
+ * io info entries
+ *
+ * Return: -1, 0, 1
+ *
+ * Programmer: Jordan Henderson
+ * Wednesday, Nov. 30th, 2016
+ *
+ *-------------------------------------------------------------------------
+ */
+static int
+H5D__cmp_filtered_collective_io_entry(const void *filtered_collective_io_entry1, const void *filtered_collective_io_entry2)
+{
+ haddr_t addr1, addr2;
+
+ FUNC_ENTER_STATIC_NOERR
+
+ addr1 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_entry1)->new_chunk.offset;
+ addr2 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_entry2)->new_chunk.offset;
+
+ FUNC_LEAVE_NOAPI(H5F_addr_cmp(addr1, addr2))
+} /* end H5D__cmp_filtered_collective_io_entry() */
+
+
+/*-------------------------------------------------------------------------
* Function: H5D__sort_chunk
*
* Purpose: Routine to sort chunks in increasing order of chunk address
@@ -1837,5 +2957,249 @@ done:
FUNC_LEAVE_NOAPI(ret_value)
} /* end H5D__obtain_mpio_mode() */
+
+
+/*-------------------------------------------------------------------------
+ * Function: H5D__construct_filtered_io_info_list
+ *
+ * XXX: Revise description
+ * Purpose: Constructs a list of entries which contain the necessary
+ * information for inter-process communication when performing
+ * collective io on filtered chunks. This list is used by
+ * every process in operations that must be collectively done
+ * on every chunk, such as chunk re-allocation, insertion of
+ * chunks into the chunk index, etc.
+ *
+ * Return: Non-negative on success/Negative on failure
+ *
+ * Programmer: Jordan Henderson
+ * Tuesday, January 10th, 2017
+ *
+ *-------------------------------------------------------------------------
+ */
+static herr_t
+H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_type_info_t *type_info,
+ const H5D_chunk_map_t *fm, H5D_filtered_collective_io_info_t **chunk_list, size_t *num_entries,
+ size_t **_num_chunks_selected_array)
+{
+ H5D_filtered_collective_io_info_t *local_info_array = NULL;
+ H5SL_node_t *chunk_node;
+ hbool_t no_overlap = FALSE;
+ size_t i;
+ size_t num_chunks_selected;
+ size_t *num_chunks_selected_array = NULL;
+ int mpi_rank, mpi_size, mpi_code;
+ herr_t ret_value = SUCCEED;
+
+ FUNC_ENTER_STATIC
+
+ HDassert(io_info);
+ HDassert(type_info);
+ HDassert(fm);
+ HDassert(chunk_list);
+
+ if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank")
+ if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size")
+
+ /* Get the no overlap property */
+
+
+ num_chunks_selected = H5SL_count(fm->sel_chunks);
+
+ if (!no_overlap) {
+ /* Redistribute chunks so that no more than 1 process is writing to a given chunk */
+ }
+
+ if (NULL == (local_info_array = (H5D_filtered_collective_io_info_t *) H5MM_malloc(num_chunks_selected * sizeof(*local_info_array))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate local io info array buffer")
+
+ chunk_node = H5SL_first(fm->sel_chunks);
+ for (i = 0; chunk_node; i++) {
+ H5D_chunk_info_t *chunk_info = (H5D_chunk_info_t *) H5SL_item(chunk_node);
+ H5D_chunk_ud_t udata;
+
+ /* Obtain this chunk's address */
+ if (H5D__chunk_lookup(io_info->dset, io_info->md_dxpl_id, chunk_info->scaled, &udata) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "error looking up chunk address")
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ local_info_array[i].owner = mpi_rank;
+ local_info_array[i].num_writers = 0;
+#endif
+
+ local_info_array[i].old_chunk = local_info_array[i].new_chunk = udata.chunk_block;
+ local_info_array[i].chunk_info = *chunk_info;
+ local_info_array[i].buf = NULL;
+
+ chunk_node = H5SL_next(chunk_node);
+ } /* end for */
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, " Contents of local info array\n");
+ HDfprintf(debug_file, "------------------------------\n");
+ for (size_t j = 0; j < (size_t) num_chunks_selected; j++) {
+ HDfprintf(debug_file, "| Chunk Entry %zd:\n", j);
+ HDfprintf(debug_file, "| - Chunk Address: %a\n", local_info_array[j].old_chunk.offset);
+ HDfprintf(debug_file, "| - Chunk Length: %zd\n", local_info_array[j].old_chunk.length);
+ HDfprintf(debug_file, "| - Chunk Owner: %d\n", local_info_array[j].owner);
+ HDfprintf(debug_file, "| - Address of mspace: %x\n", local_info_array[j].chunk_info.mspace);
+ HDfprintf(debug_file, "| - Chunk Selection Type: %d\n", H5S_GET_SELECT_TYPE(local_info_array[j].chunk_info.mspace));
+ HDfprintf(debug_file, "| - Chunk Num Elmts Sel.: %zd\n", H5S_GET_SELECT_NPOINTS(local_info_array[j].chunk_info.mspace));
+ }
+ HDfprintf(debug_file, "------------------------------\n\n");
+#endif
+
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "Testing mem/file space addresses:\n");
+ HDfprintf(debug_file, "-----------------------------------\n");
+
+ for (size_t j = 0; j < num_chunks_selected; j++) {
+ HDfprintf(debug_file, "| Testing chunk at address %a.\n", local_info_array[j].old_chunk.offset);
+ HDfprintf(debug_file, "| Mem Space:\n");
+ HDfprintf(debug_file, "| - Extent Num elements: %zd\n", H5S_GET_EXTENT_NPOINTS(local_info_array[j].chunk_info.mspace));
+ HDfprintf(debug_file, "| - Extent type: %d\n", H5S_GET_EXTENT_TYPE(local_info_array[j].chunk_info.mspace));
+ HDfprintf(debug_file, "| - Selection Num elements: %zd\n", H5S_GET_SELECT_NPOINTS(local_info_array[j].chunk_info.mspace));
+ HDfprintf(debug_file, "| - Selection type: %d\n", H5S_GET_SELECT_TYPE(local_info_array[j].chunk_info.mspace));
+ HDfprintf(debug_file, "| File Space:\n");
+ HDfprintf(debug_file, "| - Extent Num elements: %zd\n", H5S_GET_EXTENT_NPOINTS(local_info_array[j].chunk_info.fspace));
+ HDfprintf(debug_file, "| - Extent type: %d\n", H5S_GET_EXTENT_TYPE(local_info_array[j].chunk_info.fspace));
+ HDfprintf(debug_file, "| - Selection Num elements: %zd\n", H5S_GET_SELECT_NPOINTS(local_info_array[j].chunk_info.fspace));
+ HDfprintf(debug_file, "| - Selection type: %d\n|\n", H5S_GET_SELECT_TYPE(local_info_array[j].chunk_info.fspace));
+ }
+
+ HDfprintf(debug_file, "-----------------------------------\n\n");
+#endif
+
+ /* Gather the number of chunks each process is writing to all processes */
+ if (NULL == (num_chunks_selected_array = (size_t *) H5MM_malloc((size_t) mpi_size * sizeof(*num_chunks_selected_array))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate num chunks selected array")
+
+ if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&num_chunks_selected, 1, MPI_UNSIGNED_LONG_LONG, num_chunks_selected_array, 1, MPI_UNSIGNED_LONG_LONG, io_info->comm)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Allgather of num chunks selected array failed", mpi_code)
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, " Num Chunks Selected Array\n");
+ HDfprintf(debug_file, "------------------------------------\n");
+ for (size_t j = 0; j < (size_t) mpi_size; j++) {
+ HDfprintf(debug_file, "| Process %d has %zd chunks selected.\n", j, num_chunks_selected_array[j]);
+ }
+ HDfprintf(debug_file, "------------------------------------\n\n");
+#endif
+
+ *chunk_list = local_info_array;
+ *num_entries = num_chunks_selected;
+ *_num_chunks_selected_array = num_chunks_selected_array;
+
+done:
+ FUNC_LEAVE_NOAPI(ret_value)
+} /* end H5D__construct_filtered_io_info_list() */
+
+
+/*-------------------------------------------------------------------------
+ * Function: H5D__mpio_filtered_collective_write_type
+ *
+ * Purpose: Constructs a MPI derived datatype for both the memory and
+ * the file for a collective write of filtered chunks. The
+ * datatype contains the offsets in the file and the locations
+ * of the filtered chunk data buffers
+ *
+ * XXX: Same type may be reusable for filtered collective read
+ *
+ * Return: Non-negative on success/Negative on failure
+ *
+ * Programmer: Jordan Henderson
+ * Tuesday, November 22, 2016
+ *
+ *-------------------------------------------------------------------------
+ */
+static herr_t
+H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chunk_list,
+ size_t num_entries, MPI_Datatype *new_mem_type, hbool_t *mem_type_created,
+ MPI_Datatype *new_file_type, hbool_t *file_type_created)
+{
+ MPI_Aint *write_buf_array = NULL; /* Relative displacements of filtered chunk data buffers */
+ MPI_Aint *file_offset_array = NULL; /* Chunk offsets in the file */
+ int *length_array = NULL; /* Filtered Chunk lengths */
+ herr_t ret_value = SUCCEED;
+
+ FUNC_ENTER_STATIC
+
+ HDassert(chunk_list);
+ HDassert(new_mem_type);
+ HDassert(mem_type_created);
+ HDassert(new_file_type);
+ HDassert(file_type_created);
+
+ if (num_entries > 0) {
+ size_t i;
+ int mpi_code;
+ void *base_buf;
+
+ H5_CHECK_OVERFLOW(num_entries, size_t, int);
+
+ /* Allocate arrays */
+ if (NULL == (length_array = (int *) H5MM_malloc((size_t) num_entries * sizeof(int))))
+ HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "memory allocation failed for filtered collective write length array")
+ if (NULL == (write_buf_array = (MPI_Aint *) H5MM_malloc((size_t) num_entries * sizeof(MPI_Aint))))
+ HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "memory allocation failed for filtered collective write buf length array")
+ if (NULL == (file_offset_array = (MPI_Aint *) H5MM_malloc((size_t) num_entries * sizeof(MPI_Aint))))
+ HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "memory allocation failed for collective write offset array")
+
+ /* Ensure the list is sorted in ascending order of offset in the file */
+ HDqsort(chunk_list, num_entries, sizeof(*chunk_list), H5D__cmp_filtered_collective_io_entry);
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "MPI Write type entries:\n");
+ HDfprintf(debug_file, "---------------------------------\n");
+#endif
+
+ base_buf = chunk_list[0].buf;
+ for (i = 0; i < num_entries; i++) {
+ /* XXX: Revise description */
+ /* Set up array position */
+ file_offset_array[i] = (MPI_Aint) chunk_list[i].new_chunk.offset;
+ length_array[i] = (int) chunk_list[i].new_chunk.length;
+ write_buf_array[i] = (MPI_Aint) chunk_list[i].buf - (MPI_Aint) base_buf;
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "| Type Entry %zd:\n", i);
+ HDfprintf(debug_file, "| - Offset: %a; Length: %zd\n", file_offset_array[i], length_array[i]);
+ HDfprintf(debug_file, "| - Write buffer:\n| [");
+ for (size_t j = 0; j < (size_t) length_array[i]; j++) {
+ HDfprintf(debug_file, "%c, ", ((char *) chunk_list[i].buf)[j]);
+ }
+ HDfprintf(debug_file, "]\n|\n");
+#endif
+ } /* end while */
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "---------------------------------\n\n");
+#endif
+
+ /* Create memory MPI type */
+ if (MPI_SUCCESS != (mpi_code = MPI_Type_create_hindexed((int) num_entries, length_array, write_buf_array, MPI_BYTE, new_mem_type)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code)
+ *mem_type_created = TRUE;
+ if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_mem_type)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
+
+ /* Create file MPI type */
+ if (MPI_SUCCESS != (mpi_code = MPI_Type_create_hindexed((int) num_entries, length_array, file_offset_array, MPI_BYTE, new_file_type)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code)
+ *file_type_created = TRUE;
+ if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_file_type)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
+ } /* end if */
+
+done:
+ length_array = (int *) H5MM_free(length_array);
+ write_buf_array = (MPI_Aint *) H5MM_free(write_buf_array);
+ file_offset_array = (MPI_Aint *) H5MM_free(file_offset_array);
+
+ FUNC_LEAVE_NOAPI(ret_value)
+} /* end H5D__mpio_filtered_collective_write_type() */
#endif /* H5_HAVE_PARALLEL */
diff --git a/src/H5Dpkg.h b/src/H5Dpkg.h
index f54a9f2..bd58d38 100644
--- a/src/H5Dpkg.h
+++ b/src/H5Dpkg.h
@@ -618,6 +618,9 @@ H5_DLL herr_t H5D__select_write(const H5D_io_info_t *io_info,
H5_DLL herr_t H5D__scatter_mem(const void *_tscat_buf,
const H5S_t *space, H5S_sel_iter_t *iter, size_t nelmts,
const H5D_dxpl_cache_t *dxpl_cache, void *_buf);
+H5_DLL size_t H5D__gather_mem(const void *_buf,
+ const H5S_t *space, H5S_sel_iter_t *iter, size_t nelmts,
+ const H5D_dxpl_cache_t *dxpl_cache, void *_tgath_buf/*out*/);
H5_DLL herr_t H5D__scatgath_read(const H5D_io_info_t *io_info,
const H5D_type_info_t *type_info,
hsize_t nelmts, const H5S_t *file_space, const H5S_t *mem_space);
@@ -667,6 +670,8 @@ H5_DLL herr_t H5D__chunk_lookup(const H5D_t *dset, hid_t dxpl_id,
const hsize_t *scaled, H5D_chunk_ud_t *udata);
H5_DLL herr_t H5D__chunk_allocated(H5D_t *dset, hid_t dxpl_id, hsize_t *nbytes);
H5_DLL herr_t H5D__chunk_allocate(const H5D_io_info_t *io_info, hbool_t full_overwrite, hsize_t old_dim[]);
+H5_DLL herr_t H5D__chunk_file_alloc(const H5D_chk_idx_info_t *idx_info, const H5F_block_t *old_chunk,
+ H5F_block_t *new_chunk, hbool_t *need_insert, hsize_t scaled[]);
H5_DLL herr_t H5D__chunk_update_old_edge_chunks(H5D_t *dset, hid_t dxpl_id,
hsize_t old_dim[]);
H5_DLL herr_t H5D__chunk_prune_by_extent(H5D_t *dset, hid_t dxpl_id,
diff --git a/src/H5Dscatgath.c b/src/H5Dscatgath.c
index 55111f0..929feb7 100644
--- a/src/H5Dscatgath.c
+++ b/src/H5Dscatgath.c
@@ -49,9 +49,6 @@ static herr_t H5D__scatter_file(const H5D_io_info_t *io_info,
static size_t H5D__gather_file(const H5D_io_info_t *io_info,
const H5S_t *file_space, H5S_sel_iter_t *file_iter, size_t nelmts,
void *buf);
-static size_t H5D__gather_mem(const void *_buf,
- const H5S_t *space, H5S_sel_iter_t *iter, size_t nelmts,
- const H5D_dxpl_cache_t *dxpl_cache, void *_tgath_buf/*out*/);
static herr_t H5D__compound_opt_read(size_t nelmts, const H5S_t *mem_space,
H5S_sel_iter_t *iter, const H5D_dxpl_cache_t *dxpl_cache,
const H5D_type_info_t *type_info, void *user_buf/*out*/);
@@ -305,6 +302,7 @@ H5D__scatter_mem (const void *_tscat_buf, const H5S_t *space,
HDassert(space);
HDassert(iter);
HDassert(nelmts > 0);
+ HDassert(dxpl_cache);
HDassert(buf);
/* Allocate the vector I/O arrays */
@@ -366,7 +364,7 @@ done:
*
*-------------------------------------------------------------------------
*/
-static size_t
+size_t
H5D__gather_mem(const void *_buf, const H5S_t *space,
H5S_sel_iter_t *iter, size_t nelmts, const H5D_dxpl_cache_t *dxpl_cache,
void *_tgath_buf/*out*/)
@@ -389,6 +387,7 @@ H5D__gather_mem(const void *_buf, const H5S_t *space,
HDassert(space);
HDassert(iter);
HDassert(nelmts > 0);
+ HDassert(dxpl_cache);
HDassert(tgath_buf);
/* Allocate the vector I/O arrays */