summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJordan Henderson <jhenderson@hdfgroup.org>2017-09-05 13:38:47 (GMT)
committerJordan Henderson <jhenderson@hdfgroup.org>2017-09-05 13:38:47 (GMT)
commitbf570b1a7ca3b9cbd4a59f0933a19ce1bcc99103 (patch)
tree5b00d8e9f10a03c42f2c994563cbc86fb53c7a24
parentabcf30f232914c1ec9b8c5f2a3bb3a621af7bfe4 (diff)
parentb95986e50d1d1043952b00cad6595e8ecbff414f (diff)
downloadhdf5-bf570b1a7ca3b9cbd4a59f0933a19ce1bcc99103.zip
hdf5-bf570b1a7ca3b9cbd4a59f0933a19ce1bcc99103.tar.gz
hdf5-bf570b1a7ca3b9cbd4a59f0933a19ce1bcc99103.tar.bz2
Merge pull request #592 in HDFFV/hdf5 from ~JHENDERSON/hdf5:develop to develop
* commit 'b95986e50d1d1043952b00cad6595e8ecbff414f': (71 commits) Amend tests to explicitly use H5Dcreate2 and H5Dopen2 Revert malloc's back to using hard-coded type for sizeof Minor comment refactoring Update documentation Finish up Parallel Compression test scaling work Partial update for scaling parallel filters tests Modify t_dset.c in lieu of Parallel Compression changes Updated H5C__flush_single_entry() in H5C.c to correct duplicate metadata write bug observed in 1.10.1. Amend MANIFEST Add test for write parallel; read serial case Fix uninitialized array issue in test Test updates Fix bug where incorrect amount of data was being read from the file Add data verification for first half of tests Start adding data verification Switch tests over to use testing macros Updates to parallel filters tests Move test files to testpar directory Add test file to build process Suggested changes from code review ...
-rw-r--r--MANIFEST2
-rw-r--r--src/H5C.c26
-rw-r--r--src/H5Dchunk.c5
-rw-r--r--src/H5Dint.c4
-rw-r--r--src/H5Dio.c68
-rw-r--r--src/H5Dmpio.c1665
-rw-r--r--src/H5Dpkg.h8
-rw-r--r--src/H5Dscatgath.c7
-rw-r--r--src/H5Ppublic.h2
-rw-r--r--src/H5err.txt2
-rw-r--r--src/H5trace.c4
-rw-r--r--testpar/Makefile.am2
-rw-r--r--testpar/t_dset.c7
-rw-r--r--testpar/t_filters_parallel.c2475
-rw-r--r--testpar/t_filters_parallel.h212
15 files changed, 4401 insertions, 88 deletions
diff --git a/MANIFEST b/MANIFEST
index 934ffaa..18a3eb5 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -1236,6 +1236,8 @@
./testpar/t_file.c
./testpar/t_file_image.c
./testpar/t_filter_read.c
+./testpar/t_filters_parallel.c
+./testpar/t_filters_parallel.h
./testpar/t_mdset.c
./testpar/t_mpi.c
./testpar/t_ph5basic.c
diff --git a/src/H5C.c b/src/H5C.c
index e6770ec..371fc90 100644
--- a/src/H5C.c
+++ b/src/H5C.c
@@ -6273,17 +6273,27 @@ H5C__flush_single_entry(H5F_t *f, hid_t dxpl_id, H5C_cache_entry_t *entry_ptr,
HGOTO_ERROR(H5E_CACHE, H5E_CANTINSERT, FAIL, "unable to insert skip list item")
} /* end if */
else
+ {
#endif /* H5_HAVE_PARALLEL */
- if(entry_ptr->prefetched) {
- HDassert(entry_ptr->type->id == H5AC_PREFETCHED_ENTRY_ID);
- mem_type = cache_ptr->class_table_ptr[entry_ptr->prefetch_type_id]->mem_type;
- } /* end if */
- else
- mem_type = entry_ptr->type->mem_type;
+ if(entry_ptr->prefetched) {
+ HDassert(entry_ptr->type->id == H5AC_PREFETCHED_ENTRY_ID);
+ mem_type = cache_ptr->
+ class_table_ptr[entry_ptr->prefetch_type_id]->
+ mem_type;
+ } /* end if */
+ else
+ mem_type = entry_ptr->type->mem_type;
- if(H5F_block_write(f, mem_type, entry_ptr->addr, entry_ptr->size, dxpl_id, entry_ptr->image_ptr) < 0)
- HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, FAIL, "Can't write image to file")
+ if(H5F_block_write(f, mem_type, entry_ptr->addr,
+ entry_ptr->size, dxpl_id,
+ entry_ptr->image_ptr) < 0)
+
+ HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, FAIL, \
+ "Can't write image to file")
+#ifdef H5_HAVE_PARALLEL
+ }
+#endif /* H5_HAVE_PARALLEL */
} /* end if */
/* if the entry has a notify callback, notify it that we have
diff --git a/src/H5Dchunk.c b/src/H5Dchunk.c
index b7b8b03..af6599a 100644
--- a/src/H5Dchunk.c
+++ b/src/H5Dchunk.c
@@ -299,9 +299,6 @@ static herr_t H5D__chunk_unlock(const H5D_io_info_t *io_info,
static herr_t H5D__chunk_cache_prune(const H5D_t *dset, hid_t dxpl_id,
const H5D_dxpl_cache_t *dxpl_cache, size_t size);
static herr_t H5D__chunk_prune_fill(H5D_chunk_it_ud1_t *udata, hbool_t new_unfilt_chunk);
-static herr_t H5D__chunk_file_alloc(const H5D_chk_idx_info_t *idx_info,
- const H5F_block_t *old_chunk, H5F_block_t *new_chunk, hbool_t *need_insert,
- hsize_t scaled[]);
#ifdef H5_HAVE_PARALLEL
static herr_t H5D__chunk_collective_fill(const H5D_t *dset, hid_t dxpl_id,
H5D_chunk_coll_info_t *chunk_info, size_t chunk_size, const void *fill_buf);
@@ -6558,7 +6555,7 @@ done:
*
*-------------------------------------------------------------------------
*/
-static herr_t
+herr_t
H5D__chunk_file_alloc(const H5D_chk_idx_info_t *idx_info, const H5F_block_t *old_chunk,
H5F_block_t *new_chunk, hbool_t *need_insert, hsize_t scaled[])
{
diff --git a/src/H5Dint.c b/src/H5Dint.c
index 3b938e2..bdedd1e 100644
--- a/src/H5Dint.c
+++ b/src/H5Dint.c
@@ -1213,10 +1213,6 @@ H5D__create(H5F_t *file, hid_t type_id, const H5S_t *space, hid_t dcpl_id,
/* Don't allow compact datasets to allocate space later */
if(layout->type == H5D_COMPACT && fill->alloc_time != H5D_ALLOC_TIME_EARLY)
HGOTO_ERROR(H5E_DATASET, H5E_BADVALUE, NULL, "compact dataset must have early space allocation")
-
- /* If MPI VFD is used, no filter support yet. */
- if(H5F_HAS_FEATURE(file, H5FD_FEAT_HAS_MPI) && pline->nused > 0)
- HGOTO_ERROR(H5E_DATASET, H5E_UNSUPPORTED, NULL, "Parallel I/O does not support filters yet")
} /* end if */
/* Set the latest version of the layout, pline & fill messages, if requested */
diff --git a/src/H5Dio.c b/src/H5Dio.c
index 1766422..104a632 100644
--- a/src/H5Dio.c
+++ b/src/H5Dio.c
@@ -714,11 +714,6 @@ H5D__write(H5D_t *dataset, hid_t mem_type_id, const H5S_t *mem_space,
if(H5T_get_class(type_info.mem_type, TRUE) == H5T_REFERENCE &&
H5T_get_ref_type(type_info.mem_type) == H5R_DATASET_REGION)
HGOTO_ERROR(H5E_DATASET, H5E_UNSUPPORTED, FAIL, "Parallel IO does not support writing region reference datatypes yet")
-
- /* Can't write to chunked datasets with filters, in parallel */
- if(dataset->shared->layout.type == H5D_CHUNKED &&
- dataset->shared->dcpl_cache.pline.nused > 0)
- HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "cannot write to chunked storage with filters in parallel")
} /* end if */
else {
/* Collective access is not permissible without a MPI based VFD */
@@ -1195,7 +1190,7 @@ H5D__ioinfo_adjust(H5D_io_info_t *io_info, const H5D_t *dset, hid_t dxpl_id,
HGOTO_ERROR(H5E_DATASPACE, H5E_CANTGET, FAIL, "can't retrieve MPI communicator")
/* Check if we can set direct MPI-IO read/write functions */
- if((opt = H5D__mpio_opt_possible(io_info, file_space, mem_space, type_info, fm, dx_plist)) < 0)
+ if((opt = H5D__mpio_opt_possible(io_info, file_space, mem_space, type_info, dx_plist)) < 0)
HGOTO_ERROR(H5E_DATASPACE, H5E_BADRANGE, FAIL, "invalid check for direct IO dataspace ")
/* Check if we can use the optimized parallel I/O routines */
@@ -1207,6 +1202,67 @@ H5D__ioinfo_adjust(H5D_io_info_t *io_info, const H5D_t *dset, hid_t dxpl_id,
io_info->io_ops.single_write = H5D__mpio_select_write;
} /* end if */
else {
+ /* Check if there are any filters in the pipeline. If there are,
+ * we cannot break to independent I/O if this is a write operation;
+ * otherwise there will be metadata inconsistencies in the file.
+ */
+ if (io_info->op_type == H5D_IO_OP_WRITE && io_info->dset->shared->dcpl_cache.pline.nused > 0) {
+ H5D_mpio_no_collective_cause_t cause;
+ uint32_t local_no_collective_cause;
+ uint32_t global_no_collective_cause;
+ hbool_t local_error_message_previously_written = FALSE;
+ hbool_t global_error_message_previously_written = FALSE;
+ size_t index;
+ char local_no_collective_cause_string[256] = "";
+ char global_no_collective_cause_string[256] = "";
+ const char *cause_strings[] = { "independent I/O was requested",
+ "datatype conversions were required",
+ "data transforms needed to be applied",
+ "optimized MPI types flag wasn't set",
+ "one of the dataspaces was neither simple nor scalar",
+ "dataset was not contiguous or chunked" };
+
+ if (H5P_get(dx_plist, H5D_MPIO_LOCAL_NO_COLLECTIVE_CAUSE_NAME, &local_no_collective_cause) < 0)
+ HGOTO_ERROR(H5E_PLIST, H5E_CANTGET, FAIL, "unable to get local no collective cause value")
+ if (H5P_get(dx_plist, H5D_MPIO_GLOBAL_NO_COLLECTIVE_CAUSE_NAME, &global_no_collective_cause) < 0)
+ HGOTO_ERROR(H5E_PLIST, H5E_CANTGET, FAIL, "unable to get global no collective cause value")
+
+ /* Append each of the "reason for breaking collective I/O" error messages to the
+ * local and global no collective cause strings */
+ for (cause = 1, index = 0; cause < H5D_MPIO_NO_COLLECTIVE_MAX_CAUSE; cause <<= 1, index++) {
+ size_t cause_strlen = strlen(cause_strings[index]);
+
+ if (cause & local_no_collective_cause) {
+ /* Check if there were any previous error messages included. If so, prepend a semicolon
+ * to separate the messages.
+ */
+ if (local_error_message_previously_written) strncat(local_no_collective_cause_string, "; ", 2);
+
+ strncat(local_no_collective_cause_string, cause_strings[index], cause_strlen);
+
+ local_error_message_previously_written = TRUE;
+ } /* end if */
+
+ if (cause & global_no_collective_cause) {
+ /* Check if there were any previous error messages included. If so, prepend a semicolon
+ * to separate the messages.
+ */
+ if (global_error_message_previously_written) strncat(global_no_collective_cause_string, "; ", 2);
+
+ strncat(global_no_collective_cause_string, cause_strings[index], cause_strlen);
+
+ global_error_message_previously_written = TRUE;
+ } /* end if */
+ } /* end for */
+
+ HGOTO_ERROR(H5E_IO, H5E_NO_INDEPENDENT, FAIL, "Can't perform independent write with filters in pipeline.\n"
+ " The following caused a break from collective I/O:\n"
+ " Local causes: %s\n"
+ " Global causes: %s",
+ local_no_collective_cause_string,
+ global_no_collective_cause_string);
+ } /* end if */
+
/* If we won't be doing collective I/O, but the user asked for
* collective I/O, change the request to use independent I/O, but
* mark it so that we remember to revert the change.
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c
index 0389c72..79572c0 100644
--- a/src/H5Dmpio.c
+++ b/src/H5Dmpio.c
@@ -84,7 +84,6 @@
#define H5D_CHUNK_SELECT_IRREG 2
#define H5D_CHUNK_SELECT_NONE 0
-
/******************/
/* Local Typedefs */
/******************/
@@ -94,6 +93,113 @@ typedef struct H5D_chunk_addr_info_t {
H5D_chunk_info_t chunk_info;
} H5D_chunk_addr_info_t;
+/*
+ * Information about a single chunk when performing collective filtered I/O. All
+ * of the fields of one of these structs are initialized at the start of collective
+ * filtered I/O in the function H5D__construct_filtered_io_info_list().
+ *
+ * This struct's fields are as follows:
+ *
+ * index - The "Index" of the chunk in the dataset. The index of a chunk is used during
+ * the collective re-insertion of chunks into the chunk index after the collective
+ * I/O has been performed.
+ *
+ * scaled - The scaled coordinates of the chunk in the dataset's file dataspace. The
+ * coordinates are used in both the collective re-allocation of space in the file
+ * and the collective re-insertion of chunks into the chunk index after the collective
+ * I/O has been performed.
+ *
+ * full_overwrite - A flag which determines whether or not a chunk needs to be read from the
+ * file when being updated. If a chunk is being fully overwritten (the entire
+ * extent is selected in its file dataspace), then it is not necessary to
+ * read the chunk from the file. However, if the chunk is not being fully
+ * overwritten, it has to be read from the file in order to update the chunk
+ * without trashing the parts of the chunk that are not selected.
+ *
+ * num_writers - The total number of processors writing to this chunk. This field is used
+ * when the new owner of a chunk is receiving messages, which contain selections in
+ * the chunk and data to update the chunk with, from other processors which have this
+ * chunk selected in the I/O operation. The new owner must know how many processors it
+ * should expect messages from so that it can post an equal number of receive calls.
+ *
+ * io_size - The total size of I/O to this chunk. This field is an accumulation of the size of
+ * I/O to the chunk from each processor which has the chunk selected and is used to
+ * determine the value for the previous full_overwrite flag.
+ *
+ * buf - A pointer which serves the dual purpose of holding either the chunk data which is to be
+ * written to the file or the chunk data which has been read from the file.
+ *
+ * chunk_states - In the case of dataset writes only, this struct is used to track a chunk's size and
+ * address in the file before and after the filtering operation has occurred.
+ *
+ * Its fields are as follows:
+ *
+ * chunk_current - The address in the file and size of this chunk before the filtering
+ * operation. When reading a chunk from the file, this field is used to
+ * read the correct amount of bytes. It is also used when redistributing
+ * shared chunks among processors and as a parameter to the chunk file
+ * space reallocation function.
+ *
+ * new_chunk - The address in the file and size of this chunk after the filtering
+ * operation. This field is relevant when collectively re-allocating space
+ * in the file for all of the chunks written to in the I/O operation, as
+ * their sizes may have changed after their data has been filtered.
+ *
+ * owners - In the case of dataset writes only, this struct is used to manage which single processor
+ * will ultimately write data out to the chunk. It allows the other processors to act according
+ * to the decision and send their selection in the chunk, as well as the data they wish
+ * to update the chunk with, to the processor which is writing to the chunk.
+ *
+ * Its fields are as follows:
+ *
+ * original_owner - The processor which originally had this chunk selected at the beginning of
+ * the collective filtered I/O operation. This field is currently used when
+ * redistributing shared chunks among processors.
+ *
+ * new_owner - The processor which has been selected to perform the write to this chunk.
+ *
+ * async_info - In the case of dataset writes only, this struct is used by the owning processor of the
+ * chunk in order to manage the MPI send and receive calls made between it and all of
+ * the other processors which have this chunk selected in the I/O operation.
+ *
+ * Its fields are as follows:
+ *
+ * receive_requests_array - An array containing one MPI_Request for each of the
+ * asynchronous MPI receive calls the owning processor of this
+ * chunk makes to another processor in order to receive that
+ * processor's chunk modification data and selection in the chunk.
+ *
+ * receive_buffer_array - An array of buffers into which the owning processor of this chunk
+ * will store chunk modification data and the selection in the chunk
+ * received from another processor.
+ *
+ * num_receive_requests - The number of entries in the receive_request_array and
+ * receive_buffer_array fields.
+ */
+typedef struct H5D_filtered_collective_io_info_t {
+ hsize_t index;
+ hsize_t scaled[H5O_LAYOUT_NDIMS];
+ hbool_t full_overwrite;
+ size_t num_writers;
+ size_t io_size;
+ void *buf;
+
+ struct {
+ H5F_block_t chunk_current;
+ H5F_block_t new_chunk;
+ } chunk_states;
+
+ struct {
+ int original_owner;
+ int new_owner;
+ } owners;
+
+ struct {
+ MPI_Request *receive_requests_array;
+ unsigned char **receive_buffer_array;
+ int num_receive_requests;
+ } async_info;
+} H5D_filtered_collective_io_info_t;
/********************/
/* Local Prototypes */
@@ -103,9 +209,15 @@ static herr_t H5D__chunk_collective_io(H5D_io_info_t *io_info,
static herr_t H5D__multi_chunk_collective_io(H5D_io_info_t *io_info,
const H5D_type_info_t *type_info, H5D_chunk_map_t *fm,
H5P_genplist_t *dx_plist);
+static herr_t H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info,
+ const H5D_type_info_t *type_info, H5D_chunk_map_t *fm,
+ H5P_genplist_t *dx_plist);
static herr_t H5D__link_chunk_collective_io(H5D_io_info_t *io_info,
const H5D_type_info_t *type_info, H5D_chunk_map_t *fm, int sum_chunk,
H5P_genplist_t *dx_plist);
+static herr_t H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info,
+ const H5D_type_info_t *type_info, H5D_chunk_map_t *fm,
+ H5P_genplist_t *dx_plist);
static herr_t H5D__inter_collective_io(H5D_io_info_t *io_info,
const H5D_type_info_t *type_info, const H5S_t *file_space,
const H5S_t *mem_space);
@@ -124,6 +236,26 @@ static herr_t H5D__mpio_get_min_chunk(const H5D_io_info_t *io_info,
const H5D_chunk_map_t *fm, int *min_chunkf);
static herr_t H5D__mpio_get_sum_chunk(const H5D_io_info_t *io_info,
const H5D_chunk_map_t *fm, int *sum_chunkf);
+static herr_t H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info,
+ const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm,
+ H5D_filtered_collective_io_info_t **chunk_list, size_t *num_entries);
+static herr_t H5D__chunk_redistribute_shared_chunks(const H5D_io_info_t *io_info,
+ const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm,
+ H5D_filtered_collective_io_info_t *local_chunk_array, size_t *local_chunk_array_num_entries);
+static herr_t H5D__mpio_array_gatherv(void *local_array, size_t local_array_num_entries,
+ size_t array_entry_size, void **gathered_array, size_t *gathered_array_num_entries,
+ int nprocs, hbool_t allgather, int root, MPI_Comm comm, int (*sort_func)(const void *, const void *));
+static herr_t H5D__mpio_filtered_collective_write_type(
+ H5D_filtered_collective_io_info_t *chunk_list, size_t num_entries,
+ MPI_Datatype *new_mem_type, hbool_t *mem_type_derived,
+ MPI_Datatype *new_file_type, hbool_t *file_type_derived);
+static herr_t H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk_entry,
+ const H5D_io_info_t *io_info, const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm);
+static int H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2);
+static int H5D__cmp_filtered_collective_io_info_entry(const void *filtered_collective_io_info_entry1,
+ const void *filtered_collective_io_info_entry2);
+static int H5D__cmp_filtered_collective_io_info_entry_owner(const void *filtered_collective_io_info_entry1,
+ const void *filtered_collective_io_info_entry2);
/*********************/
@@ -142,7 +274,7 @@ static herr_t H5D__mpio_get_sum_chunk(const H5D_io_info_t *io_info,
* Purpose: Checks if an direct I/O transfer is possible between memory and
* the file.
*
- * Return: Sauccess: Non-negative: TRUE or FALSE
+ * Return: Success: Non-negative: TRUE or FALSE
* Failure: Negative
*
* Programmer: Quincey Koziol
@@ -152,12 +284,11 @@ static herr_t H5D__mpio_get_sum_chunk(const H5D_io_info_t *io_info,
*/
htri_t
H5D__mpio_opt_possible(const H5D_io_info_t *io_info, const H5S_t *file_space,
- const H5S_t *mem_space, const H5D_type_info_t *type_info,
- const H5D_chunk_map_t *fm, H5P_genplist_t *dx_plist)
+ const H5S_t *mem_space, const H5D_type_info_t *type_info, H5P_genplist_t *dx_plist)
{
int local_cause = 0; /* Local reason(s) for breaking collective mode */
int global_cause = 0; /* Global reason(s) for breaking collective mode */
- htri_t ret_value; /* Return value */
+ htri_t ret_value = SUCCEED; /* Return value */
FUNC_ENTER_PACKAGE
@@ -206,11 +337,6 @@ H5D__mpio_opt_possible(const H5D_io_info_t *io_info, const H5S_t *file_space,
* use collective IO will defer until each chunk IO is reached.
*/
- /* Don't allow collective operations if filters need to be applied */
- if(io_info->dset->shared->layout.type == H5D_CHUNKED &&
- io_info->dset->shared->dcpl_cache.pline.nused > 0)
- local_cause |= H5D_MPIO_FILTERS;
-
/* Check for independent I/O */
if(local_cause & H5D_MPIO_SET_INDEPENDENT)
global_cause = local_cause;
@@ -300,6 +426,113 @@ done:
/*-------------------------------------------------------------------------
+ * Function: H5D__mpio_array_gatherv
+ *
+ * Purpose: Given an array, specified in local_array, by each processor
+ * calling this function, gathers each array into a single
+ * array which is then either gathered to the processor
+ * specified by root, when allgather is false, or is
+ * distributed back to all processors when allgather is true.
+ *
+ * The size of each entry and number of entries in the array
+ * contributed by an individual processor should be specified
+ * in array_entry_size and local_array_num_entries,
+ * respectively.
+ *
+ * The number of processors participating in the gather
+ * operation should be specified for nprocs.
+ *
+ * The MPI communicator to use should be specified for comm.
+ *
+ * If the sort_func argument is supplied, the array is sorted
+ * before the function returns.
+ *
+ * Note: if allgather is specified as true, root is ignored.
+ *
+ * Return: Non-negative on success/Negative on failure
+ *
+ * Programmer: Jordan Henderson
+ * Sunday, April 9th, 2017
+ *
+ *-------------------------------------------------------------------------
+ */
+static herr_t
+H5D__mpio_array_gatherv(void *local_array, size_t local_array_num_entries,
+ size_t array_entry_size, void **_gathered_array, size_t *_gathered_array_num_entries,
+ int nprocs, hbool_t allgather, int root, MPI_Comm comm, int (*sort_func)(const void *, const void *))
+{
+ size_t gathered_array_num_entries = 0; /* The size of the newly-constructed array */
+ size_t i;
+ void *gathered_array = NULL; /* The newly-constructed array returned to the caller */
+ int *receive_counts_array = NULL; /* Array containing number of entries each process is contributing */
+ int *displacements_array = NULL; /* Array of displacements where each process places its data in the final array */
+ int mpi_code;
+ int sendcount;
+ herr_t ret_value = SUCCEED;
+
+ FUNC_ENTER_STATIC
+
+ HDassert(_gathered_array);
+ HDassert(_gathered_array_num_entries);
+
+ /* Determine the size of the end result array */
+ if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&local_array_num_entries, &gathered_array_num_entries, 1, MPI_INT, MPI_SUM, comm)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code)
+
+ /* If 0 entries resulted from the collective operation, no one is writing anything */
+ if (gathered_array_num_entries > 0) {
+ if (NULL == (gathered_array = H5MM_malloc(gathered_array_num_entries * array_entry_size)))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate gathered array")
+
+ if (NULL == (receive_counts_array = (int *) H5MM_malloc((size_t) nprocs * sizeof(int))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive counts array")
+
+ if (NULL == (displacements_array = (int *) H5MM_malloc((size_t) nprocs * sizeof(int))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive displacements array")
+
+ /* Inform each process of how many entries each other process is contributing to the resulting array */
+ if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&local_array_num_entries, 1, MPI_INT, receive_counts_array, 1, MPI_INT, comm)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code)
+
+ /* Multiply each receive count by the size of the array entry, since the data is sent as bytes */
+ for (i = 0; i < (size_t) nprocs; i++)
+ H5_CHECKED_ASSIGN(receive_counts_array[i], int, (size_t) receive_counts_array[i] * array_entry_size, size_t);
+
+ /* Set receive buffer offsets for MPI_Allgatherv */
+ displacements_array[0] = 0;
+ for (i = 1; i < (size_t) nprocs; i++)
+ displacements_array[i] = displacements_array[i - 1] + receive_counts_array[i - 1];
+
+ H5_CHECKED_ASSIGN(sendcount, int, local_array_num_entries * array_entry_size, size_t);
+
+ if (allgather) {
+ if (MPI_SUCCESS != (mpi_code = MPI_Allgatherv(local_array, sendcount, MPI_BYTE,
+ gathered_array, receive_counts_array, displacements_array, MPI_BYTE, comm)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Allgatherv failed", mpi_code)
+ } /* end if */
+ else {
+ if (MPI_SUCCESS != (mpi_code = MPI_Gatherv(local_array, sendcount, MPI_BYTE,
+ gathered_array, receive_counts_array, displacements_array, MPI_BYTE, root, comm)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Allgatherv failed", mpi_code)
+ } /* end else */
+
+ if (sort_func) HDqsort(gathered_array, gathered_array_num_entries, array_entry_size, sort_func);
+ } /* end if */
+
+ *_gathered_array = gathered_array;
+ *_gathered_array_num_entries = gathered_array_num_entries;
+
+done:
+ if (receive_counts_array)
+ H5MM_free(receive_counts_array);
+ if (displacements_array)
+ H5MM_free(displacements_array);
+
+ FUNC_LEAVE_NOAPI(ret_value)
+} /* end H5D__mpio_array_gatherv() */
+
+
+/*-------------------------------------------------------------------------
* Function: H5D__ioinfo_xfer_mode
*
* Purpose: Switch to between collective & independent MPI I/O
@@ -398,7 +631,7 @@ H5D__mpio_get_min_chunk(const H5D_io_info_t *io_info, const H5D_chunk_map_t *fm,
FUNC_ENTER_STATIC
/* Get the number of chunks to perform I/O on */
- num_chunkf = H5SL_count(fm->sel_chunks);
+ H5_CHECKED_ASSIGN(num_chunkf, int, H5SL_count(fm->sel_chunks), size_t)
/* Determine the minimum # of chunks for all processes */
if(MPI_SUCCESS != (mpi_code = MPI_Allreduce(&num_chunkf, min_chunkf, 1, MPI_INT, MPI_MIN, io_info->comm)))
@@ -480,7 +713,7 @@ H5D__contig_collective_read(H5D_io_info_t *io_info, const H5D_type_info_t *type_
HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "couldn't finish shared collective MPI-IO")
/* Obtain the data transfer properties */
- if(NULL == (dx_plist = H5I_object(io_info->raw_dxpl_id)))
+ if(NULL == (dx_plist = (H5P_genplist_t *)H5I_object(io_info->raw_dxpl_id)))
HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a data transfer property list")
/* Set the actual I/O mode property. internal_collective_io will not break to
@@ -527,7 +760,7 @@ H5D__contig_collective_write(H5D_io_info_t *io_info, const H5D_type_info_t *type
HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't finish shared collective MPI-IO")
/* Obtain the data transfer properties */
- if(NULL == (dx_plist = H5I_object(io_info->raw_dxpl_id)))
+ if(NULL == (dx_plist = (H5P_genplist_t *)H5I_object(io_info->raw_dxpl_id)))
HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a data transfer property list")
/* Set the actual I/O mode property. internal_collective_io will not break to
@@ -599,12 +832,13 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf
HDassert(fm);
/* Obtain the data transfer properties */
- if(NULL == (dx_plist = H5I_object(io_info->raw_dxpl_id)))
+ if(NULL == (dx_plist = (H5P_genplist_t *)H5I_object(io_info->raw_dxpl_id)))
HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a file access property list")
- /* Check the optional property list on what to do with collective chunk IO. */
+ /* Check the optional property list for the collective chunk IO optimization option */
if(H5P_get(dx_plist, H5D_XFER_MPIO_CHUNK_OPT_HARD_NAME, &chunk_opt_mode) < 0)
HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't get chunk optimization option")
+
if(H5FD_MPIO_CHUNK_ONE_IO == chunk_opt_mode)
io_option = H5D_ONE_LINK_CHUNK_IO; /*no opt*/
/* direct request to multi-chunk-io */
@@ -620,13 +854,13 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf
if((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0)
HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size")
- /* Get the chunk optimization option */
+ /* Get the chunk optimization option threshold */
if(H5P_get(dx_plist, H5D_XFER_MPIO_CHUNK_OPT_NUM_NAME, &one_link_chunk_io_threshold) < 0)
- HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't get chunk optimization option")
+ HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't get chunk optimization option threshold value")
/* step 1: choose an IO option */
/* If the average number of chunk per process is greater than a threshold, we will do one link chunked IO. */
- if((unsigned)sum_chunk / mpi_size >= one_link_chunk_io_threshold)
+ if((unsigned)sum_chunk / (unsigned)mpi_size >= one_link_chunk_io_threshold)
io_option = H5D_ONE_LINK_CHUNK_IO_MORE_OPT;
#ifdef H5_HAVE_INSTRUMENTED_LIBRARY
else
@@ -681,19 +915,46 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf
#endif
/* step 2: Go ahead to do IO.*/
- if(H5D_ONE_LINK_CHUNK_IO == io_option || H5D_ONE_LINK_CHUNK_IO_MORE_OPT == io_option) {
- if(H5D__link_chunk_collective_io(io_info, type_info, fm, sum_chunk, dx_plist) < 0)
- HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish linked chunk MPI-IO")
- } /* end if */
- /* direct request to multi-chunk-io */
- else if(H5D_MULTI_CHUNK_IO == io_option) {
- if(H5D__multi_chunk_collective_io(io_info, type_info, fm, dx_plist) < 0)
- HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple chunk MPI-IO")
- } /* end if */
- else { /* multiple chunk IO via threshold */
- if(H5D__multi_chunk_collective_io(io_info, type_info, fm, dx_plist) < 0)
- HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple chunk MPI-IO")
- } /* end else */
+ switch (io_option) {
+ case H5D_ONE_LINK_CHUNK_IO:
+ case H5D_ONE_LINK_CHUNK_IO_MORE_OPT:
+ /* Check if there are any filters in the pipeline */
+ if(io_info->dset->shared->dcpl_cache.pline.nused > 0) {
+ /* For now, Multi-chunk IO must be forced for parallel filtered read,
+ * so that data can be unfiltered as it is received. There is significant
+ * complexity in unfiltering the data when it is read all at once into a
+ * single buffer.
+ */
+ if (io_info->op_type == H5D_IO_OP_READ) {
+ if(H5D__multi_chunk_filtered_collective_io(io_info, type_info, fm, dx_plist) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple filtered chunk MPI-IO")
+ } /* end if */
+ else {
+ if(H5D__link_chunk_filtered_collective_io(io_info, type_info, fm, dx_plist) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish filtered linked chunk MPI-IO")
+ } /* end else */
+ } /* end if */
+ else {
+ /* Perform unfiltered link chunk collective IO */
+ if(H5D__link_chunk_collective_io(io_info, type_info, fm, sum_chunk, dx_plist) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish linked chunk MPI-IO")
+ } /* end else */
+ break;
+
+ case H5D_MULTI_CHUNK_IO: /* direct request to do multi-chunk IO */
+ default: /* multiple chunk IO via threshold */
+ /* Check if there are any filters in the pipeline */
+ if(io_info->dset->shared->dcpl_cache.pline.nused > 0) {
+ if(H5D__multi_chunk_filtered_collective_io(io_info, type_info, fm, dx_plist) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple filtered chunk MPI-IO")
+ } /* end if */
+ else {
+ /* Perform unfiltered multi chunk collective IO */
+ if(H5D__multi_chunk_collective_io(io_info, type_info, fm, dx_plist) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple chunk MPI-IO")
+ } /* end else */
+ break;
+ } /* end switch */
done:
FUNC_LEAVE_NOAPI(ret_value)
@@ -1093,6 +1354,232 @@ if(H5DEBUG(D))
/*-------------------------------------------------------------------------
+ * Function: H5D__link_chunk_filtered_collective_io
+ *
+ * Purpose: Routine for one collective IO with one MPI derived datatype
+ * to link with all filtered chunks
+ *
+ * 1. Construct a list of selected chunks in the collective IO
+ * operation
+ * A. If any chunk is being written to by more than 1
+ * process, the process writing to the chunk which
+ * currently has the least amount of chunks assigned
+ * to it becomes the new owner (in the case of ties,
+ * the lowest MPI rank becomes the new owner)
+ * 2. If the operation is a write operation
+ * A. Loop through each chunk in the operation
+ * I. If this is not a full overwrite of the chunk
+ * a) Read the chunk from file and pass the chunk
+ * through the filter pipeline in reverse order
+ * (Unfilter the chunk)
+ * II. Update the chunk data with the modifications from
+ * the owning process
+ * III. Receive any modification data from other
+ * processes and update the chunk data with these
+ * modifications
+ * IV. Filter the chunk
+ * B. Contribute the modified chunks to an array gathered
+ * by all processes which contains the new sizes of
+ * every chunk modified in the collective IO operation
+ * C. All processes collectively re-allocate each chunk
+ * from the gathered array with their new sizes after
+ * the filter operation
+ * D. If this process has any chunks selected in the IO
+ * operation, create an MPI derived type for memory and
+ * file to write out the process' selected chunks to the
+ * file
+ * E. Perform the collective write
+ * F. All processes collectively re-insert each modified
+ * chunk from the gathered array into the chunk index
+ *
+ *
+ * Return: Non-negative on success/Negative on failure
+ *
+ * Programmer: Jordan Henderson
+ * Friday, Nov. 4th, 2016
+ *
+ *-------------------------------------------------------------------------
+ */
+static herr_t
+H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info,
+ H5D_chunk_map_t *fm, H5P_genplist_t *dx_plist)
+{
+ H5D_filtered_collective_io_info_t *chunk_list = NULL; /* The list of chunks being read/written */
+ H5D_filtered_collective_io_info_t *collective_chunk_list = NULL; /* The list of chunks used during collective operations */
+ H5D_mpio_actual_chunk_opt_mode_t actual_chunk_opt_mode = H5D_MPIO_LINK_CHUNK; /* The actual chunk IO optimization mode */
+ H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_CHUNK_COLLECTIVE; /* The chunk IO mode used (Independent vs Collective) */
+ H5D_storage_t ctg_store; /* Chunk storage information as contiguous dataset */
+ MPI_Datatype mem_type = MPI_BYTE;
+ MPI_Datatype file_type = MPI_BYTE;
+ hbool_t mem_type_is_derived = FALSE;
+ hbool_t file_type_is_derived = FALSE;
+ size_t chunk_list_num_entries;
+ size_t collective_chunk_list_num_entries;
+ size_t *num_chunks_selected_array = NULL; /* Array of number of chunks selected on each process */
+ size_t i; /* Local index variable */
+ int mpi_rank, mpi_size, mpi_code;
+ herr_t ret_value = SUCCEED;
+
+ FUNC_ENTER_STATIC
+
+ HDassert(io_info);
+ HDassert(type_info);
+ HDassert(fm);
+ HDassert(dx_plist);
+
+ /* Obtain the current rank of the process and the number of processes */
+ if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank")
+ if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size")
+
+ /* Set the actual-chunk-opt-mode property. */
+ if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_CHUNK_OPT_MODE_NAME, &actual_chunk_opt_mode) < 0)
+ HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual chunk opt mode property")
+
+ /* Set the actual-io-mode property.
+ * Link chunk filtered I/O does not break to independent, so can set right away
+ */
+ if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_IO_MODE_NAME, &actual_io_mode) < 0)
+ HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual io mode property")
+
+ /* Build a list of selected chunks in the collective io operation */
+ if (H5D__construct_filtered_io_info_list(io_info, type_info, fm, &chunk_list, &chunk_list_num_entries) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "couldn't construct filtered I/O info list")
+
+ if (io_info->op_type == H5D_IO_OP_WRITE) { /* Filtered collective write */
+ H5D_chk_idx_info_t index_info;
+ H5D_chunk_ud_t udata;
+ hsize_t mpi_buf_count;
+
+ /* Construct chunked index info */
+ index_info.f = io_info->dset->oloc.file;
+ index_info.dxpl_id = io_info->md_dxpl_id;
+ index_info.pline = &(io_info->dset->shared->dcpl_cache.pline);
+ index_info.layout = &(io_info->dset->shared->layout.u.chunk);
+ index_info.storage = &(io_info->dset->shared->layout.storage.u.chunk);
+
+ /* Set up chunk information for insertion to chunk index */
+ udata.common.layout = index_info.layout;
+ udata.common.storage = index_info.storage;
+ udata.filter_mask = 0;
+
+ /* Iterate through all the chunks in the collective write operation,
+ * updating each chunk with the data modifications from other processes,
+ * then re-filtering the chunk.
+ */
+ for (i = 0; i < chunk_list_num_entries; i++)
+ if (mpi_rank == chunk_list[i].owners.new_owner)
+ if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info, fm) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't process chunk entry")
+
+ /* Gather the new chunk sizes to all processes for a collective reallocation
+ * of the chunks in the file.
+ */
+ if (H5D__mpio_array_gatherv(chunk_list, chunk_list_num_entries, sizeof(H5D_filtered_collective_io_info_t),
+ (void **) &collective_chunk_list, &collective_chunk_list_num_entries, mpi_size,
+ true, 0, io_info->comm, NULL) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather new chunk sizes")
+
+ /* Collectively re-allocate the modified chunks (from each process) in the file */
+ for (i = 0; i < collective_chunk_list_num_entries; i++) {
+ hbool_t insert;
+
+ if (H5D__chunk_file_alloc(&index_info, &collective_chunk_list[i].chunk_states.chunk_current,
+ &collective_chunk_list[i].chunk_states.new_chunk, &insert, collective_chunk_list[i].scaled) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk")
+ } /* end for */
+
+ if (NULL == (num_chunks_selected_array = (size_t *) H5MM_malloc((size_t) mpi_size * sizeof(size_t))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate num chunks selected array")
+
+ if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&chunk_list_num_entries, 1, MPI_UNSIGNED_LONG_LONG, num_chunks_selected_array,
+ 1, MPI_UNSIGNED_LONG_LONG, io_info->comm)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code)
+
+ /* If this process has any chunks selected, create a MPI type for collectively
+ * writing out the chunks to file. Otherwise, the process contributes to the
+ * collective write with a none type.
+ */
+ if (chunk_list_num_entries) {
+ size_t offset;
+
+ /* During the collective re-allocation of chunks in the file, the record for each
+ * chunk is only updated in the collective array, not in the local copy of chunks on each
+ * process. However, each process needs the updated chunk records so that they can create
+ * a MPI type for the collective write that will write to the chunk's possible new locations
+ * in the file instead of the old ones. This ugly hack seems to be the best solution to
+ * copy the information back to the local array and avoid having to modify the collective
+ * write type function in an ugly way so that it will accept the collective array instead
+ * of the local array. This works correctly because the array gather function guarantees
+ * that the chunk data in the collective array is ordered in blocks by rank.
+ */
+ for (i = 0, offset = 0; i < (size_t) mpi_rank; i++)
+ offset += num_chunks_selected_array[i];
+
+ HDmemcpy(chunk_list, &collective_chunk_list[offset], num_chunks_selected_array[mpi_rank] * sizeof(H5D_filtered_collective_io_info_t));
+
+ /* Create single MPI type encompassing each selection in the dataspace */
+ if (H5D__mpio_filtered_collective_write_type(chunk_list, chunk_list_num_entries,
+ &mem_type, &mem_type_is_derived, &file_type, &file_type_is_derived) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_BADTYPE, FAIL, "couldn't create MPI link chunk I/O type")
+
+ /* Override the write buffer to point to the address of the first
+ * chunk data buffer
+ */
+ io_info->u.wbuf = chunk_list[0].buf;
+ } /* end if */
+
+ /* We have a single, complicated MPI datatype for both memory & file */
+ mpi_buf_count = (mem_type_is_derived && file_type_is_derived) ? (hsize_t) 1 : (hsize_t) 0;
+
+ /* Set up the base storage address for this operation */
+ ctg_store.contig.dset_addr = 0; /* Write address must be set to address 0 */
+ io_info->store = &ctg_store;
+
+ /* Perform I/O */
+ if (H5D__final_collective_io(io_info, type_info, mpi_buf_count, &file_type, &mem_type) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish MPI-IO")
+
+ /* Participate in the collective re-insertion of all chunks modified
+ * in this iteration into the chunk index
+ */
+ for (i = 0; i < collective_chunk_list_num_entries; i++) {
+ udata.chunk_block = collective_chunk_list[i].chunk_states.new_chunk;
+ udata.common.scaled = collective_chunk_list[i].scaled;
+ udata.chunk_idx = collective_chunk_list[i].index;
+
+ if ((index_info.storage->ops->insert)(&index_info, &udata, io_info->dset) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTINSERT, FAIL, "unable to insert chunk address into index")
+ } /* end for */
+ } /* end if */
+
+done:
+ /* Free resources used by a process which had some selection */
+ if (chunk_list) {
+ for (i = 0; i < chunk_list_num_entries; i++)
+ if (chunk_list[i].buf)
+ H5MM_free(chunk_list[i].buf);
+
+ H5MM_free(chunk_list);
+ } /* end if */
+
+ if (num_chunks_selected_array)
+ H5MM_free(num_chunks_selected_array);
+ if (collective_chunk_list)
+ H5MM_free(collective_chunk_list);
+
+ /* Free the MPI buf and file types, if they were derived */
+ if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type)))
+ HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
+ if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type)))
+ HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
+
+ FUNC_LEAVE_NOAPI(ret_value)
+} /* end H5D__link_chunk_filtered_collective_io() */
+
+
+/*-------------------------------------------------------------------------
* Function: H5D__multi_chunk_collective_io
*
* Purpose: To do IO per chunk according to IO mode(collective/independent/none)
@@ -1225,7 +1712,7 @@ if(H5DEBUG(D))
* to ease switching between to mixed I/O without checking the current
* value of the property. You can see the definition in H5Ppublic.h
*/
- actual_io_mode = actual_io_mode | H5D_MPIO_CHUNK_COLLECTIVE;
+ actual_io_mode = (H5D_mpio_actual_io_mode_t) (actual_io_mode | H5D_MPIO_CHUNK_COLLECTIVE);
} /* end if */
else {
@@ -1265,7 +1752,7 @@ if(H5DEBUG(D))
mspace = chunk_info->mspace;
/* Update the local variable tracking the dxpl's actual io mode. */
- actual_io_mode = actual_io_mode | H5D_MPIO_CHUNK_INDEPENDENT;
+ actual_io_mode = (H5D_mpio_actual_io_mode_t) (actual_io_mode | H5D_MPIO_CHUNK_INDEPENDENT);
} /* end if */
else {
fspace = mspace = NULL;
@@ -1306,6 +1793,314 @@ done:
/*-------------------------------------------------------------------------
+ * Function: H5D__multi_chunk_filtered_collective_io
+ *
+ * Purpose: To do filtered collective IO iteratively to save on memory.
+ * While link_chunk_filtered_collective_io will construct and
+ * work on a list of all of the chunks selected in the IO
+ * operation at once, this function works iteratively on a set
+ * of chunks at a time; at most one chunk per rank per
+ * iteration.
+ *
+ * 1. Construct a list of selected chunks in the collective IO
+ * operation
+ * A. If any chunk is being written to by more than 1
+ * process, the process writing to the chunk which
+ * currently has the least amount of chunks assigned
+ * to it becomes the new owner (in the case of ties,
+ * the lowest MPI rank becomes the new owner)
+ * 2. If the operation is a read operation
+ * A. Loop through each chunk in the operation
+ * I. Read the chunk from the file
+ * II. Unfilter the chunk
+ * III. Scatter the read chunk data to the user's buffer
+ * 3. If the operation is a write operation
+ * A. Loop through each chunk in the operation
+ * I. If this is not a full overwrite of the chunk
+ * a) Read the chunk from file and pass the chunk
+ * through the filter pipeline in reverse order
+ * (Unfilter the chunk)
+ * II. Update the chunk data with the modifications from
+ * the owning process
+ * III. Receive any modification data from other
+ * processes and update the chunk data with these
+ * modifications
+ * IV. Filter the chunk
+ * V. Contribute the chunk to an array gathered by
+ * all processes which contains every chunk
+ * modified in this iteration (up to one chunk
+ * per process, some processes may not have a
+ * selection/may have less chunks to work on than
+ * other processes)
+ * VI. All processes collectively re-allocate each
+ * chunk from the gathered array with their new
+ * sizes after the filter operation
+ * VII. Proceed with the collective write operation
+ * for the chunks modified on this iteration
+ * VIII. All processes collectively re-insert each
+ * chunk from the gathered array into the chunk
+ * index
+ *
+ * Return: Non-negative on success/Negative on failure
+ *
+ * Programmer: Jordan Henderson
+ * Friday, Dec. 2nd, 2016
+ *
+ *-------------------------------------------------------------------------
+ */
+static herr_t
+H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info,
+ H5D_chunk_map_t *fm, H5P_genplist_t *dx_plist)
+{
+ H5D_filtered_collective_io_info_t *chunk_list = NULL; /* The list of chunks being read/written */
+ H5D_filtered_collective_io_info_t *collective_chunk_list = NULL; /* The list of chunks used during collective operations */
+ H5D_mpio_actual_chunk_opt_mode_t actual_chunk_opt_mode = H5D_MPIO_MULTI_CHUNK; /* The actual chunk IO optimization mode */
+ H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_CHUNK_COLLECTIVE; /* The chunk IO mode used (Independent vs Collective) */
+ H5D_storage_t store; /* union of EFL and chunk pointer in file space */
+ H5D_io_info_t ctg_io_info; /* Contiguous I/O info object */
+ H5D_storage_t ctg_store; /* Chunk storage information as contiguous dataset */
+ MPI_Datatype *file_type_array = NULL;
+ MPI_Datatype *mem_type_array = NULL;
+ hbool_t *file_type_is_derived_array = NULL;
+ hbool_t *mem_type_is_derived_array = NULL;
+ hbool_t *has_chunk_selected_array = NULL; /* Array of whether or not each process is contributing a chunk to each iteration */
+ size_t chunk_list_num_entries;
+ size_t collective_chunk_list_num_entries;
+ size_t i, j; /* Local index variable */
+ int mpi_rank, mpi_size, mpi_code;
+ herr_t ret_value = SUCCEED;
+
+ FUNC_ENTER_STATIC
+
+ HDassert(io_info);
+ HDassert(type_info);
+ HDassert(fm);
+ HDassert(dx_plist);
+
+ /* Obtain the current rank of the process and the number of processes */
+ if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank")
+ if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size")
+
+ /* Set the actual chunk opt mode property */
+ if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_CHUNK_OPT_MODE_NAME, &actual_chunk_opt_mode) < 0)
+ HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual chunk opt mode property")
+
+ /* Set the actual_io_mode property.
+ * Multi chunk I/O does not break to independent, so can set right away
+ */
+ if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_IO_MODE_NAME, &actual_io_mode) < 0)
+ HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual chunk io mode property")
+
+ /* Build a list of selected chunks in the collective IO operation */
+ if (H5D__construct_filtered_io_info_list(io_info, type_info, fm, &chunk_list, &chunk_list_num_entries) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "couldn't construct filtered I/O info list")
+
+ /* Set up contiguous I/O info object */
+ HDmemcpy(&ctg_io_info, io_info, sizeof(ctg_io_info));
+ ctg_io_info.store = &ctg_store;
+ ctg_io_info.layout_ops = *H5D_LOPS_CONTIG;
+
+ /* Initialize temporary contiguous storage info */
+ ctg_store.contig.dset_size = (hsize_t) io_info->dset->shared->layout.u.chunk.size;
+ ctg_store.contig.dset_addr = 0;
+
+ /* Set dataset storage for I/O info */
+ io_info->store = &store;
+
+ if (io_info->op_type == H5D_IO_OP_READ) { /* Filtered collective read */
+ for (i = 0; i < chunk_list_num_entries; i++)
+ if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info, fm) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't process chunk entry")
+ } /* end if */
+ else { /* Filtered collective write */
+ H5D_chk_idx_info_t index_info;
+ H5D_chunk_ud_t udata;
+ size_t max_num_chunks;
+ hsize_t mpi_buf_count;
+
+ /* Construct chunked index info */
+ index_info.f = io_info->dset->oloc.file;
+ index_info.dxpl_id = io_info->md_dxpl_id;
+ index_info.pline = &(io_info->dset->shared->dcpl_cache.pline);
+ index_info.layout = &(io_info->dset->shared->layout.u.chunk);
+ index_info.storage = &(io_info->dset->shared->layout.storage.u.chunk);
+
+ /* Set up chunk information for insertion to chunk index */
+ udata.common.layout = index_info.layout;
+ udata.common.storage = index_info.storage;
+ udata.filter_mask = 0;
+
+ /* Retrieve the maximum number of chunks being written among all processes */
+ if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&chunk_list_num_entries, &max_num_chunks,
+ 1, MPI_UNSIGNED_LONG_LONG, MPI_MAX, io_info->comm)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code)
+
+ /* If no one is writing anything at all, end the operation */
+ if (!(max_num_chunks > 0)) HGOTO_DONE(SUCCEED);
+
+ /* Allocate arrays for storing MPI file and mem types and whether or not the
+ * types were derived.
+ */
+ if (NULL == (file_type_array = (MPI_Datatype *) H5MM_malloc(max_num_chunks * sizeof(MPI_Datatype))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate file type array")
+
+ if (NULL == (file_type_is_derived_array = (hbool_t *) H5MM_calloc(max_num_chunks * sizeof(hbool_t))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate file type is derived array")
+
+ if (NULL == (mem_type_array = (MPI_Datatype *) H5MM_malloc(max_num_chunks * sizeof(MPI_Datatype))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate mem type array")
+
+ if (NULL == (mem_type_is_derived_array = (hbool_t *) H5MM_calloc(max_num_chunks * sizeof(hbool_t))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate mem type is derived array")
+
+ /* Iterate over the max number of chunks among all processes, as this process could
+ * have no chunks left to work on, but it still needs to participate in the collective
+ * re-allocation and re-insertion of chunks modified by other processes.
+ */
+ for (i = 0; i < max_num_chunks; i++) {
+ /* Check if this process has a chunk to work on for this iteration */
+ hbool_t have_chunk_to_process = (i < chunk_list_num_entries) && (mpi_rank == chunk_list[i].owners.new_owner);
+
+ if (have_chunk_to_process)
+ if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info, fm) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't process chunk entry")
+
+ /* Gather the new chunk sizes to all processes for a collective re-allocation
+ * of the chunks in the file
+ */
+ if (H5D__mpio_array_gatherv(&chunk_list[i], have_chunk_to_process ? 1 : 0, sizeof(H5D_filtered_collective_io_info_t),
+ (void **) &collective_chunk_list, &collective_chunk_list_num_entries, mpi_size,
+ true, 0, io_info->comm, NULL) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather new chunk sizes")
+
+ /* Participate in the collective re-allocation of all chunks modified
+ * in this iteration.
+ */
+ for (j = 0; j < collective_chunk_list_num_entries; j++) {
+ hbool_t insert = FALSE;
+
+ if (H5D__chunk_file_alloc(&index_info, &collective_chunk_list[j].chunk_states.chunk_current,
+ &collective_chunk_list[j].chunk_states.new_chunk, &insert, chunk_list[j].scaled) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk")
+ } /* end for */
+
+ if (NULL == (has_chunk_selected_array = (hbool_t *) H5MM_malloc((size_t) mpi_size * sizeof(hbool_t))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate num chunks selected array")
+
+ if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&have_chunk_to_process, 1, MPI_C_BOOL, has_chunk_selected_array,
+ 1, MPI_C_BOOL, io_info->comm)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code)
+
+ /* If this process has a chunk to work on, create a MPI type for the
+ * memory and file for writing out the chunk
+ */
+ if (have_chunk_to_process) {
+ size_t offset;
+ int mpi_type_count;
+
+ for (j = 0, offset = 0; j < (size_t) mpi_rank; j++)
+ offset += has_chunk_selected_array[j];
+
+ /* Collect the new chunk info back to the local copy, since only the record in the
+ * collective array gets updated by the chunk re-allocation */
+ HDmemcpy(&chunk_list[i].chunk_states.new_chunk, &collective_chunk_list[offset].chunk_states.new_chunk, sizeof(chunk_list[i].chunk_states.new_chunk));
+
+ H5_CHECKED_ASSIGN(mpi_type_count, int, chunk_list[i].chunk_states.new_chunk.length, hsize_t);
+
+ /* Create MPI memory type for writing to chunk */
+ if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(mpi_type_count, MPI_BYTE, &mem_type_array[i])))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Type_contiguous failed", mpi_code)
+ if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&mem_type_array[i])))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
+ mem_type_is_derived_array[i] = TRUE;
+
+ /* Create MPI file type for writing to chunk */
+ if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(mpi_type_count, MPI_BYTE, &file_type_array[i])))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Type_contiguous failed", mpi_code)
+ if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&file_type_array[i])))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
+ file_type_is_derived_array[i] = TRUE;
+
+ mpi_buf_count = 1;
+
+ /* Set up the base storage address for this operation */
+ ctg_store.contig.dset_addr = chunk_list[i].chunk_states.new_chunk.offset;
+
+ /* Override the write buffer to point to the address of the
+ * chunk data buffer
+ */
+ ctg_io_info.u.wbuf = chunk_list[i].buf;
+ } /* end if */
+ else {
+ mem_type_array[i] = file_type_array[i] = MPI_BYTE;
+ mpi_buf_count = 0;
+ } /* end else */
+
+ /* Perform the I/O */
+ if (H5D__final_collective_io(&ctg_io_info, type_info, mpi_buf_count, &file_type_array[i], &mem_type_array[i]) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish MPI-IO")
+
+ /* Participate in the collective re-insertion of all chunks modified
+ * in this iteration into the chunk index
+ */
+ for (j = 0; j < collective_chunk_list_num_entries; j++) {
+ udata.chunk_block = collective_chunk_list[j].chunk_states.new_chunk;
+ udata.common.scaled = collective_chunk_list[j].scaled;
+ udata.chunk_idx = collective_chunk_list[j].index;
+
+ if ((index_info.storage->ops->insert)(&index_info, &udata, io_info->dset) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTINSERT, FAIL, "unable to insert chunk address into index")
+ } /* end for */
+
+ if (collective_chunk_list){
+ H5MM_free(collective_chunk_list);
+ collective_chunk_list = NULL;
+ } /* end if */
+ if (has_chunk_selected_array){
+ H5MM_free(has_chunk_selected_array);
+ has_chunk_selected_array = NULL;
+ } /* end if */
+ } /* end for */
+
+ /* Free the MPI file and memory types, if they were derived */
+ for (i = 0; i < max_num_chunks; i++) {
+ if (file_type_is_derived_array[i])
+ if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type_array[i])))
+ HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
+
+ if (mem_type_is_derived_array[i])
+ if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type_array[i])))
+ HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
+ } /* end for */
+ } /* end else */
+
+done:
+ if (chunk_list) {
+ for (i = 0; i < chunk_list_num_entries; i++)
+ if (chunk_list[i].buf)
+ H5MM_free(chunk_list[i].buf);
+
+ H5MM_free(chunk_list);
+ } /* end if */
+
+ if (collective_chunk_list)
+ H5MM_free(collective_chunk_list);
+ if (file_type_array)
+ H5MM_free(file_type_array);
+ if (mem_type_array)
+ H5MM_free(mem_type_array);
+ if (file_type_is_derived_array)
+ H5MM_free(file_type_is_derived_array);
+ if (mem_type_is_derived_array)
+ H5MM_free(mem_type_is_derived_array);
+
+ FUNC_LEAVE_NOAPI(ret_value)
+} /* end H5D__multi_chunk_filtered_collective_io() */
+
+
+/*-------------------------------------------------------------------------
* Function: H5D__inter_collective_io
*
* Purpose: Routine for the shared part of collective IO between multiple chunk
@@ -1472,7 +2267,7 @@ if(H5DEBUG(D))
static int
H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2)
{
- haddr_t addr1, addr2;
+ haddr_t addr1 = HADDR_UNDEF, addr2 = HADDR_UNDEF;
FUNC_ENTER_STATIC_NOERR
@@ -1484,6 +2279,67 @@ H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2)
/*-------------------------------------------------------------------------
+ * Function: H5D__cmp_filtered_collective_io_info_entry
+ *
+ * Purpose: Routine to compare filtered collective chunk io info
+ * entries
+ *
+ * Description: Callback for qsort() to compare filtered collective chunk
+ * io info entries
+ *
+ * Return: -1, 0, 1
+ *
+ * Programmer: Jordan Henderson
+ * Wednesday, Nov. 30th, 2016
+ *
+ *-------------------------------------------------------------------------
+ */
+static int
+H5D__cmp_filtered_collective_io_info_entry(const void *filtered_collective_io_info_entry1, const void *filtered_collective_io_info_entry2)
+{
+ haddr_t addr1 = HADDR_UNDEF, addr2 = HADDR_UNDEF;
+
+ FUNC_ENTER_STATIC_NOERR
+
+ addr1 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry1)->chunk_states.new_chunk.offset;
+ addr2 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry2)->chunk_states.new_chunk.offset;
+
+ FUNC_LEAVE_NOAPI(H5F_addr_cmp(addr1, addr2))
+} /* end H5D__cmp_filtered_collective_io_info_entry() */
+
+
+/*-------------------------------------------------------------------------
+ * Function: H5D__cmp_filtered_collective_io_info_entry_owner
+ *
+ * Purpose: Routine to compare filtered collective chunk io info
+ * entries's original owner fields
+ *
+ * Description: Callback for qsort() to compare filtered collective chunk
+ * io info entries's original owner fields
+ *
+ * Return: The difference between the two
+ * H5D_filtered_collective_io_info_t's original owner fields
+ *
+ * Programmer: Jordan Henderson
+ * Monday, Apr. 10th, 2017
+ *
+ *-------------------------------------------------------------------------
+ */
+static int
+H5D__cmp_filtered_collective_io_info_entry_owner(const void *filtered_collective_io_info_entry1, const void *filtered_collective_io_info_entry2)
+{
+ int owner1 = -1, owner2 = -1;
+
+ FUNC_ENTER_STATIC_NOERR
+
+ owner1 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry1)->owners.original_owner;
+ owner2 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry2)->owners.original_owner;
+
+ FUNC_LEAVE_NOAPI(owner1 - owner2)
+} /* end H5D__cmp_filtered_collective_io_info_entry_owner() */
+
+
+/*-------------------------------------------------------------------------
* Function: H5D__sort_chunk
*
* Purpose: Routine to sort chunks in increasing order of chunk address
@@ -1557,7 +2413,7 @@ if(H5DEBUG(D))
HDfprintf(H5DEBUG(D), "Coming inside H5D_OBTAIN_ALL_CHUNK_ADDR_COL\n");
#endif
/* Allocate array for chunk addresses */
- if(NULL == (total_chunk_addr_array = H5MM_malloc(sizeof(haddr_t) * (size_t)fm->layout->u.chunk.nchunks)))
+ if(NULL == (total_chunk_addr_array = (haddr_t *)H5MM_malloc(sizeof(haddr_t) * (size_t)fm->layout->u.chunk.nchunks)))
HGOTO_ERROR(H5E_RESOURCE, H5E_NOSPACE, FAIL, "unable to allocate memory chunk address array")
/* Retrieve all the chunk addresses with process 0 */
@@ -1581,7 +2437,7 @@ if(H5DEBUG(D))
/* Iterate over all chunks for this process */
while(chunk_node) {
- if(NULL == (chunk_info = H5SL_item(chunk_node)))
+ if(NULL == (chunk_info = (H5D_chunk_info_t *)H5SL_item(chunk_node)))
HGOTO_ERROR(H5E_STORAGE, H5E_CANTGET, FAIL,"couldn't get chunk info from skipped list")
if(many_chunk_opt == H5D_OBTAIN_ONE_CHUNK_ADDR_IND) {
@@ -1666,7 +2522,7 @@ static herr_t
H5D__obtain_mpio_mode(H5D_io_info_t* io_info, H5D_chunk_map_t *fm,
H5P_genplist_t *dx_plist, uint8_t assign_io_mode[], haddr_t chunk_addr[])
{
- int total_chunks;
+ size_t total_chunks;
unsigned percent_nproc_per_chunk, threshold_nproc_per_chunk;
uint8_t* io_mode_info = NULL;
uint8_t* recv_io_mode_info = NULL;
@@ -1676,7 +2532,8 @@ H5D__obtain_mpio_mode(H5D_io_info_t* io_info, H5D_chunk_map_t *fm,
H5D_chunk_info_t* chunk_info;
int mpi_size, mpi_rank;
MPI_Comm comm;
- int ic, root;
+ int root;
+ size_t ic;
int mpi_code;
#ifdef H5_HAVE_INSTRUMENTED_LIBRARY
int new_value;
@@ -1697,7 +2554,7 @@ H5D__obtain_mpio_mode(H5D_io_info_t* io_info, H5D_chunk_map_t *fm,
HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size")
/* Setup parameters */
- H5_CHECKED_ASSIGN(total_chunks, int, fm->layout->u.chunk.nchunks, hsize_t);
+ H5_CHECKED_ASSIGN(total_chunks, size_t, fm->layout->u.chunk.nchunks, hsize_t);
if(H5P_get(dx_plist, H5D_XFER_MPIO_CHUNK_OPT_RATIO_NAME, &percent_nproc_per_chunk) < 0)
HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't get percent nproc per chunk")
/* if ratio is 0, perform collective io */
@@ -1709,39 +2566,42 @@ H5D__obtain_mpio_mode(H5D_io_info_t* io_info, H5D_chunk_map_t *fm,
HGOTO_DONE(SUCCEED)
} /* end if */
- threshold_nproc_per_chunk = mpi_size * percent_nproc_per_chunk/100;
+
+ threshold_nproc_per_chunk = (unsigned)mpi_size * percent_nproc_per_chunk/100;
/* Allocate memory */
if(NULL == (io_mode_info = (uint8_t *)H5MM_calloc(total_chunks)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate I/O mode info buffer")
- if(NULL == (mergebuf = H5MM_malloc((sizeof(haddr_t) + 1) * total_chunks)))
+ if(NULL == (mergebuf = (uint8_t *)H5MM_malloc((sizeof(haddr_t) + 1) * total_chunks)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate mergebuf buffer")
tempbuf = mergebuf + total_chunks;
if(mpi_rank == root)
- if(NULL == (recv_io_mode_info = (uint8_t *)H5MM_malloc(total_chunks * mpi_size)))
+ if(NULL == (recv_io_mode_info = (uint8_t *)H5MM_malloc(total_chunks * (size_t)mpi_size)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate recv I/O mode info buffer")
/* Obtain the regularity and selection information for all chunks in this process. */
chunk_node = H5SL_first(fm->sel_chunks);
while(chunk_node) {
- chunk_info = H5SL_item(chunk_node);
+ chunk_info = (H5D_chunk_info_t *)H5SL_item(chunk_node);
- io_mode_info[chunk_info->index] = H5D_CHUNK_SELECT_REG; /* this chunk is selected and is "regular" */
+ io_mode_info[chunk_info->index] = H5D_CHUNK_SELECT_REG; /* this chunk is selected and is "regular" */
chunk_node = H5SL_next(chunk_node);
} /* end while */
/* Gather all the information */
- if(MPI_SUCCESS != (mpi_code = MPI_Gather(io_mode_info, total_chunks, MPI_BYTE, recv_io_mode_info, total_chunks, MPI_BYTE, root, comm)))
+ H5_CHECK_OVERFLOW(total_chunks, size_t, int)
+ if(MPI_SUCCESS != (mpi_code = MPI_Gather(io_mode_info, (int)total_chunks, MPI_BYTE,
+ recv_io_mode_info, (int)total_chunks, MPI_BYTE, root, comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Gather failed", mpi_code)
/* Calculate the mode for IO(collective, independent or none) at root process */
if(mpi_rank == root) {
- int nproc;
- int* nproc_per_chunk;
+ size_t nproc;
+ unsigned* nproc_per_chunk;
/* pre-computing: calculate number of processes and
regularity of the selection occupied in each chunk */
- if(NULL == (nproc_per_chunk = (int*)H5MM_calloc(total_chunks * sizeof(int))))
+ if(NULL == (nproc_per_chunk = (unsigned*)H5MM_calloc(total_chunks * sizeof(unsigned))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate nproc_per_chunk buffer")
/* calculating the chunk address */
@@ -1751,7 +2611,7 @@ H5D__obtain_mpio_mode(H5D_io_info_t* io_info, H5D_chunk_map_t *fm,
} /* end if */
/* checking for number of process per chunk and regularity of the selection*/
- for(nproc = 0; nproc < mpi_size; nproc++) {
+ for(nproc = 0; nproc < (size_t)mpi_size; nproc++) {
uint8_t *tmp_recv_io_mode_info = recv_io_mode_info + (nproc * total_chunks);
/* Calculate the number of process per chunk and adding irregular selection option */
@@ -1835,5 +2695,712 @@ done:
FUNC_LEAVE_NOAPI(ret_value)
} /* end H5D__obtain_mpio_mode() */
+
+
+/*-------------------------------------------------------------------------
+ * Function: H5D__construct_filtered_io_info_list
+ *
+ * Purpose: Constructs a list of entries which contain the necessary
+ * information for inter-process communication when performing
+ * collective io on filtered chunks. This list is used by
+ * each process when performing I/O on locally selected chunks
+ * and also in operations that must be collectively done
+ * on every chunk, such as chunk re-allocation, insertion of
+ * chunks into the chunk index, etc.
+ *
+ * Return: Non-negative on success/Negative on failure
+ *
+ * Programmer: Jordan Henderson
+ * Tuesday, January 10th, 2017
+ *
+ *-------------------------------------------------------------------------
+ */
+static herr_t
+H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_type_info_t *type_info,
+ const H5D_chunk_map_t *fm, H5D_filtered_collective_io_info_t **chunk_list, size_t *num_entries)
+{
+ H5D_filtered_collective_io_info_t *local_info_array = NULL; /* The list of initially selected chunks for this process */
+ size_t num_chunks_selected;
+ size_t i;
+ int mpi_rank;
+ herr_t ret_value = SUCCEED;
+
+ FUNC_ENTER_STATIC
+
+ HDassert(io_info);
+ HDassert(type_info);
+ HDassert(fm);
+ HDassert(chunk_list);
+ HDassert(num_entries);
+ HDassert(TRUE == H5P_isa_class(io_info->raw_dxpl_id, H5P_DATASET_XFER));
+
+ if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank")
+
+ /* Each process builds a local list of the chunks they have selected */
+ if ((num_chunks_selected = H5SL_count(fm->sel_chunks))) {
+ H5D_chunk_info_t *chunk_info;
+ H5D_chunk_ud_t udata;
+ H5SL_node_t *chunk_node;
+ hssize_t select_npoints;
+ hssize_t chunk_npoints;
+
+ if (NULL == (local_info_array = (H5D_filtered_collective_io_info_t *) H5MM_malloc(num_chunks_selected * sizeof(H5D_filtered_collective_io_info_t))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate local io info array buffer")
+
+ chunk_node = H5SL_first(fm->sel_chunks);
+ for (i = 0; chunk_node; i++) {
+ chunk_info = (H5D_chunk_info_t *) H5SL_item(chunk_node);
+
+ /* Obtain this chunk's address */
+ if (H5D__chunk_lookup(io_info->dset, io_info->md_dxpl_id, chunk_info->scaled, &udata) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "error looking up chunk address")
+
+ local_info_array[i].index = chunk_info->index;
+ local_info_array[i].chunk_states.chunk_current = local_info_array[i].chunk_states.new_chunk = udata.chunk_block;
+ local_info_array[i].num_writers = 0;
+ local_info_array[i].owners.original_owner = local_info_array[i].owners.new_owner = mpi_rank;
+ local_info_array[i].buf = NULL;
+
+ local_info_array[i].async_info.num_receive_requests = 0;
+ local_info_array[i].async_info.receive_buffer_array = NULL;
+ local_info_array[i].async_info.receive_requests_array = NULL;
+
+ HDmemcpy(local_info_array[i].scaled, chunk_info->scaled, sizeof(chunk_info->scaled));
+
+ if ((select_npoints = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
+ local_info_array[i].io_size = (size_t) select_npoints * type_info->src_type_size;
+
+ /* Currently the full overwrite status of a chunk is only obtained on a per-process
+ * basis. This means that if the total selection in the chunk, as determined by the combination
+ * of selections of all of the processes interested in the chunk, covers the entire chunk,
+ * the performance optimization of not reading the chunk from the file is still valid, but
+ * is not applied in the current implementation. Something like an appropriately placed
+ * MPI_Allreduce or a running total of the number of chunk points selected during chunk
+ * redistribution should suffice for implementing this case - JTH.
+ */
+ if ((chunk_npoints = H5S_GET_EXTENT_NPOINTS(chunk_info->fspace)) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
+ local_info_array[i].full_overwrite =
+ (local_info_array[i].io_size >= (hsize_t) chunk_npoints * type_info->dst_type_size) ? TRUE : FALSE;
+
+ chunk_node = H5SL_next(chunk_node);
+ } /* end for */
+ } /* end if */
+
+ /* Redistribute shared chunks to new owners as necessary */
+ if (io_info->op_type == H5D_IO_OP_WRITE)
+ if (H5D__chunk_redistribute_shared_chunks(io_info, type_info, fm, local_info_array, &num_chunks_selected) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "unable to redistribute shared chunks")
+
+ *chunk_list = local_info_array;
+ *num_entries = num_chunks_selected;
+
+done:
+ FUNC_LEAVE_NOAPI(ret_value)
+} /* end H5D__construct_filtered_io_info_list() */
+
+
+/*-------------------------------------------------------------------------
+ * Function: H5D__chunk_redistribute_shared_chunks
+ *
+ * Purpose: When performing a collective write on a Dataset with
+ * filters applied, this function is used to redistribute any
+ * chunks which are selected by more than one process, so as
+ * to preserve file integrity after the write by ensuring
+ * that any shared chunks are only modified by one process.
+ *
+ * The current implementation follows this 3-phase process:
+ *
+ * - Collect everyone's list of chunks into one large list,
+ * sort the list in increasing order of chunk offset in the
+ * file and hand the list off to rank 0
+ *
+ * - Rank 0 scans the list looking for matching runs of chunk
+ * offset in the file (corresponding to a shared chunk which
+ * has been selected by more than one rank in the I/O
+ * operation) and for each shared chunk, it redistributes
+ * the chunk to the process writing to the chunk which
+ * currently has the least amount of chunks assigned to it
+ * by modifying the "new_owner" field in each of the list
+ * entries corresponding to that chunk
+ *
+ * - After the chunks have been redistributed, rank 0 re-sorts
+ * the list in order of previous owner so that each rank
+ * will get back exactly the array that they contributed to
+ * the redistribution operation, with the "new_owner" field
+ * of each chunk they are modifying having possibly been
+ * modified. Rank 0 then scatters each segment of the list
+ * back to its corresponding rank
+ *
+ * Return: Non-negative on success/Negative on failure
+ *
+ * Programmer: Jordan Henderson
+ * Monday, May 1, 2017
+ *
+ *-------------------------------------------------------------------------
+ */
+static herr_t
+H5D__chunk_redistribute_shared_chunks(const H5D_io_info_t *io_info, const H5D_type_info_t *type_info,
+ const H5D_chunk_map_t *fm, H5D_filtered_collective_io_info_t *local_chunk_array, size_t *local_chunk_array_num_entries)
+{
+ H5D_filtered_collective_io_info_t *shared_chunks_info_array = NULL; /* The list of all chunks selected in the operation by all processes */
+ H5S_sel_iter_t *mem_iter = NULL; /* Memory iterator for H5D__gather_mem */
+ unsigned char **mod_data = NULL; /* Array of chunk modification data buffers sent by a process to new chunk owners */
+ MPI_Request *send_requests = NULL; /* Array of MPI_Isend chunk modification data send requests */
+ MPI_Status *send_statuses = NULL; /* Array of MPI_Isend chunk modification send statuses */
+ hbool_t mem_iter_init = FALSE;
+ size_t shared_chunks_info_array_num_entries = 0;
+ size_t num_send_requests = 0;
+ size_t *num_assigned_chunks_array = NULL;
+ size_t i, last_assigned_idx;
+ int *send_counts = NULL;
+ int *send_displacements = NULL;
+ int scatter_recvcount_int;
+ int mpi_rank, mpi_size, mpi_code;
+ herr_t ret_value = SUCCEED;
+
+ FUNC_ENTER_STATIC
+
+ HDassert(io_info);
+ HDassert(type_info);
+ HDassert(fm);
+ HDassert(local_chunk_array_num_entries);
+
+ if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank")
+ if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size")
+
+ if (*local_chunk_array_num_entries)
+ if (NULL == (send_requests = (MPI_Request *) H5MM_malloc(*local_chunk_array_num_entries * sizeof(MPI_Request))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send requests buffer")
+
+ if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(H5S_sel_iter_t))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator")
+
+ /* Gather every rank's list of chunks to rank 0 to allow it to perform the redistribution operation. After this
+ * call, the gathered list will initially be sorted in increasing order of chunk offset in the file.
+ */
+ if (H5D__mpio_array_gatherv(local_chunk_array, *local_chunk_array_num_entries, sizeof(H5D_filtered_collective_io_info_t),
+ (void **) &shared_chunks_info_array, &shared_chunks_info_array_num_entries, mpi_size,
+ false, 0, io_info->comm, H5D__cmp_filtered_collective_io_info_entry) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather array")
+
+ /* Rank 0 redistributes any shared chunks to new owners as necessary */
+ if (mpi_rank == 0) {
+ if (NULL == (send_counts = (int *) H5MM_calloc((size_t) mpi_size * sizeof(int))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate send counts buffer")
+
+ if (NULL == (send_displacements = (int *) H5MM_malloc((size_t) mpi_size * sizeof(int))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate send displacements buffer")
+
+ if (NULL == (num_assigned_chunks_array = (size_t *) H5MM_calloc((size_t) mpi_size * sizeof(size_t))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate number of assigned chunks array")
+
+ for (i = 0; i < shared_chunks_info_array_num_entries;) {
+ H5D_filtered_collective_io_info_t chunk_entry;
+ haddr_t last_seen_addr = shared_chunks_info_array[i].chunk_states.chunk_current.offset;
+ size_t set_begin_index = i;
+ size_t num_writers = 0;
+ int new_chunk_owner = shared_chunks_info_array[i].owners.original_owner;
+
+ /* Process each set of duplicate entries caused by another process writing to the same chunk */
+ do {
+ chunk_entry = shared_chunks_info_array[i];
+
+ send_counts[chunk_entry.owners.original_owner] += (int) sizeof(chunk_entry);
+
+ /* The new owner of the chunk is determined by the process
+ * writing to the chunk which currently has the least amount
+ * of chunks assigned to it
+ */
+ if (num_assigned_chunks_array[chunk_entry.owners.original_owner] < num_assigned_chunks_array[new_chunk_owner])
+ new_chunk_owner = chunk_entry.owners.original_owner;
+
+ num_writers++;
+ } while (++i < shared_chunks_info_array_num_entries && shared_chunks_info_array[i].chunk_states.chunk_current.offset == last_seen_addr);
+
+ /* Set all of the chunk entries' "new_owner" fields */
+ for (; set_begin_index < i; set_begin_index++) {
+ shared_chunks_info_array[set_begin_index].owners.new_owner = new_chunk_owner;
+ shared_chunks_info_array[set_begin_index].num_writers = num_writers;
+ } /* end for */
+
+ num_assigned_chunks_array[new_chunk_owner]++;
+ } /* end for */
+
+ /* Sort the new list in order of previous owner so that each original owner of a chunk
+ * entry gets that entry back, with the possibly newly-modified "new_owner" field
+ */
+ HDqsort(shared_chunks_info_array, shared_chunks_info_array_num_entries,
+ sizeof(H5D_filtered_collective_io_info_t), H5D__cmp_filtered_collective_io_info_entry_owner);
+
+ send_displacements[0] = 0;
+ for (i = 1; i < (size_t) mpi_size; i++)
+ send_displacements[i] = send_displacements[i - 1] + send_counts[i - 1];
+ } /* end if */
+
+ /* Scatter the segments of the list back to each process */
+ H5_CHECKED_ASSIGN(scatter_recvcount_int, int, *local_chunk_array_num_entries * sizeof(H5D_filtered_collective_io_info_t), size_t);
+ if (MPI_SUCCESS != (mpi_code = MPI_Scatterv(shared_chunks_info_array, send_counts, send_displacements,
+ MPI_BYTE, local_chunk_array, scatter_recvcount_int, MPI_BYTE, 0, io_info->comm)))
+ HMPI_GOTO_ERROR(FAIL, "unable to scatter shared chunks info buffer", mpi_code)
+
+ if (shared_chunks_info_array) {
+ H5MM_free(shared_chunks_info_array);
+ shared_chunks_info_array = NULL;
+ } /* end if */
+
+ /* Now that the chunks have been redistributed, each process must send its modification data
+ * to the new owners of any of the chunks it previously possessed. Accordingly, each process
+ * must also issue asynchronous receives for any messages it may receive for each of the
+ * chunks it is assigned, in order to avoid potential deadlocking issues.
+ */
+ if (*local_chunk_array_num_entries)
+ if (NULL == (mod_data = (unsigned char **) H5MM_malloc(*local_chunk_array_num_entries * sizeof(unsigned char *))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate modification data buffer array")
+
+ for (i = 0, last_assigned_idx = 0; i < *local_chunk_array_num_entries; i++) {
+ H5D_filtered_collective_io_info_t *chunk_entry = &local_chunk_array[i];
+
+ if (mpi_rank != chunk_entry->owners.new_owner) {
+ H5D_chunk_info_t *chunk_info = NULL;
+ unsigned char *mod_data_p = NULL;
+ hssize_t iter_nelmts;
+ size_t mod_data_size;
+
+ /* Look up the chunk and get its file and memory dataspaces */
+ if (NULL == (chunk_info = (H5D_chunk_info_t *) H5SL_search(fm->sel_chunks, &chunk_entry->index)))
+ HGOTO_ERROR(H5E_DATASPACE, H5E_NOTFOUND, FAIL, "can't locate chunk in skip list")
+
+ /* Determine size of serialized chunk file dataspace, plus the size of
+ * the data being written
+ */
+ if (H5S_encode(chunk_info->fspace, &mod_data_p, &mod_data_size) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to get encoded dataspace size")
+
+ if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
+
+ mod_data_size += (size_t) iter_nelmts * type_info->src_type_size;
+
+ if (NULL == (mod_data[num_send_requests] = (unsigned char *) H5MM_malloc(mod_data_size)))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification send buffer")
+
+ /* Serialize the chunk's file dataspace into the buffer */
+ mod_data_p = mod_data[num_send_requests];
+ if (H5S_encode(chunk_info->fspace, &mod_data_p, &mod_data_size) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to encode dataspace")
+
+ /* Intialize iterator for memory selection */
+ if (H5S_select_iter_init(mem_iter, chunk_info->mspace, type_info->src_type_size) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
+ mem_iter_init = TRUE;
+
+ /* Collect the modification data into the buffer */
+ if (!H5D__gather_mem(io_info->u.wbuf, chunk_info->mspace, mem_iter,
+ (size_t) iter_nelmts, io_info->dxpl_cache, mod_data_p))
+ HGOTO_ERROR(H5E_IO, H5E_CANTGATHER, FAIL, "couldn't gather from write buffer")
+
+ /* Send modification data to new owner */
+ H5_CHECK_OVERFLOW(mod_data_size, size_t, int)
+ H5_CHECK_OVERFLOW(chunk_entry->index, hsize_t, int)
+ if (MPI_SUCCESS != (mpi_code = MPI_Isend(mod_data[num_send_requests], (int) mod_data_size, MPI_BYTE,
+ chunk_entry->owners.new_owner, (int) chunk_entry->index, io_info->comm, &send_requests[num_send_requests])))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code)
+
+ if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release memory selection iterator")
+ mem_iter_init = FALSE;
+
+ num_send_requests++;
+ } /* end if */
+ else {
+ /* Allocate all necessary buffers for an asynchronous receive operation */
+ if (chunk_entry->num_writers > 1) {
+ MPI_Message message;
+ MPI_Status status;
+ size_t j;
+
+ chunk_entry->async_info.num_receive_requests = (int) chunk_entry->num_writers - 1;
+ if (NULL == (chunk_entry->async_info.receive_requests_array = (MPI_Request *) H5MM_malloc((size_t) chunk_entry->async_info.num_receive_requests * sizeof(MPI_Request))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate async requests array")
+
+ if (NULL == (chunk_entry->async_info.receive_buffer_array = (unsigned char **) H5MM_malloc((size_t) chunk_entry->async_info.num_receive_requests * sizeof(unsigned char *))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate async receive buffers")
+
+ for (j = 0; j < chunk_entry->num_writers - 1; j++) {
+ int count = 0;
+
+ /* Probe for a particular message from any process, removing that message
+ * from the receive queue in the process and allocating that much memory
+ * for the asynchronous receive
+ */
+ if (MPI_SUCCESS != (mpi_code = MPI_Mprobe(MPI_ANY_SOURCE, (int) chunk_entry->index, io_info->comm, &message, &status)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Mprobe failed", mpi_code)
+
+ if (MPI_SUCCESS != (mpi_code = MPI_Get_count(&status, MPI_BYTE, &count)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Get_count failed", mpi_code)
+
+ HDassert(count >= 0);
+ if (NULL == (chunk_entry->async_info.receive_buffer_array[j] = (unsigned char *) H5MM_malloc((size_t) count * sizeof(char *))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate modification data receive buffer")
+
+ if (MPI_SUCCESS != (mpi_code = MPI_Imrecv(chunk_entry->async_info.receive_buffer_array[j], count, MPI_BYTE,
+ &message, &chunk_entry->async_info.receive_requests_array[j])))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Imrecv failed", mpi_code)
+ } /* end for */
+ } /* end if */
+
+ local_chunk_array[last_assigned_idx++] = local_chunk_array[i];
+ } /* end else */
+ } /* end for */
+
+ *local_chunk_array_num_entries = last_assigned_idx;
+
+ /* Wait for all async send requests to complete before returning */
+ if (num_send_requests) {
+ if (NULL == (send_statuses = (MPI_Status *) H5MM_malloc(num_send_requests * sizeof(MPI_Status))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send statuses buffer")
+
+ H5_CHECK_OVERFLOW(num_send_requests, size_t, int);
+ if (MPI_SUCCESS != (mpi_code = MPI_Waitall((int) num_send_requests, send_requests, send_statuses)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code)
+ } /* end if */
+
+done:
+ /* Now that all async send requests have completed, free up the send
+ * buffers used in the async operations
+ */
+ for (i = 0; i < num_send_requests; i++) {
+ if (mod_data[i])
+ H5MM_free(mod_data[i]);
+ } /* end for */
+
+ if (send_requests)
+ H5MM_free(send_requests);
+ if (send_statuses)
+ H5MM_free(send_statuses);
+ if (send_counts)
+ H5MM_free(send_counts);
+ if (send_displacements)
+ H5MM_free(send_displacements);
+ if (mod_data)
+ H5MM_free(mod_data);
+ if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
+ HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
+ if (mem_iter)
+ H5MM_free(mem_iter);
+ if (num_assigned_chunks_array)
+ H5MM_free(num_assigned_chunks_array);
+ if (shared_chunks_info_array)
+ H5MM_free(shared_chunks_info_array);
+
+ FUNC_LEAVE_NOAPI(ret_value)
+} /* end H5D__chunk_redistribute_shared_chunks() */
+
+
+/*-------------------------------------------------------------------------
+ * Function: H5D__mpio_filtered_collective_write_type
+ *
+ * Purpose: Constructs a MPI derived datatype for both the memory and
+ * the file for a collective write of filtered chunks. The
+ * datatype contains the offsets in the file and the locations
+ * of the filtered chunk data buffers.
+ *
+ * Return: Non-negative on success/Negative on failure
+ *
+ * Programmer: Jordan Henderson
+ * Tuesday, November 22, 2016
+ *
+ *-------------------------------------------------------------------------
+ */
+static herr_t
+H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chunk_list,
+ size_t num_entries, MPI_Datatype *new_mem_type, hbool_t *mem_type_derived,
+ MPI_Datatype *new_file_type, hbool_t *file_type_derived)
+{
+ MPI_Aint *write_buf_array = NULL; /* Relative displacements of filtered chunk data buffers */
+ MPI_Aint *file_offset_array = NULL; /* Chunk offsets in the file */
+ int *length_array = NULL; /* Filtered Chunk lengths */
+ herr_t ret_value = SUCCEED;
+
+ FUNC_ENTER_STATIC
+
+ HDassert(chunk_list);
+ HDassert(new_mem_type);
+ HDassert(mem_type_derived);
+ HDassert(new_file_type);
+ HDassert(file_type_derived);
+
+ if (num_entries > 0) {
+ size_t i;
+ int mpi_code;
+ void *base_buf;
+
+ H5_CHECK_OVERFLOW(num_entries, size_t, int);
+
+ /* Allocate arrays */
+ if (NULL == (length_array = (int *) H5MM_malloc((size_t) num_entries * sizeof(int))))
+ HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "memory allocation failed for filtered collective write length array")
+ if (NULL == (write_buf_array = (MPI_Aint *) H5MM_malloc((size_t) num_entries * sizeof(MPI_Aint))))
+ HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "memory allocation failed for filtered collective write buf length array")
+ if (NULL == (file_offset_array = (MPI_Aint *) H5MM_malloc((size_t) num_entries * sizeof(MPI_Aint))))
+ HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "memory allocation failed for collective write offset array")
+
+ /* Ensure the list is sorted in ascending order of offset in the file */
+ HDqsort(chunk_list, num_entries, sizeof(H5D_filtered_collective_io_info_t), H5D__cmp_filtered_collective_io_info_entry);
+
+ base_buf = chunk_list[0].buf;
+ for (i = 0; i < num_entries; i++) {
+ /* Set up the offset in the file, the length of the chunk data, and the relative
+ * displacement of the chunk data write buffer
+ */
+ file_offset_array[i] = (MPI_Aint) chunk_list[i].chunk_states.new_chunk.offset;
+ length_array[i] = (int) chunk_list[i].chunk_states.new_chunk.length;
+ write_buf_array[i] = (MPI_Aint) chunk_list[i].buf - (MPI_Aint) base_buf;
+ } /* end for */
+
+ /* Create memory MPI type */
+ if (MPI_SUCCESS != (mpi_code = MPI_Type_create_hindexed((int) num_entries, length_array, write_buf_array, MPI_BYTE, new_mem_type)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code)
+ *mem_type_derived = TRUE;
+ if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_mem_type)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
+
+ /* Create file MPI type */
+ if (MPI_SUCCESS != (mpi_code = MPI_Type_create_hindexed((int) num_entries, length_array, file_offset_array, MPI_BYTE, new_file_type)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code)
+ *file_type_derived = TRUE;
+ if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_file_type)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
+ } /* end if */
+
+done:
+ if (write_buf_array)
+ H5MM_free(write_buf_array);
+ if (file_offset_array)
+ H5MM_free(file_offset_array);
+ if (length_array)
+ H5MM_free(length_array);
+
+ FUNC_LEAVE_NOAPI(ret_value)
+} /* end H5D__mpio_filtered_collective_write_type() */
+
+
+/*-------------------------------------------------------------------------
+ * Function: H5D__filtered_collective_chunk_entry_io
+ *
+ * Purpose: Given an entry for a filtered chunk, performs the necessary
+ * steps for updating the chunk data during a collective
+ * write, or for reading the chunk from file during a
+ * collective read.
+ *
+ * Return: Non-negative on success/Negative on failure
+ *
+ * Programmer: Jordan Henderson
+ * Wednesday, January 18, 2017
+ *
+ *-------------------------------------------------------------------------
+ */
+static herr_t
+H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk_entry,
+ const H5D_io_info_t *io_info, const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm)
+{
+ H5D_chunk_info_t *chunk_info = NULL;
+ H5S_sel_iter_t *mem_iter = NULL; /* Memory iterator for H5D__scatter_mem/H5D__gather_mem */
+ unsigned char *mod_data = NULL; /* Chunk modification data sent by a process to a chunk's owner */
+ unsigned filter_mask = 0;
+ hssize_t iter_nelmts; /* Number of points to iterate over for the chunk IO operation */
+ hssize_t extent_npoints;
+ hsize_t true_chunk_size;
+ hbool_t mem_iter_init = FALSE;
+ size_t buf_size;
+ size_t i;
+ H5S_t *dataspace = NULL; /* Other process' dataspace for the chunk */
+ void *tmp_gath_buf = NULL; /* Temporary gather buffer for owner of the chunk to gather into from
+ application write buffer before scattering out to the chunk data buffer */
+ int mpi_code;
+ herr_t ret_value = SUCCEED;
+
+ FUNC_ENTER_STATIC
+
+ HDassert(chunk_entry);
+ HDassert(io_info);
+ HDassert(type_info);
+ HDassert(fm);
+
+ /* Look up the chunk and get its file and memory dataspaces */
+ if (NULL == (chunk_info = (H5D_chunk_info_t *) H5SL_search(fm->sel_chunks, &chunk_entry->index)))
+ HGOTO_ERROR(H5E_DATASPACE, H5E_NOTFOUND, FAIL, "can't locate chunk in skip list")
+
+ if ((extent_npoints = H5S_GET_EXTENT_NPOINTS(chunk_info->fspace)) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
+ true_chunk_size = (hsize_t) extent_npoints * type_info->src_type_size;
+
+ /* If the size of the filtered chunk is larger than the number of points in the
+ * chunk file space extent times the datatype size, allocate enough space to hold the
+ * whole filtered chunk. Otherwise, allocate a buffer equal to the size of the
+ * chunk so that the unfiltering operation doesn't have to grow the buffer.
+ */
+ buf_size = MAX(chunk_entry->chunk_states.chunk_current.length, true_chunk_size);
+
+ if (NULL == (chunk_entry->buf = H5MM_malloc(buf_size)))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer")
+
+ /* If this is not a full chunk overwrite or this is a read operation, the chunk must be
+ * read from the file and unfiltered.
+ */
+ if (!chunk_entry->full_overwrite || io_info->op_type == H5D_IO_OP_READ) {
+ chunk_entry->chunk_states.new_chunk.length = chunk_entry->chunk_states.chunk_current.length;
+
+ /* Currently, these chunk reads are done independently and will likely
+ * cause issues with collective metadata reads enabled. In the future,
+ * this should be refactored to use collective chunk reads - JTH */
+ if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_entry->chunk_states.chunk_current.offset,
+ chunk_entry->chunk_states.new_chunk.length, H5AC_rawdata_dxpl_id, chunk_entry->buf) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "unable to read raw data chunk")
+
+ if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, &filter_mask,
+ io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb,
+ (size_t *) &chunk_entry->chunk_states.new_chunk.length, &buf_size, &chunk_entry->buf) < 0)
+ HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, FAIL, "couldn't unfilter chunk for modifying")
+ } /* end if */
+ else {
+ chunk_entry->chunk_states.new_chunk.length = true_chunk_size;
+ } /* end else */
+
+ /* Initialize iterator for memory selection */
+ if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(H5S_sel_iter_t))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator")
+
+ if (H5S_select_iter_init(mem_iter, chunk_info->mspace, type_info->src_type_size) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
+ mem_iter_init = TRUE;
+
+ if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
+
+ /* If this is a read operation, scatter the read chunk data to the user's buffer.
+ *
+ * If this is a write operation, update the chunk data buffer with the modifications
+ * from the current process, then apply any modifications from other processes. Finally,
+ * filter the newly-updated chunk.
+ */
+ switch (io_info->op_type) {
+ case H5D_IO_OP_READ:
+ if (H5D__scatter_mem(chunk_entry->buf, chunk_info->mspace, mem_iter,
+ (size_t) iter_nelmts, io_info->dxpl_cache, io_info->u.rbuf) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't scatter to read buffer")
+ break;
+
+ case H5D_IO_OP_WRITE:
+ if (NULL == (tmp_gath_buf = H5MM_malloc((hsize_t) iter_nelmts * type_info->src_type_size)))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate temporary gather buffer")
+
+ /* Gather modification data from the application write buffer into a temporary buffer */
+ if (!H5D__gather_mem(io_info->u.wbuf, chunk_info->mspace, mem_iter,
+ (size_t) iter_nelmts, io_info->dxpl_cache, tmp_gath_buf))
+ HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't gather from write buffer")
+
+ if (H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
+ mem_iter_init = FALSE;
+
+ /* Initialize iterator for file selection */
+ if (H5S_select_iter_init(mem_iter, chunk_info->fspace, type_info->dst_type_size) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize file selection information")
+ mem_iter_init = TRUE;
+
+ if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->fspace)) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
+
+ /* Scatter the owner's modification data into the chunk data buffer according to
+ * the file space.
+ */
+ if (H5D__scatter_mem(tmp_gath_buf, chunk_info->fspace, mem_iter,
+ (size_t) iter_nelmts, io_info->dxpl_cache, chunk_entry->buf) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't scatter to chunk data buffer")
+
+ if (H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
+ mem_iter_init = FALSE;
+
+ if (MPI_SUCCESS != (mpi_code = MPI_Waitall(chunk_entry->async_info.num_receive_requests,
+ chunk_entry->async_info.receive_requests_array, MPI_STATUSES_IGNORE)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code)
+
+ /* For each asynchronous receive call previously posted, receive the chunk modification
+ * buffer from another rank and update the chunk data
+ */
+ for (i = 0; i < (size_t) chunk_entry->async_info.num_receive_requests; i++) {
+ const unsigned char *mod_data_p;
+
+ /* Decode the process' chunk file dataspace */
+ mod_data_p = chunk_entry->async_info.receive_buffer_array[i];
+ if (NULL == (dataspace = H5S_decode(&mod_data_p)))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTDECODE, FAIL, "unable to decode dataspace")
+
+ if (H5S_select_iter_init(mem_iter, dataspace, type_info->dst_type_size) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
+ mem_iter_init = TRUE;
+
+ if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(dataspace)) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
+
+ /* Update the chunk data with the received modification data */
+ if (H5D__scatter_mem(mod_data_p, dataspace, mem_iter, (size_t) iter_nelmts,
+ io_info->dxpl_cache, chunk_entry->buf) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't scatter to write buffer")
+
+ if (H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
+ mem_iter_init = FALSE;
+ if (dataspace) {
+ if (H5S_close(dataspace) < 0)
+ HGOTO_ERROR(H5E_DATASPACE, H5E_CANTFREE, FAIL, "can't close dataspace")
+ dataspace = NULL;
+ }
+ H5MM_free(chunk_entry->async_info.receive_buffer_array[i]);
+ } /* end for */
+
+ /* Filter the chunk */
+ if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, &filter_mask,
+ io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb,
+ (size_t *) &chunk_entry->chunk_states.new_chunk.length, &buf_size, &chunk_entry->buf) < 0)
+ HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "output pipeline failed")
+
+#if H5_SIZEOF_SIZE_T > 4
+ /* Check for the chunk expanding too much to encode in a 32-bit value */
+ if (chunk_entry->chunk_states.new_chunk.length > ((size_t) 0xffffffff))
+ HGOTO_ERROR(H5E_DATASET, H5E_BADRANGE, FAIL, "chunk too large for 32-bit length")
+#endif
+
+ break;
+ default:
+ HGOTO_ERROR(H5E_DATASET, H5E_BADVALUE, FAIL, "invalid I/O operation")
+ } /* end switch */
+
+done:
+ if (chunk_entry->async_info.receive_buffer_array)
+ H5MM_free(chunk_entry->async_info.receive_buffer_array);
+ if (chunk_entry->async_info.receive_requests_array)
+ H5MM_free(chunk_entry->async_info.receive_requests_array);
+ if (mod_data)
+ H5MM_free(mod_data);
+ if (tmp_gath_buf)
+ H5MM_free(tmp_gath_buf);
+ if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
+ HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
+ if (mem_iter)
+ H5MM_free(mem_iter);
+ if (dataspace)
+ if (H5S_close(dataspace) < 0)
+ HDONE_ERROR(H5E_DATASPACE, H5E_CANTFREE, FAIL, "can't close dataspace")
+
+ FUNC_LEAVE_NOAPI(ret_value)
+} /* end H5D__filtered_collective_chunk_entry_io() */
#endif /* H5_HAVE_PARALLEL */
diff --git a/src/H5Dpkg.h b/src/H5Dpkg.h
index a6857b9..097fab7 100644
--- a/src/H5Dpkg.h
+++ b/src/H5Dpkg.h
@@ -617,6 +617,9 @@ H5_DLL herr_t H5D__select_write(const H5D_io_info_t *io_info,
H5_DLL herr_t H5D__scatter_mem(const void *_tscat_buf,
const H5S_t *space, H5S_sel_iter_t *iter, size_t nelmts,
const H5D_dxpl_cache_t *dxpl_cache, void *_buf);
+H5_DLL size_t H5D__gather_mem(const void *_buf,
+ const H5S_t *space, H5S_sel_iter_t *iter, size_t nelmts,
+ const H5D_dxpl_cache_t *dxpl_cache, void *_tgath_buf/*out*/);
H5_DLL herr_t H5D__scatgath_read(const H5D_io_info_t *io_info,
const H5D_type_info_t *type_info,
hsize_t nelmts, const H5S_t *file_space, const H5S_t *mem_space);
@@ -666,6 +669,8 @@ H5_DLL herr_t H5D__chunk_lookup(const H5D_t *dset, hid_t dxpl_id,
const hsize_t *scaled, H5D_chunk_ud_t *udata);
H5_DLL herr_t H5D__chunk_allocated(H5D_t *dset, hid_t dxpl_id, hsize_t *nbytes);
H5_DLL herr_t H5D__chunk_allocate(const H5D_io_info_t *io_info, hbool_t full_overwrite, hsize_t old_dim[]);
+H5_DLL herr_t H5D__chunk_file_alloc(const H5D_chk_idx_info_t *idx_info, const H5F_block_t *old_chunk,
+ H5F_block_t *new_chunk, hbool_t *need_insert, hsize_t scaled[]);
H5_DLL herr_t H5D__chunk_update_old_edge_chunks(H5D_t *dset, hid_t dxpl_id,
hsize_t old_dim[]);
H5_DLL herr_t H5D__chunk_prune_by_extent(H5D_t *dset, hid_t dxpl_id,
@@ -768,8 +773,7 @@ H5_DLL herr_t H5D__chunk_collective_write(H5D_io_info_t *io_info,
* memory and the file */
H5_DLL htri_t H5D__mpio_opt_possible(const H5D_io_info_t *io_info,
const H5S_t *file_space, const H5S_t *mem_space,
- const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm,
- H5P_genplist_t *dx_plist);
+ const H5D_type_info_t *type_info, H5P_genplist_t *dx_plist);
#endif /* H5_HAVE_PARALLEL */
diff --git a/src/H5Dscatgath.c b/src/H5Dscatgath.c
index 4625c7a..0ae69ee 100644
--- a/src/H5Dscatgath.c
+++ b/src/H5Dscatgath.c
@@ -47,9 +47,6 @@ static herr_t H5D__scatter_file(const H5D_io_info_t *io_info,
static size_t H5D__gather_file(const H5D_io_info_t *io_info,
const H5S_t *file_space, H5S_sel_iter_t *file_iter, size_t nelmts,
void *buf);
-static size_t H5D__gather_mem(const void *_buf,
- const H5S_t *space, H5S_sel_iter_t *iter, size_t nelmts,
- const H5D_dxpl_cache_t *dxpl_cache, void *_tgath_buf/*out*/);
static herr_t H5D__compound_opt_read(size_t nelmts, const H5S_t *mem_space,
H5S_sel_iter_t *iter, const H5D_dxpl_cache_t *dxpl_cache,
const H5D_type_info_t *type_info, void *user_buf/*out*/);
@@ -303,6 +300,7 @@ H5D__scatter_mem (const void *_tscat_buf, const H5S_t *space,
HDassert(space);
HDassert(iter);
HDassert(nelmts > 0);
+ HDassert(dxpl_cache);
HDassert(buf);
/* Allocate the vector I/O arrays */
@@ -364,7 +362,7 @@ done:
*
*-------------------------------------------------------------------------
*/
-static size_t
+size_t
H5D__gather_mem(const void *_buf, const H5S_t *space,
H5S_sel_iter_t *iter, size_t nelmts, const H5D_dxpl_cache_t *dxpl_cache,
void *_tgath_buf/*out*/)
@@ -387,6 +385,7 @@ H5D__gather_mem(const void *_buf, const H5S_t *space,
HDassert(space);
HDassert(iter);
HDassert(nelmts > 0);
+ HDassert(dxpl_cache);
HDassert(tgath_buf);
/* Allocate the vector I/O arrays */
diff --git a/src/H5Ppublic.h b/src/H5Ppublic.h
index 55b3877..854b1ef 100644
--- a/src/H5Ppublic.h
+++ b/src/H5Ppublic.h
@@ -166,7 +166,7 @@ typedef enum H5D_mpio_no_collective_cause_t {
H5D_MPIO_MPI_OPT_TYPES_ENV_VAR_DISABLED = 0x08,
H5D_MPIO_NOT_SIMPLE_OR_SCALAR_DATASPACES = 0x10,
H5D_MPIO_NOT_CONTIGUOUS_OR_CHUNKED_DATASET = 0x20,
- H5D_MPIO_FILTERS = 0x40
+ H5D_MPIO_NO_COLLECTIVE_MAX_CAUSE = 0x40
} H5D_mpio_no_collective_cause_t;
/********************/
diff --git a/src/H5err.txt b/src/H5err.txt
index 3f5801f..d771956 100644
--- a/src/H5err.txt
+++ b/src/H5err.txt
@@ -243,6 +243,8 @@ MINOR, LINK, H5E_CANTSORT, Can't sort objects
MINOR, MPI, H5E_MPI, Some MPI function failed
MINOR, MPI, H5E_MPIERRSTR, MPI Error String
MINOR, MPI, H5E_CANTRECV, Can't receive data
+MINOR, MPI, H5E_CANTGATHER, Can't gather data
+MINOR, MPI, H5E_NO_INDEPENDENT, Can't perform independent IO
# Heap errors
MINOR, HEAP, H5E_CANTRESTORE, Can't restore condition
diff --git a/src/H5trace.c b/src/H5trace.c
index 9fb8a72..930002f 100644
--- a/src/H5trace.c
+++ b/src/H5trace.c
@@ -621,10 +621,6 @@ H5_trace(const double *returning, const char *func, const char *type, ...)
fprintf(out, "%sH5D_MPIO_NOT_CONTIGUOUS_OR_CHUNKED_DATASET", flag_already_displayed ? " | " : "");
flag_already_displayed = TRUE;
} /* end if */
- if(nocol_cause_mode & H5D_MPIO_FILTERS) {
- fprintf(out, "%sH5D_MPIO_FILTERS", flag_already_displayed ? " | " : "");
- flag_already_displayed = TRUE;
- } /* end if */
/* Display '<none>' if there's no flags set */
if(!flag_already_displayed)
diff --git a/testpar/Makefile.am b/testpar/Makefile.am
index 7029bd5..b0fe0cd 100644
--- a/testpar/Makefile.am
+++ b/testpar/Makefile.am
@@ -23,7 +23,7 @@ AM_CPPFLAGS+=-I$(top_srcdir)/src -I$(top_srcdir)/test
# Test programs. These are our main targets.
#
-TEST_PROG_PARA=t_mpi t_bigio testphdf5 t_cache t_cache_image t_pflush1 t_pflush2 t_pshutdown t_prestart t_init_term t_shapesame
+TEST_PROG_PARA=t_mpi t_bigio testphdf5 t_cache t_cache_image t_pflush1 t_pflush2 t_pshutdown t_prestart t_init_term t_shapesame t_filters_parallel
check_PROGRAMS = $(TEST_PROG_PARA)
diff --git a/testpar/t_dset.c b/testpar/t_dset.c
index b952bf3..65d1bb4 100644
--- a/testpar/t_dset.c
+++ b/testpar/t_dset.c
@@ -2651,11 +2651,8 @@ compress_readAll(void)
nerrors++;
}
- /* Writing to the compressed, chunked dataset in parallel should fail */
- H5E_BEGIN_TRY {
- ret = H5Dwrite(dataset, H5T_NATIVE_INT, H5S_ALL, H5S_ALL, xfer_plist, data_read);
- } H5E_END_TRY;
- VRFY((ret < 0), "H5Dwrite failed");
+ ret = H5Dwrite(dataset, H5T_NATIVE_INT, H5S_ALL, H5S_ALL, xfer_plist, data_read);
+ VRFY((ret >= 0), "H5Dwrite succeeded");
ret = H5Pclose(xfer_plist);
VRFY((ret >= 0), "H5Pclose succeeded");
diff --git a/testpar/t_filters_parallel.c b/testpar/t_filters_parallel.c
new file mode 100644
index 0000000..21a5ce0
--- /dev/null
+++ b/testpar/t_filters_parallel.c
@@ -0,0 +1,2475 @@
+/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
+ * Copyright by The HDF Group. *
+ * Copyright by the Board of Trustees of the University of Illinois. *
+ * All rights reserved. *
+ * *
+ * This file is part of HDF5. The full HDF5 copyright notice, including *
+ * terms governing use, modification, and redistribution, is contained in *
+ * the files COPYING and Copyright.html. COPYING can be found at the root *
+ * of the source code distribution tree; Copyright.html can be found at the *
+ * root level of an installed copy of the electronic HDF5 document set and *
+ * is linked from the top-level documents page. It can also be found at *
+ * http://hdfgroup.org/HDF5/doc/Copyright.html. If you do not have *
+ * access to either file, you may request a copy from help@hdfgroup.org. *
+ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
+
+/*
+ * Programmer: Jordan Henderson
+ * 01/31/2017
+ *
+ * This file contains tests for writing to and reading from
+ * datasets in parallel with filters applied to the data.
+ */
+
+#include "t_filters_parallel.h"
+
+const char *FILENAME[] = {
+ "t_filters_parallel",
+ NULL
+};
+char filenames[1][256];
+
+int nerrors = 0;
+
+#define ARRAY_SIZE(a) sizeof(a) / sizeof(a[0])
+
+static void test_one_chunk_filtered_dataset(void);
+static void test_filtered_dataset_no_overlap(void);
+static void test_filtered_dataset_overlap(void);
+static void test_filtered_dataset_single_no_selection(void);
+static void test_filtered_dataset_all_no_selection(void);
+static void test_filtered_dataset_point_selection(void);
+static void test_filtered_dataset_interleaved_write(void);
+static void test_3d_filtered_dataset_no_overlap_separate_pages(void);
+static void test_3d_filtered_dataset_no_overlap_same_pages(void);
+static void test_3d_filtered_dataset_overlap(void);
+static void test_cmpd_filtered_dataset_no_conversion_unshared(void);
+static void test_cmpd_filtered_dataset_no_conversion_shared(void);
+static void test_cmpd_filtered_dataset_type_conversion_unshared(void);
+static void test_cmpd_filtered_dataset_type_conversion_shared(void);
+static void test_write_serial_read_parallel(void);
+static void test_write_parallel_read_serial(void);
+
+static MPI_Comm comm = MPI_COMM_WORLD;
+static MPI_Info info = MPI_INFO_NULL;
+static int mpi_rank;
+static int mpi_size;
+
+static void (*tests[])(void) = {
+ test_one_chunk_filtered_dataset,
+ test_filtered_dataset_no_overlap,
+ test_filtered_dataset_overlap,
+ test_filtered_dataset_single_no_selection,
+ test_filtered_dataset_all_no_selection,
+ test_filtered_dataset_point_selection,
+ test_filtered_dataset_interleaved_write,
+ test_3d_filtered_dataset_no_overlap_separate_pages,
+ test_3d_filtered_dataset_no_overlap_same_pages,
+ test_3d_filtered_dataset_overlap,
+ test_cmpd_filtered_dataset_no_conversion_unshared,
+ test_cmpd_filtered_dataset_no_conversion_shared,
+ test_cmpd_filtered_dataset_type_conversion_unshared,
+ test_cmpd_filtered_dataset_type_conversion_shared,
+ test_write_serial_read_parallel,
+ test_write_parallel_read_serial,
+};
+
+/*
+ * Tests parallel write of filtered data in the special
+ * case where a dataset is composed of a single chunk.
+ *
+ * Programmer: Jordan Henderson
+ * 02/01/2017
+ */
+static void
+test_one_chunk_filtered_dataset(void)
+{
+ C_DATATYPE *data = NULL;
+ C_DATATYPE *read_buf = NULL;
+ C_DATATYPE *correct_buf = NULL;
+ hsize_t dataset_dims[ONE_CHUNK_FILTERED_DATASET_DIMS];
+ hsize_t chunk_dims[ONE_CHUNK_FILTERED_DATASET_DIMS];
+ hsize_t sel_dims[ONE_CHUNK_FILTERED_DATASET_DIMS];
+ hsize_t count[ONE_CHUNK_FILTERED_DATASET_DIMS];
+ hsize_t stride[ONE_CHUNK_FILTERED_DATASET_DIMS];
+ hsize_t block[ONE_CHUNK_FILTERED_DATASET_DIMS];
+ hsize_t offset[ONE_CHUNK_FILTERED_DATASET_DIMS];
+ size_t i, data_size, correct_buf_size;
+ hid_t file_id = -1, dset_id = -1, plist_id = -1;
+ hid_t filespace = -1, memspace = -1;
+
+ if (MAINPROCESS) puts("Testing one-chunk filtered dataset");
+
+ /* Set up file access property list with parallel I/O access */
+ plist_id = H5Pcreate(H5P_FILE_ACCESS);
+ VRFY((plist_id >= 0), "FAPL creation succeeded");
+
+ VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded");
+
+ VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded");
+
+ file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id);
+ VRFY((file_id >= 0), "Test file open succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded");
+
+ /* Create the dataspace for the dataset */
+ dataset_dims[0] = (hsize_t) ONE_CHUNK_FILTERED_DATASET_NROWS;
+ dataset_dims[1] = (hsize_t) ONE_CHUNK_FILTERED_DATASET_NCOLS;
+ chunk_dims[0] = (hsize_t) ONE_CHUNK_FILTERED_DATASET_CH_NROWS;
+ chunk_dims[1] = (hsize_t) ONE_CHUNK_FILTERED_DATASET_CH_NCOLS;
+ sel_dims[0] = (hsize_t) ONE_CHUNK_FILTERED_DATASET_NROWS / (hsize_t) mpi_size;
+ sel_dims[1] = (hsize_t) ONE_CHUNK_FILTERED_DATASET_NCOLS;
+
+ filespace = H5Screate_simple(ONE_CHUNK_FILTERED_DATASET_DIMS, dataset_dims, NULL);
+ VRFY((filespace >= 0), "File dataspace creation succeeded");
+
+ memspace = H5Screate_simple(ONE_CHUNK_FILTERED_DATASET_DIMS, sel_dims, NULL);
+ VRFY((memspace >= 0), "Memory dataspace creation succeeded");
+
+ /* Create chunked dataset */
+ plist_id = H5Pcreate(H5P_DATASET_CREATE);
+ VRFY((plist_id >= 0), "DCPL creation succeeded");
+
+ VRFY((H5Pset_chunk(plist_id, ONE_CHUNK_FILTERED_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set");
+
+ /* Add test filter to the pipeline */
+ VRFY((SET_FILTER(plist_id) >= 0), "Filter set");
+
+ dset_id = H5Dcreate2(file_id, ONE_CHUNK_FILTERED_DATASET_NAME, HDF5_DATATYPE_NAME, filespace,
+ H5P_DEFAULT, plist_id, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset creation succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+
+ /* Each process defines the dataset selection in memory and writes
+ * it to the hyperslab in the file
+ */
+ count[0] = 1;
+ count[1] = 1;
+ stride[0] = (hsize_t) ONE_CHUNK_FILTERED_DATASET_CH_NROWS;
+ stride[1] = (hsize_t) ONE_CHUNK_FILTERED_DATASET_CH_NCOLS;
+ block[0] = sel_dims[0];
+ block[1] = sel_dims[1];
+ offset[0] = ((hsize_t) mpi_rank * sel_dims[0]);
+ offset[1] = 0;
+
+ if (VERBOSE_MED)
+ printf("Process %d: count[ %llu, %llu ], stride[ %llu, %llu ], offset[ %llu, %llu ], block size[ %llu, %llu ]\n",
+ mpi_rank, count[0], count[1], stride[0], stride[1], offset[0], offset[1], block[0], block[1]);
+
+ /* Select hyperslab in the file */
+ filespace = H5Dget_space(dset_id);
+ VRFY((filespace >= 0), "File dataspace retrieval succeeded");
+
+ VRFY((H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, stride, count, block) >= 0),
+ "Hyperslab selection succeeded");
+
+ /* Fill data buffer */
+ data_size = (hsize_t) ONE_CHUNK_FILTERED_DATASET_CH_NROWS * (hsize_t) ONE_CHUNK_FILTERED_DATASET_NCOLS * sizeof(*data);
+ correct_buf_size = dataset_dims[0] * dataset_dims[1] * sizeof(*correct_buf);
+
+ data = (C_DATATYPE *) calloc(1, data_size);
+ VRFY((NULL != data), "calloc succeeded");
+
+ correct_buf = (C_DATATYPE *) calloc(1, correct_buf_size);
+ VRFY((NULL != correct_buf), "calloc succeeded");
+
+ for (i = 0; i < data_size / sizeof(*data); i++)
+ data[i] = (C_DATATYPE) GEN_DATA(i);
+
+ for (i = 0; i < correct_buf_size / sizeof(*correct_buf); i++)
+ correct_buf[i] = ((C_DATATYPE) i % (ONE_CHUNK_FILTERED_DATASET_CH_NROWS / mpi_size * ONE_CHUNK_FILTERED_DATASET_CH_NCOLS))
+ + ((C_DATATYPE) i / (ONE_CHUNK_FILTERED_DATASET_CH_NROWS / mpi_size * ONE_CHUNK_FILTERED_DATASET_CH_NCOLS));
+
+ /* Create property list for collective dataset write */
+ plist_id = H5Pcreate(H5P_DATASET_XFER);
+ VRFY((plist_id >= 0), "DXPL creation succeeded");
+
+ VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded");
+
+ VRFY((H5Dwrite(dset_id, HDF5_DATATYPE_NAME, memspace, filespace, plist_id, data) >= 0), "Dataset write succeeded");
+
+ if (data) free(data);
+
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+
+ /* Verify the correct data was written */
+ read_buf = (C_DATATYPE *) calloc(1, correct_buf_size);
+ VRFY((NULL != read_buf), "calloc succeeded");
+
+ dset_id = H5Dopen2(file_id, "/" ONE_CHUNK_FILTERED_DATASET_NAME, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset open succeeded");
+
+ VRFY((H5Dread(dset_id, HDF5_DATATYPE_NAME, H5S_ALL, H5S_ALL, plist_id, read_buf) >= 0), "Dataset read succeeded");
+
+ VRFY((0 == memcmp(read_buf, correct_buf, correct_buf_size)), "Data verification succeeded");
+
+ if (correct_buf) free(correct_buf);
+ if (read_buf) free(read_buf);
+
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+ VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded");
+ VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded");
+ VRFY((H5Fclose(file_id) >= 0), "File close succeeded");
+
+ return;
+}
+
+/*
+ * Tests parallel write of filtered data in the case where only
+ * one process is writing to a particular chunk in the operation.
+ * In this case, the write operation can be optimized because
+ * chunks do not have to be redistributed to new owners.
+ *
+ * Programmer: Jordan Henderson
+ * 02/01/2017
+ */
+static void
+test_filtered_dataset_no_overlap(void)
+{
+ C_DATATYPE *data = NULL;
+ C_DATATYPE *read_buf = NULL;
+ C_DATATYPE *correct_buf = NULL;
+ hsize_t dataset_dims[UNSHARED_FILTERED_CHUNKS_DATASET_DIMS];
+ hsize_t chunk_dims[UNSHARED_FILTERED_CHUNKS_DATASET_DIMS];
+ hsize_t sel_dims[UNSHARED_FILTERED_CHUNKS_DATASET_DIMS];
+ hsize_t count[UNSHARED_FILTERED_CHUNKS_DATASET_DIMS];
+ hsize_t stride[UNSHARED_FILTERED_CHUNKS_DATASET_DIMS];
+ hsize_t block[UNSHARED_FILTERED_CHUNKS_DATASET_DIMS];
+ hsize_t offset[UNSHARED_FILTERED_CHUNKS_DATASET_DIMS];
+ size_t i, data_size, correct_buf_size;
+ hid_t file_id = -1, dset_id = -1, plist_id = -1;
+ hid_t filespace = -1, memspace = -1;
+
+ if (MAINPROCESS) puts("Testing write to unshared filtered chunks");
+
+ /* Set up file access property list with parallel I/O access */
+ plist_id = H5Pcreate(H5P_FILE_ACCESS);
+ VRFY((plist_id >= 0), "FAPL creation succeeded");
+
+ VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded");
+
+ VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded");
+
+ file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id);
+ VRFY((file_id >= 0), "Test file open succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded");
+
+ /* Create the dataspace for the dataset */
+ dataset_dims[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_NROWS;
+ dataset_dims[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_NCOLS;
+ chunk_dims[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_CH_NROWS;
+ chunk_dims[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_CH_NCOLS;
+ sel_dims[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_CH_NROWS;
+ sel_dims[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_NCOLS;
+
+ filespace = H5Screate_simple(UNSHARED_FILTERED_CHUNKS_DATASET_DIMS, dataset_dims, NULL);
+ VRFY((filespace >= 0), "File dataspace creation succeeded");
+
+ memspace = H5Screate_simple(UNSHARED_FILTERED_CHUNKS_DATASET_DIMS, sel_dims, NULL);
+ VRFY((memspace >= 0), "Memory dataspace creation succeeded");
+
+ /* Create chunked dataset */
+ plist_id = H5Pcreate(H5P_DATASET_CREATE);
+ VRFY((plist_id >= 0), "DCPL creation succeeded");
+
+ VRFY((H5Pset_chunk(plist_id, UNSHARED_FILTERED_CHUNKS_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set");
+
+ /* Add test filter to the pipeline */
+ VRFY((SET_FILTER(plist_id) >= 0), "Filter set");
+
+ dset_id = H5Dcreate2(file_id, UNSHARED_FILTERED_CHUNKS_DATASET_NAME, HDF5_DATATYPE_NAME, filespace,
+ H5P_DEFAULT, plist_id, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset creation succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+
+ /* Each process defines the dataset selection in memory and writes
+ * it to the hyperslab in the file
+ */
+ count[0] = 1;
+ count[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_NCOLS / (hsize_t) UNSHARED_FILTERED_CHUNKS_CH_NCOLS;
+ stride[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_CH_NROWS;
+ stride[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_CH_NCOLS;
+ block[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_CH_NROWS;
+ block[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_CH_NCOLS;
+ offset[0] = ((hsize_t) mpi_rank * (hsize_t) UNSHARED_FILTERED_CHUNKS_CH_NROWS * count[0]);
+ offset[1] = 0;
+
+ if (VERBOSE_MED)
+ printf("Process %d: count[ %llu, %llu ], stride[ %llu, %llu ], offset[ %llu, %llu ], block size[ %llu, %llu ]\n",
+ mpi_rank, count[0], count[1], stride[0], stride[1], offset[0], offset[1], block[0], block[1]);
+
+ /* Select hyperslab in the file */
+ filespace = H5Dget_space(dset_id);
+ VRFY((dset_id >= 0), "File dataspace retrieval succeeded");
+
+ VRFY((H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, stride, count, block) >= 0), "Hyperslab selection succeeded");
+
+ /* Fill data buffer */
+ data_size = sel_dims[0] * sel_dims[1] * sizeof(*data);
+ correct_buf_size = dataset_dims[0] * dataset_dims[1] * sizeof(*correct_buf);
+
+ data = (C_DATATYPE *) calloc(1, data_size);
+ VRFY((NULL != data), "calloc succeeded");
+
+ correct_buf = (C_DATATYPE *) calloc(1, correct_buf_size);
+ VRFY((NULL != correct_buf), "calloc succeeded");
+
+ for (i = 0; i < data_size / sizeof(*data); i++)
+ data[i] = (C_DATATYPE) GEN_DATA(i);
+
+ for (i = 0; i < correct_buf_size / sizeof(*correct_buf); i++)
+ correct_buf[i] = (C_DATATYPE) ( (i % (dataset_dims[0] / (hsize_t) mpi_size * dataset_dims[1]))
+ + (i / (dataset_dims[0] / (hsize_t) mpi_size * dataset_dims[1])));
+
+ /* Create property list for collective dataset write */
+ plist_id = H5Pcreate(H5P_DATASET_XFER);
+ VRFY((plist_id >= 0), "DXPL creation succeeded");
+
+ VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded");
+
+ VRFY((H5Dwrite(dset_id, HDF5_DATATYPE_NAME, memspace, filespace, plist_id, data) >= 0), "Dataset write succeeded");
+
+ if (data) free(data);
+
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+
+ /* Verify the correct data was written */
+ read_buf = (C_DATATYPE *) calloc(1, correct_buf_size);
+ VRFY((NULL != read_buf), "calloc succeeded");
+
+ dset_id = H5Dopen2(file_id, "/" UNSHARED_FILTERED_CHUNKS_DATASET_NAME, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset open succeeded");
+
+ VRFY((H5Dread(dset_id, HDF5_DATATYPE_NAME, H5S_ALL, H5S_ALL, plist_id, read_buf) >= 0), "Dataset read succeeded");
+
+ VRFY((0 == memcmp(read_buf, correct_buf, correct_buf_size)), "Data verification succeeded");
+
+ if (correct_buf) free(correct_buf);
+ if (read_buf) free(read_buf);
+
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+ VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded");
+ VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded");
+ VRFY((H5Fclose(file_id) >= 0), "File close succeeded");
+
+ return;
+}
+
+/*
+ * Tests parallel write of filtered data in the case where
+ * more than one process is writing to a particular chunk
+ * in the operation. In this case, the chunks have to be
+ * redistributed before the operation so that only one process
+ * writes to a particular chunk.
+ *
+ * Programmer: Jordan Henderson
+ * 02/01/2017
+ */
+static void
+test_filtered_dataset_overlap(void)
+{
+ C_DATATYPE *data = NULL;
+ C_DATATYPE *read_buf = NULL;
+ C_DATATYPE *correct_buf = NULL;
+ hsize_t dataset_dims[SHARED_FILTERED_CHUNKS_DATASET_DIMS];
+ hsize_t chunk_dims[SHARED_FILTERED_CHUNKS_DATASET_DIMS];
+ hsize_t sel_dims[SHARED_FILTERED_CHUNKS_DATASET_DIMS];
+ hsize_t count[SHARED_FILTERED_CHUNKS_DATASET_DIMS];
+ hsize_t stride[SHARED_FILTERED_CHUNKS_DATASET_DIMS];
+ hsize_t block[SHARED_FILTERED_CHUNKS_DATASET_DIMS];
+ hsize_t offset[SHARED_FILTERED_CHUNKS_DATASET_DIMS];
+ size_t i, data_size, correct_buf_size;
+ hid_t file_id = -1, dset_id = -1, plist_id = -1;
+ hid_t filespace = -1, memspace = -1;
+
+ if (MAINPROCESS) puts("Testing write to shared filtered chunks");
+
+ /* Set up file access property list with parallel I/O access */
+ plist_id = H5Pcreate(H5P_FILE_ACCESS);
+ VRFY((plist_id >= 0), "FAPL creation succeeded");
+
+ VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded");
+
+ VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded");
+
+ file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id);
+ VRFY((file_id >= 0), "Test file open succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded");
+
+ /* Create the dataspace for the dataset */
+ dataset_dims[0] = (hsize_t) SHARED_FILTERED_CHUNKS_NROWS;
+ dataset_dims[1] = (hsize_t) SHARED_FILTERED_CHUNKS_NCOLS;
+ chunk_dims[0] = (hsize_t) SHARED_FILTERED_CHUNKS_CH_NROWS;
+ chunk_dims[1] = (hsize_t) SHARED_FILTERED_CHUNKS_CH_NCOLS;
+ sel_dims[0] = (hsize_t) DIM0_SCALE_FACTOR;
+ sel_dims[1] = (hsize_t) SHARED_FILTERED_CHUNKS_CH_NCOLS * (hsize_t) DIM1_SCALE_FACTOR;
+
+ filespace = H5Screate_simple(SHARED_FILTERED_CHUNKS_DATASET_DIMS, dataset_dims, NULL);
+ VRFY((filespace >= 0), "File dataspace creation succeeded");
+
+ memspace = H5Screate_simple(SHARED_FILTERED_CHUNKS_DATASET_DIMS, sel_dims, NULL);
+ VRFY((memspace >= 0), "Memory dataspace creation succeeded");
+
+ /* Create chunked dataset */
+ plist_id = H5Pcreate(H5P_DATASET_CREATE);
+ VRFY((plist_id >= 0), "DCPL creation succeeded");
+
+ VRFY((H5Pset_chunk(plist_id, SHARED_FILTERED_CHUNKS_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set");
+
+ /* Add test filter to the pipeline */
+ VRFY((SET_FILTER(plist_id) >= 0), "Filter set");
+
+ dset_id = H5Dcreate2(file_id, SHARED_FILTERED_CHUNKS_DATASET_NAME, HDF5_DATATYPE_NAME, filespace,
+ H5P_DEFAULT, plist_id, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset creation succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+
+ /* Each process defines the dataset selection in memory and writes
+ * it to the hyperslab in the file
+ */
+ count[0] = (hsize_t) SHARED_FILTERED_CHUNKS_NROWS / (hsize_t) SHARED_FILTERED_CHUNKS_CH_NROWS;
+ count[1] = (hsize_t) SHARED_FILTERED_CHUNKS_NCOLS / (hsize_t) SHARED_FILTERED_CHUNKS_CH_NCOLS;
+ stride[0] = (hsize_t) SHARED_FILTERED_CHUNKS_CH_NROWS;
+ stride[1] = (hsize_t) SHARED_FILTERED_CHUNKS_CH_NCOLS;
+ block[0] = (hsize_t) SHARED_FILTERED_CHUNKS_CH_NROWS / (hsize_t) mpi_size;
+ block[1] = (hsize_t) SHARED_FILTERED_CHUNKS_CH_NCOLS;
+ offset[0] = (hsize_t) mpi_rank * block[0];
+ offset[1] = 0;
+
+ if (VERBOSE_MED)
+ printf("Process %d is writing with count[ %llu, %llu ], stride[ %llu, %llu ], offset[ %llu, %llu ], block size[ %llu, %llu ]\n",
+ mpi_rank, count[0], count[1], stride[0], stride[1], offset[0], offset[1], block[0], block[1]);
+
+ /* Select hyperslab in the file */
+ filespace = H5Dget_space(dset_id);
+ VRFY((filespace >= 0), "File dataspace retrieval succeeded");
+
+ VRFY((H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, stride, count, block) >= 0), "Hyperslab selection succeeded");
+
+ /* Fill data buffer */
+ data_size = sel_dims[0] * sel_dims[1] * sizeof(*data);
+ correct_buf_size = dataset_dims[0] * dataset_dims[1] * sizeof(*correct_buf);
+
+ data = (C_DATATYPE *) calloc(1, data_size);
+ VRFY((NULL != data), "calloc succeeded");
+
+ correct_buf = (C_DATATYPE *) calloc(1, correct_buf_size);
+ VRFY((NULL != correct_buf), "calloc succeeded");
+
+ for (i = 0; i < data_size / sizeof(*data); i++)
+ data[i] = (C_DATATYPE) GEN_DATA(i);
+
+ for (i = 0; i < correct_buf_size / sizeof(*correct_buf); i++)
+ correct_buf[i] = (C_DATATYPE) ((dataset_dims[1] * (i / ((hsize_t) mpi_size * dataset_dims[1])))
+ + (i % dataset_dims[1])
+ + (((i % ((hsize_t) mpi_size * dataset_dims[1])) / dataset_dims[1]) % dataset_dims[1]));
+
+ /* Create property list for collective dataset write */
+ plist_id = H5Pcreate(H5P_DATASET_XFER);
+ VRFY((plist_id >= 0), "DXPL creation succeeded");
+
+ VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded");
+
+ VRFY((H5Dwrite(dset_id, HDF5_DATATYPE_NAME, memspace, filespace, plist_id, data) >= 0), "Dataset write succeeded");
+
+ if (data) free(data);
+
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+
+ /* Verify correct data was written */
+ read_buf = (C_DATATYPE *) calloc(1, correct_buf_size);
+ VRFY((NULL != read_buf), "calloc succeeded");
+
+ dset_id = H5Dopen2(file_id, "/" SHARED_FILTERED_CHUNKS_DATASET_NAME, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset open succeeded");
+
+ VRFY((H5Dread(dset_id, HDF5_DATATYPE_NAME, H5S_ALL, H5S_ALL, plist_id, read_buf) >= 0), "Dataset read succeeded");
+
+ VRFY((0 == memcmp(read_buf, correct_buf, correct_buf_size)), "Data verification succeeded");
+
+ if (correct_buf) free(correct_buf);
+ if (read_buf) free(read_buf);
+
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+ VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded");
+ VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded");
+ VRFY((H5Fclose(file_id) >= 0), "File close succeeded");
+
+ return;
+}
+
+/*
+ * Tests parallel write of filtered data in the case where
+ * a single process in the write operation has no selection
+ * in the dataset's dataspace. In this case, the process with
+ * no selection still has to participate in the collective
+ * space re-allocation for the filtered chunks and also must
+ * participate in the re-insertion of the filtered chunks
+ * into the chunk index.
+ *
+ * Programmer: Jordan Henderson
+ * 02/01/2017
+ */
+static void
+test_filtered_dataset_single_no_selection(void)
+{
+ C_DATATYPE *data = NULL;
+ C_DATATYPE *read_buf = NULL;
+ C_DATATYPE *correct_buf = NULL;
+ hsize_t dataset_dims[SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS];
+ hsize_t chunk_dims[SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS];
+ hsize_t sel_dims[SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS];
+ hsize_t count[SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS];
+ hsize_t stride[SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS];
+ hsize_t block[SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS];
+ hsize_t offset[SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS];
+ size_t i, data_size, correct_buf_size;
+ size_t segment_length;
+ hid_t file_id = -1, dset_id = -1, plist_id = -1;
+ hid_t filespace = -1, memspace = -1;
+
+ if (MAINPROCESS) puts("Testing write to filtered chunks with a single process having no selection");
+
+ /* Set up file access property list with parallel I/O access */
+ plist_id = H5Pcreate(H5P_FILE_ACCESS);
+ VRFY((plist_id >= 0), "FAPL creation succeeded");
+
+ VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded");
+
+ VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded");
+
+ file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id);
+ VRFY((file_id >= 0), "Test file open succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded");
+
+ /* Create the dataspace for the dataset */
+ dataset_dims[0] = (hsize_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_NROWS;
+ dataset_dims[1] = (hsize_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_NCOLS;
+ chunk_dims[0] = (hsize_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_CH_NROWS;
+ chunk_dims[1] = (hsize_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_CH_NCOLS;
+ sel_dims[0] = (hsize_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_CH_NROWS;
+ sel_dims[1] = (hsize_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_NCOLS;
+
+ if (mpi_rank == SINGLE_NO_SELECTION_FILTERED_CHUNKS_NO_SELECT_PROC)
+ sel_dims[0] = sel_dims[1] = 0;
+
+ filespace = H5Screate_simple(SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS, dataset_dims, NULL);
+ VRFY((filespace >= 0), "File dataspace creation succeeded");
+
+ memspace = H5Screate_simple(SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS, sel_dims, NULL);
+ VRFY((memspace >= 0), "Memory dataspace creation succeeded");
+
+ /* Create chunked dataset */
+ plist_id = H5Pcreate(H5P_DATASET_CREATE);
+ VRFY((plist_id >= 0), "DCPL creation succeeded");
+
+ VRFY((H5Pset_chunk(plist_id, SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set");
+
+ /* Add test filter to the pipeline */
+ VRFY((SET_FILTER(plist_id) >= 0), "Filter set");
+
+ dset_id = H5Dcreate2(file_id, SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_NAME, HDF5_DATATYPE_NAME, filespace,
+ H5P_DEFAULT, plist_id, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset creation succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+
+ /* Each process defines the dataset selection in memory and writes
+ * it to the hyperslab in the file
+ */
+ count[0] = 1;
+ count[1] = (hsize_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_NCOLS / (hsize_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_CH_NCOLS;
+ stride[0] = (hsize_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_CH_NROWS;
+ stride[1] = (hsize_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_CH_NCOLS;
+ block[0] = (hsize_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_CH_NROWS;
+ block[1] = (hsize_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_CH_NCOLS;
+ offset[0] = (hsize_t) mpi_rank * (hsize_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_CH_NROWS * count[0];
+ offset[1] = 0;
+
+ if (VERBOSE_MED)
+ printf("Process %d is writing with count[ %llu, %llu ], stride[ %llu, %llu ], offset[ %llu, %llu ], block size[ %llu, %llu ]\n",
+ mpi_rank, count[0], count[1], stride[0], stride[1], offset[0], offset[1], block[0], block[1]);
+
+ /* Select hyperslab in the file */
+ filespace = H5Dget_space(dset_id);
+ VRFY((filespace >= 0), "File dataspace retrieval succeeded");
+
+ if (mpi_rank == SINGLE_NO_SELECTION_FILTERED_CHUNKS_NO_SELECT_PROC)
+ VRFY((H5Sselect_none(filespace) >= 0), "Select none succeeded");
+ else
+ VRFY((H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, stride, count, block) >= 0), "Hyperslab selection succeeded");
+
+ /* Fill data buffer */
+ data_size = sel_dims[0] * sel_dims[1] * sizeof(*data);
+ correct_buf_size = dataset_dims[0] * dataset_dims[1] * sizeof(*correct_buf);
+
+ data = (C_DATATYPE *) calloc(1, data_size);
+ VRFY((NULL != data), "calloc succeeded");
+
+ correct_buf = (C_DATATYPE *) calloc(1, correct_buf_size);
+ VRFY((NULL != correct_buf), "calloc succeeded");
+
+ for (i = 0; i < data_size / sizeof(*data); i++)
+ data[i] = (C_DATATYPE) GEN_DATA(i);
+
+ for (i = 0; i < correct_buf_size / sizeof(*correct_buf); i++)
+ correct_buf[i] = (C_DATATYPE) ( (i % (dataset_dims[0] / (hsize_t) mpi_size * dataset_dims[1]))
+ + (i / (dataset_dims[0] / (hsize_t) mpi_size * dataset_dims[1])));
+
+ /* Compute the correct offset into the buffer for the process having no selection and clear it */
+ segment_length = dataset_dims[0] * dataset_dims[1] / (hsize_t) mpi_size;
+ HDmemset(correct_buf + ((size_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_NO_SELECT_PROC * segment_length), 0, segment_length * sizeof(*data));
+
+ /* Create property list for collective dataset write */
+ plist_id = H5Pcreate(H5P_DATASET_XFER);
+ VRFY((plist_id >= 0), "DXPL creation succeeded");
+
+ VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded");
+
+ VRFY((H5Dwrite(dset_id, HDF5_DATATYPE_NAME, memspace, filespace, plist_id, data) >= 0), "Dataset write succeeded");
+
+ if (data) free(data);
+
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+
+ /* Verify the correct data was written */
+ read_buf = (C_DATATYPE *) calloc(1, correct_buf_size);
+ VRFY((NULL != read_buf), "calloc succeeded");
+
+ dset_id = H5Dopen2(file_id, "/" SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_NAME, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset open succeeded");
+
+ VRFY((H5Dread(dset_id, HDF5_DATATYPE_NAME, H5S_ALL, H5S_ALL, plist_id, read_buf) >= 0), "Dataset read succeeded");
+
+ VRFY((0 == memcmp(read_buf, correct_buf, correct_buf_size)), "Data verification succeeded");
+
+ if (correct_buf) free(correct_buf);
+ if (read_buf) free(read_buf);
+
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+ VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded");
+ VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded");
+ VRFY((H5Fclose(file_id) >= 0), "File close succeeded");
+
+ return;
+}
+
+/*
+ * Tests parallel write of filtered data in the case
+ * where no process in the write operation has a
+ * selection in the dataset's dataspace. This test is
+ * to ensure that there are no assertion failures or
+ * similar issues due to size 0 allocations and the
+ * like. In this case, the file and dataset are created
+ * but the dataset is populated with the default fill
+ * value.
+ *
+ * Programmer: Jordan Henderson
+ * 02/02/2017
+ */
+static void
+test_filtered_dataset_all_no_selection(void)
+{
+ C_DATATYPE *data = NULL;
+ C_DATATYPE *read_buf = NULL;
+ C_DATATYPE *correct_buf = NULL;
+ hsize_t dataset_dims[ALL_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS];
+ hsize_t chunk_dims[ALL_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS];
+ hsize_t sel_dims[ALL_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS];
+ size_t i, data_size, correct_buf_size;
+ hid_t file_id = -1, dset_id = -1, plist_id = -1;
+ hid_t filespace = -1, memspace = -1;
+
+ if (MAINPROCESS) puts("Testing write to filtered chunks with all processes having no selection");
+
+ /* Set up file access property list with parallel I/O access */
+ plist_id = H5Pcreate(H5P_FILE_ACCESS);
+ VRFY((plist_id >= 0), "FAPL creation succeeded");
+
+ VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded");
+
+ VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded");
+
+ file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id);
+ VRFY((file_id >= 0), "Test file open succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded");
+
+ /* Create the dataspace for the dataset */
+ dataset_dims[0] = (hsize_t) ALL_NO_SELECTION_FILTERED_CHUNKS_NROWS;
+ dataset_dims[1] = (hsize_t) ALL_NO_SELECTION_FILTERED_CHUNKS_NCOLS;
+ chunk_dims[0] = (hsize_t) ALL_NO_SELECTION_FILTERED_CHUNKS_CH_NROWS;
+ chunk_dims[1] = (hsize_t) ALL_NO_SELECTION_FILTERED_CHUNKS_CH_NCOLS;
+ sel_dims[0] = sel_dims[1] = 0;
+
+ filespace = H5Screate_simple(ALL_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS, dataset_dims, NULL);
+ VRFY((filespace >= 0), "File dataspace creation succeeded");
+
+ memspace = H5Screate_simple(ALL_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS, sel_dims, NULL);
+ VRFY((memspace >= 0), "Memory dataspace creation succeeded");
+
+ /* Create chunked dataset */
+ plist_id = H5Pcreate(H5P_DATASET_CREATE);
+ VRFY((plist_id >= 0), "DCPL creation succeeded");
+
+ VRFY((H5Pset_chunk(plist_id, ALL_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set");
+
+ /* Add test filter to the pipeline */
+ VRFY((SET_FILTER(plist_id) >= 0), "Filter set");
+
+ dset_id = H5Dcreate2(file_id, ALL_NO_SELECTION_FILTERED_CHUNKS_DATASET_NAME, HDF5_DATATYPE_NAME, filespace,
+ H5P_DEFAULT, plist_id, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset creation succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+
+ filespace = H5Dget_space(dset_id);
+ VRFY((filespace >= 0), "File dataspace retrieval succeeded");
+
+ VRFY((H5Sselect_none(filespace) >= 0), "Select none succeeded");
+
+ /* Fill data buffer */
+ data_size = sel_dims[0] * sel_dims[1] * sizeof(*data);
+ correct_buf_size = dataset_dims[0] * dataset_dims[1] * sizeof(*correct_buf);
+
+ data = (C_DATATYPE *) calloc(1, data_size);
+ VRFY((NULL != data), "calloc succeeded");
+
+ correct_buf = (C_DATATYPE *) calloc(1, correct_buf_size);
+ VRFY((NULL != correct_buf), "calloc succeeded");
+
+ for (i = 0; i < data_size / sizeof(*data); i++)
+ data[i] = (C_DATATYPE) GEN_DATA(i);
+
+ /* Create property list for collective dataset write */
+ plist_id = H5Pcreate(H5P_DATASET_XFER);
+ VRFY((plist_id >= 0), "DXPL creation succeeded");
+
+ VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded");
+
+ VRFY((H5Dwrite(dset_id, HDF5_DATATYPE_NAME, memspace, filespace, plist_id, data) >= 0), "Dataset write succeeded");
+
+ if (data) free(data);
+
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+
+ /* Verify the correct data was written */
+ read_buf = (C_DATATYPE *) calloc(1, correct_buf_size);
+ VRFY((NULL != read_buf), "calloc succeeded");
+
+ dset_id = H5Dopen2(file_id, "/" ALL_NO_SELECTION_FILTERED_CHUNKS_DATASET_NAME, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset open succeeded");
+
+ VRFY((H5Dread(dset_id, HDF5_DATATYPE_NAME, H5S_ALL, H5S_ALL, plist_id, read_buf) >= 0), "Dataset read succeeded");
+
+ VRFY((0 == memcmp(read_buf, correct_buf, correct_buf_size)), "Data verification succeeded");
+
+ if (correct_buf) free(correct_buf);
+ if (read_buf) free(read_buf);
+
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+ VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded");
+ VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded");
+ VRFY((H5Fclose(file_id) >= 0), "File close succeeded");
+
+ return;
+}
+
+/*
+ * Tests parallel write of filtered data by using
+ * point selections instead of hyperslab selections.
+ *
+ * Programmer: Jordan Henderson
+ * 02/02/2017
+ */
+static void
+test_filtered_dataset_point_selection(void)
+{
+ C_DATATYPE *data = NULL;
+ C_DATATYPE *correct_buf = NULL;
+ C_DATATYPE *read_buf = NULL;
+ hsize_t *coords = NULL;
+ hsize_t dataset_dims[POINT_SELECTION_FILTERED_CHUNKS_DATASET_DIMS];
+ hsize_t chunk_dims[POINT_SELECTION_FILTERED_CHUNKS_DATASET_DIMS];
+ hsize_t sel_dims[POINT_SELECTION_FILTERED_CHUNKS_DATASET_DIMS];
+ size_t i, j, data_size, correct_buf_size;
+ size_t num_points;
+ hid_t file_id = -1, dset_id = -1, plist_id = -1;
+ hid_t filespace = -1, memspace = -1;
+
+ if (MAINPROCESS) puts("Testing write to filtered chunks with point selection");
+
+ /* Set up file access property list with parallel I/O access */
+ plist_id = H5Pcreate(H5P_FILE_ACCESS);
+ VRFY((plist_id >= 0), "FAPL creation succeeded");
+
+ VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded");
+
+ VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded");
+
+ file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id);
+ VRFY((file_id >= 0), "Test file open succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded");
+
+ /* Create the dataspace for the dataset */
+ dataset_dims[0] = (hsize_t) POINT_SELECTION_FILTERED_CHUNKS_NROWS;
+ dataset_dims[1] = (hsize_t) POINT_SELECTION_FILTERED_CHUNKS_NCOLS;
+ chunk_dims[0] = (hsize_t) POINT_SELECTION_FILTERED_CHUNKS_CH_NROWS;
+ chunk_dims[1] = (hsize_t) POINT_SELECTION_FILTERED_CHUNKS_CH_NCOLS;
+ sel_dims[0] = (hsize_t) POINT_SELECTION_FILTERED_CHUNKS_NROWS / (hsize_t) mpi_size;
+ sel_dims[1] = (hsize_t) POINT_SELECTION_FILTERED_CHUNKS_NCOLS;
+
+ filespace = H5Screate_simple(POINT_SELECTION_FILTERED_CHUNKS_DATASET_DIMS, dataset_dims, NULL);
+ VRFY((filespace >= 0), "File dataspace creation succeeded");
+
+ memspace = H5Screate_simple(POINT_SELECTION_FILTERED_CHUNKS_DATASET_DIMS, sel_dims, NULL);
+ VRFY((memspace >= 0), "Memory dataspace creation succeeded");
+
+ /* Create chunked dataset */
+ plist_id = H5Pcreate(H5P_DATASET_CREATE);
+ VRFY((plist_id >= 0), "DCPL creation succeeded");
+
+ VRFY((H5Pset_chunk(plist_id, POINT_SELECTION_FILTERED_CHUNKS_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set");
+
+ /* Add test filter to the pipeline */
+ VRFY((SET_FILTER(plist_id) >= 0), "Filter set");
+
+ dset_id = H5Dcreate2(file_id, POINT_SELECTION_FILTERED_CHUNKS_DATASET_NAME, HDF5_DATATYPE_NAME, filespace,
+ H5P_DEFAULT, plist_id, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset creation succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+
+ /* Set up point selection */
+ filespace = H5Dget_space(dset_id);
+ VRFY((filespace >= 0), "File dataspace retrieval succeeded");
+
+ num_points = (hsize_t) POINT_SELECTION_FILTERED_CHUNKS_NROWS * (hsize_t) POINT_SELECTION_FILTERED_CHUNKS_NCOLS / (hsize_t) mpi_size;
+ coords = (hsize_t *) calloc(1, 2 * num_points * sizeof(*coords));
+ VRFY((NULL != coords), "Coords calloc succeeded");
+
+ for (i = 0; i < num_points; i++)
+ for (j = 0; j < POINT_SELECTION_FILTERED_CHUNKS_DATASET_DIMS; j++)
+ coords[(i * POINT_SELECTION_FILTERED_CHUNKS_DATASET_DIMS) + j] = (j > 0) ? (i % (hsize_t) POINT_SELECTION_FILTERED_CHUNKS_NCOLS)
+ : ((hsize_t) mpi_rank + ((hsize_t) mpi_size * (i / (hsize_t) POINT_SELECTION_FILTERED_CHUNKS_NCOLS)));
+
+ VRFY((H5Sselect_elements(filespace, H5S_SELECT_SET, (hsize_t) num_points, (const hsize_t *) coords) >= 0),
+ "Point selection succeeded");
+
+ /* Fill data buffer */
+ data_size = sel_dims[0] * sel_dims[1] * sizeof(*data);
+ correct_buf_size = dataset_dims[0] * dataset_dims[1] * sizeof(*correct_buf);
+
+ data = (C_DATATYPE *) calloc(1, data_size);
+ VRFY((NULL != data), "calloc succeeded");
+
+ correct_buf = (C_DATATYPE *) calloc(1, correct_buf_size);
+ VRFY((NULL != correct_buf), "calloc succeeded");
+
+ for (i = 0; i < data_size / sizeof(*data); i++)
+ data[i] = (C_DATATYPE) GEN_DATA(i);
+
+ for (i = 0; i < correct_buf_size / sizeof(*correct_buf); i++)
+ correct_buf[i] = (C_DATATYPE) ((dataset_dims[1] * (i / ((hsize_t) mpi_size * dataset_dims[1])))
+ + (i % dataset_dims[1])
+ + (((i % ((hsize_t) mpi_size * dataset_dims[1])) / dataset_dims[1]) % dataset_dims[1]));
+
+ /* Create property list for collective dataset write */
+ plist_id = H5Pcreate(H5P_DATASET_XFER);
+ VRFY((plist_id >= 0), "DXPL creation succeeded");
+
+ VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded");
+
+ VRFY((H5Dwrite(dset_id, HDF5_DATATYPE_NAME, memspace, filespace, plist_id, data) >= 0), "Dataset write succeeded");
+
+ if (data) free(data);
+
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+
+ /* Verify the correct data was written */
+ read_buf = (C_DATATYPE *) calloc(1, correct_buf_size);
+ VRFY((NULL != read_buf), "calloc succeeded");
+
+ dset_id = H5Dopen2(file_id, "/" POINT_SELECTION_FILTERED_CHUNKS_DATASET_NAME, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset open succeeded");
+
+ VRFY((H5Dread(dset_id, HDF5_DATATYPE_NAME, H5S_ALL, H5S_ALL, plist_id, read_buf) >= 0), "Dataset read succeeded");
+
+ VRFY((0 == memcmp(read_buf, correct_buf, correct_buf_size)), "Data verification succeeded");
+
+ if (coords) free(coords);
+ if (correct_buf) free(correct_buf);
+ if (read_buf) free(read_buf);
+
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+ VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded");
+ VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded");
+ VRFY((H5Fclose(file_id) >= 0), "File close succeeded");
+
+ return;
+}
+
+/*
+ * Tests parallel write of filtered data in the case where
+ * each process writes an equal amount of data to each chunk
+ * in the dataset. Each chunk is distributed among the
+ * processes in round-robin fashion by blocks of size 1 until
+ * the whole chunk is selected, leading to an interleaved
+ * write pattern.
+ *
+ * Programmer: Jordan Henderson
+ * 02/02/2017
+ */
+static void
+test_filtered_dataset_interleaved_write(void)
+{
+ C_DATATYPE *data = NULL;
+ C_DATATYPE *read_buf = NULL;
+ C_DATATYPE *correct_buf = NULL;
+ hsize_t dataset_dims[INTERLEAVED_WRITE_FILTERED_DATASET_DIMS];
+ hsize_t chunk_dims[INTERLEAVED_WRITE_FILTERED_DATASET_DIMS];
+ hsize_t sel_dims[INTERLEAVED_WRITE_FILTERED_DATASET_DIMS];
+ hsize_t count[INTERLEAVED_WRITE_FILTERED_DATASET_DIMS];
+ hsize_t stride[INTERLEAVED_WRITE_FILTERED_DATASET_DIMS];
+ hsize_t block[INTERLEAVED_WRITE_FILTERED_DATASET_DIMS];
+ hsize_t offset[INTERLEAVED_WRITE_FILTERED_DATASET_DIMS];
+ size_t i, data_size, correct_buf_size;
+ hid_t file_id = -1, dset_id = -1, plist_id = -1;
+ hid_t filespace = -1, memspace = -1;
+
+ if (MAINPROCESS) puts("Testing interleaved write to filtered chunks");
+
+ /* Set up file access property list with parallel I/O access */
+ plist_id = H5Pcreate(H5P_FILE_ACCESS);
+ VRFY((plist_id >= 0), "FAPL creation succeeded");
+
+ VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded");
+
+ VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded");
+
+ file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id);
+ VRFY((file_id >= 0), "Test file open succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded");
+
+ /* Create the dataspace for the dataset */
+ dataset_dims[0] = (hsize_t) INTERLEAVED_WRITE_FILTERED_DATASET_NROWS;
+ dataset_dims[1] = (hsize_t) INTERLEAVED_WRITE_FILTERED_DATASET_NCOLS;
+ chunk_dims[0] = (hsize_t) INTERLEAVED_WRITE_FILTERED_DATASET_CH_NROWS;
+ chunk_dims[1] = (hsize_t) INTERLEAVED_WRITE_FILTERED_DATASET_CH_NCOLS;
+ sel_dims[0] = (hsize_t) (INTERLEAVED_WRITE_FILTERED_DATASET_NROWS / mpi_size);
+ sel_dims[1] = (hsize_t) INTERLEAVED_WRITE_FILTERED_DATASET_NCOLS;
+
+ filespace = H5Screate_simple(INTERLEAVED_WRITE_FILTERED_DATASET_DIMS, dataset_dims, NULL);
+ VRFY((filespace >= 0), "File dataspace creation succeeded");
+
+ memspace = H5Screate_simple(INTERLEAVED_WRITE_FILTERED_DATASET_DIMS, sel_dims, NULL);
+ VRFY((memspace >= 0), "Memory dataspace creation succeeded");
+
+ /* Create chunked dataset */
+ plist_id = H5Pcreate(H5P_DATASET_CREATE);
+ VRFY((plist_id >= 0), "DCPL creation succeeded");
+
+ VRFY((H5Pset_chunk(plist_id, INTERLEAVED_WRITE_FILTERED_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set");
+
+ /* Add test filter to the pipeline */
+ VRFY((SET_FILTER(plist_id) >= 0), "Filter set");
+
+ dset_id = H5Dcreate2(file_id, INTERLEAVED_WRITE_FILTERED_DATASET_NAME, HDF5_DATATYPE_NAME, filespace,
+ H5P_DEFAULT, plist_id, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset creation succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+
+ /* Each process defines the dataset selection in memory and writes
+ * it to the hyperslab in the file
+ */
+ count[0] = (hsize_t) (INTERLEAVED_WRITE_FILTERED_DATASET_NROWS / INTERLEAVED_WRITE_FILTERED_DATASET_CH_NROWS);
+ count[1] = (hsize_t) (INTERLEAVED_WRITE_FILTERED_DATASET_NCOLS / INTERLEAVED_WRITE_FILTERED_DATASET_CH_NCOLS);
+ stride[0] = (hsize_t) INTERLEAVED_WRITE_FILTERED_DATASET_CH_NROWS;
+ stride[1] = (hsize_t) INTERLEAVED_WRITE_FILTERED_DATASET_CH_NCOLS;
+ block[0] = 1;
+ block[1] = (hsize_t) INTERLEAVED_WRITE_FILTERED_DATASET_CH_NCOLS;
+ offset[0] = (hsize_t) mpi_rank;
+ offset[1] = 0;
+
+ if (VERBOSE_MED)
+ printf("Process %d is writing with count[ %llu, %llu ], stride[ %llu, %llu ], offset[ %llu, %llu ], block size[ %llu, %llu ]\n",
+ mpi_rank, count[0], count[1], stride[0], stride[1], offset[0], offset[1], block[0], block[1]);
+
+ /* Select hyperslab in the file */
+ filespace = H5Dget_space(dset_id);
+ VRFY((filespace >= 0), "File dataspace retrieval succeeded");
+
+ VRFY((H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, stride, count, block) >= 0), "Hyperslab selection succeeded");
+
+ /* Fill data buffer */
+ data_size = sel_dims[0] * sel_dims[1] * sizeof(*data);
+ correct_buf_size = dataset_dims[0] * dataset_dims[1] * sizeof(*correct_buf);
+
+ data = (C_DATATYPE *) calloc(1, data_size);
+ VRFY((NULL != data), "calloc succeeded");
+
+ correct_buf = (C_DATATYPE *) calloc(1, correct_buf_size);
+ VRFY((NULL != correct_buf), "calloc succeeded");
+
+ for (i = 0; i < data_size / sizeof(*data); i++)
+ data[i] = (C_DATATYPE) GEN_DATA(i);
+
+ for (i = 0; i < correct_buf_size / sizeof(*correct_buf); i++)
+ /* Add Column Index */
+ correct_buf[i] = (C_DATATYPE) ( (i % (hsize_t) INTERLEAVED_WRITE_FILTERED_DATASET_NCOLS)
+
+ /* Add the Row Index */
+ + ((i % (hsize_t) (mpi_size * INTERLEAVED_WRITE_FILTERED_DATASET_NCOLS)) / (hsize_t) INTERLEAVED_WRITE_FILTERED_DATASET_NCOLS)
+
+ /* Add the amount that gets added when a rank moves down to its next section vertically in the dataset */
+ + ((hsize_t) INTERLEAVED_WRITE_FILTERED_DATASET_NCOLS * (i / (hsize_t) (mpi_size * INTERLEAVED_WRITE_FILTERED_DATASET_NCOLS))));
+
+ /* Create property list for collective dataset write */
+ plist_id = H5Pcreate(H5P_DATASET_XFER);
+ VRFY((plist_id >= 0), "DXPL creation succeeded");
+
+ VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded");
+
+ VRFY((H5Dwrite(dset_id, HDF5_DATATYPE_NAME, memspace, filespace, plist_id, data) >= 0), "Dataset write succeeded");
+
+ if (data) free(data);
+
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+
+ /* Verify the correct data was written */
+ read_buf = (C_DATATYPE *) calloc(1, correct_buf_size);
+ VRFY((NULL != read_buf), "calloc succeeded");
+
+ dset_id = H5Dopen2(file_id, "/" INTERLEAVED_WRITE_FILTERED_DATASET_NAME, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset open succeeded");
+
+ VRFY((H5Dread(dset_id, HDF5_DATATYPE_NAME, H5S_ALL, H5S_ALL, plist_id, read_buf) >= 0), "Dataset read succeeded");
+
+ VRFY((0 == memcmp(read_buf, correct_buf, correct_buf_size)), "Data verification succeeded");
+
+ if (correct_buf) free(correct_buf);
+ if (read_buf) free(read_buf);
+
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+ VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded");
+ VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded");
+ VRFY((H5Fclose(file_id) >= 0), "File close succeeded");
+
+ return;
+}
+
+/*
+ * Tests parallel write of filtered data in the case where
+ * the dataset has 3 dimensions and each process writes
+ * to its own "page" in the 3rd dimension.
+ *
+ * Programmer: Jordan Henderson
+ * 02/06/2017
+ */
+static void
+test_3d_filtered_dataset_no_overlap_separate_pages(void)
+{
+ C_DATATYPE *data = NULL;
+ C_DATATYPE *read_buf = NULL;
+ C_DATATYPE *correct_buf = NULL;
+ hsize_t dataset_dims[UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_DIMS];
+ hsize_t chunk_dims[UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_DIMS];
+ hsize_t sel_dims[UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_DIMS];
+ hsize_t count[UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_DIMS];
+ hsize_t stride[UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_DIMS];
+ hsize_t block[UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_DIMS];
+ hsize_t offset[UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_DIMS];
+ size_t i, data_size, correct_buf_size;
+ hid_t file_id = -1, dset_id = -1, plist_id = -1;
+ hid_t filespace = -1, memspace = -1;
+
+ if (MAINPROCESS) puts("Testing write to unshared filtered chunks on separate pages in 3D dataset");
+
+ /* Set up file access property list with parallel I/O access */
+ plist_id = H5Pcreate(H5P_FILE_ACCESS);
+ VRFY((plist_id >= 0), "FAPL creation succeeded");
+
+ VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded");
+
+ VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded");
+
+ file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id);
+ VRFY((file_id >= 0), "Test file open succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded");
+
+ /* Create the dataspace for the dataset */
+ dataset_dims[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_NROWS;
+ dataset_dims[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_NCOLS;
+ dataset_dims[2] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DEPTH;
+ chunk_dims[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_CH_NROWS;
+ chunk_dims[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_CH_NCOLS;
+ chunk_dims[2] = 1;
+ sel_dims[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_NROWS;
+ sel_dims[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_NCOLS;
+ sel_dims[2] = 1;
+
+ filespace = H5Screate_simple(UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_DIMS, dataset_dims, NULL);
+ VRFY((filespace >= 0), "File dataspace creation succeeded");
+
+ memspace = H5Screate_simple(UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_DIMS, sel_dims, NULL);
+ VRFY((memspace >= 0), "Memory dataspace creation succeeded");
+
+ /* Create chunked dataset */
+ plist_id = H5Pcreate(H5P_DATASET_CREATE);
+ VRFY((plist_id >= 0), "DCPL creation succeeded");
+
+ VRFY((H5Pset_chunk(plist_id, UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set");
+
+ /* Add test filter to the pipeline */
+ VRFY((SET_FILTER(plist_id) >= 0), "Filter set");
+
+ dset_id = H5Dcreate2(file_id, UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_NAME, HDF5_DATATYPE_NAME, filespace,
+ H5P_DEFAULT, plist_id, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset creation succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+
+ /* Each process defines the dataset selection in memory and writes
+ * it to the hyperslab in the file
+ */
+ count[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_NROWS / (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_CH_NROWS;
+ count[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_NCOLS / (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_CH_NCOLS;
+ count[2] = 1;
+ stride[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_CH_NROWS;
+ stride[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_CH_NCOLS;
+ stride[2] = 1;
+ block[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_CH_NROWS;
+ block[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_CH_NCOLS;
+ block[2] = 1;
+ offset[0] = 0;
+ offset[1] = 0;
+ offset[2] = (hsize_t) mpi_rank;
+
+ if (VERBOSE_MED)
+ printf("Process %d is writing with count[ %llu, %llu, %llu ], stride[ %llu, %llu, %llu ], offset[ %llu, %llu, %llu ], block size[ %llu, %llu, %llu ]\n",
+ mpi_rank, count[0], count[1], count[2], stride[0], stride[1], stride[2], offset[0], offset[1], offset[2], block[0], block[1], block[2]);
+
+ /* Select hyperslab in the file */
+ filespace = H5Dget_space(dset_id);
+ VRFY((filespace >= 0), "File dataspace retrieval succeeded");
+
+ VRFY((H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, stride, count, block) >= 0), "Hyperslab selection succeeded");
+
+ /* Fill data buffer */
+ data_size = sel_dims[0] * sel_dims[1] * sel_dims[2] * sizeof(*data);
+ correct_buf_size = dataset_dims[0] * dataset_dims[1] * dataset_dims[2] * sizeof(*correct_buf);
+
+ data = (C_DATATYPE *) calloc(1, data_size);
+ VRFY((NULL != data), "calloc succeeded");
+
+ correct_buf = (C_DATATYPE *) calloc(1, correct_buf_size);
+ VRFY((NULL != correct_buf), "calloc succeeded");
+
+ for (i = 0; i < data_size / sizeof(*data); i++)
+ data[i] = (C_DATATYPE) GEN_DATA(i);
+
+ for (i = 0; i < correct_buf_size / sizeof(*correct_buf); i++)
+ correct_buf[i] = (C_DATATYPE) ((i % (hsize_t) mpi_size) + (i / (hsize_t) mpi_size));
+
+ /* Create property list for collective dataset write */
+ plist_id = H5Pcreate(H5P_DATASET_XFER);
+ VRFY((plist_id >= 0), "DXPL creation succeeded");
+
+ VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded");
+
+ VRFY((H5Dwrite(dset_id, HDF5_DATATYPE_NAME, memspace, filespace, plist_id, data) >= 0), "Dataset write succeeded");
+
+ if (data) free(data);
+
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+
+ /* Verify the correct data was written */
+ read_buf = (C_DATATYPE *) calloc(1, correct_buf_size);
+ VRFY((NULL != read_buf), "calloc succeeded");
+
+ dset_id = H5Dopen2(file_id, "/" UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_NAME, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset open succeeded");
+
+ VRFY((H5Dread(dset_id, HDF5_DATATYPE_NAME, H5S_ALL, H5S_ALL, plist_id, read_buf) >= 0), "Dataset read succeeded");
+
+ VRFY((0 == memcmp(read_buf, correct_buf, correct_buf_size)), "Data verification succeeded");
+
+ if (correct_buf) free(correct_buf);
+ if (read_buf) free(read_buf);
+
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+ VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded");
+ VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded");
+ VRFY((H5Fclose(file_id) >= 0), "File close succeeded");
+
+ return;
+}
+
+/*
+ * Tests parallel write of filtered data in the case where
+ * the dataset has 3 dimensions and each process writes
+ * to each "page" in the 3rd dimension. However, no chunk
+ * on a given "page" is written to by more than one process.
+ *
+ * Programmer: Jordan Henderson
+ * 02/06/2017
+ */
+static void
+test_3d_filtered_dataset_no_overlap_same_pages(void)
+{
+ C_DATATYPE *data = NULL;
+ C_DATATYPE *read_buf = NULL;
+ C_DATATYPE *correct_buf = NULL;
+ hsize_t dataset_dims[UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_DIMS];
+ hsize_t chunk_dims[UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_DIMS];
+ hsize_t sel_dims[UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_DIMS];
+ hsize_t count[UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_DIMS];
+ hsize_t stride[UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_DIMS];
+ hsize_t block[UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_DIMS];
+ hsize_t offset[UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_DIMS];
+ size_t i, data_size, correct_buf_size;
+ hid_t file_id, dset_id, plist_id;
+ hid_t filespace, memspace;
+
+ if (MAINPROCESS) puts("Testing write to unshared filtered chunks on the same pages in 3D dataset");
+
+ /* Set up file access property list with parallel I/O access */
+ plist_id = H5Pcreate(H5P_FILE_ACCESS);
+ VRFY((plist_id >= 0), "FAPL creation succeeded");
+
+ VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded");
+
+ VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded");
+
+ file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id);
+ VRFY((file_id >= 0), "Test file open succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded");
+
+ /* Create the dataspace for the dataset */
+ dataset_dims[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_NROWS;
+ dataset_dims[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_NCOLS;
+ dataset_dims[2] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DEPTH;
+ chunk_dims[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_CH_NROWS;
+ chunk_dims[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_CH_NCOLS;
+ chunk_dims[2] = 1;
+ sel_dims[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_CH_NROWS;
+ sel_dims[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_NCOLS;
+ sel_dims[2] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DEPTH;
+
+ filespace = H5Screate_simple(UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_DIMS, dataset_dims, NULL);
+ VRFY((filespace >= 0), "File dataspace creation succeeded");
+
+ memspace = H5Screate_simple(UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_DIMS, sel_dims, NULL);
+ VRFY((memspace >= 0), "Memory dataspace creation succeeded");
+
+ /* Create chunked dataset */
+ plist_id = H5Pcreate(H5P_DATASET_CREATE);
+ VRFY((plist_id >= 0), "DCPL creation succeeded");
+
+ VRFY((H5Pset_chunk(plist_id, UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set");
+
+ /* Add test filter to the pipeline */
+ VRFY((SET_FILTER(plist_id) >= 0), "Filter set");
+
+ dset_id = H5Dcreate2(file_id, UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_NAME, HDF5_DATATYPE_NAME, filespace,
+ H5P_DEFAULT, plist_id, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset creation succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+
+ /* Each process defines the dataset selection in memory and writes
+ * it to the hyperslab in the file
+ */
+ count[0] = 1;
+ count[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_NCOLS / (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_CH_NCOLS;
+ count[2] = (hsize_t) mpi_size;
+ stride[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_CH_NROWS;
+ stride[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_CH_NCOLS;
+ stride[2] = 1;
+ block[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_CH_NROWS;
+ block[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_CH_NCOLS;
+ block[2] = 1;
+ offset[0] = ((hsize_t) mpi_rank * (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_CH_NROWS * count[0]);
+ offset[1] = 0;
+ offset[2] = 0;
+
+ if (VERBOSE_MED)
+ printf("Process %d is writing with count[ %llu, %llu, %llu ], stride[ %llu, %llu, %llu ], offset[ %llu, %llu, %llu ], block size[ %llu, %llu, %llu ]\n",
+ mpi_rank, count[0], count[1], count[2], stride[0], stride[1], stride[2], offset[0], offset[1], offset[2], block[0], block[1], block[2]);
+
+ /* Select hyperslab in the file */
+ filespace = H5Dget_space(dset_id);
+ VRFY((filespace >= 0), "File dataspace retrieval succeeded");
+
+ VRFY((H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, stride, count, block) >= 0), "Hyperslab selection succeeded");
+
+ /* Fill data buffer */
+ data_size = sel_dims[0] * sel_dims[1] * sel_dims[2] * sizeof(*data);
+ correct_buf_size = dataset_dims[0] * dataset_dims[1] * dataset_dims[2] * sizeof(*correct_buf);
+
+ data = (C_DATATYPE *) calloc(1, data_size);
+ VRFY((NULL != data), "calloc succeeded");
+
+ correct_buf = (C_DATATYPE *) calloc(1, correct_buf_size);
+ VRFY((NULL != correct_buf), "calloc succeeded");
+
+ for (i = 0; i < data_size / sizeof(*data); i++)
+ data[i] = (C_DATATYPE) GEN_DATA(i);
+
+ for (i = 0; i < correct_buf_size / sizeof(*correct_buf); i++)
+ correct_buf[i] = (C_DATATYPE) ((i % (dataset_dims[0] * dataset_dims[1])) + (i / (dataset_dims[0] * dataset_dims[1])));
+
+ /* Create property list for collective dataset write */
+ plist_id = H5Pcreate(H5P_DATASET_XFER);
+ VRFY((plist_id >= 0), "DXPL creation succeeded");
+
+ VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded");
+
+ VRFY((H5Dwrite(dset_id, HDF5_DATATYPE_NAME, memspace, filespace, plist_id, data) >= 0), "Dataset write succeeded");
+
+ if (data) free(data);
+
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+
+ /* Verify the correct data was written */
+ read_buf = (C_DATATYPE *) calloc(1, correct_buf_size);
+ VRFY((NULL != read_buf), "calloc succeeded");
+
+ dset_id = H5Dopen2(file_id, "/" UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_NAME, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset open succeeded");
+
+ VRFY((H5Dread(dset_id, HDF5_DATATYPE_NAME, H5S_ALL, H5S_ALL, plist_id, read_buf) >= 0), "Dataset read succeeded");
+
+ VRFY((0 == memcmp(read_buf, correct_buf, correct_buf_size)), "Data verification succeeded");
+
+ if (correct_buf) free(correct_buf);
+ if (read_buf) free(read_buf);
+
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+ VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded");
+ VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded");
+ VRFY((H5Fclose(file_id) >= 0), "File close succeeded");
+
+ return;
+}
+
+/*
+ * Tests parallel write of filtered data in the case where
+ * the dataset has 3 dimensions and each process writes
+ * to each "page" in the 3rd dimension. Further, each chunk
+ * in each "page" is written to equally by all processes.
+ *
+ * Programmer: Jordan Henderson
+ * 02/06/2017
+ */
+static void
+test_3d_filtered_dataset_overlap(void)
+{
+ C_DATATYPE *data = NULL;
+ C_DATATYPE *read_buf = NULL;
+ C_DATATYPE *correct_buf = NULL;
+ hsize_t dataset_dims[SHARED_FILTERED_CHUNKS_3D_DATASET_DIMS];
+ hsize_t chunk_dims[SHARED_FILTERED_CHUNKS_3D_DATASET_DIMS];
+ hsize_t sel_dims[SHARED_FILTERED_CHUNKS_3D_DATASET_DIMS];
+ hsize_t count[SHARED_FILTERED_CHUNKS_3D_DATASET_DIMS];
+ hsize_t stride[SHARED_FILTERED_CHUNKS_3D_DATASET_DIMS];
+ hsize_t block[SHARED_FILTERED_CHUNKS_3D_DATASET_DIMS];
+ hsize_t offset[SHARED_FILTERED_CHUNKS_3D_DATASET_DIMS];
+ size_t i, data_size, correct_buf_size;
+ hid_t file_id = -1, dset_id = -1, plist_id = -1;
+ hid_t filespace = -1, memspace = -1;
+
+ if (MAINPROCESS) puts("Testing write to shared filtered chunks in 3D dataset");
+
+ /* Set up file access property list with parallel I/O access */
+ plist_id = H5Pcreate(H5P_FILE_ACCESS);
+ VRFY((plist_id >= 0), "FAPL creation succeeded");
+
+ VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded");
+
+ VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded");
+
+ file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id);
+ VRFY((file_id >= 0), "Test file open succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded");
+
+ /* Create the dataspace for the dataset */
+ dataset_dims[0] = (hsize_t) SHARED_FILTERED_CHUNKS_3D_NROWS;
+ dataset_dims[1] = (hsize_t) SHARED_FILTERED_CHUNKS_3D_NCOLS;
+ dataset_dims[2] = (hsize_t) SHARED_FILTERED_CHUNKS_3D_DEPTH;
+ chunk_dims[0] = (hsize_t) SHARED_FILTERED_CHUNKS_3D_CH_NROWS;
+ chunk_dims[1] = (hsize_t) SHARED_FILTERED_CHUNKS_3D_CH_NCOLS;
+ chunk_dims[2] = 1;
+ sel_dims[0] = (hsize_t) (SHARED_FILTERED_CHUNKS_3D_NROWS / mpi_size);
+ sel_dims[1] = (hsize_t) SHARED_FILTERED_CHUNKS_3D_NCOLS;
+ sel_dims[2] = (hsize_t) SHARED_FILTERED_CHUNKS_3D_DEPTH;
+
+ filespace = H5Screate_simple(SHARED_FILTERED_CHUNKS_3D_DATASET_DIMS, dataset_dims, NULL);
+ VRFY((filespace >= 0), "File dataspace creation succeeded");
+
+ memspace = H5Screate_simple(SHARED_FILTERED_CHUNKS_3D_DATASET_DIMS, sel_dims, NULL);
+ VRFY((memspace >= 0), "Memory dataspace creation succeeded");
+
+ /* Create chunked dataset */
+ plist_id = H5Pcreate(H5P_DATASET_CREATE);
+ VRFY((plist_id >= 0), "DCPL creation succeeded");
+
+ VRFY((H5Pset_chunk(plist_id, SHARED_FILTERED_CHUNKS_3D_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set");
+
+ /* Add test filter to the pipeline */
+ VRFY((SET_FILTER(plist_id) >= 0), "Filter set");
+
+ dset_id = H5Dcreate2(file_id, SHARED_FILTERED_CHUNKS_3D_DATASET_NAME, HDF5_DATATYPE_NAME, filespace,
+ H5P_DEFAULT, plist_id, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset creation succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+
+ /* Each process defines the dataset selection in memory and writes
+ * it to the hyperslab in the file
+ */
+ count[0] = (hsize_t) (SHARED_FILTERED_CHUNKS_3D_NROWS / SHARED_FILTERED_CHUNKS_3D_CH_NROWS);
+ count[1] = (hsize_t) (SHARED_FILTERED_CHUNKS_3D_NCOLS / SHARED_FILTERED_CHUNKS_3D_CH_NCOLS);
+ count[2] = (hsize_t) SHARED_FILTERED_CHUNKS_3D_DEPTH;
+ stride[0] = (hsize_t) SHARED_FILTERED_CHUNKS_3D_CH_NROWS;
+ stride[1] = (hsize_t) SHARED_FILTERED_CHUNKS_3D_CH_NCOLS;
+ stride[2] = 1;
+ block[0] = 1;
+ block[1] = (hsize_t) SHARED_FILTERED_CHUNKS_3D_CH_NCOLS;
+ block[2] = 1;
+ offset[0] = (hsize_t) mpi_rank;
+ offset[1] = 0;
+ offset[2] = 0;
+
+ if (VERBOSE_MED)
+ printf("Process %d is writing with count[ %llu, %llu, %llu ], stride[ %llu, %llu, %llu ], offset[ %llu, %llu, %llu ], block size[ %llu, %llu, %llu ]\n",
+ mpi_rank, count[0], count[1], count[2], stride[0], stride[1], stride[2], offset[0], offset[1], offset[2], block[0], block[1], block[2]);
+
+ /* Select hyperslab in the file */
+ filespace = H5Dget_space(dset_id);
+ VRFY((filespace >= 0), "File dataspace retrieval succeeded");
+
+ VRFY((H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, stride, count, block) >= 0), "Hyperslab selection succeeded");
+
+ /* Fill data buffer */
+ data_size = sel_dims[0] * sel_dims[1] * sel_dims[2] * sizeof(*data);
+ correct_buf_size = dataset_dims[0] * dataset_dims[1] * dataset_dims[2] * sizeof(*correct_buf);
+
+ data = (C_DATATYPE *) calloc(1, data_size);
+ VRFY((NULL != data), "calloc succeeded");
+
+ correct_buf = (C_DATATYPE *) calloc(1, correct_buf_size);
+ VRFY((NULL != correct_buf), "calloc succeeded");
+
+ for (i = 0; i < data_size / sizeof(*data); i++)
+ data[i] = (C_DATATYPE) GEN_DATA(i);
+
+ for (i = 0; i < correct_buf_size / sizeof(*correct_buf); i++)
+ /* Add the Column Index */
+ correct_buf[i] = (C_DATATYPE) ( (i % (hsize_t) (SHARED_FILTERED_CHUNKS_3D_DEPTH * SHARED_FILTERED_CHUNKS_3D_NCOLS))
+
+ /* Add the Row Index */
+ + ((i % (hsize_t) (mpi_size * SHARED_FILTERED_CHUNKS_3D_DEPTH * SHARED_FILTERED_CHUNKS_3D_NCOLS)) / (hsize_t) (SHARED_FILTERED_CHUNKS_3D_DEPTH * SHARED_FILTERED_CHUNKS_3D_NCOLS))
+
+ /* Add the amount that gets added when a rank moves down to its next section vertically in the dataset */
+ + ((hsize_t) (SHARED_FILTERED_CHUNKS_3D_DEPTH * SHARED_FILTERED_CHUNKS_3D_NCOLS) * (i / (hsize_t) (mpi_size * SHARED_FILTERED_CHUNKS_3D_DEPTH * SHARED_FILTERED_CHUNKS_3D_NCOLS))));
+
+ /* Create property list for collective dataset write */
+ plist_id = H5Pcreate(H5P_DATASET_XFER);
+ VRFY((plist_id >= 0), "DXPL creation succeeded");
+
+ VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded");
+
+ VRFY((H5Dwrite(dset_id, HDF5_DATATYPE_NAME, memspace, filespace, plist_id, data) >= 0), "Dataset write succeeded");
+
+ if (data) free(data);
+
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+
+ /* Verify the correct data was written */
+ read_buf = (C_DATATYPE *) calloc(1, correct_buf_size);
+ VRFY((NULL != read_buf), "calloc succeeded");
+
+ dset_id = H5Dopen2(file_id, "/" SHARED_FILTERED_CHUNKS_3D_DATASET_NAME, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset open succeeded");
+
+ VRFY((H5Dread(dset_id, HDF5_DATATYPE_NAME, H5S_ALL, H5S_ALL, plist_id, read_buf) >= 0), "Dataset read succeeded");
+
+ VRFY((0 == memcmp(read_buf, correct_buf, correct_buf_size)), "Data verification succeeded");
+
+ if (correct_buf) free(correct_buf);
+ if (read_buf) free(read_buf);
+
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+ VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded");
+ VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded");
+ VRFY((H5Fclose(file_id) >= 0), "File close succeeded");
+
+ return;
+}
+
+/*
+ * Tests parallel write of filtered data to unshared
+ * chunks using a compound datatype which doesn't
+ * require a datatype conversion.
+ *
+ * Programmer: Jordan Henderson
+ * 02/10/2017
+ */
+/* JTH: This test currently cannot be data-verified due to the floating-point data involved */
+static void
+test_cmpd_filtered_dataset_no_conversion_unshared(void)
+{
+ cmpd_filtered_t *data = NULL;
+ hsize_t dataset_dims[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_DATASET_DIMS];
+ hsize_t chunk_dims[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_DATASET_DIMS];
+ hsize_t sel_dims[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_DATASET_DIMS];
+ hsize_t count[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_DATASET_DIMS];
+ hsize_t stride[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_DATASET_DIMS];
+ hsize_t block[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_DATASET_DIMS];
+ hsize_t offset[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_DATASET_DIMS];
+ size_t i;
+ hid_t file_id = -1, dset_id = -1, plist_id = -1, memtype = -1;
+ hid_t filespace = -1, memspace = -1;
+
+ if (MAINPROCESS) puts("Testing write to unshared filtered chunks in Compound Datatype dataset without Datatype conversion");
+
+ /* Set up file access property list with parallel I/O access */
+ plist_id = H5Pcreate(H5P_FILE_ACCESS);
+ VRFY((plist_id >= 0), "FAPL creation succeeded");
+
+ VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded");
+
+ VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded");
+
+ file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id);
+ VRFY((file_id >= 0), "Test file open succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded");
+
+ /* Create the dataspace for the dataset */
+ dataset_dims[0] = COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_NROWS;
+ dataset_dims[1] = (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_NCOLS;
+ chunk_dims[0] = COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_CH_NROWS;
+ chunk_dims[1] = COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_CH_NCOLS;
+ sel_dims[0] = COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_CH_NROWS;
+ sel_dims[1] = (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_ENTRIES_PER_PROC;
+
+ filespace = H5Screate_simple(COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_DATASET_DIMS, dataset_dims, NULL);
+ VRFY((filespace >= 0), "File dataspace creation succeeded");
+
+ memspace = H5Screate_simple(COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_DATASET_DIMS, sel_dims, NULL);
+ VRFY((memspace >= 0), "Memory dataspace creation succeeded");
+
+ /* Create chunked dataset */
+ plist_id = H5Pcreate(H5P_DATASET_CREATE);
+ VRFY((plist_id >= 0), "DCPL creation succeeded");
+
+ VRFY((H5Pset_chunk(plist_id, COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set");
+
+ /* Add test filter to the pipeline */
+ VRFY((SET_FILTER(plist_id) >= 0), "Filter set");
+
+ /* Create the compound type for memory. */
+ memtype = H5Tcreate(H5T_COMPOUND, sizeof(cmpd_filtered_t));
+ VRFY((memtype >= 0), "Datatype creation succeeded");
+
+ VRFY((H5Tinsert(memtype, "ShortData", HOFFSET(cmpd_filtered_t, field1), H5T_NATIVE_SHORT) >= 0), "Datatype insertion succeeded");
+ VRFY((H5Tinsert(memtype, "IntData", HOFFSET(cmpd_filtered_t, field2), H5T_NATIVE_INT) >= 0), "Datatype insertion succeeded");
+ VRFY((H5Tinsert(memtype, "LongData", HOFFSET(cmpd_filtered_t, field3), H5T_NATIVE_LONG) >= 0), "Datatype insertion succeeded");
+ VRFY((H5Tinsert(memtype, "DoubleData", HOFFSET(cmpd_filtered_t, field4), H5T_NATIVE_DOUBLE) >= 0), "Datatype insertion succeeded");
+
+ dset_id = H5Dcreate2(file_id, COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_DATASET_NAME, memtype, filespace,
+ H5P_DEFAULT, plist_id, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset creation succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+
+ /* Each process defines the dataset selection in memory and writes
+ * it to the hyperslab in the file
+ */
+ count[0] = 1;
+ count[1] = (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_ENTRIES_PER_PROC;
+ stride[0] = COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_CH_NROWS;
+ stride[1] = COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_CH_NCOLS;
+ block[0] = COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_CH_NROWS;
+ block[1] = COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_CH_NCOLS;
+ offset[0] = 0;
+ offset[1] = ((hsize_t) mpi_rank * COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_CH_NCOLS);
+
+ if (VERBOSE_MED)
+ printf("Process %d is writing with count[ %llu, %llu ], stride[ %llu, %llu ], offset[ %llu, %llu ], block size[ %llu, %llu ]\n",
+ mpi_rank, count[0], count[1], stride[0], stride[1], offset[0], offset[1], block[0], block[1]);
+
+ /* Select hyperslab in the file */
+ filespace = H5Dget_space(dset_id);
+ VRFY((filespace >= 0), "File dataspace retrieval succeeded");
+
+ VRFY((H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, stride, count, block) >= 0), "Hyperslab selection succeeded");
+
+ data = (COMPOUND_C_DATATYPE *) calloc(1, (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_ENTRIES_PER_PROC * sizeof(*data));
+ VRFY((NULL != data), "calloc succeeded");
+
+ /* Fill data buffer */
+ memset(data, 0, sizeof(cmpd_filtered_t) * (size_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_ENTRIES_PER_PROC);
+ for (i = 0; i < (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_ENTRIES_PER_PROC; i++) {
+ data[i].field1 = (short) GEN_DATA(i);
+ data[i].field2 = (int) GEN_DATA(i);
+ data[i].field3 = (long) GEN_DATA(i);
+ data[i].field4 = (double) GEN_DATA(i);
+ }
+
+ /* Create property list for collective dataset write */
+ plist_id = H5Pcreate(H5P_DATASET_XFER);
+ VRFY((plist_id >= 0), "DXPL creation succeeded");
+
+ VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded");
+
+ VRFY((H5Dwrite(dset_id, memtype, memspace, filespace, plist_id, data) >= 0), "Dataset write succeeded");
+
+ if (data) free(data);
+
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+ VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded");
+ VRFY((H5Tclose(memtype) >= 0), "Datatype close succeeded");
+ VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded");
+ VRFY((H5Fclose(file_id) >= 0), "File close succeeded");
+
+ return;
+}
+
+/*
+ * Tests parallel write of filtered data to shared
+ * chunks using a compound datatype which doesn't
+ * require a datatype conversion.
+ *
+ * Programmer: Jordan Henderson
+ * 02/10/2017
+ */
+/* JTH: This test currently cannot be data-verified due to the floating-point data involved */
+static void
+test_cmpd_filtered_dataset_no_conversion_shared(void)
+{
+ cmpd_filtered_t *data = NULL;
+ hsize_t dataset_dims[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_DATASET_DIMS];
+ hsize_t chunk_dims[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_DATASET_DIMS];
+ hsize_t sel_dims[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_DATASET_DIMS];
+ hsize_t count[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_DATASET_DIMS];
+ hsize_t stride[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_DATASET_DIMS];
+ hsize_t block[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_DATASET_DIMS];
+ hsize_t offset[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_DATASET_DIMS];
+ size_t i;
+ hid_t file_id, dset_id, plist_id, memtype;
+ hid_t filespace, memspace;
+
+ if (MAINPROCESS) puts("Testing write to shared filtered chunks in Compound Datatype dataset without Datatype conversion");
+
+ /* Set up file access property list with parallel I/O access */
+ plist_id = H5Pcreate(H5P_FILE_ACCESS);
+ VRFY((plist_id>= 0), "FAPL creation succeeded");
+
+ VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded");
+
+ VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded");
+
+ file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id);
+ VRFY((file_id >= 0), "Test file open succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded");
+
+ /* Create the dataspace for the dataset */
+ dataset_dims[0] = (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_NROWS;
+ dataset_dims[1] = (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_NCOLS;
+ chunk_dims[0] = (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_CH_NROWS;
+ chunk_dims[1] = COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_CH_NCOLS;
+ sel_dims[0] = (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_CH_NROWS / (hsize_t) mpi_size;
+ sel_dims[1] = (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_ENTRIES_PER_PROC;
+
+ filespace = H5Screate_simple(COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_DATASET_DIMS, dataset_dims, NULL);
+ VRFY((filespace >= 0), "File dataspace creation succeeded");
+
+ memspace = H5Screate_simple(COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_DATASET_DIMS, sel_dims, NULL);
+ VRFY((memspace >= 0), "Memory dataspace creation succeeded");
+
+ /* Create chunked dataset */
+ plist_id = H5Pcreate(H5P_DATASET_CREATE);
+ VRFY((plist_id >= 0), "DCPL creation succeeded");
+
+ VRFY((H5Pset_chunk(plist_id, COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set");
+
+ /* Add test filter to the pipeline */
+ VRFY((SET_FILTER(plist_id) >= 0), "Filter set");
+
+ /* Create the compound type for memory. */
+ memtype = H5Tcreate(H5T_COMPOUND, sizeof(cmpd_filtered_t));
+ VRFY((memtype >= 0), "Datatype creation succeeded");
+
+ VRFY((H5Tinsert(memtype, "ShortData", HOFFSET(cmpd_filtered_t, field1), H5T_NATIVE_SHORT) >= 0), "Datatype insertion succeeded");
+ VRFY((H5Tinsert(memtype, "IntData", HOFFSET(cmpd_filtered_t, field2), H5T_NATIVE_INT) >= 0), "Datatype insertion succeeded");
+ VRFY((H5Tinsert(memtype, "LongData", HOFFSET(cmpd_filtered_t, field3), H5T_NATIVE_LONG) >= 0), "Datatype insertion succeeded");
+ VRFY((H5Tinsert(memtype, "DoubleData", HOFFSET(cmpd_filtered_t, field4), H5T_NATIVE_DOUBLE) >= 0), "Datatype insertion succeeded");
+
+ dset_id = H5Dcreate2(file_id, COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_DATASET_NAME, memtype, filespace,
+ H5P_DEFAULT, plist_id, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset creation succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+
+ /* Each process defines the dataset selection in memory and writes
+ * it to the hyperslab in the file
+ */
+ count[0] = 1;
+ count[1] = (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_ENTRIES_PER_PROC;
+ stride[0] = (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_CH_NROWS;
+ stride[1] = COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_CH_NCOLS;
+ block[0] = (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_CH_NROWS / (hsize_t) mpi_size;
+ block[1] = COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_CH_NCOLS;
+ offset[0] = (hsize_t) mpi_rank;
+ offset[1] = 0;
+
+ if (VERBOSE_MED)
+ printf("Process %d is writing with count[ %llu, %llu ], stride[ %llu, %llu ], offset[ %llu, %llu ], block size[ %llu, %llu ]\n",
+ mpi_rank, count[0], count[1], stride[0], stride[1], offset[0], offset[1], block[0], block[1]);
+
+ /* Select hyperslab in the file */
+ filespace = H5Dget_space(dset_id);
+ VRFY((filespace >= 0), "File dataspace retrieval succeeded");
+
+ VRFY((H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, stride, count, block) >= 0), "Hyperslab selection succeeded");
+
+ data = (COMPOUND_C_DATATYPE *) calloc(1, (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_ENTRIES_PER_PROC * sizeof(*data));
+ VRFY((NULL != data), "calloc succeeded");
+
+ /* Fill data buffer */
+ memset(data, 0, sizeof(cmpd_filtered_t) * (size_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_ENTRIES_PER_PROC);
+ for (i = 0; i < (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_ENTRIES_PER_PROC; i++) {
+ data[i].field1 = (short) GEN_DATA(i);
+ data[i].field2 = (int) GEN_DATA(i);
+ data[i].field3 = (long) GEN_DATA(i);
+ data[i].field4 = (double) GEN_DATA(i);
+ }
+
+ /* Create property list for collective dataset write */
+ plist_id = H5Pcreate(H5P_DATASET_XFER);
+ VRFY((plist_id >= 0), "DXPL creation succeeded");
+
+ VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded");
+
+ VRFY((H5Dwrite(dset_id, memtype, memspace, filespace, plist_id, data) >= 0), "Dataset write succeeded");
+
+ if (data) free(data);
+
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+ VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded");
+ VRFY((H5Tclose(memtype) >= 0), "Datatype close succeeded");
+ VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded");
+ VRFY((H5Fclose(file_id) >= 0), "File close succeeded");
+
+ return;
+}
+
+/*
+ * Tests parallel write of filtered data to unshared
+ * chunks using a compound datatype which requires a
+ * datatype conversion.
+ *
+ * This test currently should fail because the datatype
+ * conversion causes the parallel library to break
+ * to independent I/O and this isn't allowed when
+ * there are filters in the pipeline.
+ *
+ * Programmer: Jordan Henderson
+ * 02/07/2017
+ */
+/* JTH: This test currently cannot be data-verified due to the floating-point data involved */
+static void
+test_cmpd_filtered_dataset_type_conversion_unshared(void)
+{
+ cmpd_filtered_t *data = NULL;
+ hsize_t dataset_dims[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_DATASET_DIMS];
+ hsize_t chunk_dims[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_DATASET_DIMS];
+ hsize_t sel_dims[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_DATASET_DIMS];
+ hsize_t count[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_DATASET_DIMS];
+ hsize_t stride[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_DATASET_DIMS];
+ hsize_t block[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_DATASET_DIMS];
+ hsize_t offset[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_DATASET_DIMS];
+ size_t i;
+ hid_t file_id = -1, dset_id = -1, plist_id = -1, filetype = -1, memtype = -1;
+ hid_t filespace = -1, memspace = -1;
+
+ if (MAINPROCESS) puts("Testing write to unshared filtered chunks in Compound Datatype dataset with Datatype conversion");
+
+ /* Set up file access property list with parallel I/O access */
+ plist_id = H5Pcreate(H5P_FILE_ACCESS);
+ VRFY((plist_id >= 0), "FAPL creation succeeded");
+
+ VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded");
+
+ VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded");
+
+ file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id);
+ VRFY((file_id >= 0), "Test file open succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded");
+
+ /* Create the dataspace for the dataset */
+ dataset_dims[0] = COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_NROWS;
+ dataset_dims[1] = (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_NCOLS;
+ chunk_dims[0] = COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_CH_NROWS;
+ chunk_dims[1] = COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_CH_NCOLS;
+ sel_dims[0] = COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_CH_NROWS;
+ sel_dims[1] = (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_ENTRIES_PER_PROC;
+
+ filespace = H5Screate_simple(COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_DATASET_DIMS, dataset_dims, NULL);
+ VRFY((filespace >= 0), "File dataspace creation succeeded");
+
+ memspace = H5Screate_simple(COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_DATASET_DIMS, sel_dims, NULL);
+ VRFY((memspace >= 0), "Memory dataspace creation succeeded");
+
+ /* Create chunked dataset */
+ plist_id = H5Pcreate(H5P_DATASET_CREATE);
+ VRFY((plist_id >= 0), "DCPL creation succeeded");
+
+ VRFY((H5Pset_chunk(plist_id, COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set");
+
+ /* Add test filter to the pipeline */
+ VRFY((SET_FILTER(plist_id) >= 0), "Filter set");
+
+ /* Create the compound type for memory. */
+ memtype = H5Tcreate(H5T_COMPOUND, sizeof(cmpd_filtered_t));
+ VRFY((memtype >= 0), "Datatype creation succeeded");
+
+ VRFY((H5Tinsert(memtype, "ShortData", HOFFSET(cmpd_filtered_t, field1), H5T_NATIVE_SHORT) >= 0), "Datatype insertion succeeded");
+ VRFY((H5Tinsert(memtype, "IntData", HOFFSET(cmpd_filtered_t, field2), H5T_NATIVE_INT) >= 0), "Datatype insertion succeeded");
+ VRFY((H5Tinsert(memtype, "LongData", HOFFSET(cmpd_filtered_t, field3), H5T_NATIVE_LONG) >= 0), "Datatype insertion succeeded");
+ VRFY((H5Tinsert(memtype, "DoubleData", HOFFSET(cmpd_filtered_t, field4), H5T_NATIVE_DOUBLE) >= 0), "Datatype insertion succeeded");
+
+ /* Create the compound type for file. */
+ filetype = H5Tcreate(H5T_COMPOUND, 32);
+ VRFY((filetype >= 0), "Datatype creation succeeded");
+
+ VRFY((H5Tinsert(filetype, "ShortData", 0, H5T_STD_I64BE) >= 0), "Datatype insertion succeeded");
+ VRFY((H5Tinsert(filetype, "IntData", 8, H5T_STD_I64BE) >= 0), "Datatype insertion succeeded");
+ VRFY((H5Tinsert(filetype, "LongData", 16, H5T_STD_I64BE) >= 0), "Datatype insertion succeeded");
+ VRFY((H5Tinsert(filetype, "DoubleData", 24, H5T_IEEE_F64BE) >= 0), "Datatype insertion succeeded");
+
+ dset_id = H5Dcreate2(file_id, COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_DATASET_NAME, filetype, filespace,
+ H5P_DEFAULT, plist_id, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset creation succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+
+ /* Each process defines the dataset selection in memory and writes
+ * it to the hyperslab in the file
+ */
+ count[0] = 1;
+ count[1] = (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_ENTRIES_PER_PROC;
+ stride[0] = COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_CH_NROWS;
+ stride[1] = COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_CH_NCOLS;
+ block[0] = COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_CH_NROWS;
+ block[1] = COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_CH_NCOLS;
+ offset[0] = 0;
+ offset[1] = ((hsize_t) mpi_rank * COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_CH_NCOLS);
+
+ if (VERBOSE_MED)
+ printf("Process %d is writing with count[ %llu, %llu ], stride[ %llu, %llu ], offset[ %llu, %llu ], block size[ %llu, %llu ]\n",
+ mpi_rank, count[0], count[1], stride[0], stride[1], offset[0], offset[1], block[0], block[1]);
+
+ /* Select hyperslab in the file */
+ filespace = H5Dget_space(dset_id);
+ VRFY((filespace >= 0), "File dataspace retrieval succeeded");
+
+ VRFY((H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, stride, count, block) >= 0), "Hyperslab selection succeeded");
+
+ data = (COMPOUND_C_DATATYPE *) calloc(1, (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_ENTRIES_PER_PROC * sizeof(*data));
+ VRFY((NULL != data), "calloc succeeded");
+
+ /* Fill data buffer */
+ memset(data, 0, sizeof(cmpd_filtered_t) * (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_ENTRIES_PER_PROC);
+ for (i = 0; i < (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_ENTRIES_PER_PROC; i++) {
+ data[i].field1 = (short) GEN_DATA(i);
+ data[i].field2 = (int) GEN_DATA(i);
+ data[i].field3 = (long) GEN_DATA(i);
+ data[i].field4 = (double) GEN_DATA(i);
+ }
+
+ /* Create property list for collective dataset write */
+ plist_id = H5Pcreate(H5P_DATASET_XFER);
+ VRFY((plist_id >= 0), "DXPL creation succeeded");
+
+ VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded");
+
+ /* Ensure that this test currently fails since type conversions break collective mode */
+ H5E_BEGIN_TRY {
+ VRFY((H5Dwrite(dset_id, memtype, memspace, filespace, plist_id, data) < 0), "Dataset write succeeded");
+ } H5E_END_TRY;
+
+ if (data) free(data);
+
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+ VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded");
+ VRFY((H5Tclose(filetype) >= 0), "File datatype close succeeded");
+ VRFY((H5Tclose(memtype) >= 0), "Memory datatype close succeeded");
+ VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded");
+ VRFY((H5Fclose(file_id) >= 0), "File close succeeded");
+
+ return;
+}
+
+/*
+ * Tests parallel write of filtered data to shared
+ * chunks using a compound datatype which requires
+ * a datatype conversion.
+ *
+ * This test currently should fail because the datatype
+ * conversion causes the parallel library to break
+ * to independent I/O and this isn't allowed when
+ * there are filters in the pipeline.
+ *
+ * Programmer: Jordan Henderson
+ * 02/10/2017
+ */
+/* JTH: This test currently cannot be data-verified due to the floating-point data involved */
+static void
+test_cmpd_filtered_dataset_type_conversion_shared(void)
+{
+ cmpd_filtered_t *data = NULL;
+ hsize_t dataset_dims[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_DATASET_DIMS];
+ hsize_t chunk_dims[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_DATASET_DIMS];
+ hsize_t sel_dims[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_DATASET_DIMS];
+ hsize_t count[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_DATASET_DIMS];
+ hsize_t stride[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_DATASET_DIMS];
+ hsize_t block[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_DATASET_DIMS];
+ hsize_t offset[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_DATASET_DIMS];
+ size_t i;
+ hid_t file_id, dset_id, plist_id, filetype, memtype;
+ hid_t filespace, memspace;
+
+ if (MAINPROCESS) puts("Testing write to shared filtered chunks in Compound Datatype dataset with Datatype conversion");
+
+ /* Set up file access property list with parallel I/O access */
+ plist_id = H5Pcreate(H5P_FILE_ACCESS);
+ VRFY((plist_id >= 0), "FAPL creation succeeded");
+
+ VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded");
+
+ VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded");
+
+ file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id);
+ VRFY((file_id >= 0), "Test file open succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded");
+
+ /* Create the dataspace for the dataset */
+ dataset_dims[0] = (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_NROWS;
+ dataset_dims[1] = (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_NCOLS;
+ chunk_dims[0] = (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_CH_NROWS;
+ chunk_dims[1] = COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_CH_NCOLS;
+ sel_dims[0] = (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_CH_NROWS / (hsize_t) mpi_size;
+ sel_dims[1] = (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_ENTRIES_PER_PROC;
+
+ filespace = H5Screate_simple(COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_DATASET_DIMS, dataset_dims, NULL);
+ VRFY((filespace >= 0), "File dataspace creation succeeded");
+
+ memspace = H5Screate_simple(COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_DATASET_DIMS, sel_dims, NULL);
+ VRFY((memspace >= 0), "Memory dataspace creation succeeded");
+
+ /* Create chunked dataset */
+ plist_id = H5Pcreate(H5P_DATASET_CREATE);
+ VRFY((plist_id >= 0), "DCPL creation succeeded");
+
+ VRFY((H5Pset_chunk(plist_id, COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set");
+
+ /* Add test filter to the pipeline */
+ VRFY((SET_FILTER(plist_id) >= 0), "Filter set");
+
+ /* Create the compound type for memory. */
+ memtype = H5Tcreate(H5T_COMPOUND, sizeof(cmpd_filtered_t));
+ VRFY((memtype >= 0), "Datatype creation succeeded");
+
+ VRFY((H5Tinsert(memtype, "ShortData", HOFFSET(cmpd_filtered_t, field1), H5T_NATIVE_SHORT) >= 0), "Datatype insertion succeeded");
+ VRFY((H5Tinsert(memtype, "IntData", HOFFSET(cmpd_filtered_t, field2), H5T_NATIVE_INT) >= 0), "Datatype insertion succeeded");
+ VRFY((H5Tinsert(memtype, "LongData", HOFFSET(cmpd_filtered_t, field3), H5T_NATIVE_LONG) >= 0), "Datatype insertion succeeded");
+ VRFY((H5Tinsert(memtype, "DoubleData", HOFFSET(cmpd_filtered_t, field4), H5T_NATIVE_DOUBLE) >= 0), "Datatype insertion succeeded");
+
+ /* Create the compound type for file. */
+ filetype = H5Tcreate(H5T_COMPOUND, 32);
+ VRFY((filetype >= 0), "Datatype creation succeeded");
+
+ VRFY((H5Tinsert(filetype, "ShortData", 0, H5T_STD_I64BE) >= 0), "Datatype insertion succeeded");
+ VRFY((H5Tinsert(filetype, "IntData", 8, H5T_STD_I64BE) >= 0), "Datatype insertion succeeded");
+ VRFY((H5Tinsert(filetype, "LongData", 16, H5T_STD_I64BE) >= 0), "Datatype insertion succeeded");
+ VRFY((H5Tinsert(filetype, "DoubleData", 24, H5T_IEEE_F64BE) >= 0), "Datatype insertion succeeded");
+
+ dset_id = H5Dcreate2(file_id, COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_DATASET_NAME, filetype, filespace,
+ H5P_DEFAULT, plist_id, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset creation succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+
+ /* Each process defines the dataset selection in memory and writes
+ * it to the hyperslab in the file
+ */
+ count[0] = 1;
+ count[1] = (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_ENTRIES_PER_PROC;
+ stride[0] = (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_CH_NROWS;
+ stride[1] = COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_CH_NCOLS;
+ block[0] = (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_CH_NROWS / (hsize_t) mpi_size;
+ block[1] = COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_CH_NCOLS;
+ offset[0] = (hsize_t) mpi_rank;
+ offset[1] = 0;
+
+ if (VERBOSE_MED)
+ printf("Process %d is writing with count[ %llu, %llu ], stride[ %llu, %llu ], offset[ %llu, %llu ], block size[ %llu, %llu ]\n",
+ mpi_rank, count[0], count[1], stride[0], stride[1], offset[0], offset[1], block[0], block[1]);
+
+ /* Select hyperslab in the file */
+ filespace = H5Dget_space(dset_id);
+ VRFY((filespace >= 0), "File dataspace retrieval succeeded");
+
+ VRFY((H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, stride, count, block) >= 0), "Hyperslab selection succeeded");
+
+ data = (COMPOUND_C_DATATYPE *) calloc(1, (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_ENTRIES_PER_PROC * sizeof(*data));
+ VRFY((NULL != data), "calloc succeeded");
+
+ /* Fill data buffer */
+ memset(data, 0, sizeof(cmpd_filtered_t) * (size_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_ENTRIES_PER_PROC);
+ for (i = 0; i < (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_ENTRIES_PER_PROC; i++) {
+ data[i].field1 = (short) GEN_DATA(i);
+ data[i].field2 = (int) GEN_DATA(i);
+ data[i].field3 = (long) GEN_DATA(i);
+ data[i].field4 = (double) GEN_DATA(i);
+ }
+
+ /* Create property list for collective dataset write */
+ plist_id = H5Pcreate(H5P_DATASET_XFER);
+ VRFY((plist_id >= 0), "DXPL creation succeeded");
+
+ VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded");
+
+ /* Ensure that this test currently fails since type conversions break collective mode */
+ H5E_BEGIN_TRY {
+ VRFY((H5Dwrite(dset_id, memtype, memspace, filespace, plist_id, data) < 0), "Dataset write succeeded");
+ } H5E_END_TRY;
+
+ if (data) free(data);
+
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+ VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded");
+ VRFY((H5Tclose(filetype) >= 0), "File datatype close succeeded");
+ VRFY((H5Tclose(memtype) >= 0), "Memory datatype close succeeded");
+ VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded");
+ VRFY((H5Fclose(file_id) >= 0), "File close succeeded");
+
+ return;
+}
+
+/*
+ * Tests write of filtered data to a dataset
+ * by a single process. After the write has
+ * succeeded, the dataset is closed and then
+ * re-opened in parallel and read by all
+ * processes to ensure data correctness.
+ *
+ * Programmer: Jordan Henderson
+ * 08/03/2017
+ */
+static void
+test_write_serial_read_parallel(void)
+{
+ C_DATATYPE *data = NULL;
+ C_DATATYPE *read_buf = NULL;
+ C_DATATYPE *correct_buf = NULL;
+ hsize_t dataset_dims[WRITE_SERIAL_READ_PARALLEL_DATASET_DIMS];
+ hsize_t chunk_dims[WRITE_SERIAL_READ_PARALLEL_DATASET_DIMS];
+ size_t i, data_size, correct_buf_size;
+ hid_t file_id = -1, dset_id = -1, plist_id = -1;
+ hid_t filespace = -1;
+
+ if (MAINPROCESS) puts("Testing write file serially; read file in parallel");
+
+ dataset_dims[0] = (hsize_t) WRITE_SERIAL_READ_PARALLEL_NROWS;
+ dataset_dims[1] = (hsize_t) WRITE_SERIAL_READ_PARALLEL_NCOLS;
+ dataset_dims[2] = (hsize_t) WRITE_SERIAL_READ_PARALLEL_DEPTH;
+
+ /* Write the file on the MAINPROCESS rank */
+ if (MAINPROCESS) {
+ /* Set up file access property list */
+ plist_id = H5Pcreate(H5P_FILE_ACCESS);
+ VRFY((plist_id >= 0), "FAPL creation succeeded");
+
+ VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded");
+
+ file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id);
+ VRFY((file_id >= 0), "Test file open succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded");
+
+ /* Create the dataspace for the dataset */
+ chunk_dims[0] = (hsize_t) WRITE_SERIAL_READ_PARALLEL_CH_NROWS;
+ chunk_dims[1] = (hsize_t) WRITE_SERIAL_READ_PARALLEL_CH_NCOLS;
+ chunk_dims[2] = 1;
+
+ filespace = H5Screate_simple(WRITE_SERIAL_READ_PARALLEL_DATASET_DIMS, dataset_dims, NULL);
+ VRFY((filespace >= 0), "File dataspace creation succeeded");
+
+ /* Create chunked dataset */
+ plist_id = H5Pcreate(H5P_DATASET_CREATE);
+ VRFY((plist_id >= 0), "DCPL creation succeeded");
+
+ VRFY((H5Pset_chunk(plist_id, WRITE_SERIAL_READ_PARALLEL_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set");
+
+ /* Add test filter to the pipeline */
+ VRFY((SET_FILTER(plist_id) >= 0), "Filter set");
+
+ dset_id = H5Dcreate2(file_id, WRITE_SERIAL_READ_PARALLEL_DATASET_NAME, HDF5_DATATYPE_NAME, filespace,
+ H5P_DEFAULT, plist_id, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset creation succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+
+ data_size = dataset_dims[0] * dataset_dims[1] * dataset_dims[2] * sizeof(*data);
+
+ data = (C_DATATYPE *) calloc(1, data_size);
+ VRFY((NULL != data), "calloc succeeded");
+
+ for (i = 0; i < data_size / sizeof(*data); i++)
+ data[i] = (C_DATATYPE) GEN_DATA(i);
+
+ VRFY((H5Dwrite(dset_id, HDF5_DATATYPE_NAME, H5S_ALL, H5S_ALL, H5P_DEFAULT, data) >= 0), "Dataset write succeeded");
+
+ if (data) free(data);
+
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+ VRFY((H5Fclose(file_id) >= 0), "File close succeeded");
+ }
+
+ correct_buf_size = dataset_dims[0] * dataset_dims[1] * dataset_dims[2] * sizeof(*correct_buf);
+
+ correct_buf = (C_DATATYPE *) calloc(1, correct_buf_size);
+ VRFY((NULL != correct_buf), "calloc succeeded");
+
+ read_buf = (C_DATATYPE *) calloc(1, correct_buf_size);
+ VRFY((NULL != read_buf), "calloc succeeded");
+
+ for (i = 0; i < correct_buf_size / sizeof(*correct_buf); i++)
+ correct_buf[i] = (long) i;
+
+ /* All ranks open the file and verify their "portion" of the dataset is correct */
+ plist_id = H5Pcreate(H5P_FILE_ACCESS);
+ VRFY((plist_id >= 0), "FAPL creation succeeded");
+
+ VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded");
+
+ VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded");
+
+ file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id);
+ VRFY((file_id >= 0), "Test file open succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded");
+
+ dset_id = H5Dopen2(file_id, "/" WRITE_SERIAL_READ_PARALLEL_DATASET_NAME, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset open succeeded");
+
+ plist_id = H5Pcreate(H5P_DATASET_XFER);
+ VRFY((plist_id >= 0), "DXPL creation succeeded");
+
+ VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded");
+
+ VRFY((H5Dread(dset_id, HDF5_DATATYPE_NAME, H5S_ALL, H5S_ALL, plist_id, read_buf) >= 0), "Dataset read succeeded");
+
+ VRFY((0 == memcmp(read_buf, correct_buf, correct_buf_size)), "Data verification succeeded");
+
+ if (correct_buf) free(correct_buf);
+ if (read_buf) free(read_buf);
+
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+ VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded");
+ VRFY((H5Fclose(file_id) >= 0), "File close succeeded");
+
+ return;
+}
+
+/*
+ * Tests parallel write of filtered data
+ * to a dataset. After the write has
+ * succeeded, the dataset is closed and
+ * then re-opened and read by a single
+ * process to ensure data correctness.
+ *
+ * Programmer: Jordan Henderson
+ * 08/03/2017
+ */
+static void
+test_write_parallel_read_serial(void)
+{
+ C_DATATYPE *data = NULL;
+ C_DATATYPE *read_buf = NULL;
+ C_DATATYPE *correct_buf = NULL;
+ hsize_t dataset_dims[WRITE_PARALLEL_READ_SERIAL_DATASET_DIMS];
+ hsize_t chunk_dims[WRITE_PARALLEL_READ_SERIAL_DATASET_DIMS];
+ hsize_t sel_dims[WRITE_PARALLEL_READ_SERIAL_DATASET_DIMS];
+ hsize_t count[WRITE_PARALLEL_READ_SERIAL_DATASET_DIMS];
+ hsize_t stride[WRITE_PARALLEL_READ_SERIAL_DATASET_DIMS];
+ hsize_t block[WRITE_PARALLEL_READ_SERIAL_DATASET_DIMS];
+ hsize_t offset[WRITE_PARALLEL_READ_SERIAL_DATASET_DIMS];
+ size_t i, data_size, correct_buf_size;
+ hid_t file_id = -1, dset_id = -1, plist_id = -1;
+ hid_t filespace = -1, memspace = -1;
+
+ if (MAINPROCESS) puts("Testing write file in parallel; read serially");
+
+ /* Set up file access property list with parallel I/O access */
+ plist_id = H5Pcreate(H5P_FILE_ACCESS);
+ VRFY((plist_id >= 0), "FAPL creation succeeded");
+
+ VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded");
+
+ VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded");
+
+ file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id);
+ VRFY((file_id >= 0), "Test file open succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded");
+
+ /* Create the dataspace for the dataset */
+ dataset_dims[0] = (hsize_t) WRITE_PARALLEL_READ_SERIAL_NROWS;
+ dataset_dims[1] = (hsize_t) WRITE_PARALLEL_READ_SERIAL_NCOLS;
+ dataset_dims[2] = (hsize_t) WRITE_PARALLEL_READ_SERIAL_DEPTH;
+ chunk_dims[0] = (hsize_t) WRITE_PARALLEL_READ_SERIAL_CH_NROWS;
+ chunk_dims[1] = (hsize_t) WRITE_PARALLEL_READ_SERIAL_CH_NCOLS;
+ chunk_dims[2] = 1;
+ sel_dims[0] = (hsize_t) WRITE_PARALLEL_READ_SERIAL_CH_NROWS;
+ sel_dims[1] = (hsize_t) WRITE_PARALLEL_READ_SERIAL_NCOLS;
+ sel_dims[2] = (hsize_t) WRITE_PARALLEL_READ_SERIAL_DEPTH;
+
+ filespace = H5Screate_simple(WRITE_PARALLEL_READ_SERIAL_DATASET_DIMS, dataset_dims, NULL);
+ VRFY((filespace >= 0), "File dataspace creation succeeded");
+
+ memspace = H5Screate_simple(WRITE_PARALLEL_READ_SERIAL_DATASET_DIMS, sel_dims, NULL);
+ VRFY((memspace >= 0), "Memory dataspace creation succeeded");
+
+ /* Create chunked dataset */
+ plist_id = H5Pcreate(H5P_DATASET_CREATE);
+ VRFY((plist_id >= 0), "DCPL creation succeeded");
+
+ VRFY((H5Pset_chunk(plist_id, WRITE_PARALLEL_READ_SERIAL_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set");
+
+ /* Add test filter to the pipeline */
+ VRFY((SET_FILTER(plist_id) >= 0), "Filter set");
+
+ dset_id = H5Dcreate2(file_id, WRITE_PARALLEL_READ_SERIAL_DATASET_NAME, HDF5_DATATYPE_NAME, filespace,
+ H5P_DEFAULT, plist_id, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset creation succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+
+ /* Each process defines the dataset selection in memory and writes
+ * it to the hyperslab in the file
+ */
+ count[0] = 1;
+ count[1] = (hsize_t) WRITE_PARALLEL_READ_SERIAL_NCOLS / (hsize_t) WRITE_PARALLEL_READ_SERIAL_CH_NCOLS;
+ count[2] = (hsize_t) mpi_size;
+ stride[0] = (hsize_t) WRITE_PARALLEL_READ_SERIAL_CH_NROWS;
+ stride[1] = (hsize_t) WRITE_PARALLEL_READ_SERIAL_CH_NCOLS;
+ stride[2] = 1;
+ block[0] = (hsize_t) WRITE_PARALLEL_READ_SERIAL_CH_NROWS;
+ block[1] = (hsize_t) WRITE_PARALLEL_READ_SERIAL_CH_NCOLS;
+ block[2] = 1;
+ offset[0] = ((hsize_t) mpi_rank * (hsize_t) WRITE_PARALLEL_READ_SERIAL_CH_NROWS * count[0]);
+ offset[1] = 0;
+ offset[2] = 0;
+
+ if (VERBOSE_MED)
+ printf("Process %d is writing with count[ %llu, %llu, %llu ], stride[ %llu, %llu, %llu ], offset[ %llu, %llu, %llu ], block size[ %llu, %llu, %llu ]\n",
+ mpi_rank, count[0], count[1], count[2], stride[0], stride[1], stride[2], offset[0], offset[1], offset[2], block[0], block[1], block[2]);
+
+ /* Select hyperslab in the file */
+ filespace = H5Dget_space(dset_id);
+ VRFY((filespace >= 0), "File dataspace retrieval succeeded");
+
+ VRFY((H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, stride, count, block) >= 0), "Hyperslab selection succeeded");
+
+ /* Fill data buffer */
+ data_size = sel_dims[0] * sel_dims[1] * sel_dims[2] * sizeof(*data);
+
+ data = (C_DATATYPE *) calloc(1, data_size);
+ VRFY((NULL != data), "calloc succeeded");
+
+ for (i = 0; i < data_size / sizeof(*data); i++)
+ data[i] = (C_DATATYPE) GEN_DATA(i);
+
+ /* Create property list for collective dataset write */
+ plist_id = H5Pcreate(H5P_DATASET_XFER);
+ VRFY((plist_id >= 0), "DXPL creation succeeded");
+
+ VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded");
+
+ VRFY((H5Dwrite(dset_id, HDF5_DATATYPE_NAME, memspace, filespace, plist_id, data) >= 0), "Dataset write succeeded");
+
+ if (data) free(data);
+
+ VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded");
+ VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded");
+ VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded");
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+ VRFY((H5Fclose(file_id) >= 0), "File close succeeded");
+
+ if (MAINPROCESS) {
+ plist_id = H5Pcreate(H5P_FILE_ACCESS);
+ VRFY((plist_id >= 0), "FAPL creation succeeded");
+
+ VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded");
+
+ file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id);
+ VRFY((file_id >= 0), "Test file open succeeded");
+
+ VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded");
+
+ dset_id = H5Dopen2(file_id, "/" WRITE_PARALLEL_READ_SERIAL_DATASET_NAME, H5P_DEFAULT);
+ VRFY((dset_id >= 0), "Dataset open succeeded");
+
+ correct_buf_size = dataset_dims[0] * dataset_dims[1] * dataset_dims[2] * sizeof(*correct_buf);
+
+ correct_buf = (C_DATATYPE *) calloc(1, correct_buf_size);
+ VRFY((NULL != correct_buf), "calloc succeeded");
+
+ read_buf = (C_DATATYPE *) calloc(1, correct_buf_size);
+ VRFY((NULL != read_buf), "calloc succeeded");
+
+ for (i = 0; i < correct_buf_size / sizeof(*correct_buf); i++)
+ correct_buf[i] = (C_DATATYPE) ((i % (dataset_dims[0] * dataset_dims[1])) + (i / (dataset_dims[0] * dataset_dims[1])));;
+
+ VRFY((H5Dread(dset_id, HDF5_DATATYPE_NAME, H5S_ALL, H5S_ALL, H5P_DEFAULT, read_buf) >= 0), "Dataset read succeeded");
+
+ VRFY((0 == memcmp(read_buf, correct_buf, correct_buf_size)), "Data verification succeeded");
+
+ VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded");
+ VRFY((H5Fclose(file_id) >= 0), "File close succeeded");
+ }
+
+ return;
+}
+
+int
+main(int argc, char** argv)
+{
+ size_t i;
+ hid_t file_id = -1, fapl = -1;
+ int mpi_code;
+
+ /* Initialize MPI */
+ MPI_Init(&argc, &argv);
+ MPI_Comm_size(comm, &mpi_size);
+ MPI_Comm_rank(comm, &mpi_rank);
+
+ if (mpi_size <= 0) {
+ if (MAINPROCESS) {
+ printf("The Parallel Filters tests require at least 1 rank.\n");
+ printf("Quitting...\n");
+ }
+
+ MPI_Abort(MPI_COMM_WORLD, 1);
+ }
+
+ if (H5dont_atexit() < 0) {
+ printf("Failed to turn off atexit processing. Continue.\n");
+ }
+
+ H5open();
+
+ if (MAINPROCESS) {
+ printf("==========================\n");
+ printf("Parallel Filters tests\n");
+ printf("==========================\n\n");
+ }
+
+ if (VERBOSE_MED) h5_show_hostname();
+
+ ALARM_ON;
+
+ /* Create test file */
+ fapl = H5Pcreate(H5P_FILE_ACCESS);
+ VRFY((fapl >= 0), "FAPL creation succeeded");
+
+ VRFY((H5Pset_fapl_mpio(fapl, comm, info) >= 0), "Set FAPL MPIO succeeded");
+
+ VRFY((H5Pset_libver_bounds(fapl, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded");
+
+ VRFY((h5_fixname(FILENAME[0], fapl, filenames[0], sizeof(filenames[0])) != NULL), "Test file name created");
+
+ file_id = H5Fcreate(filenames[0], H5F_ACC_TRUNC, H5P_DEFAULT, fapl);
+ VRFY((file_id >= 0), "Test file creation succeeded");
+
+ VRFY((H5Fclose(file_id) >= 0), "File close succeeded");
+
+ for (i = 0; i < ARRAY_SIZE(tests); i++) {
+ if (MPI_SUCCESS == (mpi_code = MPI_Barrier(comm))) {
+ (*tests[i])();
+ } else {
+ if (MAINPROCESS) MESG("MPI_Barrier failed");
+ nerrors++;
+ }
+ }
+
+ if (nerrors) goto exit;
+
+ if (MAINPROCESS) puts("All Parallel Filters tests passed\n");
+
+exit:
+ if (nerrors)
+ if (MAINPROCESS) printf("*** %d TEST ERROR%s OCCURRED ***\n", nerrors, nerrors > 1 ? "S" : "");
+
+ ALARM_OFF;
+
+ h5_clean_files(FILENAME, fapl);
+
+ H5close();
+
+ MPI_Finalize();
+
+ exit((nerrors ? EXIT_FAILURE : EXIT_SUCCESS));
+}
diff --git a/testpar/t_filters_parallel.h b/testpar/t_filters_parallel.h
new file mode 100644
index 0000000..cb9a1ab
--- /dev/null
+++ b/testpar/t_filters_parallel.h
@@ -0,0 +1,212 @@
+/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
+ * Copyright by The HDF Group. *
+ * Copyright by the Board of Trustees of the University of Illinois. *
+ * All rights reserved. *
+ * *
+ * This file is part of HDF5. The full HDF5 copyright notice, including *
+ * terms governing use, modification, and redistribution, is contained in *
+ * the files COPYING and Copyright.html. COPYING can be found at the root *
+ * of the source code distribution tree; Copyright.html can be found at the *
+ * root level of an installed copy of the electronic HDF5 document set and *
+ * is linked from the top-level documents page. It can also be found at *
+ * http://hdfgroup.org/HDF5/doc/Copyright.html. If you do not have *
+ * access to either file, you may request a copy from help@hdfgroup.org. *
+ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
+
+/*
+ * Programmer: Jordan Henderson
+ * 01/31/2017
+ *
+ * This file contains #defines for tests of the use
+ * of filters in parallel HDF5, implemented in
+ * H5Dmpio.c
+ */
+
+#ifndef TEST_PARALLEL_FILTERS_H_
+#define TEST_PARALLEL_FILTERS_H_
+
+#include <string.h>
+
+#include "stdlib.h"
+#include "testpar.h"
+
+/* Used to load other filters than GZIP */
+/* #define DYNAMIC_FILTER */ /* Uncomment and define the fields below to use a dynamically loaded filter */
+#define FILTER_NUM_CDVALUES 1
+const unsigned int cd_values[FILTER_NUM_CDVALUES] = { 0 };
+H5Z_filter_t filter_id;
+unsigned int flags = 0;
+size_t cd_nelmts = FILTER_NUM_CDVALUES;
+
+/* Utility Macros */
+#define STRINGIFY(type) #type
+
+/* Common defines for all tests */
+#define C_DATATYPE long
+#define COMPOUND_C_DATATYPE cmpd_filtered_t
+#define C_DATATYPE_STR(type) STRINGIFY(type)
+#define HDF5_DATATYPE_NAME H5T_NATIVE_LONG
+
+#define GEN_DATA(i) INCREMENTAL_DATA(i)
+#define INCREMENTAL_DATA(i) ((size_t) mpi_rank + i) /* Generates incremental test data */
+
+/* For experimental purposes only, will cause tests to fail data verification phase - JTH */
+/* #define GEN_DATA(i) RANK_DATA(i) */ /* Given an index value i, generates test data based upon selected mode */
+#define RANK_DATA(i) (mpi_rank) /* Generates test data to visibly show which rank wrote to which parts of the dataset */
+
+#ifdef DYNAMIC_FILTER
+#define SET_FILTER(dcpl) H5Pset_filter(dcpl, filter_id, flags, FILTER_NUM_CDVALUES, cd_values) /* Test other filter in parallel */
+#else
+#define SET_FILTER(dcpl) H5Pset_deflate(dcpl, 6) /* Test GZIP filter in parallel */
+#endif
+
+#define DIM0_SCALE_FACTOR 4
+#define DIM1_SCALE_FACTOR 2
+
+/* Defines for the one-chunk filtered dataset test */
+#define ONE_CHUNK_FILTERED_DATASET_NAME "one_chunk_filtered_dataset"
+#define ONE_CHUNK_FILTERED_DATASET_DIMS 2
+#define ONE_CHUNK_FILTERED_DATASET_NROWS (mpi_size * DIM0_SCALE_FACTOR) /* Must be an even multiple of the number of ranks to avoid issues */
+#define ONE_CHUNK_FILTERED_DATASET_NCOLS (mpi_size * DIM1_SCALE_FACTOR) /* Must be an even multiple of the number of ranks to avoid issues */
+#define ONE_CHUNK_FILTERED_DATASET_CH_NROWS ONE_CHUNK_FILTERED_DATASET_NROWS
+#define ONE_CHUNK_FILTERED_DATASET_CH_NCOLS ONE_CHUNK_FILTERED_DATASET_NCOLS
+
+/* Defines for the unshared filtered chunks write test */
+#define UNSHARED_FILTERED_CHUNKS_DATASET_NAME "unshared_filtered_chunks"
+#define UNSHARED_FILTERED_CHUNKS_DATASET_DIMS 2
+#define UNSHARED_FILTERED_CHUNKS_NROWS (mpi_size * DIM0_SCALE_FACTOR)
+#define UNSHARED_FILTERED_CHUNKS_NCOLS (mpi_size * DIM1_SCALE_FACTOR)
+#define UNSHARED_FILTERED_CHUNKS_CH_NROWS (UNSHARED_FILTERED_CHUNKS_NROWS / mpi_size)
+#define UNSHARED_FILTERED_CHUNKS_CH_NCOLS (UNSHARED_FILTERED_CHUNKS_NCOLS / mpi_size)
+
+/* Defines for the shared filtered chunks write test */
+#define SHARED_FILTERED_CHUNKS_DATASET_NAME "shared_filtered_chunks"
+#define SHARED_FILTERED_CHUNKS_DATASET_DIMS 2
+#define SHARED_FILTERED_CHUNKS_CH_NROWS (mpi_size)
+#define SHARED_FILTERED_CHUNKS_CH_NCOLS (mpi_size)
+#define SHARED_FILTERED_CHUNKS_NROWS (SHARED_FILTERED_CHUNKS_CH_NROWS * DIM0_SCALE_FACTOR)
+#define SHARED_FILTERED_CHUNKS_NCOLS (SHARED_FILTERED_CHUNKS_CH_NCOLS * DIM1_SCALE_FACTOR)
+
+/* Defines for the filtered chunks write test where a process has no selection */
+#define SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_NAME "single_no_selection_filtered_chunks"
+#define SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS 2
+#define SINGLE_NO_SELECTION_FILTERED_CHUNKS_CH_NROWS (DIM0_SCALE_FACTOR)
+#define SINGLE_NO_SELECTION_FILTERED_CHUNKS_CH_NCOLS (DIM1_SCALE_FACTOR)
+#define SINGLE_NO_SELECTION_FILTERED_CHUNKS_NROWS (SINGLE_NO_SELECTION_FILTERED_CHUNKS_CH_NROWS * mpi_size)
+#define SINGLE_NO_SELECTION_FILTERED_CHUNKS_NCOLS (SINGLE_NO_SELECTION_FILTERED_CHUNKS_CH_NCOLS * mpi_size)
+#define SINGLE_NO_SELECTION_FILTERED_CHUNKS_NO_SELECT_PROC (mpi_size - 1)
+
+/* Defines for the filtered chunks write test where no process has a selection */
+#define ALL_NO_SELECTION_FILTERED_CHUNKS_DATASET_NAME "all_no_selection_filtered_chunks"
+#define ALL_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS 2
+#define ALL_NO_SELECTION_FILTERED_CHUNKS_CH_NROWS (DIM0_SCALE_FACTOR)
+#define ALL_NO_SELECTION_FILTERED_CHUNKS_CH_NCOLS (DIM1_SCALE_FACTOR)
+#define ALL_NO_SELECTION_FILTERED_CHUNKS_NROWS (ALL_NO_SELECTION_FILTERED_CHUNKS_CH_NROWS * mpi_size)
+#define ALL_NO_SELECTION_FILTERED_CHUNKS_NCOLS (ALL_NO_SELECTION_FILTERED_CHUNKS_CH_NCOLS * mpi_size)
+
+/* Defines for the filtered chunks write test with a point selection */
+#define POINT_SELECTION_FILTERED_CHUNKS_DATASET_NAME "point_selection_filtered_chunks"
+#define POINT_SELECTION_FILTERED_CHUNKS_DATASET_DIMS 2
+#define POINT_SELECTION_FILTERED_CHUNKS_CH_NROWS (DIM0_SCALE_FACTOR)
+#define POINT_SELECTION_FILTERED_CHUNKS_CH_NCOLS (DIM1_SCALE_FACTOR)
+#define POINT_SELECTION_FILTERED_CHUNKS_NROWS (POINT_SELECTION_FILTERED_CHUNKS_CH_NROWS * mpi_size)
+#define POINT_SELECTION_FILTERED_CHUNKS_NCOLS (POINT_SELECTION_FILTERED_CHUNKS_CH_NCOLS * mpi_size)
+
+/* Defines for the filtered dataset interleaved write test */
+#define INTERLEAVED_WRITE_FILTERED_DATASET_NAME "interleaved_write_filtered_dataset"
+#define INTERLEAVED_WRITE_FILTERED_DATASET_DIMS 2
+#define INTERLEAVED_WRITE_FILTERED_DATASET_CH_NROWS (mpi_size)
+#define INTERLEAVED_WRITE_FILTERED_DATASET_CH_NCOLS (DIM1_SCALE_FACTOR)
+#define INTERLEAVED_WRITE_FILTERED_DATASET_NROWS (INTERLEAVED_WRITE_FILTERED_DATASET_CH_NROWS * DIM0_SCALE_FACTOR)
+#define INTERLEAVED_WRITE_FILTERED_DATASET_NCOLS (INTERLEAVED_WRITE_FILTERED_DATASET_CH_NCOLS * DIM1_SCALE_FACTOR)
+
+/* Defines for the 3D unshared filtered dataset separate page write test */
+#define UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_NAME "3D_unshared_filtered_chunks_separate_pages"
+#define UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_DIMS 3
+#define UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_NROWS (mpi_size * DIM0_SCALE_FACTOR)
+#define UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_NCOLS (mpi_size * DIM1_SCALE_FACTOR)
+#define UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DEPTH (mpi_size)
+#define UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_CH_NROWS (UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_NROWS / mpi_size)
+#define UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_CH_NCOLS (UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_NCOLS / mpi_size)
+
+/* Defines for the 3D unshared filtered dataset same page write test */
+#define UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_NAME "3D_unshared_filtered_chunks_same_pages"
+#define UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_DIMS 3
+#define UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_NROWS (mpi_size * DIM0_SCALE_FACTOR)
+#define UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_NCOLS (mpi_size * DIM1_SCALE_FACTOR)
+#define UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DEPTH (mpi_size)
+#define UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_CH_NROWS (UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_NROWS / mpi_size)
+#define UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_CH_NCOLS (UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_NCOLS / mpi_size)
+
+/* Defines for the 3d shared filtered dataset write test */
+#define SHARED_FILTERED_CHUNKS_3D_DATASET_NAME "3D_shared_filtered_chunks"
+#define SHARED_FILTERED_CHUNKS_3D_DATASET_DIMS 3
+#define SHARED_FILTERED_CHUNKS_3D_CH_NROWS (mpi_size)
+#define SHARED_FILTERED_CHUNKS_3D_CH_NCOLS (DIM1_SCALE_FACTOR)
+#define SHARED_FILTERED_CHUNKS_3D_NROWS (SHARED_FILTERED_CHUNKS_3D_CH_NROWS * DIM0_SCALE_FACTOR)
+#define SHARED_FILTERED_CHUNKS_3D_NCOLS (SHARED_FILTERED_CHUNKS_3D_CH_NCOLS * DIM1_SCALE_FACTOR)
+#define SHARED_FILTERED_CHUNKS_3D_DEPTH (mpi_size)
+
+/* Struct type for the compound datatype filtered dataset tests */
+typedef struct {
+ short field1;
+ int field2;
+ long field3;
+ double field4;
+} COMPOUND_C_DATATYPE;
+
+/* Defines for the compound datatype filtered dataset no conversion write test with unshared chunks */
+#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_DATASET_NAME "compound_unshared_filtered_chunks_no_conversion"
+#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_DATASET_DIMS 2
+#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_NROWS 1
+#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_NCOLS mpi_size
+#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_CH_NROWS 1
+#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_CH_NCOLS 1
+#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_ENTRIES_PER_PROC (COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_NCOLS / mpi_size)
+
+/* Defines for the compound datatype filtered dataset no conversion write test with shared chunks */
+#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_DATASET_NAME "compound_shared_filtered_chunks_no_conversion"
+#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_DATASET_DIMS 2
+#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_NROWS mpi_size
+#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_NCOLS mpi_size
+#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_CH_NROWS mpi_size
+#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_CH_NCOLS 1
+#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_ENTRIES_PER_PROC COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_NCOLS
+
+/* Defines for the compound datatype filtered dataset type conversion write test with unshared chunks */
+#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_DATASET_NAME "compound_unshared_filtered_chunks_type_conversion"
+#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_DATASET_DIMS 2
+#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_NROWS 1
+#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_NCOLS mpi_size
+#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_CH_NROWS 1
+#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_CH_NCOLS 1
+#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_ENTRIES_PER_PROC (COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_NCOLS / mpi_size)
+
+/* Defines for the compound datatype filtered dataset type conversion write test with shared chunks */
+#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_DATASET_NAME "compound_shared_filtered_chunks_type_conversion"
+#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_DATASET_DIMS 2
+#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_NROWS mpi_size
+#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_NCOLS mpi_size
+#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_CH_NROWS mpi_size
+#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_CH_NCOLS 1
+#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_ENTRIES_PER_PROC COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_NCOLS
+
+/* Defines for the write file serially/read in parallel test */
+#define WRITE_SERIAL_READ_PARALLEL_DATASET_NAME "write_serial_read_parallel"
+#define WRITE_SERIAL_READ_PARALLEL_DATASET_DIMS 3
+#define WRITE_SERIAL_READ_PARALLEL_NROWS (mpi_size * DIM0_SCALE_FACTOR)
+#define WRITE_SERIAL_READ_PARALLEL_NCOLS (mpi_size * DIM1_SCALE_FACTOR)
+#define WRITE_SERIAL_READ_PARALLEL_DEPTH (mpi_size)
+#define WRITE_SERIAL_READ_PARALLEL_CH_NROWS (WRITE_SERIAL_READ_PARALLEL_NROWS / mpi_size)
+#define WRITE_SERIAL_READ_PARALLEL_CH_NCOLS (WRITE_SERIAL_READ_PARALLEL_NCOLS / mpi_size)
+
+/* Defines for the write file in parallel/read serially test */
+#define WRITE_PARALLEL_READ_SERIAL_DATASET_NAME "write_parallel_read_serial"
+#define WRITE_PARALLEL_READ_SERIAL_DATASET_DIMS 3
+#define WRITE_PARALLEL_READ_SERIAL_NROWS (mpi_size * DIM0_SCALE_FACTOR)
+#define WRITE_PARALLEL_READ_SERIAL_NCOLS (mpi_size * DIM1_SCALE_FACTOR)
+#define WRITE_PARALLEL_READ_SERIAL_DEPTH (mpi_size)
+#define WRITE_PARALLEL_READ_SERIAL_CH_NROWS (WRITE_PARALLEL_READ_SERIAL_NROWS / mpi_size)
+#define WRITE_PARALLEL_READ_SERIAL_CH_NCOLS (WRITE_PARALLEL_READ_SERIAL_NCOLS / mpi_size)
+
+#endif /* TEST_PARALLEL_FILTERS_H_ */