diff options
-rw-r--r-- | MANIFEST | 2 | ||||
-rw-r--r-- | src/H5C.c | 26 | ||||
-rw-r--r-- | src/H5Dchunk.c | 5 | ||||
-rw-r--r-- | src/H5Dint.c | 4 | ||||
-rw-r--r-- | src/H5Dio.c | 68 | ||||
-rw-r--r-- | src/H5Dmpio.c | 1665 | ||||
-rw-r--r-- | src/H5Dpkg.h | 8 | ||||
-rw-r--r-- | src/H5Dscatgath.c | 7 | ||||
-rw-r--r-- | src/H5Ppublic.h | 2 | ||||
-rw-r--r-- | src/H5err.txt | 2 | ||||
-rw-r--r-- | src/H5trace.c | 4 | ||||
-rw-r--r-- | testpar/Makefile.am | 2 | ||||
-rw-r--r-- | testpar/t_dset.c | 7 | ||||
-rw-r--r-- | testpar/t_filters_parallel.c | 2475 | ||||
-rw-r--r-- | testpar/t_filters_parallel.h | 212 |
15 files changed, 4401 insertions, 88 deletions
@@ -1236,6 +1236,8 @@ ./testpar/t_file.c ./testpar/t_file_image.c ./testpar/t_filter_read.c +./testpar/t_filters_parallel.c +./testpar/t_filters_parallel.h ./testpar/t_mdset.c ./testpar/t_mpi.c ./testpar/t_ph5basic.c @@ -6273,17 +6273,27 @@ H5C__flush_single_entry(H5F_t *f, hid_t dxpl_id, H5C_cache_entry_t *entry_ptr, HGOTO_ERROR(H5E_CACHE, H5E_CANTINSERT, FAIL, "unable to insert skip list item") } /* end if */ else + { #endif /* H5_HAVE_PARALLEL */ - if(entry_ptr->prefetched) { - HDassert(entry_ptr->type->id == H5AC_PREFETCHED_ENTRY_ID); - mem_type = cache_ptr->class_table_ptr[entry_ptr->prefetch_type_id]->mem_type; - } /* end if */ - else - mem_type = entry_ptr->type->mem_type; + if(entry_ptr->prefetched) { + HDassert(entry_ptr->type->id == H5AC_PREFETCHED_ENTRY_ID); + mem_type = cache_ptr-> + class_table_ptr[entry_ptr->prefetch_type_id]-> + mem_type; + } /* end if */ + else + mem_type = entry_ptr->type->mem_type; - if(H5F_block_write(f, mem_type, entry_ptr->addr, entry_ptr->size, dxpl_id, entry_ptr->image_ptr) < 0) - HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, FAIL, "Can't write image to file") + if(H5F_block_write(f, mem_type, entry_ptr->addr, + entry_ptr->size, dxpl_id, + entry_ptr->image_ptr) < 0) + + HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, FAIL, \ + "Can't write image to file") +#ifdef H5_HAVE_PARALLEL + } +#endif /* H5_HAVE_PARALLEL */ } /* end if */ /* if the entry has a notify callback, notify it that we have diff --git a/src/H5Dchunk.c b/src/H5Dchunk.c index b7b8b03..af6599a 100644 --- a/src/H5Dchunk.c +++ b/src/H5Dchunk.c @@ -299,9 +299,6 @@ static herr_t H5D__chunk_unlock(const H5D_io_info_t *io_info, static herr_t H5D__chunk_cache_prune(const H5D_t *dset, hid_t dxpl_id, const H5D_dxpl_cache_t *dxpl_cache, size_t size); static herr_t H5D__chunk_prune_fill(H5D_chunk_it_ud1_t *udata, hbool_t new_unfilt_chunk); -static herr_t H5D__chunk_file_alloc(const H5D_chk_idx_info_t *idx_info, - const H5F_block_t *old_chunk, H5F_block_t *new_chunk, hbool_t *need_insert, - hsize_t scaled[]); #ifdef H5_HAVE_PARALLEL static herr_t H5D__chunk_collective_fill(const H5D_t *dset, hid_t dxpl_id, H5D_chunk_coll_info_t *chunk_info, size_t chunk_size, const void *fill_buf); @@ -6558,7 +6555,7 @@ done: * *------------------------------------------------------------------------- */ -static herr_t +herr_t H5D__chunk_file_alloc(const H5D_chk_idx_info_t *idx_info, const H5F_block_t *old_chunk, H5F_block_t *new_chunk, hbool_t *need_insert, hsize_t scaled[]) { diff --git a/src/H5Dint.c b/src/H5Dint.c index 3b938e2..bdedd1e 100644 --- a/src/H5Dint.c +++ b/src/H5Dint.c @@ -1213,10 +1213,6 @@ H5D__create(H5F_t *file, hid_t type_id, const H5S_t *space, hid_t dcpl_id, /* Don't allow compact datasets to allocate space later */ if(layout->type == H5D_COMPACT && fill->alloc_time != H5D_ALLOC_TIME_EARLY) HGOTO_ERROR(H5E_DATASET, H5E_BADVALUE, NULL, "compact dataset must have early space allocation") - - /* If MPI VFD is used, no filter support yet. */ - if(H5F_HAS_FEATURE(file, H5FD_FEAT_HAS_MPI) && pline->nused > 0) - HGOTO_ERROR(H5E_DATASET, H5E_UNSUPPORTED, NULL, "Parallel I/O does not support filters yet") } /* end if */ /* Set the latest version of the layout, pline & fill messages, if requested */ diff --git a/src/H5Dio.c b/src/H5Dio.c index 1766422..104a632 100644 --- a/src/H5Dio.c +++ b/src/H5Dio.c @@ -714,11 +714,6 @@ H5D__write(H5D_t *dataset, hid_t mem_type_id, const H5S_t *mem_space, if(H5T_get_class(type_info.mem_type, TRUE) == H5T_REFERENCE && H5T_get_ref_type(type_info.mem_type) == H5R_DATASET_REGION) HGOTO_ERROR(H5E_DATASET, H5E_UNSUPPORTED, FAIL, "Parallel IO does not support writing region reference datatypes yet") - - /* Can't write to chunked datasets with filters, in parallel */ - if(dataset->shared->layout.type == H5D_CHUNKED && - dataset->shared->dcpl_cache.pline.nused > 0) - HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "cannot write to chunked storage with filters in parallel") } /* end if */ else { /* Collective access is not permissible without a MPI based VFD */ @@ -1195,7 +1190,7 @@ H5D__ioinfo_adjust(H5D_io_info_t *io_info, const H5D_t *dset, hid_t dxpl_id, HGOTO_ERROR(H5E_DATASPACE, H5E_CANTGET, FAIL, "can't retrieve MPI communicator") /* Check if we can set direct MPI-IO read/write functions */ - if((opt = H5D__mpio_opt_possible(io_info, file_space, mem_space, type_info, fm, dx_plist)) < 0) + if((opt = H5D__mpio_opt_possible(io_info, file_space, mem_space, type_info, dx_plist)) < 0) HGOTO_ERROR(H5E_DATASPACE, H5E_BADRANGE, FAIL, "invalid check for direct IO dataspace ") /* Check if we can use the optimized parallel I/O routines */ @@ -1207,6 +1202,67 @@ H5D__ioinfo_adjust(H5D_io_info_t *io_info, const H5D_t *dset, hid_t dxpl_id, io_info->io_ops.single_write = H5D__mpio_select_write; } /* end if */ else { + /* Check if there are any filters in the pipeline. If there are, + * we cannot break to independent I/O if this is a write operation; + * otherwise there will be metadata inconsistencies in the file. + */ + if (io_info->op_type == H5D_IO_OP_WRITE && io_info->dset->shared->dcpl_cache.pline.nused > 0) { + H5D_mpio_no_collective_cause_t cause; + uint32_t local_no_collective_cause; + uint32_t global_no_collective_cause; + hbool_t local_error_message_previously_written = FALSE; + hbool_t global_error_message_previously_written = FALSE; + size_t index; + char local_no_collective_cause_string[256] = ""; + char global_no_collective_cause_string[256] = ""; + const char *cause_strings[] = { "independent I/O was requested", + "datatype conversions were required", + "data transforms needed to be applied", + "optimized MPI types flag wasn't set", + "one of the dataspaces was neither simple nor scalar", + "dataset was not contiguous or chunked" }; + + if (H5P_get(dx_plist, H5D_MPIO_LOCAL_NO_COLLECTIVE_CAUSE_NAME, &local_no_collective_cause) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTGET, FAIL, "unable to get local no collective cause value") + if (H5P_get(dx_plist, H5D_MPIO_GLOBAL_NO_COLLECTIVE_CAUSE_NAME, &global_no_collective_cause) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTGET, FAIL, "unable to get global no collective cause value") + + /* Append each of the "reason for breaking collective I/O" error messages to the + * local and global no collective cause strings */ + for (cause = 1, index = 0; cause < H5D_MPIO_NO_COLLECTIVE_MAX_CAUSE; cause <<= 1, index++) { + size_t cause_strlen = strlen(cause_strings[index]); + + if (cause & local_no_collective_cause) { + /* Check if there were any previous error messages included. If so, prepend a semicolon + * to separate the messages. + */ + if (local_error_message_previously_written) strncat(local_no_collective_cause_string, "; ", 2); + + strncat(local_no_collective_cause_string, cause_strings[index], cause_strlen); + + local_error_message_previously_written = TRUE; + } /* end if */ + + if (cause & global_no_collective_cause) { + /* Check if there were any previous error messages included. If so, prepend a semicolon + * to separate the messages. + */ + if (global_error_message_previously_written) strncat(global_no_collective_cause_string, "; ", 2); + + strncat(global_no_collective_cause_string, cause_strings[index], cause_strlen); + + global_error_message_previously_written = TRUE; + } /* end if */ + } /* end for */ + + HGOTO_ERROR(H5E_IO, H5E_NO_INDEPENDENT, FAIL, "Can't perform independent write with filters in pipeline.\n" + " The following caused a break from collective I/O:\n" + " Local causes: %s\n" + " Global causes: %s", + local_no_collective_cause_string, + global_no_collective_cause_string); + } /* end if */ + /* If we won't be doing collective I/O, but the user asked for * collective I/O, change the request to use independent I/O, but * mark it so that we remember to revert the change. diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c index 0389c72..79572c0 100644 --- a/src/H5Dmpio.c +++ b/src/H5Dmpio.c @@ -84,7 +84,6 @@ #define H5D_CHUNK_SELECT_IRREG 2 #define H5D_CHUNK_SELECT_NONE 0 - /******************/ /* Local Typedefs */ /******************/ @@ -94,6 +93,113 @@ typedef struct H5D_chunk_addr_info_t { H5D_chunk_info_t chunk_info; } H5D_chunk_addr_info_t; +/* + * Information about a single chunk when performing collective filtered I/O. All + * of the fields of one of these structs are initialized at the start of collective + * filtered I/O in the function H5D__construct_filtered_io_info_list(). + * + * This struct's fields are as follows: + * + * index - The "Index" of the chunk in the dataset. The index of a chunk is used during + * the collective re-insertion of chunks into the chunk index after the collective + * I/O has been performed. + * + * scaled - The scaled coordinates of the chunk in the dataset's file dataspace. The + * coordinates are used in both the collective re-allocation of space in the file + * and the collective re-insertion of chunks into the chunk index after the collective + * I/O has been performed. + * + * full_overwrite - A flag which determines whether or not a chunk needs to be read from the + * file when being updated. If a chunk is being fully overwritten (the entire + * extent is selected in its file dataspace), then it is not necessary to + * read the chunk from the file. However, if the chunk is not being fully + * overwritten, it has to be read from the file in order to update the chunk + * without trashing the parts of the chunk that are not selected. + * + * num_writers - The total number of processors writing to this chunk. This field is used + * when the new owner of a chunk is receiving messages, which contain selections in + * the chunk and data to update the chunk with, from other processors which have this + * chunk selected in the I/O operation. The new owner must know how many processors it + * should expect messages from so that it can post an equal number of receive calls. + * + * io_size - The total size of I/O to this chunk. This field is an accumulation of the size of + * I/O to the chunk from each processor which has the chunk selected and is used to + * determine the value for the previous full_overwrite flag. + * + * buf - A pointer which serves the dual purpose of holding either the chunk data which is to be + * written to the file or the chunk data which has been read from the file. + * + * chunk_states - In the case of dataset writes only, this struct is used to track a chunk's size and + * address in the file before and after the filtering operation has occurred. + * + * Its fields are as follows: + * + * chunk_current - The address in the file and size of this chunk before the filtering + * operation. When reading a chunk from the file, this field is used to + * read the correct amount of bytes. It is also used when redistributing + * shared chunks among processors and as a parameter to the chunk file + * space reallocation function. + * + * new_chunk - The address in the file and size of this chunk after the filtering + * operation. This field is relevant when collectively re-allocating space + * in the file for all of the chunks written to in the I/O operation, as + * their sizes may have changed after their data has been filtered. + * + * owners - In the case of dataset writes only, this struct is used to manage which single processor + * will ultimately write data out to the chunk. It allows the other processors to act according + * to the decision and send their selection in the chunk, as well as the data they wish + * to update the chunk with, to the processor which is writing to the chunk. + * + * Its fields are as follows: + * + * original_owner - The processor which originally had this chunk selected at the beginning of + * the collective filtered I/O operation. This field is currently used when + * redistributing shared chunks among processors. + * + * new_owner - The processor which has been selected to perform the write to this chunk. + * + * async_info - In the case of dataset writes only, this struct is used by the owning processor of the + * chunk in order to manage the MPI send and receive calls made between it and all of + * the other processors which have this chunk selected in the I/O operation. + * + * Its fields are as follows: + * + * receive_requests_array - An array containing one MPI_Request for each of the + * asynchronous MPI receive calls the owning processor of this + * chunk makes to another processor in order to receive that + * processor's chunk modification data and selection in the chunk. + * + * receive_buffer_array - An array of buffers into which the owning processor of this chunk + * will store chunk modification data and the selection in the chunk + * received from another processor. + * + * num_receive_requests - The number of entries in the receive_request_array and + * receive_buffer_array fields. + */ +typedef struct H5D_filtered_collective_io_info_t { + hsize_t index; + hsize_t scaled[H5O_LAYOUT_NDIMS]; + hbool_t full_overwrite; + size_t num_writers; + size_t io_size; + void *buf; + + struct { + H5F_block_t chunk_current; + H5F_block_t new_chunk; + } chunk_states; + + struct { + int original_owner; + int new_owner; + } owners; + + struct { + MPI_Request *receive_requests_array; + unsigned char **receive_buffer_array; + int num_receive_requests; + } async_info; +} H5D_filtered_collective_io_info_t; /********************/ /* Local Prototypes */ @@ -103,9 +209,15 @@ static herr_t H5D__chunk_collective_io(H5D_io_info_t *io_info, static herr_t H5D__multi_chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, H5D_chunk_map_t *fm, H5P_genplist_t *dx_plist); +static herr_t H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, + const H5D_type_info_t *type_info, H5D_chunk_map_t *fm, + H5P_genplist_t *dx_plist); static herr_t H5D__link_chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, H5D_chunk_map_t *fm, int sum_chunk, H5P_genplist_t *dx_plist); +static herr_t H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, + const H5D_type_info_t *type_info, H5D_chunk_map_t *fm, + H5P_genplist_t *dx_plist); static herr_t H5D__inter_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, const H5S_t *file_space, const H5S_t *mem_space); @@ -124,6 +236,26 @@ static herr_t H5D__mpio_get_min_chunk(const H5D_io_info_t *io_info, const H5D_chunk_map_t *fm, int *min_chunkf); static herr_t H5D__mpio_get_sum_chunk(const H5D_io_info_t *io_info, const H5D_chunk_map_t *fm, int *sum_chunkf); +static herr_t H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, + const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm, + H5D_filtered_collective_io_info_t **chunk_list, size_t *num_entries); +static herr_t H5D__chunk_redistribute_shared_chunks(const H5D_io_info_t *io_info, + const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm, + H5D_filtered_collective_io_info_t *local_chunk_array, size_t *local_chunk_array_num_entries); +static herr_t H5D__mpio_array_gatherv(void *local_array, size_t local_array_num_entries, + size_t array_entry_size, void **gathered_array, size_t *gathered_array_num_entries, + int nprocs, hbool_t allgather, int root, MPI_Comm comm, int (*sort_func)(const void *, const void *)); +static herr_t H5D__mpio_filtered_collective_write_type( + H5D_filtered_collective_io_info_t *chunk_list, size_t num_entries, + MPI_Datatype *new_mem_type, hbool_t *mem_type_derived, + MPI_Datatype *new_file_type, hbool_t *file_type_derived); +static herr_t H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk_entry, + const H5D_io_info_t *io_info, const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm); +static int H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2); +static int H5D__cmp_filtered_collective_io_info_entry(const void *filtered_collective_io_info_entry1, + const void *filtered_collective_io_info_entry2); +static int H5D__cmp_filtered_collective_io_info_entry_owner(const void *filtered_collective_io_info_entry1, + const void *filtered_collective_io_info_entry2); /*********************/ @@ -142,7 +274,7 @@ static herr_t H5D__mpio_get_sum_chunk(const H5D_io_info_t *io_info, * Purpose: Checks if an direct I/O transfer is possible between memory and * the file. * - * Return: Sauccess: Non-negative: TRUE or FALSE + * Return: Success: Non-negative: TRUE or FALSE * Failure: Negative * * Programmer: Quincey Koziol @@ -152,12 +284,11 @@ static herr_t H5D__mpio_get_sum_chunk(const H5D_io_info_t *io_info, */ htri_t H5D__mpio_opt_possible(const H5D_io_info_t *io_info, const H5S_t *file_space, - const H5S_t *mem_space, const H5D_type_info_t *type_info, - const H5D_chunk_map_t *fm, H5P_genplist_t *dx_plist) + const H5S_t *mem_space, const H5D_type_info_t *type_info, H5P_genplist_t *dx_plist) { int local_cause = 0; /* Local reason(s) for breaking collective mode */ int global_cause = 0; /* Global reason(s) for breaking collective mode */ - htri_t ret_value; /* Return value */ + htri_t ret_value = SUCCEED; /* Return value */ FUNC_ENTER_PACKAGE @@ -206,11 +337,6 @@ H5D__mpio_opt_possible(const H5D_io_info_t *io_info, const H5S_t *file_space, * use collective IO will defer until each chunk IO is reached. */ - /* Don't allow collective operations if filters need to be applied */ - if(io_info->dset->shared->layout.type == H5D_CHUNKED && - io_info->dset->shared->dcpl_cache.pline.nused > 0) - local_cause |= H5D_MPIO_FILTERS; - /* Check for independent I/O */ if(local_cause & H5D_MPIO_SET_INDEPENDENT) global_cause = local_cause; @@ -300,6 +426,113 @@ done: /*------------------------------------------------------------------------- + * Function: H5D__mpio_array_gatherv + * + * Purpose: Given an array, specified in local_array, by each processor + * calling this function, gathers each array into a single + * array which is then either gathered to the processor + * specified by root, when allgather is false, or is + * distributed back to all processors when allgather is true. + * + * The size of each entry and number of entries in the array + * contributed by an individual processor should be specified + * in array_entry_size and local_array_num_entries, + * respectively. + * + * The number of processors participating in the gather + * operation should be specified for nprocs. + * + * The MPI communicator to use should be specified for comm. + * + * If the sort_func argument is supplied, the array is sorted + * before the function returns. + * + * Note: if allgather is specified as true, root is ignored. + * + * Return: Non-negative on success/Negative on failure + * + * Programmer: Jordan Henderson + * Sunday, April 9th, 2017 + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__mpio_array_gatherv(void *local_array, size_t local_array_num_entries, + size_t array_entry_size, void **_gathered_array, size_t *_gathered_array_num_entries, + int nprocs, hbool_t allgather, int root, MPI_Comm comm, int (*sort_func)(const void *, const void *)) +{ + size_t gathered_array_num_entries = 0; /* The size of the newly-constructed array */ + size_t i; + void *gathered_array = NULL; /* The newly-constructed array returned to the caller */ + int *receive_counts_array = NULL; /* Array containing number of entries each process is contributing */ + int *displacements_array = NULL; /* Array of displacements where each process places its data in the final array */ + int mpi_code; + int sendcount; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + HDassert(_gathered_array); + HDassert(_gathered_array_num_entries); + + /* Determine the size of the end result array */ + if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&local_array_num_entries, &gathered_array_num_entries, 1, MPI_INT, MPI_SUM, comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code) + + /* If 0 entries resulted from the collective operation, no one is writing anything */ + if (gathered_array_num_entries > 0) { + if (NULL == (gathered_array = H5MM_malloc(gathered_array_num_entries * array_entry_size))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate gathered array") + + if (NULL == (receive_counts_array = (int *) H5MM_malloc((size_t) nprocs * sizeof(int)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive counts array") + + if (NULL == (displacements_array = (int *) H5MM_malloc((size_t) nprocs * sizeof(int)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive displacements array") + + /* Inform each process of how many entries each other process is contributing to the resulting array */ + if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&local_array_num_entries, 1, MPI_INT, receive_counts_array, 1, MPI_INT, comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code) + + /* Multiply each receive count by the size of the array entry, since the data is sent as bytes */ + for (i = 0; i < (size_t) nprocs; i++) + H5_CHECKED_ASSIGN(receive_counts_array[i], int, (size_t) receive_counts_array[i] * array_entry_size, size_t); + + /* Set receive buffer offsets for MPI_Allgatherv */ + displacements_array[0] = 0; + for (i = 1; i < (size_t) nprocs; i++) + displacements_array[i] = displacements_array[i - 1] + receive_counts_array[i - 1]; + + H5_CHECKED_ASSIGN(sendcount, int, local_array_num_entries * array_entry_size, size_t); + + if (allgather) { + if (MPI_SUCCESS != (mpi_code = MPI_Allgatherv(local_array, sendcount, MPI_BYTE, + gathered_array, receive_counts_array, displacements_array, MPI_BYTE, comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Allgatherv failed", mpi_code) + } /* end if */ + else { + if (MPI_SUCCESS != (mpi_code = MPI_Gatherv(local_array, sendcount, MPI_BYTE, + gathered_array, receive_counts_array, displacements_array, MPI_BYTE, root, comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Allgatherv failed", mpi_code) + } /* end else */ + + if (sort_func) HDqsort(gathered_array, gathered_array_num_entries, array_entry_size, sort_func); + } /* end if */ + + *_gathered_array = gathered_array; + *_gathered_array_num_entries = gathered_array_num_entries; + +done: + if (receive_counts_array) + H5MM_free(receive_counts_array); + if (displacements_array) + H5MM_free(displacements_array); + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__mpio_array_gatherv() */ + + +/*------------------------------------------------------------------------- * Function: H5D__ioinfo_xfer_mode * * Purpose: Switch to between collective & independent MPI I/O @@ -398,7 +631,7 @@ H5D__mpio_get_min_chunk(const H5D_io_info_t *io_info, const H5D_chunk_map_t *fm, FUNC_ENTER_STATIC /* Get the number of chunks to perform I/O on */ - num_chunkf = H5SL_count(fm->sel_chunks); + H5_CHECKED_ASSIGN(num_chunkf, int, H5SL_count(fm->sel_chunks), size_t) /* Determine the minimum # of chunks for all processes */ if(MPI_SUCCESS != (mpi_code = MPI_Allreduce(&num_chunkf, min_chunkf, 1, MPI_INT, MPI_MIN, io_info->comm))) @@ -480,7 +713,7 @@ H5D__contig_collective_read(H5D_io_info_t *io_info, const H5D_type_info_t *type_ HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "couldn't finish shared collective MPI-IO") /* Obtain the data transfer properties */ - if(NULL == (dx_plist = H5I_object(io_info->raw_dxpl_id))) + if(NULL == (dx_plist = (H5P_genplist_t *)H5I_object(io_info->raw_dxpl_id))) HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a data transfer property list") /* Set the actual I/O mode property. internal_collective_io will not break to @@ -527,7 +760,7 @@ H5D__contig_collective_write(H5D_io_info_t *io_info, const H5D_type_info_t *type HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't finish shared collective MPI-IO") /* Obtain the data transfer properties */ - if(NULL == (dx_plist = H5I_object(io_info->raw_dxpl_id))) + if(NULL == (dx_plist = (H5P_genplist_t *)H5I_object(io_info->raw_dxpl_id))) HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a data transfer property list") /* Set the actual I/O mode property. internal_collective_io will not break to @@ -599,12 +832,13 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf HDassert(fm); /* Obtain the data transfer properties */ - if(NULL == (dx_plist = H5I_object(io_info->raw_dxpl_id))) + if(NULL == (dx_plist = (H5P_genplist_t *)H5I_object(io_info->raw_dxpl_id))) HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a file access property list") - /* Check the optional property list on what to do with collective chunk IO. */ + /* Check the optional property list for the collective chunk IO optimization option */ if(H5P_get(dx_plist, H5D_XFER_MPIO_CHUNK_OPT_HARD_NAME, &chunk_opt_mode) < 0) HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't get chunk optimization option") + if(H5FD_MPIO_CHUNK_ONE_IO == chunk_opt_mode) io_option = H5D_ONE_LINK_CHUNK_IO; /*no opt*/ /* direct request to multi-chunk-io */ @@ -620,13 +854,13 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf if((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0) HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size") - /* Get the chunk optimization option */ + /* Get the chunk optimization option threshold */ if(H5P_get(dx_plist, H5D_XFER_MPIO_CHUNK_OPT_NUM_NAME, &one_link_chunk_io_threshold) < 0) - HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't get chunk optimization option") + HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't get chunk optimization option threshold value") /* step 1: choose an IO option */ /* If the average number of chunk per process is greater than a threshold, we will do one link chunked IO. */ - if((unsigned)sum_chunk / mpi_size >= one_link_chunk_io_threshold) + if((unsigned)sum_chunk / (unsigned)mpi_size >= one_link_chunk_io_threshold) io_option = H5D_ONE_LINK_CHUNK_IO_MORE_OPT; #ifdef H5_HAVE_INSTRUMENTED_LIBRARY else @@ -681,19 +915,46 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf #endif /* step 2: Go ahead to do IO.*/ - if(H5D_ONE_LINK_CHUNK_IO == io_option || H5D_ONE_LINK_CHUNK_IO_MORE_OPT == io_option) { - if(H5D__link_chunk_collective_io(io_info, type_info, fm, sum_chunk, dx_plist) < 0) - HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish linked chunk MPI-IO") - } /* end if */ - /* direct request to multi-chunk-io */ - else if(H5D_MULTI_CHUNK_IO == io_option) { - if(H5D__multi_chunk_collective_io(io_info, type_info, fm, dx_plist) < 0) - HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple chunk MPI-IO") - } /* end if */ - else { /* multiple chunk IO via threshold */ - if(H5D__multi_chunk_collective_io(io_info, type_info, fm, dx_plist) < 0) - HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple chunk MPI-IO") - } /* end else */ + switch (io_option) { + case H5D_ONE_LINK_CHUNK_IO: + case H5D_ONE_LINK_CHUNK_IO_MORE_OPT: + /* Check if there are any filters in the pipeline */ + if(io_info->dset->shared->dcpl_cache.pline.nused > 0) { + /* For now, Multi-chunk IO must be forced for parallel filtered read, + * so that data can be unfiltered as it is received. There is significant + * complexity in unfiltering the data when it is read all at once into a + * single buffer. + */ + if (io_info->op_type == H5D_IO_OP_READ) { + if(H5D__multi_chunk_filtered_collective_io(io_info, type_info, fm, dx_plist) < 0) + HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple filtered chunk MPI-IO") + } /* end if */ + else { + if(H5D__link_chunk_filtered_collective_io(io_info, type_info, fm, dx_plist) < 0) + HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish filtered linked chunk MPI-IO") + } /* end else */ + } /* end if */ + else { + /* Perform unfiltered link chunk collective IO */ + if(H5D__link_chunk_collective_io(io_info, type_info, fm, sum_chunk, dx_plist) < 0) + HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish linked chunk MPI-IO") + } /* end else */ + break; + + case H5D_MULTI_CHUNK_IO: /* direct request to do multi-chunk IO */ + default: /* multiple chunk IO via threshold */ + /* Check if there are any filters in the pipeline */ + if(io_info->dset->shared->dcpl_cache.pline.nused > 0) { + if(H5D__multi_chunk_filtered_collective_io(io_info, type_info, fm, dx_plist) < 0) + HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple filtered chunk MPI-IO") + } /* end if */ + else { + /* Perform unfiltered multi chunk collective IO */ + if(H5D__multi_chunk_collective_io(io_info, type_info, fm, dx_plist) < 0) + HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple chunk MPI-IO") + } /* end else */ + break; + } /* end switch */ done: FUNC_LEAVE_NOAPI(ret_value) @@ -1093,6 +1354,232 @@ if(H5DEBUG(D)) /*------------------------------------------------------------------------- + * Function: H5D__link_chunk_filtered_collective_io + * + * Purpose: Routine for one collective IO with one MPI derived datatype + * to link with all filtered chunks + * + * 1. Construct a list of selected chunks in the collective IO + * operation + * A. If any chunk is being written to by more than 1 + * process, the process writing to the chunk which + * currently has the least amount of chunks assigned + * to it becomes the new owner (in the case of ties, + * the lowest MPI rank becomes the new owner) + * 2. If the operation is a write operation + * A. Loop through each chunk in the operation + * I. If this is not a full overwrite of the chunk + * a) Read the chunk from file and pass the chunk + * through the filter pipeline in reverse order + * (Unfilter the chunk) + * II. Update the chunk data with the modifications from + * the owning process + * III. Receive any modification data from other + * processes and update the chunk data with these + * modifications + * IV. Filter the chunk + * B. Contribute the modified chunks to an array gathered + * by all processes which contains the new sizes of + * every chunk modified in the collective IO operation + * C. All processes collectively re-allocate each chunk + * from the gathered array with their new sizes after + * the filter operation + * D. If this process has any chunks selected in the IO + * operation, create an MPI derived type for memory and + * file to write out the process' selected chunks to the + * file + * E. Perform the collective write + * F. All processes collectively re-insert each modified + * chunk from the gathered array into the chunk index + * + * + * Return: Non-negative on success/Negative on failure + * + * Programmer: Jordan Henderson + * Friday, Nov. 4th, 2016 + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, + H5D_chunk_map_t *fm, H5P_genplist_t *dx_plist) +{ + H5D_filtered_collective_io_info_t *chunk_list = NULL; /* The list of chunks being read/written */ + H5D_filtered_collective_io_info_t *collective_chunk_list = NULL; /* The list of chunks used during collective operations */ + H5D_mpio_actual_chunk_opt_mode_t actual_chunk_opt_mode = H5D_MPIO_LINK_CHUNK; /* The actual chunk IO optimization mode */ + H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_CHUNK_COLLECTIVE; /* The chunk IO mode used (Independent vs Collective) */ + H5D_storage_t ctg_store; /* Chunk storage information as contiguous dataset */ + MPI_Datatype mem_type = MPI_BYTE; + MPI_Datatype file_type = MPI_BYTE; + hbool_t mem_type_is_derived = FALSE; + hbool_t file_type_is_derived = FALSE; + size_t chunk_list_num_entries; + size_t collective_chunk_list_num_entries; + size_t *num_chunks_selected_array = NULL; /* Array of number of chunks selected on each process */ + size_t i; /* Local index variable */ + int mpi_rank, mpi_size, mpi_code; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + HDassert(io_info); + HDassert(type_info); + HDassert(fm); + HDassert(dx_plist); + + /* Obtain the current rank of the process and the number of processes */ + if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0) + HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank") + if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0) + HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size") + + /* Set the actual-chunk-opt-mode property. */ + if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_CHUNK_OPT_MODE_NAME, &actual_chunk_opt_mode) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual chunk opt mode property") + + /* Set the actual-io-mode property. + * Link chunk filtered I/O does not break to independent, so can set right away + */ + if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_IO_MODE_NAME, &actual_io_mode) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual io mode property") + + /* Build a list of selected chunks in the collective io operation */ + if (H5D__construct_filtered_io_info_list(io_info, type_info, fm, &chunk_list, &chunk_list_num_entries) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "couldn't construct filtered I/O info list") + + if (io_info->op_type == H5D_IO_OP_WRITE) { /* Filtered collective write */ + H5D_chk_idx_info_t index_info; + H5D_chunk_ud_t udata; + hsize_t mpi_buf_count; + + /* Construct chunked index info */ + index_info.f = io_info->dset->oloc.file; + index_info.dxpl_id = io_info->md_dxpl_id; + index_info.pline = &(io_info->dset->shared->dcpl_cache.pline); + index_info.layout = &(io_info->dset->shared->layout.u.chunk); + index_info.storage = &(io_info->dset->shared->layout.storage.u.chunk); + + /* Set up chunk information for insertion to chunk index */ + udata.common.layout = index_info.layout; + udata.common.storage = index_info.storage; + udata.filter_mask = 0; + + /* Iterate through all the chunks in the collective write operation, + * updating each chunk with the data modifications from other processes, + * then re-filtering the chunk. + */ + for (i = 0; i < chunk_list_num_entries; i++) + if (mpi_rank == chunk_list[i].owners.new_owner) + if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info, fm) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't process chunk entry") + + /* Gather the new chunk sizes to all processes for a collective reallocation + * of the chunks in the file. + */ + if (H5D__mpio_array_gatherv(chunk_list, chunk_list_num_entries, sizeof(H5D_filtered_collective_io_info_t), + (void **) &collective_chunk_list, &collective_chunk_list_num_entries, mpi_size, + true, 0, io_info->comm, NULL) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather new chunk sizes") + + /* Collectively re-allocate the modified chunks (from each process) in the file */ + for (i = 0; i < collective_chunk_list_num_entries; i++) { + hbool_t insert; + + if (H5D__chunk_file_alloc(&index_info, &collective_chunk_list[i].chunk_states.chunk_current, + &collective_chunk_list[i].chunk_states.new_chunk, &insert, collective_chunk_list[i].scaled) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk") + } /* end for */ + + if (NULL == (num_chunks_selected_array = (size_t *) H5MM_malloc((size_t) mpi_size * sizeof(size_t)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate num chunks selected array") + + if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&chunk_list_num_entries, 1, MPI_UNSIGNED_LONG_LONG, num_chunks_selected_array, + 1, MPI_UNSIGNED_LONG_LONG, io_info->comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code) + + /* If this process has any chunks selected, create a MPI type for collectively + * writing out the chunks to file. Otherwise, the process contributes to the + * collective write with a none type. + */ + if (chunk_list_num_entries) { + size_t offset; + + /* During the collective re-allocation of chunks in the file, the record for each + * chunk is only updated in the collective array, not in the local copy of chunks on each + * process. However, each process needs the updated chunk records so that they can create + * a MPI type for the collective write that will write to the chunk's possible new locations + * in the file instead of the old ones. This ugly hack seems to be the best solution to + * copy the information back to the local array and avoid having to modify the collective + * write type function in an ugly way so that it will accept the collective array instead + * of the local array. This works correctly because the array gather function guarantees + * that the chunk data in the collective array is ordered in blocks by rank. + */ + for (i = 0, offset = 0; i < (size_t) mpi_rank; i++) + offset += num_chunks_selected_array[i]; + + HDmemcpy(chunk_list, &collective_chunk_list[offset], num_chunks_selected_array[mpi_rank] * sizeof(H5D_filtered_collective_io_info_t)); + + /* Create single MPI type encompassing each selection in the dataspace */ + if (H5D__mpio_filtered_collective_write_type(chunk_list, chunk_list_num_entries, + &mem_type, &mem_type_is_derived, &file_type, &file_type_is_derived) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_BADTYPE, FAIL, "couldn't create MPI link chunk I/O type") + + /* Override the write buffer to point to the address of the first + * chunk data buffer + */ + io_info->u.wbuf = chunk_list[0].buf; + } /* end if */ + + /* We have a single, complicated MPI datatype for both memory & file */ + mpi_buf_count = (mem_type_is_derived && file_type_is_derived) ? (hsize_t) 1 : (hsize_t) 0; + + /* Set up the base storage address for this operation */ + ctg_store.contig.dset_addr = 0; /* Write address must be set to address 0 */ + io_info->store = &ctg_store; + + /* Perform I/O */ + if (H5D__final_collective_io(io_info, type_info, mpi_buf_count, &file_type, &mem_type) < 0) + HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish MPI-IO") + + /* Participate in the collective re-insertion of all chunks modified + * in this iteration into the chunk index + */ + for (i = 0; i < collective_chunk_list_num_entries; i++) { + udata.chunk_block = collective_chunk_list[i].chunk_states.new_chunk; + udata.common.scaled = collective_chunk_list[i].scaled; + udata.chunk_idx = collective_chunk_list[i].index; + + if ((index_info.storage->ops->insert)(&index_info, &udata, io_info->dset) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINSERT, FAIL, "unable to insert chunk address into index") + } /* end for */ + } /* end if */ + +done: + /* Free resources used by a process which had some selection */ + if (chunk_list) { + for (i = 0; i < chunk_list_num_entries; i++) + if (chunk_list[i].buf) + H5MM_free(chunk_list[i].buf); + + H5MM_free(chunk_list); + } /* end if */ + + if (num_chunks_selected_array) + H5MM_free(num_chunks_selected_array); + if (collective_chunk_list) + H5MM_free(collective_chunk_list); + + /* Free the MPI buf and file types, if they were derived */ + if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__link_chunk_filtered_collective_io() */ + + +/*------------------------------------------------------------------------- * Function: H5D__multi_chunk_collective_io * * Purpose: To do IO per chunk according to IO mode(collective/independent/none) @@ -1225,7 +1712,7 @@ if(H5DEBUG(D)) * to ease switching between to mixed I/O without checking the current * value of the property. You can see the definition in H5Ppublic.h */ - actual_io_mode = actual_io_mode | H5D_MPIO_CHUNK_COLLECTIVE; + actual_io_mode = (H5D_mpio_actual_io_mode_t) (actual_io_mode | H5D_MPIO_CHUNK_COLLECTIVE); } /* end if */ else { @@ -1265,7 +1752,7 @@ if(H5DEBUG(D)) mspace = chunk_info->mspace; /* Update the local variable tracking the dxpl's actual io mode. */ - actual_io_mode = actual_io_mode | H5D_MPIO_CHUNK_INDEPENDENT; + actual_io_mode = (H5D_mpio_actual_io_mode_t) (actual_io_mode | H5D_MPIO_CHUNK_INDEPENDENT); } /* end if */ else { fspace = mspace = NULL; @@ -1306,6 +1793,314 @@ done: /*------------------------------------------------------------------------- + * Function: H5D__multi_chunk_filtered_collective_io + * + * Purpose: To do filtered collective IO iteratively to save on memory. + * While link_chunk_filtered_collective_io will construct and + * work on a list of all of the chunks selected in the IO + * operation at once, this function works iteratively on a set + * of chunks at a time; at most one chunk per rank per + * iteration. + * + * 1. Construct a list of selected chunks in the collective IO + * operation + * A. If any chunk is being written to by more than 1 + * process, the process writing to the chunk which + * currently has the least amount of chunks assigned + * to it becomes the new owner (in the case of ties, + * the lowest MPI rank becomes the new owner) + * 2. If the operation is a read operation + * A. Loop through each chunk in the operation + * I. Read the chunk from the file + * II. Unfilter the chunk + * III. Scatter the read chunk data to the user's buffer + * 3. If the operation is a write operation + * A. Loop through each chunk in the operation + * I. If this is not a full overwrite of the chunk + * a) Read the chunk from file and pass the chunk + * through the filter pipeline in reverse order + * (Unfilter the chunk) + * II. Update the chunk data with the modifications from + * the owning process + * III. Receive any modification data from other + * processes and update the chunk data with these + * modifications + * IV. Filter the chunk + * V. Contribute the chunk to an array gathered by + * all processes which contains every chunk + * modified in this iteration (up to one chunk + * per process, some processes may not have a + * selection/may have less chunks to work on than + * other processes) + * VI. All processes collectively re-allocate each + * chunk from the gathered array with their new + * sizes after the filter operation + * VII. Proceed with the collective write operation + * for the chunks modified on this iteration + * VIII. All processes collectively re-insert each + * chunk from the gathered array into the chunk + * index + * + * Return: Non-negative on success/Negative on failure + * + * Programmer: Jordan Henderson + * Friday, Dec. 2nd, 2016 + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, + H5D_chunk_map_t *fm, H5P_genplist_t *dx_plist) +{ + H5D_filtered_collective_io_info_t *chunk_list = NULL; /* The list of chunks being read/written */ + H5D_filtered_collective_io_info_t *collective_chunk_list = NULL; /* The list of chunks used during collective operations */ + H5D_mpio_actual_chunk_opt_mode_t actual_chunk_opt_mode = H5D_MPIO_MULTI_CHUNK; /* The actual chunk IO optimization mode */ + H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_CHUNK_COLLECTIVE; /* The chunk IO mode used (Independent vs Collective) */ + H5D_storage_t store; /* union of EFL and chunk pointer in file space */ + H5D_io_info_t ctg_io_info; /* Contiguous I/O info object */ + H5D_storage_t ctg_store; /* Chunk storage information as contiguous dataset */ + MPI_Datatype *file_type_array = NULL; + MPI_Datatype *mem_type_array = NULL; + hbool_t *file_type_is_derived_array = NULL; + hbool_t *mem_type_is_derived_array = NULL; + hbool_t *has_chunk_selected_array = NULL; /* Array of whether or not each process is contributing a chunk to each iteration */ + size_t chunk_list_num_entries; + size_t collective_chunk_list_num_entries; + size_t i, j; /* Local index variable */ + int mpi_rank, mpi_size, mpi_code; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + HDassert(io_info); + HDassert(type_info); + HDassert(fm); + HDassert(dx_plist); + + /* Obtain the current rank of the process and the number of processes */ + if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0) + HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank") + if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0) + HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size") + + /* Set the actual chunk opt mode property */ + if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_CHUNK_OPT_MODE_NAME, &actual_chunk_opt_mode) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual chunk opt mode property") + + /* Set the actual_io_mode property. + * Multi chunk I/O does not break to independent, so can set right away + */ + if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_IO_MODE_NAME, &actual_io_mode) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual chunk io mode property") + + /* Build a list of selected chunks in the collective IO operation */ + if (H5D__construct_filtered_io_info_list(io_info, type_info, fm, &chunk_list, &chunk_list_num_entries) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "couldn't construct filtered I/O info list") + + /* Set up contiguous I/O info object */ + HDmemcpy(&ctg_io_info, io_info, sizeof(ctg_io_info)); + ctg_io_info.store = &ctg_store; + ctg_io_info.layout_ops = *H5D_LOPS_CONTIG; + + /* Initialize temporary contiguous storage info */ + ctg_store.contig.dset_size = (hsize_t) io_info->dset->shared->layout.u.chunk.size; + ctg_store.contig.dset_addr = 0; + + /* Set dataset storage for I/O info */ + io_info->store = &store; + + if (io_info->op_type == H5D_IO_OP_READ) { /* Filtered collective read */ + for (i = 0; i < chunk_list_num_entries; i++) + if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info, fm) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't process chunk entry") + } /* end if */ + else { /* Filtered collective write */ + H5D_chk_idx_info_t index_info; + H5D_chunk_ud_t udata; + size_t max_num_chunks; + hsize_t mpi_buf_count; + + /* Construct chunked index info */ + index_info.f = io_info->dset->oloc.file; + index_info.dxpl_id = io_info->md_dxpl_id; + index_info.pline = &(io_info->dset->shared->dcpl_cache.pline); + index_info.layout = &(io_info->dset->shared->layout.u.chunk); + index_info.storage = &(io_info->dset->shared->layout.storage.u.chunk); + + /* Set up chunk information for insertion to chunk index */ + udata.common.layout = index_info.layout; + udata.common.storage = index_info.storage; + udata.filter_mask = 0; + + /* Retrieve the maximum number of chunks being written among all processes */ + if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&chunk_list_num_entries, &max_num_chunks, + 1, MPI_UNSIGNED_LONG_LONG, MPI_MAX, io_info->comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code) + + /* If no one is writing anything at all, end the operation */ + if (!(max_num_chunks > 0)) HGOTO_DONE(SUCCEED); + + /* Allocate arrays for storing MPI file and mem types and whether or not the + * types were derived. + */ + if (NULL == (file_type_array = (MPI_Datatype *) H5MM_malloc(max_num_chunks * sizeof(MPI_Datatype)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate file type array") + + if (NULL == (file_type_is_derived_array = (hbool_t *) H5MM_calloc(max_num_chunks * sizeof(hbool_t)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate file type is derived array") + + if (NULL == (mem_type_array = (MPI_Datatype *) H5MM_malloc(max_num_chunks * sizeof(MPI_Datatype)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate mem type array") + + if (NULL == (mem_type_is_derived_array = (hbool_t *) H5MM_calloc(max_num_chunks * sizeof(hbool_t)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate mem type is derived array") + + /* Iterate over the max number of chunks among all processes, as this process could + * have no chunks left to work on, but it still needs to participate in the collective + * re-allocation and re-insertion of chunks modified by other processes. + */ + for (i = 0; i < max_num_chunks; i++) { + /* Check if this process has a chunk to work on for this iteration */ + hbool_t have_chunk_to_process = (i < chunk_list_num_entries) && (mpi_rank == chunk_list[i].owners.new_owner); + + if (have_chunk_to_process) + if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info, fm) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't process chunk entry") + + /* Gather the new chunk sizes to all processes for a collective re-allocation + * of the chunks in the file + */ + if (H5D__mpio_array_gatherv(&chunk_list[i], have_chunk_to_process ? 1 : 0, sizeof(H5D_filtered_collective_io_info_t), + (void **) &collective_chunk_list, &collective_chunk_list_num_entries, mpi_size, + true, 0, io_info->comm, NULL) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather new chunk sizes") + + /* Participate in the collective re-allocation of all chunks modified + * in this iteration. + */ + for (j = 0; j < collective_chunk_list_num_entries; j++) { + hbool_t insert = FALSE; + + if (H5D__chunk_file_alloc(&index_info, &collective_chunk_list[j].chunk_states.chunk_current, + &collective_chunk_list[j].chunk_states.new_chunk, &insert, chunk_list[j].scaled) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk") + } /* end for */ + + if (NULL == (has_chunk_selected_array = (hbool_t *) H5MM_malloc((size_t) mpi_size * sizeof(hbool_t)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate num chunks selected array") + + if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&have_chunk_to_process, 1, MPI_C_BOOL, has_chunk_selected_array, + 1, MPI_C_BOOL, io_info->comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code) + + /* If this process has a chunk to work on, create a MPI type for the + * memory and file for writing out the chunk + */ + if (have_chunk_to_process) { + size_t offset; + int mpi_type_count; + + for (j = 0, offset = 0; j < (size_t) mpi_rank; j++) + offset += has_chunk_selected_array[j]; + + /* Collect the new chunk info back to the local copy, since only the record in the + * collective array gets updated by the chunk re-allocation */ + HDmemcpy(&chunk_list[i].chunk_states.new_chunk, &collective_chunk_list[offset].chunk_states.new_chunk, sizeof(chunk_list[i].chunk_states.new_chunk)); + + H5_CHECKED_ASSIGN(mpi_type_count, int, chunk_list[i].chunk_states.new_chunk.length, hsize_t); + + /* Create MPI memory type for writing to chunk */ + if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(mpi_type_count, MPI_BYTE, &mem_type_array[i]))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_contiguous failed", mpi_code) + if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&mem_type_array[i]))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) + mem_type_is_derived_array[i] = TRUE; + + /* Create MPI file type for writing to chunk */ + if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(mpi_type_count, MPI_BYTE, &file_type_array[i]))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_contiguous failed", mpi_code) + if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&file_type_array[i]))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) + file_type_is_derived_array[i] = TRUE; + + mpi_buf_count = 1; + + /* Set up the base storage address for this operation */ + ctg_store.contig.dset_addr = chunk_list[i].chunk_states.new_chunk.offset; + + /* Override the write buffer to point to the address of the + * chunk data buffer + */ + ctg_io_info.u.wbuf = chunk_list[i].buf; + } /* end if */ + else { + mem_type_array[i] = file_type_array[i] = MPI_BYTE; + mpi_buf_count = 0; + } /* end else */ + + /* Perform the I/O */ + if (H5D__final_collective_io(&ctg_io_info, type_info, mpi_buf_count, &file_type_array[i], &mem_type_array[i]) < 0) + HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish MPI-IO") + + /* Participate in the collective re-insertion of all chunks modified + * in this iteration into the chunk index + */ + for (j = 0; j < collective_chunk_list_num_entries; j++) { + udata.chunk_block = collective_chunk_list[j].chunk_states.new_chunk; + udata.common.scaled = collective_chunk_list[j].scaled; + udata.chunk_idx = collective_chunk_list[j].index; + + if ((index_info.storage->ops->insert)(&index_info, &udata, io_info->dset) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINSERT, FAIL, "unable to insert chunk address into index") + } /* end for */ + + if (collective_chunk_list){ + H5MM_free(collective_chunk_list); + collective_chunk_list = NULL; + } /* end if */ + if (has_chunk_selected_array){ + H5MM_free(has_chunk_selected_array); + has_chunk_selected_array = NULL; + } /* end if */ + } /* end for */ + + /* Free the MPI file and memory types, if they were derived */ + for (i = 0; i < max_num_chunks; i++) { + if (file_type_is_derived_array[i]) + if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type_array[i]))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + + if (mem_type_is_derived_array[i]) + if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type_array[i]))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + } /* end for */ + } /* end else */ + +done: + if (chunk_list) { + for (i = 0; i < chunk_list_num_entries; i++) + if (chunk_list[i].buf) + H5MM_free(chunk_list[i].buf); + + H5MM_free(chunk_list); + } /* end if */ + + if (collective_chunk_list) + H5MM_free(collective_chunk_list); + if (file_type_array) + H5MM_free(file_type_array); + if (mem_type_array) + H5MM_free(mem_type_array); + if (file_type_is_derived_array) + H5MM_free(file_type_is_derived_array); + if (mem_type_is_derived_array) + H5MM_free(mem_type_is_derived_array); + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__multi_chunk_filtered_collective_io() */ + + +/*------------------------------------------------------------------------- * Function: H5D__inter_collective_io * * Purpose: Routine for the shared part of collective IO between multiple chunk @@ -1472,7 +2267,7 @@ if(H5DEBUG(D)) static int H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2) { - haddr_t addr1, addr2; + haddr_t addr1 = HADDR_UNDEF, addr2 = HADDR_UNDEF; FUNC_ENTER_STATIC_NOERR @@ -1484,6 +2279,67 @@ H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2) /*------------------------------------------------------------------------- + * Function: H5D__cmp_filtered_collective_io_info_entry + * + * Purpose: Routine to compare filtered collective chunk io info + * entries + * + * Description: Callback for qsort() to compare filtered collective chunk + * io info entries + * + * Return: -1, 0, 1 + * + * Programmer: Jordan Henderson + * Wednesday, Nov. 30th, 2016 + * + *------------------------------------------------------------------------- + */ +static int +H5D__cmp_filtered_collective_io_info_entry(const void *filtered_collective_io_info_entry1, const void *filtered_collective_io_info_entry2) +{ + haddr_t addr1 = HADDR_UNDEF, addr2 = HADDR_UNDEF; + + FUNC_ENTER_STATIC_NOERR + + addr1 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry1)->chunk_states.new_chunk.offset; + addr2 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry2)->chunk_states.new_chunk.offset; + + FUNC_LEAVE_NOAPI(H5F_addr_cmp(addr1, addr2)) +} /* end H5D__cmp_filtered_collective_io_info_entry() */ + + +/*------------------------------------------------------------------------- + * Function: H5D__cmp_filtered_collective_io_info_entry_owner + * + * Purpose: Routine to compare filtered collective chunk io info + * entries's original owner fields + * + * Description: Callback for qsort() to compare filtered collective chunk + * io info entries's original owner fields + * + * Return: The difference between the two + * H5D_filtered_collective_io_info_t's original owner fields + * + * Programmer: Jordan Henderson + * Monday, Apr. 10th, 2017 + * + *------------------------------------------------------------------------- + */ +static int +H5D__cmp_filtered_collective_io_info_entry_owner(const void *filtered_collective_io_info_entry1, const void *filtered_collective_io_info_entry2) +{ + int owner1 = -1, owner2 = -1; + + FUNC_ENTER_STATIC_NOERR + + owner1 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry1)->owners.original_owner; + owner2 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry2)->owners.original_owner; + + FUNC_LEAVE_NOAPI(owner1 - owner2) +} /* end H5D__cmp_filtered_collective_io_info_entry_owner() */ + + +/*------------------------------------------------------------------------- * Function: H5D__sort_chunk * * Purpose: Routine to sort chunks in increasing order of chunk address @@ -1557,7 +2413,7 @@ if(H5DEBUG(D)) HDfprintf(H5DEBUG(D), "Coming inside H5D_OBTAIN_ALL_CHUNK_ADDR_COL\n"); #endif /* Allocate array for chunk addresses */ - if(NULL == (total_chunk_addr_array = H5MM_malloc(sizeof(haddr_t) * (size_t)fm->layout->u.chunk.nchunks))) + if(NULL == (total_chunk_addr_array = (haddr_t *)H5MM_malloc(sizeof(haddr_t) * (size_t)fm->layout->u.chunk.nchunks))) HGOTO_ERROR(H5E_RESOURCE, H5E_NOSPACE, FAIL, "unable to allocate memory chunk address array") /* Retrieve all the chunk addresses with process 0 */ @@ -1581,7 +2437,7 @@ if(H5DEBUG(D)) /* Iterate over all chunks for this process */ while(chunk_node) { - if(NULL == (chunk_info = H5SL_item(chunk_node))) + if(NULL == (chunk_info = (H5D_chunk_info_t *)H5SL_item(chunk_node))) HGOTO_ERROR(H5E_STORAGE, H5E_CANTGET, FAIL,"couldn't get chunk info from skipped list") if(many_chunk_opt == H5D_OBTAIN_ONE_CHUNK_ADDR_IND) { @@ -1666,7 +2522,7 @@ static herr_t H5D__obtain_mpio_mode(H5D_io_info_t* io_info, H5D_chunk_map_t *fm, H5P_genplist_t *dx_plist, uint8_t assign_io_mode[], haddr_t chunk_addr[]) { - int total_chunks; + size_t total_chunks; unsigned percent_nproc_per_chunk, threshold_nproc_per_chunk; uint8_t* io_mode_info = NULL; uint8_t* recv_io_mode_info = NULL; @@ -1676,7 +2532,8 @@ H5D__obtain_mpio_mode(H5D_io_info_t* io_info, H5D_chunk_map_t *fm, H5D_chunk_info_t* chunk_info; int mpi_size, mpi_rank; MPI_Comm comm; - int ic, root; + int root; + size_t ic; int mpi_code; #ifdef H5_HAVE_INSTRUMENTED_LIBRARY int new_value; @@ -1697,7 +2554,7 @@ H5D__obtain_mpio_mode(H5D_io_info_t* io_info, H5D_chunk_map_t *fm, HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size") /* Setup parameters */ - H5_CHECKED_ASSIGN(total_chunks, int, fm->layout->u.chunk.nchunks, hsize_t); + H5_CHECKED_ASSIGN(total_chunks, size_t, fm->layout->u.chunk.nchunks, hsize_t); if(H5P_get(dx_plist, H5D_XFER_MPIO_CHUNK_OPT_RATIO_NAME, &percent_nproc_per_chunk) < 0) HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't get percent nproc per chunk") /* if ratio is 0, perform collective io */ @@ -1709,39 +2566,42 @@ H5D__obtain_mpio_mode(H5D_io_info_t* io_info, H5D_chunk_map_t *fm, HGOTO_DONE(SUCCEED) } /* end if */ - threshold_nproc_per_chunk = mpi_size * percent_nproc_per_chunk/100; + + threshold_nproc_per_chunk = (unsigned)mpi_size * percent_nproc_per_chunk/100; /* Allocate memory */ if(NULL == (io_mode_info = (uint8_t *)H5MM_calloc(total_chunks))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate I/O mode info buffer") - if(NULL == (mergebuf = H5MM_malloc((sizeof(haddr_t) + 1) * total_chunks))) + if(NULL == (mergebuf = (uint8_t *)H5MM_malloc((sizeof(haddr_t) + 1) * total_chunks))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate mergebuf buffer") tempbuf = mergebuf + total_chunks; if(mpi_rank == root) - if(NULL == (recv_io_mode_info = (uint8_t *)H5MM_malloc(total_chunks * mpi_size))) + if(NULL == (recv_io_mode_info = (uint8_t *)H5MM_malloc(total_chunks * (size_t)mpi_size))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate recv I/O mode info buffer") /* Obtain the regularity and selection information for all chunks in this process. */ chunk_node = H5SL_first(fm->sel_chunks); while(chunk_node) { - chunk_info = H5SL_item(chunk_node); + chunk_info = (H5D_chunk_info_t *)H5SL_item(chunk_node); - io_mode_info[chunk_info->index] = H5D_CHUNK_SELECT_REG; /* this chunk is selected and is "regular" */ + io_mode_info[chunk_info->index] = H5D_CHUNK_SELECT_REG; /* this chunk is selected and is "regular" */ chunk_node = H5SL_next(chunk_node); } /* end while */ /* Gather all the information */ - if(MPI_SUCCESS != (mpi_code = MPI_Gather(io_mode_info, total_chunks, MPI_BYTE, recv_io_mode_info, total_chunks, MPI_BYTE, root, comm))) + H5_CHECK_OVERFLOW(total_chunks, size_t, int) + if(MPI_SUCCESS != (mpi_code = MPI_Gather(io_mode_info, (int)total_chunks, MPI_BYTE, + recv_io_mode_info, (int)total_chunks, MPI_BYTE, root, comm))) HMPI_GOTO_ERROR(FAIL, "MPI_Gather failed", mpi_code) /* Calculate the mode for IO(collective, independent or none) at root process */ if(mpi_rank == root) { - int nproc; - int* nproc_per_chunk; + size_t nproc; + unsigned* nproc_per_chunk; /* pre-computing: calculate number of processes and regularity of the selection occupied in each chunk */ - if(NULL == (nproc_per_chunk = (int*)H5MM_calloc(total_chunks * sizeof(int)))) + if(NULL == (nproc_per_chunk = (unsigned*)H5MM_calloc(total_chunks * sizeof(unsigned)))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate nproc_per_chunk buffer") /* calculating the chunk address */ @@ -1751,7 +2611,7 @@ H5D__obtain_mpio_mode(H5D_io_info_t* io_info, H5D_chunk_map_t *fm, } /* end if */ /* checking for number of process per chunk and regularity of the selection*/ - for(nproc = 0; nproc < mpi_size; nproc++) { + for(nproc = 0; nproc < (size_t)mpi_size; nproc++) { uint8_t *tmp_recv_io_mode_info = recv_io_mode_info + (nproc * total_chunks); /* Calculate the number of process per chunk and adding irregular selection option */ @@ -1835,5 +2695,712 @@ done: FUNC_LEAVE_NOAPI(ret_value) } /* end H5D__obtain_mpio_mode() */ + + +/*------------------------------------------------------------------------- + * Function: H5D__construct_filtered_io_info_list + * + * Purpose: Constructs a list of entries which contain the necessary + * information for inter-process communication when performing + * collective io on filtered chunks. This list is used by + * each process when performing I/O on locally selected chunks + * and also in operations that must be collectively done + * on every chunk, such as chunk re-allocation, insertion of + * chunks into the chunk index, etc. + * + * Return: Non-negative on success/Negative on failure + * + * Programmer: Jordan Henderson + * Tuesday, January 10th, 2017 + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_type_info_t *type_info, + const H5D_chunk_map_t *fm, H5D_filtered_collective_io_info_t **chunk_list, size_t *num_entries) +{ + H5D_filtered_collective_io_info_t *local_info_array = NULL; /* The list of initially selected chunks for this process */ + size_t num_chunks_selected; + size_t i; + int mpi_rank; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + HDassert(io_info); + HDassert(type_info); + HDassert(fm); + HDassert(chunk_list); + HDassert(num_entries); + HDassert(TRUE == H5P_isa_class(io_info->raw_dxpl_id, H5P_DATASET_XFER)); + + if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0) + HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank") + + /* Each process builds a local list of the chunks they have selected */ + if ((num_chunks_selected = H5SL_count(fm->sel_chunks))) { + H5D_chunk_info_t *chunk_info; + H5D_chunk_ud_t udata; + H5SL_node_t *chunk_node; + hssize_t select_npoints; + hssize_t chunk_npoints; + + if (NULL == (local_info_array = (H5D_filtered_collective_io_info_t *) H5MM_malloc(num_chunks_selected * sizeof(H5D_filtered_collective_io_info_t)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate local io info array buffer") + + chunk_node = H5SL_first(fm->sel_chunks); + for (i = 0; chunk_node; i++) { + chunk_info = (H5D_chunk_info_t *) H5SL_item(chunk_node); + + /* Obtain this chunk's address */ + if (H5D__chunk_lookup(io_info->dset, io_info->md_dxpl_id, chunk_info->scaled, &udata) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "error looking up chunk address") + + local_info_array[i].index = chunk_info->index; + local_info_array[i].chunk_states.chunk_current = local_info_array[i].chunk_states.new_chunk = udata.chunk_block; + local_info_array[i].num_writers = 0; + local_info_array[i].owners.original_owner = local_info_array[i].owners.new_owner = mpi_rank; + local_info_array[i].buf = NULL; + + local_info_array[i].async_info.num_receive_requests = 0; + local_info_array[i].async_info.receive_buffer_array = NULL; + local_info_array[i].async_info.receive_requests_array = NULL; + + HDmemcpy(local_info_array[i].scaled, chunk_info->scaled, sizeof(chunk_info->scaled)); + + if ((select_npoints = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + local_info_array[i].io_size = (size_t) select_npoints * type_info->src_type_size; + + /* Currently the full overwrite status of a chunk is only obtained on a per-process + * basis. This means that if the total selection in the chunk, as determined by the combination + * of selections of all of the processes interested in the chunk, covers the entire chunk, + * the performance optimization of not reading the chunk from the file is still valid, but + * is not applied in the current implementation. Something like an appropriately placed + * MPI_Allreduce or a running total of the number of chunk points selected during chunk + * redistribution should suffice for implementing this case - JTH. + */ + if ((chunk_npoints = H5S_GET_EXTENT_NPOINTS(chunk_info->fspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + local_info_array[i].full_overwrite = + (local_info_array[i].io_size >= (hsize_t) chunk_npoints * type_info->dst_type_size) ? TRUE : FALSE; + + chunk_node = H5SL_next(chunk_node); + } /* end for */ + } /* end if */ + + /* Redistribute shared chunks to new owners as necessary */ + if (io_info->op_type == H5D_IO_OP_WRITE) + if (H5D__chunk_redistribute_shared_chunks(io_info, type_info, fm, local_info_array, &num_chunks_selected) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "unable to redistribute shared chunks") + + *chunk_list = local_info_array; + *num_entries = num_chunks_selected; + +done: + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__construct_filtered_io_info_list() */ + + +/*------------------------------------------------------------------------- + * Function: H5D__chunk_redistribute_shared_chunks + * + * Purpose: When performing a collective write on a Dataset with + * filters applied, this function is used to redistribute any + * chunks which are selected by more than one process, so as + * to preserve file integrity after the write by ensuring + * that any shared chunks are only modified by one process. + * + * The current implementation follows this 3-phase process: + * + * - Collect everyone's list of chunks into one large list, + * sort the list in increasing order of chunk offset in the + * file and hand the list off to rank 0 + * + * - Rank 0 scans the list looking for matching runs of chunk + * offset in the file (corresponding to a shared chunk which + * has been selected by more than one rank in the I/O + * operation) and for each shared chunk, it redistributes + * the chunk to the process writing to the chunk which + * currently has the least amount of chunks assigned to it + * by modifying the "new_owner" field in each of the list + * entries corresponding to that chunk + * + * - After the chunks have been redistributed, rank 0 re-sorts + * the list in order of previous owner so that each rank + * will get back exactly the array that they contributed to + * the redistribution operation, with the "new_owner" field + * of each chunk they are modifying having possibly been + * modified. Rank 0 then scatters each segment of the list + * back to its corresponding rank + * + * Return: Non-negative on success/Negative on failure + * + * Programmer: Jordan Henderson + * Monday, May 1, 2017 + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__chunk_redistribute_shared_chunks(const H5D_io_info_t *io_info, const H5D_type_info_t *type_info, + const H5D_chunk_map_t *fm, H5D_filtered_collective_io_info_t *local_chunk_array, size_t *local_chunk_array_num_entries) +{ + H5D_filtered_collective_io_info_t *shared_chunks_info_array = NULL; /* The list of all chunks selected in the operation by all processes */ + H5S_sel_iter_t *mem_iter = NULL; /* Memory iterator for H5D__gather_mem */ + unsigned char **mod_data = NULL; /* Array of chunk modification data buffers sent by a process to new chunk owners */ + MPI_Request *send_requests = NULL; /* Array of MPI_Isend chunk modification data send requests */ + MPI_Status *send_statuses = NULL; /* Array of MPI_Isend chunk modification send statuses */ + hbool_t mem_iter_init = FALSE; + size_t shared_chunks_info_array_num_entries = 0; + size_t num_send_requests = 0; + size_t *num_assigned_chunks_array = NULL; + size_t i, last_assigned_idx; + int *send_counts = NULL; + int *send_displacements = NULL; + int scatter_recvcount_int; + int mpi_rank, mpi_size, mpi_code; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + HDassert(io_info); + HDassert(type_info); + HDassert(fm); + HDassert(local_chunk_array_num_entries); + + if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0) + HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank") + if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0) + HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size") + + if (*local_chunk_array_num_entries) + if (NULL == (send_requests = (MPI_Request *) H5MM_malloc(*local_chunk_array_num_entries * sizeof(MPI_Request)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send requests buffer") + + if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(H5S_sel_iter_t)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator") + + /* Gather every rank's list of chunks to rank 0 to allow it to perform the redistribution operation. After this + * call, the gathered list will initially be sorted in increasing order of chunk offset in the file. + */ + if (H5D__mpio_array_gatherv(local_chunk_array, *local_chunk_array_num_entries, sizeof(H5D_filtered_collective_io_info_t), + (void **) &shared_chunks_info_array, &shared_chunks_info_array_num_entries, mpi_size, + false, 0, io_info->comm, H5D__cmp_filtered_collective_io_info_entry) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather array") + + /* Rank 0 redistributes any shared chunks to new owners as necessary */ + if (mpi_rank == 0) { + if (NULL == (send_counts = (int *) H5MM_calloc((size_t) mpi_size * sizeof(int)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate send counts buffer") + + if (NULL == (send_displacements = (int *) H5MM_malloc((size_t) mpi_size * sizeof(int)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate send displacements buffer") + + if (NULL == (num_assigned_chunks_array = (size_t *) H5MM_calloc((size_t) mpi_size * sizeof(size_t)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate number of assigned chunks array") + + for (i = 0; i < shared_chunks_info_array_num_entries;) { + H5D_filtered_collective_io_info_t chunk_entry; + haddr_t last_seen_addr = shared_chunks_info_array[i].chunk_states.chunk_current.offset; + size_t set_begin_index = i; + size_t num_writers = 0; + int new_chunk_owner = shared_chunks_info_array[i].owners.original_owner; + + /* Process each set of duplicate entries caused by another process writing to the same chunk */ + do { + chunk_entry = shared_chunks_info_array[i]; + + send_counts[chunk_entry.owners.original_owner] += (int) sizeof(chunk_entry); + + /* The new owner of the chunk is determined by the process + * writing to the chunk which currently has the least amount + * of chunks assigned to it + */ + if (num_assigned_chunks_array[chunk_entry.owners.original_owner] < num_assigned_chunks_array[new_chunk_owner]) + new_chunk_owner = chunk_entry.owners.original_owner; + + num_writers++; + } while (++i < shared_chunks_info_array_num_entries && shared_chunks_info_array[i].chunk_states.chunk_current.offset == last_seen_addr); + + /* Set all of the chunk entries' "new_owner" fields */ + for (; set_begin_index < i; set_begin_index++) { + shared_chunks_info_array[set_begin_index].owners.new_owner = new_chunk_owner; + shared_chunks_info_array[set_begin_index].num_writers = num_writers; + } /* end for */ + + num_assigned_chunks_array[new_chunk_owner]++; + } /* end for */ + + /* Sort the new list in order of previous owner so that each original owner of a chunk + * entry gets that entry back, with the possibly newly-modified "new_owner" field + */ + HDqsort(shared_chunks_info_array, shared_chunks_info_array_num_entries, + sizeof(H5D_filtered_collective_io_info_t), H5D__cmp_filtered_collective_io_info_entry_owner); + + send_displacements[0] = 0; + for (i = 1; i < (size_t) mpi_size; i++) + send_displacements[i] = send_displacements[i - 1] + send_counts[i - 1]; + } /* end if */ + + /* Scatter the segments of the list back to each process */ + H5_CHECKED_ASSIGN(scatter_recvcount_int, int, *local_chunk_array_num_entries * sizeof(H5D_filtered_collective_io_info_t), size_t); + if (MPI_SUCCESS != (mpi_code = MPI_Scatterv(shared_chunks_info_array, send_counts, send_displacements, + MPI_BYTE, local_chunk_array, scatter_recvcount_int, MPI_BYTE, 0, io_info->comm))) + HMPI_GOTO_ERROR(FAIL, "unable to scatter shared chunks info buffer", mpi_code) + + if (shared_chunks_info_array) { + H5MM_free(shared_chunks_info_array); + shared_chunks_info_array = NULL; + } /* end if */ + + /* Now that the chunks have been redistributed, each process must send its modification data + * to the new owners of any of the chunks it previously possessed. Accordingly, each process + * must also issue asynchronous receives for any messages it may receive for each of the + * chunks it is assigned, in order to avoid potential deadlocking issues. + */ + if (*local_chunk_array_num_entries) + if (NULL == (mod_data = (unsigned char **) H5MM_malloc(*local_chunk_array_num_entries * sizeof(unsigned char *)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate modification data buffer array") + + for (i = 0, last_assigned_idx = 0; i < *local_chunk_array_num_entries; i++) { + H5D_filtered_collective_io_info_t *chunk_entry = &local_chunk_array[i]; + + if (mpi_rank != chunk_entry->owners.new_owner) { + H5D_chunk_info_t *chunk_info = NULL; + unsigned char *mod_data_p = NULL; + hssize_t iter_nelmts; + size_t mod_data_size; + + /* Look up the chunk and get its file and memory dataspaces */ + if (NULL == (chunk_info = (H5D_chunk_info_t *) H5SL_search(fm->sel_chunks, &chunk_entry->index))) + HGOTO_ERROR(H5E_DATASPACE, H5E_NOTFOUND, FAIL, "can't locate chunk in skip list") + + /* Determine size of serialized chunk file dataspace, plus the size of + * the data being written + */ + if (H5S_encode(chunk_info->fspace, &mod_data_p, &mod_data_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to get encoded dataspace size") + + if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + + mod_data_size += (size_t) iter_nelmts * type_info->src_type_size; + + if (NULL == (mod_data[num_send_requests] = (unsigned char *) H5MM_malloc(mod_data_size))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification send buffer") + + /* Serialize the chunk's file dataspace into the buffer */ + mod_data_p = mod_data[num_send_requests]; + if (H5S_encode(chunk_info->fspace, &mod_data_p, &mod_data_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to encode dataspace") + + /* Intialize iterator for memory selection */ + if (H5S_select_iter_init(mem_iter, chunk_info->mspace, type_info->src_type_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") + mem_iter_init = TRUE; + + /* Collect the modification data into the buffer */ + if (!H5D__gather_mem(io_info->u.wbuf, chunk_info->mspace, mem_iter, + (size_t) iter_nelmts, io_info->dxpl_cache, mod_data_p)) + HGOTO_ERROR(H5E_IO, H5E_CANTGATHER, FAIL, "couldn't gather from write buffer") + + /* Send modification data to new owner */ + H5_CHECK_OVERFLOW(mod_data_size, size_t, int) + H5_CHECK_OVERFLOW(chunk_entry->index, hsize_t, int) + if (MPI_SUCCESS != (mpi_code = MPI_Isend(mod_data[num_send_requests], (int) mod_data_size, MPI_BYTE, + chunk_entry->owners.new_owner, (int) chunk_entry->index, io_info->comm, &send_requests[num_send_requests]))) + HMPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code) + + if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release memory selection iterator") + mem_iter_init = FALSE; + + num_send_requests++; + } /* end if */ + else { + /* Allocate all necessary buffers for an asynchronous receive operation */ + if (chunk_entry->num_writers > 1) { + MPI_Message message; + MPI_Status status; + size_t j; + + chunk_entry->async_info.num_receive_requests = (int) chunk_entry->num_writers - 1; + if (NULL == (chunk_entry->async_info.receive_requests_array = (MPI_Request *) H5MM_malloc((size_t) chunk_entry->async_info.num_receive_requests * sizeof(MPI_Request)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate async requests array") + + if (NULL == (chunk_entry->async_info.receive_buffer_array = (unsigned char **) H5MM_malloc((size_t) chunk_entry->async_info.num_receive_requests * sizeof(unsigned char *)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate async receive buffers") + + for (j = 0; j < chunk_entry->num_writers - 1; j++) { + int count = 0; + + /* Probe for a particular message from any process, removing that message + * from the receive queue in the process and allocating that much memory + * for the asynchronous receive + */ + if (MPI_SUCCESS != (mpi_code = MPI_Mprobe(MPI_ANY_SOURCE, (int) chunk_entry->index, io_info->comm, &message, &status))) + HMPI_GOTO_ERROR(FAIL, "MPI_Mprobe failed", mpi_code) + + if (MPI_SUCCESS != (mpi_code = MPI_Get_count(&status, MPI_BYTE, &count))) + HMPI_GOTO_ERROR(FAIL, "MPI_Get_count failed", mpi_code) + + HDassert(count >= 0); + if (NULL == (chunk_entry->async_info.receive_buffer_array[j] = (unsigned char *) H5MM_malloc((size_t) count * sizeof(char *)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate modification data receive buffer") + + if (MPI_SUCCESS != (mpi_code = MPI_Imrecv(chunk_entry->async_info.receive_buffer_array[j], count, MPI_BYTE, + &message, &chunk_entry->async_info.receive_requests_array[j]))) + HMPI_GOTO_ERROR(FAIL, "MPI_Imrecv failed", mpi_code) + } /* end for */ + } /* end if */ + + local_chunk_array[last_assigned_idx++] = local_chunk_array[i]; + } /* end else */ + } /* end for */ + + *local_chunk_array_num_entries = last_assigned_idx; + + /* Wait for all async send requests to complete before returning */ + if (num_send_requests) { + if (NULL == (send_statuses = (MPI_Status *) H5MM_malloc(num_send_requests * sizeof(MPI_Status)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send statuses buffer") + + H5_CHECK_OVERFLOW(num_send_requests, size_t, int); + if (MPI_SUCCESS != (mpi_code = MPI_Waitall((int) num_send_requests, send_requests, send_statuses))) + HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code) + } /* end if */ + +done: + /* Now that all async send requests have completed, free up the send + * buffers used in the async operations + */ + for (i = 0; i < num_send_requests; i++) { + if (mod_data[i]) + H5MM_free(mod_data[i]); + } /* end for */ + + if (send_requests) + H5MM_free(send_requests); + if (send_statuses) + H5MM_free(send_statuses); + if (send_counts) + H5MM_free(send_counts); + if (send_displacements) + H5MM_free(send_displacements); + if (mod_data) + H5MM_free(mod_data); + if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) + HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") + if (mem_iter) + H5MM_free(mem_iter); + if (num_assigned_chunks_array) + H5MM_free(num_assigned_chunks_array); + if (shared_chunks_info_array) + H5MM_free(shared_chunks_info_array); + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__chunk_redistribute_shared_chunks() */ + + +/*------------------------------------------------------------------------- + * Function: H5D__mpio_filtered_collective_write_type + * + * Purpose: Constructs a MPI derived datatype for both the memory and + * the file for a collective write of filtered chunks. The + * datatype contains the offsets in the file and the locations + * of the filtered chunk data buffers. + * + * Return: Non-negative on success/Negative on failure + * + * Programmer: Jordan Henderson + * Tuesday, November 22, 2016 + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chunk_list, + size_t num_entries, MPI_Datatype *new_mem_type, hbool_t *mem_type_derived, + MPI_Datatype *new_file_type, hbool_t *file_type_derived) +{ + MPI_Aint *write_buf_array = NULL; /* Relative displacements of filtered chunk data buffers */ + MPI_Aint *file_offset_array = NULL; /* Chunk offsets in the file */ + int *length_array = NULL; /* Filtered Chunk lengths */ + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + HDassert(chunk_list); + HDassert(new_mem_type); + HDassert(mem_type_derived); + HDassert(new_file_type); + HDassert(file_type_derived); + + if (num_entries > 0) { + size_t i; + int mpi_code; + void *base_buf; + + H5_CHECK_OVERFLOW(num_entries, size_t, int); + + /* Allocate arrays */ + if (NULL == (length_array = (int *) H5MM_malloc((size_t) num_entries * sizeof(int)))) + HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "memory allocation failed for filtered collective write length array") + if (NULL == (write_buf_array = (MPI_Aint *) H5MM_malloc((size_t) num_entries * sizeof(MPI_Aint)))) + HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "memory allocation failed for filtered collective write buf length array") + if (NULL == (file_offset_array = (MPI_Aint *) H5MM_malloc((size_t) num_entries * sizeof(MPI_Aint)))) + HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "memory allocation failed for collective write offset array") + + /* Ensure the list is sorted in ascending order of offset in the file */ + HDqsort(chunk_list, num_entries, sizeof(H5D_filtered_collective_io_info_t), H5D__cmp_filtered_collective_io_info_entry); + + base_buf = chunk_list[0].buf; + for (i = 0; i < num_entries; i++) { + /* Set up the offset in the file, the length of the chunk data, and the relative + * displacement of the chunk data write buffer + */ + file_offset_array[i] = (MPI_Aint) chunk_list[i].chunk_states.new_chunk.offset; + length_array[i] = (int) chunk_list[i].chunk_states.new_chunk.length; + write_buf_array[i] = (MPI_Aint) chunk_list[i].buf - (MPI_Aint) base_buf; + } /* end for */ + + /* Create memory MPI type */ + if (MPI_SUCCESS != (mpi_code = MPI_Type_create_hindexed((int) num_entries, length_array, write_buf_array, MPI_BYTE, new_mem_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code) + *mem_type_derived = TRUE; + if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_mem_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) + + /* Create file MPI type */ + if (MPI_SUCCESS != (mpi_code = MPI_Type_create_hindexed((int) num_entries, length_array, file_offset_array, MPI_BYTE, new_file_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code) + *file_type_derived = TRUE; + if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_file_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) + } /* end if */ + +done: + if (write_buf_array) + H5MM_free(write_buf_array); + if (file_offset_array) + H5MM_free(file_offset_array); + if (length_array) + H5MM_free(length_array); + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__mpio_filtered_collective_write_type() */ + + +/*------------------------------------------------------------------------- + * Function: H5D__filtered_collective_chunk_entry_io + * + * Purpose: Given an entry for a filtered chunk, performs the necessary + * steps for updating the chunk data during a collective + * write, or for reading the chunk from file during a + * collective read. + * + * Return: Non-negative on success/Negative on failure + * + * Programmer: Jordan Henderson + * Wednesday, January 18, 2017 + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk_entry, + const H5D_io_info_t *io_info, const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm) +{ + H5D_chunk_info_t *chunk_info = NULL; + H5S_sel_iter_t *mem_iter = NULL; /* Memory iterator for H5D__scatter_mem/H5D__gather_mem */ + unsigned char *mod_data = NULL; /* Chunk modification data sent by a process to a chunk's owner */ + unsigned filter_mask = 0; + hssize_t iter_nelmts; /* Number of points to iterate over for the chunk IO operation */ + hssize_t extent_npoints; + hsize_t true_chunk_size; + hbool_t mem_iter_init = FALSE; + size_t buf_size; + size_t i; + H5S_t *dataspace = NULL; /* Other process' dataspace for the chunk */ + void *tmp_gath_buf = NULL; /* Temporary gather buffer for owner of the chunk to gather into from + application write buffer before scattering out to the chunk data buffer */ + int mpi_code; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + HDassert(chunk_entry); + HDassert(io_info); + HDassert(type_info); + HDassert(fm); + + /* Look up the chunk and get its file and memory dataspaces */ + if (NULL == (chunk_info = (H5D_chunk_info_t *) H5SL_search(fm->sel_chunks, &chunk_entry->index))) + HGOTO_ERROR(H5E_DATASPACE, H5E_NOTFOUND, FAIL, "can't locate chunk in skip list") + + if ((extent_npoints = H5S_GET_EXTENT_NPOINTS(chunk_info->fspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + true_chunk_size = (hsize_t) extent_npoints * type_info->src_type_size; + + /* If the size of the filtered chunk is larger than the number of points in the + * chunk file space extent times the datatype size, allocate enough space to hold the + * whole filtered chunk. Otherwise, allocate a buffer equal to the size of the + * chunk so that the unfiltering operation doesn't have to grow the buffer. + */ + buf_size = MAX(chunk_entry->chunk_states.chunk_current.length, true_chunk_size); + + if (NULL == (chunk_entry->buf = H5MM_malloc(buf_size))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer") + + /* If this is not a full chunk overwrite or this is a read operation, the chunk must be + * read from the file and unfiltered. + */ + if (!chunk_entry->full_overwrite || io_info->op_type == H5D_IO_OP_READ) { + chunk_entry->chunk_states.new_chunk.length = chunk_entry->chunk_states.chunk_current.length; + + /* Currently, these chunk reads are done independently and will likely + * cause issues with collective metadata reads enabled. In the future, + * this should be refactored to use collective chunk reads - JTH */ + if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_entry->chunk_states.chunk_current.offset, + chunk_entry->chunk_states.new_chunk.length, H5AC_rawdata_dxpl_id, chunk_entry->buf) < 0) + HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "unable to read raw data chunk") + + if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, &filter_mask, + io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb, + (size_t *) &chunk_entry->chunk_states.new_chunk.length, &buf_size, &chunk_entry->buf) < 0) + HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, FAIL, "couldn't unfilter chunk for modifying") + } /* end if */ + else { + chunk_entry->chunk_states.new_chunk.length = true_chunk_size; + } /* end else */ + + /* Initialize iterator for memory selection */ + if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(H5S_sel_iter_t)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator") + + if (H5S_select_iter_init(mem_iter, chunk_info->mspace, type_info->src_type_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") + mem_iter_init = TRUE; + + if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + + /* If this is a read operation, scatter the read chunk data to the user's buffer. + * + * If this is a write operation, update the chunk data buffer with the modifications + * from the current process, then apply any modifications from other processes. Finally, + * filter the newly-updated chunk. + */ + switch (io_info->op_type) { + case H5D_IO_OP_READ: + if (H5D__scatter_mem(chunk_entry->buf, chunk_info->mspace, mem_iter, + (size_t) iter_nelmts, io_info->dxpl_cache, io_info->u.rbuf) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't scatter to read buffer") + break; + + case H5D_IO_OP_WRITE: + if (NULL == (tmp_gath_buf = H5MM_malloc((hsize_t) iter_nelmts * type_info->src_type_size))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate temporary gather buffer") + + /* Gather modification data from the application write buffer into a temporary buffer */ + if (!H5D__gather_mem(io_info->u.wbuf, chunk_info->mspace, mem_iter, + (size_t) iter_nelmts, io_info->dxpl_cache, tmp_gath_buf)) + HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't gather from write buffer") + + if (H5S_SELECT_ITER_RELEASE(mem_iter) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") + mem_iter_init = FALSE; + + /* Initialize iterator for file selection */ + if (H5S_select_iter_init(mem_iter, chunk_info->fspace, type_info->dst_type_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize file selection information") + mem_iter_init = TRUE; + + if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->fspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + + /* Scatter the owner's modification data into the chunk data buffer according to + * the file space. + */ + if (H5D__scatter_mem(tmp_gath_buf, chunk_info->fspace, mem_iter, + (size_t) iter_nelmts, io_info->dxpl_cache, chunk_entry->buf) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't scatter to chunk data buffer") + + if (H5S_SELECT_ITER_RELEASE(mem_iter) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") + mem_iter_init = FALSE; + + if (MPI_SUCCESS != (mpi_code = MPI_Waitall(chunk_entry->async_info.num_receive_requests, + chunk_entry->async_info.receive_requests_array, MPI_STATUSES_IGNORE))) + HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code) + + /* For each asynchronous receive call previously posted, receive the chunk modification + * buffer from another rank and update the chunk data + */ + for (i = 0; i < (size_t) chunk_entry->async_info.num_receive_requests; i++) { + const unsigned char *mod_data_p; + + /* Decode the process' chunk file dataspace */ + mod_data_p = chunk_entry->async_info.receive_buffer_array[i]; + if (NULL == (dataspace = H5S_decode(&mod_data_p))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTDECODE, FAIL, "unable to decode dataspace") + + if (H5S_select_iter_init(mem_iter, dataspace, type_info->dst_type_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") + mem_iter_init = TRUE; + + if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(dataspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + + /* Update the chunk data with the received modification data */ + if (H5D__scatter_mem(mod_data_p, dataspace, mem_iter, (size_t) iter_nelmts, + io_info->dxpl_cache, chunk_entry->buf) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't scatter to write buffer") + + if (H5S_SELECT_ITER_RELEASE(mem_iter) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") + mem_iter_init = FALSE; + if (dataspace) { + if (H5S_close(dataspace) < 0) + HGOTO_ERROR(H5E_DATASPACE, H5E_CANTFREE, FAIL, "can't close dataspace") + dataspace = NULL; + } + H5MM_free(chunk_entry->async_info.receive_buffer_array[i]); + } /* end for */ + + /* Filter the chunk */ + if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, &filter_mask, + io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb, + (size_t *) &chunk_entry->chunk_states.new_chunk.length, &buf_size, &chunk_entry->buf) < 0) + HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "output pipeline failed") + +#if H5_SIZEOF_SIZE_T > 4 + /* Check for the chunk expanding too much to encode in a 32-bit value */ + if (chunk_entry->chunk_states.new_chunk.length > ((size_t) 0xffffffff)) + HGOTO_ERROR(H5E_DATASET, H5E_BADRANGE, FAIL, "chunk too large for 32-bit length") +#endif + + break; + default: + HGOTO_ERROR(H5E_DATASET, H5E_BADVALUE, FAIL, "invalid I/O operation") + } /* end switch */ + +done: + if (chunk_entry->async_info.receive_buffer_array) + H5MM_free(chunk_entry->async_info.receive_buffer_array); + if (chunk_entry->async_info.receive_requests_array) + H5MM_free(chunk_entry->async_info.receive_requests_array); + if (mod_data) + H5MM_free(mod_data); + if (tmp_gath_buf) + H5MM_free(tmp_gath_buf); + if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) + HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") + if (mem_iter) + H5MM_free(mem_iter); + if (dataspace) + if (H5S_close(dataspace) < 0) + HDONE_ERROR(H5E_DATASPACE, H5E_CANTFREE, FAIL, "can't close dataspace") + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__filtered_collective_chunk_entry_io() */ #endif /* H5_HAVE_PARALLEL */ diff --git a/src/H5Dpkg.h b/src/H5Dpkg.h index a6857b9..097fab7 100644 --- a/src/H5Dpkg.h +++ b/src/H5Dpkg.h @@ -617,6 +617,9 @@ H5_DLL herr_t H5D__select_write(const H5D_io_info_t *io_info, H5_DLL herr_t H5D__scatter_mem(const void *_tscat_buf, const H5S_t *space, H5S_sel_iter_t *iter, size_t nelmts, const H5D_dxpl_cache_t *dxpl_cache, void *_buf); +H5_DLL size_t H5D__gather_mem(const void *_buf, + const H5S_t *space, H5S_sel_iter_t *iter, size_t nelmts, + const H5D_dxpl_cache_t *dxpl_cache, void *_tgath_buf/*out*/); H5_DLL herr_t H5D__scatgath_read(const H5D_io_info_t *io_info, const H5D_type_info_t *type_info, hsize_t nelmts, const H5S_t *file_space, const H5S_t *mem_space); @@ -666,6 +669,8 @@ H5_DLL herr_t H5D__chunk_lookup(const H5D_t *dset, hid_t dxpl_id, const hsize_t *scaled, H5D_chunk_ud_t *udata); H5_DLL herr_t H5D__chunk_allocated(H5D_t *dset, hid_t dxpl_id, hsize_t *nbytes); H5_DLL herr_t H5D__chunk_allocate(const H5D_io_info_t *io_info, hbool_t full_overwrite, hsize_t old_dim[]); +H5_DLL herr_t H5D__chunk_file_alloc(const H5D_chk_idx_info_t *idx_info, const H5F_block_t *old_chunk, + H5F_block_t *new_chunk, hbool_t *need_insert, hsize_t scaled[]); H5_DLL herr_t H5D__chunk_update_old_edge_chunks(H5D_t *dset, hid_t dxpl_id, hsize_t old_dim[]); H5_DLL herr_t H5D__chunk_prune_by_extent(H5D_t *dset, hid_t dxpl_id, @@ -768,8 +773,7 @@ H5_DLL herr_t H5D__chunk_collective_write(H5D_io_info_t *io_info, * memory and the file */ H5_DLL htri_t H5D__mpio_opt_possible(const H5D_io_info_t *io_info, const H5S_t *file_space, const H5S_t *mem_space, - const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm, - H5P_genplist_t *dx_plist); + const H5D_type_info_t *type_info, H5P_genplist_t *dx_plist); #endif /* H5_HAVE_PARALLEL */ diff --git a/src/H5Dscatgath.c b/src/H5Dscatgath.c index 4625c7a..0ae69ee 100644 --- a/src/H5Dscatgath.c +++ b/src/H5Dscatgath.c @@ -47,9 +47,6 @@ static herr_t H5D__scatter_file(const H5D_io_info_t *io_info, static size_t H5D__gather_file(const H5D_io_info_t *io_info, const H5S_t *file_space, H5S_sel_iter_t *file_iter, size_t nelmts, void *buf); -static size_t H5D__gather_mem(const void *_buf, - const H5S_t *space, H5S_sel_iter_t *iter, size_t nelmts, - const H5D_dxpl_cache_t *dxpl_cache, void *_tgath_buf/*out*/); static herr_t H5D__compound_opt_read(size_t nelmts, const H5S_t *mem_space, H5S_sel_iter_t *iter, const H5D_dxpl_cache_t *dxpl_cache, const H5D_type_info_t *type_info, void *user_buf/*out*/); @@ -303,6 +300,7 @@ H5D__scatter_mem (const void *_tscat_buf, const H5S_t *space, HDassert(space); HDassert(iter); HDassert(nelmts > 0); + HDassert(dxpl_cache); HDassert(buf); /* Allocate the vector I/O arrays */ @@ -364,7 +362,7 @@ done: * *------------------------------------------------------------------------- */ -static size_t +size_t H5D__gather_mem(const void *_buf, const H5S_t *space, H5S_sel_iter_t *iter, size_t nelmts, const H5D_dxpl_cache_t *dxpl_cache, void *_tgath_buf/*out*/) @@ -387,6 +385,7 @@ H5D__gather_mem(const void *_buf, const H5S_t *space, HDassert(space); HDassert(iter); HDassert(nelmts > 0); + HDassert(dxpl_cache); HDassert(tgath_buf); /* Allocate the vector I/O arrays */ diff --git a/src/H5Ppublic.h b/src/H5Ppublic.h index 55b3877..854b1ef 100644 --- a/src/H5Ppublic.h +++ b/src/H5Ppublic.h @@ -166,7 +166,7 @@ typedef enum H5D_mpio_no_collective_cause_t { H5D_MPIO_MPI_OPT_TYPES_ENV_VAR_DISABLED = 0x08, H5D_MPIO_NOT_SIMPLE_OR_SCALAR_DATASPACES = 0x10, H5D_MPIO_NOT_CONTIGUOUS_OR_CHUNKED_DATASET = 0x20, - H5D_MPIO_FILTERS = 0x40 + H5D_MPIO_NO_COLLECTIVE_MAX_CAUSE = 0x40 } H5D_mpio_no_collective_cause_t; /********************/ diff --git a/src/H5err.txt b/src/H5err.txt index 3f5801f..d771956 100644 --- a/src/H5err.txt +++ b/src/H5err.txt @@ -243,6 +243,8 @@ MINOR, LINK, H5E_CANTSORT, Can't sort objects MINOR, MPI, H5E_MPI, Some MPI function failed MINOR, MPI, H5E_MPIERRSTR, MPI Error String MINOR, MPI, H5E_CANTRECV, Can't receive data +MINOR, MPI, H5E_CANTGATHER, Can't gather data +MINOR, MPI, H5E_NO_INDEPENDENT, Can't perform independent IO # Heap errors MINOR, HEAP, H5E_CANTRESTORE, Can't restore condition diff --git a/src/H5trace.c b/src/H5trace.c index 9fb8a72..930002f 100644 --- a/src/H5trace.c +++ b/src/H5trace.c @@ -621,10 +621,6 @@ H5_trace(const double *returning, const char *func, const char *type, ...) fprintf(out, "%sH5D_MPIO_NOT_CONTIGUOUS_OR_CHUNKED_DATASET", flag_already_displayed ? " | " : ""); flag_already_displayed = TRUE; } /* end if */ - if(nocol_cause_mode & H5D_MPIO_FILTERS) { - fprintf(out, "%sH5D_MPIO_FILTERS", flag_already_displayed ? " | " : ""); - flag_already_displayed = TRUE; - } /* end if */ /* Display '<none>' if there's no flags set */ if(!flag_already_displayed) diff --git a/testpar/Makefile.am b/testpar/Makefile.am index 7029bd5..b0fe0cd 100644 --- a/testpar/Makefile.am +++ b/testpar/Makefile.am @@ -23,7 +23,7 @@ AM_CPPFLAGS+=-I$(top_srcdir)/src -I$(top_srcdir)/test # Test programs. These are our main targets. # -TEST_PROG_PARA=t_mpi t_bigio testphdf5 t_cache t_cache_image t_pflush1 t_pflush2 t_pshutdown t_prestart t_init_term t_shapesame +TEST_PROG_PARA=t_mpi t_bigio testphdf5 t_cache t_cache_image t_pflush1 t_pflush2 t_pshutdown t_prestart t_init_term t_shapesame t_filters_parallel check_PROGRAMS = $(TEST_PROG_PARA) diff --git a/testpar/t_dset.c b/testpar/t_dset.c index b952bf3..65d1bb4 100644 --- a/testpar/t_dset.c +++ b/testpar/t_dset.c @@ -2651,11 +2651,8 @@ compress_readAll(void) nerrors++; } - /* Writing to the compressed, chunked dataset in parallel should fail */ - H5E_BEGIN_TRY { - ret = H5Dwrite(dataset, H5T_NATIVE_INT, H5S_ALL, H5S_ALL, xfer_plist, data_read); - } H5E_END_TRY; - VRFY((ret < 0), "H5Dwrite failed"); + ret = H5Dwrite(dataset, H5T_NATIVE_INT, H5S_ALL, H5S_ALL, xfer_plist, data_read); + VRFY((ret >= 0), "H5Dwrite succeeded"); ret = H5Pclose(xfer_plist); VRFY((ret >= 0), "H5Pclose succeeded"); diff --git a/testpar/t_filters_parallel.c b/testpar/t_filters_parallel.c new file mode 100644 index 0000000..21a5ce0 --- /dev/null +++ b/testpar/t_filters_parallel.c @@ -0,0 +1,2475 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Copyright by The HDF Group. * + * Copyright by the Board of Trustees of the University of Illinois. * + * All rights reserved. * + * * + * This file is part of HDF5. The full HDF5 copyright notice, including * + * terms governing use, modification, and redistribution, is contained in * + * the files COPYING and Copyright.html. COPYING can be found at the root * + * of the source code distribution tree; Copyright.html can be found at the * + * root level of an installed copy of the electronic HDF5 document set and * + * is linked from the top-level documents page. It can also be found at * + * http://hdfgroup.org/HDF5/doc/Copyright.html. If you do not have * + * access to either file, you may request a copy from help@hdfgroup.org. * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +/* + * Programmer: Jordan Henderson + * 01/31/2017 + * + * This file contains tests for writing to and reading from + * datasets in parallel with filters applied to the data. + */ + +#include "t_filters_parallel.h" + +const char *FILENAME[] = { + "t_filters_parallel", + NULL +}; +char filenames[1][256]; + +int nerrors = 0; + +#define ARRAY_SIZE(a) sizeof(a) / sizeof(a[0]) + +static void test_one_chunk_filtered_dataset(void); +static void test_filtered_dataset_no_overlap(void); +static void test_filtered_dataset_overlap(void); +static void test_filtered_dataset_single_no_selection(void); +static void test_filtered_dataset_all_no_selection(void); +static void test_filtered_dataset_point_selection(void); +static void test_filtered_dataset_interleaved_write(void); +static void test_3d_filtered_dataset_no_overlap_separate_pages(void); +static void test_3d_filtered_dataset_no_overlap_same_pages(void); +static void test_3d_filtered_dataset_overlap(void); +static void test_cmpd_filtered_dataset_no_conversion_unshared(void); +static void test_cmpd_filtered_dataset_no_conversion_shared(void); +static void test_cmpd_filtered_dataset_type_conversion_unshared(void); +static void test_cmpd_filtered_dataset_type_conversion_shared(void); +static void test_write_serial_read_parallel(void); +static void test_write_parallel_read_serial(void); + +static MPI_Comm comm = MPI_COMM_WORLD; +static MPI_Info info = MPI_INFO_NULL; +static int mpi_rank; +static int mpi_size; + +static void (*tests[])(void) = { + test_one_chunk_filtered_dataset, + test_filtered_dataset_no_overlap, + test_filtered_dataset_overlap, + test_filtered_dataset_single_no_selection, + test_filtered_dataset_all_no_selection, + test_filtered_dataset_point_selection, + test_filtered_dataset_interleaved_write, + test_3d_filtered_dataset_no_overlap_separate_pages, + test_3d_filtered_dataset_no_overlap_same_pages, + test_3d_filtered_dataset_overlap, + test_cmpd_filtered_dataset_no_conversion_unshared, + test_cmpd_filtered_dataset_no_conversion_shared, + test_cmpd_filtered_dataset_type_conversion_unshared, + test_cmpd_filtered_dataset_type_conversion_shared, + test_write_serial_read_parallel, + test_write_parallel_read_serial, +}; + +/* + * Tests parallel write of filtered data in the special + * case where a dataset is composed of a single chunk. + * + * Programmer: Jordan Henderson + * 02/01/2017 + */ +static void +test_one_chunk_filtered_dataset(void) +{ + C_DATATYPE *data = NULL; + C_DATATYPE *read_buf = NULL; + C_DATATYPE *correct_buf = NULL; + hsize_t dataset_dims[ONE_CHUNK_FILTERED_DATASET_DIMS]; + hsize_t chunk_dims[ONE_CHUNK_FILTERED_DATASET_DIMS]; + hsize_t sel_dims[ONE_CHUNK_FILTERED_DATASET_DIMS]; + hsize_t count[ONE_CHUNK_FILTERED_DATASET_DIMS]; + hsize_t stride[ONE_CHUNK_FILTERED_DATASET_DIMS]; + hsize_t block[ONE_CHUNK_FILTERED_DATASET_DIMS]; + hsize_t offset[ONE_CHUNK_FILTERED_DATASET_DIMS]; + size_t i, data_size, correct_buf_size; + hid_t file_id = -1, dset_id = -1, plist_id = -1; + hid_t filespace = -1, memspace = -1; + + if (MAINPROCESS) puts("Testing one-chunk filtered dataset"); + + /* Set up file access property list with parallel I/O access */ + plist_id = H5Pcreate(H5P_FILE_ACCESS); + VRFY((plist_id >= 0), "FAPL creation succeeded"); + + VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded"); + + VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded"); + + file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id); + VRFY((file_id >= 0), "Test file open succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded"); + + /* Create the dataspace for the dataset */ + dataset_dims[0] = (hsize_t) ONE_CHUNK_FILTERED_DATASET_NROWS; + dataset_dims[1] = (hsize_t) ONE_CHUNK_FILTERED_DATASET_NCOLS; + chunk_dims[0] = (hsize_t) ONE_CHUNK_FILTERED_DATASET_CH_NROWS; + chunk_dims[1] = (hsize_t) ONE_CHUNK_FILTERED_DATASET_CH_NCOLS; + sel_dims[0] = (hsize_t) ONE_CHUNK_FILTERED_DATASET_NROWS / (hsize_t) mpi_size; + sel_dims[1] = (hsize_t) ONE_CHUNK_FILTERED_DATASET_NCOLS; + + filespace = H5Screate_simple(ONE_CHUNK_FILTERED_DATASET_DIMS, dataset_dims, NULL); + VRFY((filespace >= 0), "File dataspace creation succeeded"); + + memspace = H5Screate_simple(ONE_CHUNK_FILTERED_DATASET_DIMS, sel_dims, NULL); + VRFY((memspace >= 0), "Memory dataspace creation succeeded"); + + /* Create chunked dataset */ + plist_id = H5Pcreate(H5P_DATASET_CREATE); + VRFY((plist_id >= 0), "DCPL creation succeeded"); + + VRFY((H5Pset_chunk(plist_id, ONE_CHUNK_FILTERED_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set"); + + /* Add test filter to the pipeline */ + VRFY((SET_FILTER(plist_id) >= 0), "Filter set"); + + dset_id = H5Dcreate2(file_id, ONE_CHUNK_FILTERED_DATASET_NAME, HDF5_DATATYPE_NAME, filespace, + H5P_DEFAULT, plist_id, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset creation succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + + /* Each process defines the dataset selection in memory and writes + * it to the hyperslab in the file + */ + count[0] = 1; + count[1] = 1; + stride[0] = (hsize_t) ONE_CHUNK_FILTERED_DATASET_CH_NROWS; + stride[1] = (hsize_t) ONE_CHUNK_FILTERED_DATASET_CH_NCOLS; + block[0] = sel_dims[0]; + block[1] = sel_dims[1]; + offset[0] = ((hsize_t) mpi_rank * sel_dims[0]); + offset[1] = 0; + + if (VERBOSE_MED) + printf("Process %d: count[ %llu, %llu ], stride[ %llu, %llu ], offset[ %llu, %llu ], block size[ %llu, %llu ]\n", + mpi_rank, count[0], count[1], stride[0], stride[1], offset[0], offset[1], block[0], block[1]); + + /* Select hyperslab in the file */ + filespace = H5Dget_space(dset_id); + VRFY((filespace >= 0), "File dataspace retrieval succeeded"); + + VRFY((H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, stride, count, block) >= 0), + "Hyperslab selection succeeded"); + + /* Fill data buffer */ + data_size = (hsize_t) ONE_CHUNK_FILTERED_DATASET_CH_NROWS * (hsize_t) ONE_CHUNK_FILTERED_DATASET_NCOLS * sizeof(*data); + correct_buf_size = dataset_dims[0] * dataset_dims[1] * sizeof(*correct_buf); + + data = (C_DATATYPE *) calloc(1, data_size); + VRFY((NULL != data), "calloc succeeded"); + + correct_buf = (C_DATATYPE *) calloc(1, correct_buf_size); + VRFY((NULL != correct_buf), "calloc succeeded"); + + for (i = 0; i < data_size / sizeof(*data); i++) + data[i] = (C_DATATYPE) GEN_DATA(i); + + for (i = 0; i < correct_buf_size / sizeof(*correct_buf); i++) + correct_buf[i] = ((C_DATATYPE) i % (ONE_CHUNK_FILTERED_DATASET_CH_NROWS / mpi_size * ONE_CHUNK_FILTERED_DATASET_CH_NCOLS)) + + ((C_DATATYPE) i / (ONE_CHUNK_FILTERED_DATASET_CH_NROWS / mpi_size * ONE_CHUNK_FILTERED_DATASET_CH_NCOLS)); + + /* Create property list for collective dataset write */ + plist_id = H5Pcreate(H5P_DATASET_XFER); + VRFY((plist_id >= 0), "DXPL creation succeeded"); + + VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded"); + + VRFY((H5Dwrite(dset_id, HDF5_DATATYPE_NAME, memspace, filespace, plist_id, data) >= 0), "Dataset write succeeded"); + + if (data) free(data); + + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + + /* Verify the correct data was written */ + read_buf = (C_DATATYPE *) calloc(1, correct_buf_size); + VRFY((NULL != read_buf), "calloc succeeded"); + + dset_id = H5Dopen2(file_id, "/" ONE_CHUNK_FILTERED_DATASET_NAME, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset open succeeded"); + + VRFY((H5Dread(dset_id, HDF5_DATATYPE_NAME, H5S_ALL, H5S_ALL, plist_id, read_buf) >= 0), "Dataset read succeeded"); + + VRFY((0 == memcmp(read_buf, correct_buf, correct_buf_size)), "Data verification succeeded"); + + if (correct_buf) free(correct_buf); + if (read_buf) free(read_buf); + + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded"); + VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded"); + VRFY((H5Fclose(file_id) >= 0), "File close succeeded"); + + return; +} + +/* + * Tests parallel write of filtered data in the case where only + * one process is writing to a particular chunk in the operation. + * In this case, the write operation can be optimized because + * chunks do not have to be redistributed to new owners. + * + * Programmer: Jordan Henderson + * 02/01/2017 + */ +static void +test_filtered_dataset_no_overlap(void) +{ + C_DATATYPE *data = NULL; + C_DATATYPE *read_buf = NULL; + C_DATATYPE *correct_buf = NULL; + hsize_t dataset_dims[UNSHARED_FILTERED_CHUNKS_DATASET_DIMS]; + hsize_t chunk_dims[UNSHARED_FILTERED_CHUNKS_DATASET_DIMS]; + hsize_t sel_dims[UNSHARED_FILTERED_CHUNKS_DATASET_DIMS]; + hsize_t count[UNSHARED_FILTERED_CHUNKS_DATASET_DIMS]; + hsize_t stride[UNSHARED_FILTERED_CHUNKS_DATASET_DIMS]; + hsize_t block[UNSHARED_FILTERED_CHUNKS_DATASET_DIMS]; + hsize_t offset[UNSHARED_FILTERED_CHUNKS_DATASET_DIMS]; + size_t i, data_size, correct_buf_size; + hid_t file_id = -1, dset_id = -1, plist_id = -1; + hid_t filespace = -1, memspace = -1; + + if (MAINPROCESS) puts("Testing write to unshared filtered chunks"); + + /* Set up file access property list with parallel I/O access */ + plist_id = H5Pcreate(H5P_FILE_ACCESS); + VRFY((plist_id >= 0), "FAPL creation succeeded"); + + VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded"); + + VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded"); + + file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id); + VRFY((file_id >= 0), "Test file open succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded"); + + /* Create the dataspace for the dataset */ + dataset_dims[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_NROWS; + dataset_dims[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_NCOLS; + chunk_dims[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_CH_NROWS; + chunk_dims[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_CH_NCOLS; + sel_dims[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_CH_NROWS; + sel_dims[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_NCOLS; + + filespace = H5Screate_simple(UNSHARED_FILTERED_CHUNKS_DATASET_DIMS, dataset_dims, NULL); + VRFY((filespace >= 0), "File dataspace creation succeeded"); + + memspace = H5Screate_simple(UNSHARED_FILTERED_CHUNKS_DATASET_DIMS, sel_dims, NULL); + VRFY((memspace >= 0), "Memory dataspace creation succeeded"); + + /* Create chunked dataset */ + plist_id = H5Pcreate(H5P_DATASET_CREATE); + VRFY((plist_id >= 0), "DCPL creation succeeded"); + + VRFY((H5Pset_chunk(plist_id, UNSHARED_FILTERED_CHUNKS_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set"); + + /* Add test filter to the pipeline */ + VRFY((SET_FILTER(plist_id) >= 0), "Filter set"); + + dset_id = H5Dcreate2(file_id, UNSHARED_FILTERED_CHUNKS_DATASET_NAME, HDF5_DATATYPE_NAME, filespace, + H5P_DEFAULT, plist_id, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset creation succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + + /* Each process defines the dataset selection in memory and writes + * it to the hyperslab in the file + */ + count[0] = 1; + count[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_NCOLS / (hsize_t) UNSHARED_FILTERED_CHUNKS_CH_NCOLS; + stride[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_CH_NROWS; + stride[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_CH_NCOLS; + block[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_CH_NROWS; + block[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_CH_NCOLS; + offset[0] = ((hsize_t) mpi_rank * (hsize_t) UNSHARED_FILTERED_CHUNKS_CH_NROWS * count[0]); + offset[1] = 0; + + if (VERBOSE_MED) + printf("Process %d: count[ %llu, %llu ], stride[ %llu, %llu ], offset[ %llu, %llu ], block size[ %llu, %llu ]\n", + mpi_rank, count[0], count[1], stride[0], stride[1], offset[0], offset[1], block[0], block[1]); + + /* Select hyperslab in the file */ + filespace = H5Dget_space(dset_id); + VRFY((dset_id >= 0), "File dataspace retrieval succeeded"); + + VRFY((H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, stride, count, block) >= 0), "Hyperslab selection succeeded"); + + /* Fill data buffer */ + data_size = sel_dims[0] * sel_dims[1] * sizeof(*data); + correct_buf_size = dataset_dims[0] * dataset_dims[1] * sizeof(*correct_buf); + + data = (C_DATATYPE *) calloc(1, data_size); + VRFY((NULL != data), "calloc succeeded"); + + correct_buf = (C_DATATYPE *) calloc(1, correct_buf_size); + VRFY((NULL != correct_buf), "calloc succeeded"); + + for (i = 0; i < data_size / sizeof(*data); i++) + data[i] = (C_DATATYPE) GEN_DATA(i); + + for (i = 0; i < correct_buf_size / sizeof(*correct_buf); i++) + correct_buf[i] = (C_DATATYPE) ( (i % (dataset_dims[0] / (hsize_t) mpi_size * dataset_dims[1])) + + (i / (dataset_dims[0] / (hsize_t) mpi_size * dataset_dims[1]))); + + /* Create property list for collective dataset write */ + plist_id = H5Pcreate(H5P_DATASET_XFER); + VRFY((plist_id >= 0), "DXPL creation succeeded"); + + VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded"); + + VRFY((H5Dwrite(dset_id, HDF5_DATATYPE_NAME, memspace, filespace, plist_id, data) >= 0), "Dataset write succeeded"); + + if (data) free(data); + + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + + /* Verify the correct data was written */ + read_buf = (C_DATATYPE *) calloc(1, correct_buf_size); + VRFY((NULL != read_buf), "calloc succeeded"); + + dset_id = H5Dopen2(file_id, "/" UNSHARED_FILTERED_CHUNKS_DATASET_NAME, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset open succeeded"); + + VRFY((H5Dread(dset_id, HDF5_DATATYPE_NAME, H5S_ALL, H5S_ALL, plist_id, read_buf) >= 0), "Dataset read succeeded"); + + VRFY((0 == memcmp(read_buf, correct_buf, correct_buf_size)), "Data verification succeeded"); + + if (correct_buf) free(correct_buf); + if (read_buf) free(read_buf); + + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded"); + VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded"); + VRFY((H5Fclose(file_id) >= 0), "File close succeeded"); + + return; +} + +/* + * Tests parallel write of filtered data in the case where + * more than one process is writing to a particular chunk + * in the operation. In this case, the chunks have to be + * redistributed before the operation so that only one process + * writes to a particular chunk. + * + * Programmer: Jordan Henderson + * 02/01/2017 + */ +static void +test_filtered_dataset_overlap(void) +{ + C_DATATYPE *data = NULL; + C_DATATYPE *read_buf = NULL; + C_DATATYPE *correct_buf = NULL; + hsize_t dataset_dims[SHARED_FILTERED_CHUNKS_DATASET_DIMS]; + hsize_t chunk_dims[SHARED_FILTERED_CHUNKS_DATASET_DIMS]; + hsize_t sel_dims[SHARED_FILTERED_CHUNKS_DATASET_DIMS]; + hsize_t count[SHARED_FILTERED_CHUNKS_DATASET_DIMS]; + hsize_t stride[SHARED_FILTERED_CHUNKS_DATASET_DIMS]; + hsize_t block[SHARED_FILTERED_CHUNKS_DATASET_DIMS]; + hsize_t offset[SHARED_FILTERED_CHUNKS_DATASET_DIMS]; + size_t i, data_size, correct_buf_size; + hid_t file_id = -1, dset_id = -1, plist_id = -1; + hid_t filespace = -1, memspace = -1; + + if (MAINPROCESS) puts("Testing write to shared filtered chunks"); + + /* Set up file access property list with parallel I/O access */ + plist_id = H5Pcreate(H5P_FILE_ACCESS); + VRFY((plist_id >= 0), "FAPL creation succeeded"); + + VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded"); + + VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded"); + + file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id); + VRFY((file_id >= 0), "Test file open succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded"); + + /* Create the dataspace for the dataset */ + dataset_dims[0] = (hsize_t) SHARED_FILTERED_CHUNKS_NROWS; + dataset_dims[1] = (hsize_t) SHARED_FILTERED_CHUNKS_NCOLS; + chunk_dims[0] = (hsize_t) SHARED_FILTERED_CHUNKS_CH_NROWS; + chunk_dims[1] = (hsize_t) SHARED_FILTERED_CHUNKS_CH_NCOLS; + sel_dims[0] = (hsize_t) DIM0_SCALE_FACTOR; + sel_dims[1] = (hsize_t) SHARED_FILTERED_CHUNKS_CH_NCOLS * (hsize_t) DIM1_SCALE_FACTOR; + + filespace = H5Screate_simple(SHARED_FILTERED_CHUNKS_DATASET_DIMS, dataset_dims, NULL); + VRFY((filespace >= 0), "File dataspace creation succeeded"); + + memspace = H5Screate_simple(SHARED_FILTERED_CHUNKS_DATASET_DIMS, sel_dims, NULL); + VRFY((memspace >= 0), "Memory dataspace creation succeeded"); + + /* Create chunked dataset */ + plist_id = H5Pcreate(H5P_DATASET_CREATE); + VRFY((plist_id >= 0), "DCPL creation succeeded"); + + VRFY((H5Pset_chunk(plist_id, SHARED_FILTERED_CHUNKS_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set"); + + /* Add test filter to the pipeline */ + VRFY((SET_FILTER(plist_id) >= 0), "Filter set"); + + dset_id = H5Dcreate2(file_id, SHARED_FILTERED_CHUNKS_DATASET_NAME, HDF5_DATATYPE_NAME, filespace, + H5P_DEFAULT, plist_id, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset creation succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + + /* Each process defines the dataset selection in memory and writes + * it to the hyperslab in the file + */ + count[0] = (hsize_t) SHARED_FILTERED_CHUNKS_NROWS / (hsize_t) SHARED_FILTERED_CHUNKS_CH_NROWS; + count[1] = (hsize_t) SHARED_FILTERED_CHUNKS_NCOLS / (hsize_t) SHARED_FILTERED_CHUNKS_CH_NCOLS; + stride[0] = (hsize_t) SHARED_FILTERED_CHUNKS_CH_NROWS; + stride[1] = (hsize_t) SHARED_FILTERED_CHUNKS_CH_NCOLS; + block[0] = (hsize_t) SHARED_FILTERED_CHUNKS_CH_NROWS / (hsize_t) mpi_size; + block[1] = (hsize_t) SHARED_FILTERED_CHUNKS_CH_NCOLS; + offset[0] = (hsize_t) mpi_rank * block[0]; + offset[1] = 0; + + if (VERBOSE_MED) + printf("Process %d is writing with count[ %llu, %llu ], stride[ %llu, %llu ], offset[ %llu, %llu ], block size[ %llu, %llu ]\n", + mpi_rank, count[0], count[1], stride[0], stride[1], offset[0], offset[1], block[0], block[1]); + + /* Select hyperslab in the file */ + filespace = H5Dget_space(dset_id); + VRFY((filespace >= 0), "File dataspace retrieval succeeded"); + + VRFY((H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, stride, count, block) >= 0), "Hyperslab selection succeeded"); + + /* Fill data buffer */ + data_size = sel_dims[0] * sel_dims[1] * sizeof(*data); + correct_buf_size = dataset_dims[0] * dataset_dims[1] * sizeof(*correct_buf); + + data = (C_DATATYPE *) calloc(1, data_size); + VRFY((NULL != data), "calloc succeeded"); + + correct_buf = (C_DATATYPE *) calloc(1, correct_buf_size); + VRFY((NULL != correct_buf), "calloc succeeded"); + + for (i = 0; i < data_size / sizeof(*data); i++) + data[i] = (C_DATATYPE) GEN_DATA(i); + + for (i = 0; i < correct_buf_size / sizeof(*correct_buf); i++) + correct_buf[i] = (C_DATATYPE) ((dataset_dims[1] * (i / ((hsize_t) mpi_size * dataset_dims[1]))) + + (i % dataset_dims[1]) + + (((i % ((hsize_t) mpi_size * dataset_dims[1])) / dataset_dims[1]) % dataset_dims[1])); + + /* Create property list for collective dataset write */ + plist_id = H5Pcreate(H5P_DATASET_XFER); + VRFY((plist_id >= 0), "DXPL creation succeeded"); + + VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded"); + + VRFY((H5Dwrite(dset_id, HDF5_DATATYPE_NAME, memspace, filespace, plist_id, data) >= 0), "Dataset write succeeded"); + + if (data) free(data); + + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + + /* Verify correct data was written */ + read_buf = (C_DATATYPE *) calloc(1, correct_buf_size); + VRFY((NULL != read_buf), "calloc succeeded"); + + dset_id = H5Dopen2(file_id, "/" SHARED_FILTERED_CHUNKS_DATASET_NAME, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset open succeeded"); + + VRFY((H5Dread(dset_id, HDF5_DATATYPE_NAME, H5S_ALL, H5S_ALL, plist_id, read_buf) >= 0), "Dataset read succeeded"); + + VRFY((0 == memcmp(read_buf, correct_buf, correct_buf_size)), "Data verification succeeded"); + + if (correct_buf) free(correct_buf); + if (read_buf) free(read_buf); + + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded"); + VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded"); + VRFY((H5Fclose(file_id) >= 0), "File close succeeded"); + + return; +} + +/* + * Tests parallel write of filtered data in the case where + * a single process in the write operation has no selection + * in the dataset's dataspace. In this case, the process with + * no selection still has to participate in the collective + * space re-allocation for the filtered chunks and also must + * participate in the re-insertion of the filtered chunks + * into the chunk index. + * + * Programmer: Jordan Henderson + * 02/01/2017 + */ +static void +test_filtered_dataset_single_no_selection(void) +{ + C_DATATYPE *data = NULL; + C_DATATYPE *read_buf = NULL; + C_DATATYPE *correct_buf = NULL; + hsize_t dataset_dims[SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS]; + hsize_t chunk_dims[SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS]; + hsize_t sel_dims[SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS]; + hsize_t count[SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS]; + hsize_t stride[SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS]; + hsize_t block[SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS]; + hsize_t offset[SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS]; + size_t i, data_size, correct_buf_size; + size_t segment_length; + hid_t file_id = -1, dset_id = -1, plist_id = -1; + hid_t filespace = -1, memspace = -1; + + if (MAINPROCESS) puts("Testing write to filtered chunks with a single process having no selection"); + + /* Set up file access property list with parallel I/O access */ + plist_id = H5Pcreate(H5P_FILE_ACCESS); + VRFY((plist_id >= 0), "FAPL creation succeeded"); + + VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded"); + + VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded"); + + file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id); + VRFY((file_id >= 0), "Test file open succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded"); + + /* Create the dataspace for the dataset */ + dataset_dims[0] = (hsize_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_NROWS; + dataset_dims[1] = (hsize_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_NCOLS; + chunk_dims[0] = (hsize_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_CH_NROWS; + chunk_dims[1] = (hsize_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_CH_NCOLS; + sel_dims[0] = (hsize_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_CH_NROWS; + sel_dims[1] = (hsize_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_NCOLS; + + if (mpi_rank == SINGLE_NO_SELECTION_FILTERED_CHUNKS_NO_SELECT_PROC) + sel_dims[0] = sel_dims[1] = 0; + + filespace = H5Screate_simple(SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS, dataset_dims, NULL); + VRFY((filespace >= 0), "File dataspace creation succeeded"); + + memspace = H5Screate_simple(SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS, sel_dims, NULL); + VRFY((memspace >= 0), "Memory dataspace creation succeeded"); + + /* Create chunked dataset */ + plist_id = H5Pcreate(H5P_DATASET_CREATE); + VRFY((plist_id >= 0), "DCPL creation succeeded"); + + VRFY((H5Pset_chunk(plist_id, SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set"); + + /* Add test filter to the pipeline */ + VRFY((SET_FILTER(plist_id) >= 0), "Filter set"); + + dset_id = H5Dcreate2(file_id, SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_NAME, HDF5_DATATYPE_NAME, filespace, + H5P_DEFAULT, plist_id, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset creation succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + + /* Each process defines the dataset selection in memory and writes + * it to the hyperslab in the file + */ + count[0] = 1; + count[1] = (hsize_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_NCOLS / (hsize_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_CH_NCOLS; + stride[0] = (hsize_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_CH_NROWS; + stride[1] = (hsize_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_CH_NCOLS; + block[0] = (hsize_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_CH_NROWS; + block[1] = (hsize_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_CH_NCOLS; + offset[0] = (hsize_t) mpi_rank * (hsize_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_CH_NROWS * count[0]; + offset[1] = 0; + + if (VERBOSE_MED) + printf("Process %d is writing with count[ %llu, %llu ], stride[ %llu, %llu ], offset[ %llu, %llu ], block size[ %llu, %llu ]\n", + mpi_rank, count[0], count[1], stride[0], stride[1], offset[0], offset[1], block[0], block[1]); + + /* Select hyperslab in the file */ + filespace = H5Dget_space(dset_id); + VRFY((filespace >= 0), "File dataspace retrieval succeeded"); + + if (mpi_rank == SINGLE_NO_SELECTION_FILTERED_CHUNKS_NO_SELECT_PROC) + VRFY((H5Sselect_none(filespace) >= 0), "Select none succeeded"); + else + VRFY((H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, stride, count, block) >= 0), "Hyperslab selection succeeded"); + + /* Fill data buffer */ + data_size = sel_dims[0] * sel_dims[1] * sizeof(*data); + correct_buf_size = dataset_dims[0] * dataset_dims[1] * sizeof(*correct_buf); + + data = (C_DATATYPE *) calloc(1, data_size); + VRFY((NULL != data), "calloc succeeded"); + + correct_buf = (C_DATATYPE *) calloc(1, correct_buf_size); + VRFY((NULL != correct_buf), "calloc succeeded"); + + for (i = 0; i < data_size / sizeof(*data); i++) + data[i] = (C_DATATYPE) GEN_DATA(i); + + for (i = 0; i < correct_buf_size / sizeof(*correct_buf); i++) + correct_buf[i] = (C_DATATYPE) ( (i % (dataset_dims[0] / (hsize_t) mpi_size * dataset_dims[1])) + + (i / (dataset_dims[0] / (hsize_t) mpi_size * dataset_dims[1]))); + + /* Compute the correct offset into the buffer for the process having no selection and clear it */ + segment_length = dataset_dims[0] * dataset_dims[1] / (hsize_t) mpi_size; + HDmemset(correct_buf + ((size_t) SINGLE_NO_SELECTION_FILTERED_CHUNKS_NO_SELECT_PROC * segment_length), 0, segment_length * sizeof(*data)); + + /* Create property list for collective dataset write */ + plist_id = H5Pcreate(H5P_DATASET_XFER); + VRFY((plist_id >= 0), "DXPL creation succeeded"); + + VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded"); + + VRFY((H5Dwrite(dset_id, HDF5_DATATYPE_NAME, memspace, filespace, plist_id, data) >= 0), "Dataset write succeeded"); + + if (data) free(data); + + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + + /* Verify the correct data was written */ + read_buf = (C_DATATYPE *) calloc(1, correct_buf_size); + VRFY((NULL != read_buf), "calloc succeeded"); + + dset_id = H5Dopen2(file_id, "/" SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_NAME, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset open succeeded"); + + VRFY((H5Dread(dset_id, HDF5_DATATYPE_NAME, H5S_ALL, H5S_ALL, plist_id, read_buf) >= 0), "Dataset read succeeded"); + + VRFY((0 == memcmp(read_buf, correct_buf, correct_buf_size)), "Data verification succeeded"); + + if (correct_buf) free(correct_buf); + if (read_buf) free(read_buf); + + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded"); + VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded"); + VRFY((H5Fclose(file_id) >= 0), "File close succeeded"); + + return; +} + +/* + * Tests parallel write of filtered data in the case + * where no process in the write operation has a + * selection in the dataset's dataspace. This test is + * to ensure that there are no assertion failures or + * similar issues due to size 0 allocations and the + * like. In this case, the file and dataset are created + * but the dataset is populated with the default fill + * value. + * + * Programmer: Jordan Henderson + * 02/02/2017 + */ +static void +test_filtered_dataset_all_no_selection(void) +{ + C_DATATYPE *data = NULL; + C_DATATYPE *read_buf = NULL; + C_DATATYPE *correct_buf = NULL; + hsize_t dataset_dims[ALL_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS]; + hsize_t chunk_dims[ALL_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS]; + hsize_t sel_dims[ALL_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS]; + size_t i, data_size, correct_buf_size; + hid_t file_id = -1, dset_id = -1, plist_id = -1; + hid_t filespace = -1, memspace = -1; + + if (MAINPROCESS) puts("Testing write to filtered chunks with all processes having no selection"); + + /* Set up file access property list with parallel I/O access */ + plist_id = H5Pcreate(H5P_FILE_ACCESS); + VRFY((plist_id >= 0), "FAPL creation succeeded"); + + VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded"); + + VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded"); + + file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id); + VRFY((file_id >= 0), "Test file open succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded"); + + /* Create the dataspace for the dataset */ + dataset_dims[0] = (hsize_t) ALL_NO_SELECTION_FILTERED_CHUNKS_NROWS; + dataset_dims[1] = (hsize_t) ALL_NO_SELECTION_FILTERED_CHUNKS_NCOLS; + chunk_dims[0] = (hsize_t) ALL_NO_SELECTION_FILTERED_CHUNKS_CH_NROWS; + chunk_dims[1] = (hsize_t) ALL_NO_SELECTION_FILTERED_CHUNKS_CH_NCOLS; + sel_dims[0] = sel_dims[1] = 0; + + filespace = H5Screate_simple(ALL_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS, dataset_dims, NULL); + VRFY((filespace >= 0), "File dataspace creation succeeded"); + + memspace = H5Screate_simple(ALL_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS, sel_dims, NULL); + VRFY((memspace >= 0), "Memory dataspace creation succeeded"); + + /* Create chunked dataset */ + plist_id = H5Pcreate(H5P_DATASET_CREATE); + VRFY((plist_id >= 0), "DCPL creation succeeded"); + + VRFY((H5Pset_chunk(plist_id, ALL_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set"); + + /* Add test filter to the pipeline */ + VRFY((SET_FILTER(plist_id) >= 0), "Filter set"); + + dset_id = H5Dcreate2(file_id, ALL_NO_SELECTION_FILTERED_CHUNKS_DATASET_NAME, HDF5_DATATYPE_NAME, filespace, + H5P_DEFAULT, plist_id, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset creation succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + + filespace = H5Dget_space(dset_id); + VRFY((filespace >= 0), "File dataspace retrieval succeeded"); + + VRFY((H5Sselect_none(filespace) >= 0), "Select none succeeded"); + + /* Fill data buffer */ + data_size = sel_dims[0] * sel_dims[1] * sizeof(*data); + correct_buf_size = dataset_dims[0] * dataset_dims[1] * sizeof(*correct_buf); + + data = (C_DATATYPE *) calloc(1, data_size); + VRFY((NULL != data), "calloc succeeded"); + + correct_buf = (C_DATATYPE *) calloc(1, correct_buf_size); + VRFY((NULL != correct_buf), "calloc succeeded"); + + for (i = 0; i < data_size / sizeof(*data); i++) + data[i] = (C_DATATYPE) GEN_DATA(i); + + /* Create property list for collective dataset write */ + plist_id = H5Pcreate(H5P_DATASET_XFER); + VRFY((plist_id >= 0), "DXPL creation succeeded"); + + VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded"); + + VRFY((H5Dwrite(dset_id, HDF5_DATATYPE_NAME, memspace, filespace, plist_id, data) >= 0), "Dataset write succeeded"); + + if (data) free(data); + + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + + /* Verify the correct data was written */ + read_buf = (C_DATATYPE *) calloc(1, correct_buf_size); + VRFY((NULL != read_buf), "calloc succeeded"); + + dset_id = H5Dopen2(file_id, "/" ALL_NO_SELECTION_FILTERED_CHUNKS_DATASET_NAME, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset open succeeded"); + + VRFY((H5Dread(dset_id, HDF5_DATATYPE_NAME, H5S_ALL, H5S_ALL, plist_id, read_buf) >= 0), "Dataset read succeeded"); + + VRFY((0 == memcmp(read_buf, correct_buf, correct_buf_size)), "Data verification succeeded"); + + if (correct_buf) free(correct_buf); + if (read_buf) free(read_buf); + + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded"); + VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded"); + VRFY((H5Fclose(file_id) >= 0), "File close succeeded"); + + return; +} + +/* + * Tests parallel write of filtered data by using + * point selections instead of hyperslab selections. + * + * Programmer: Jordan Henderson + * 02/02/2017 + */ +static void +test_filtered_dataset_point_selection(void) +{ + C_DATATYPE *data = NULL; + C_DATATYPE *correct_buf = NULL; + C_DATATYPE *read_buf = NULL; + hsize_t *coords = NULL; + hsize_t dataset_dims[POINT_SELECTION_FILTERED_CHUNKS_DATASET_DIMS]; + hsize_t chunk_dims[POINT_SELECTION_FILTERED_CHUNKS_DATASET_DIMS]; + hsize_t sel_dims[POINT_SELECTION_FILTERED_CHUNKS_DATASET_DIMS]; + size_t i, j, data_size, correct_buf_size; + size_t num_points; + hid_t file_id = -1, dset_id = -1, plist_id = -1; + hid_t filespace = -1, memspace = -1; + + if (MAINPROCESS) puts("Testing write to filtered chunks with point selection"); + + /* Set up file access property list with parallel I/O access */ + plist_id = H5Pcreate(H5P_FILE_ACCESS); + VRFY((plist_id >= 0), "FAPL creation succeeded"); + + VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded"); + + VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded"); + + file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id); + VRFY((file_id >= 0), "Test file open succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded"); + + /* Create the dataspace for the dataset */ + dataset_dims[0] = (hsize_t) POINT_SELECTION_FILTERED_CHUNKS_NROWS; + dataset_dims[1] = (hsize_t) POINT_SELECTION_FILTERED_CHUNKS_NCOLS; + chunk_dims[0] = (hsize_t) POINT_SELECTION_FILTERED_CHUNKS_CH_NROWS; + chunk_dims[1] = (hsize_t) POINT_SELECTION_FILTERED_CHUNKS_CH_NCOLS; + sel_dims[0] = (hsize_t) POINT_SELECTION_FILTERED_CHUNKS_NROWS / (hsize_t) mpi_size; + sel_dims[1] = (hsize_t) POINT_SELECTION_FILTERED_CHUNKS_NCOLS; + + filespace = H5Screate_simple(POINT_SELECTION_FILTERED_CHUNKS_DATASET_DIMS, dataset_dims, NULL); + VRFY((filespace >= 0), "File dataspace creation succeeded"); + + memspace = H5Screate_simple(POINT_SELECTION_FILTERED_CHUNKS_DATASET_DIMS, sel_dims, NULL); + VRFY((memspace >= 0), "Memory dataspace creation succeeded"); + + /* Create chunked dataset */ + plist_id = H5Pcreate(H5P_DATASET_CREATE); + VRFY((plist_id >= 0), "DCPL creation succeeded"); + + VRFY((H5Pset_chunk(plist_id, POINT_SELECTION_FILTERED_CHUNKS_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set"); + + /* Add test filter to the pipeline */ + VRFY((SET_FILTER(plist_id) >= 0), "Filter set"); + + dset_id = H5Dcreate2(file_id, POINT_SELECTION_FILTERED_CHUNKS_DATASET_NAME, HDF5_DATATYPE_NAME, filespace, + H5P_DEFAULT, plist_id, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset creation succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + + /* Set up point selection */ + filespace = H5Dget_space(dset_id); + VRFY((filespace >= 0), "File dataspace retrieval succeeded"); + + num_points = (hsize_t) POINT_SELECTION_FILTERED_CHUNKS_NROWS * (hsize_t) POINT_SELECTION_FILTERED_CHUNKS_NCOLS / (hsize_t) mpi_size; + coords = (hsize_t *) calloc(1, 2 * num_points * sizeof(*coords)); + VRFY((NULL != coords), "Coords calloc succeeded"); + + for (i = 0; i < num_points; i++) + for (j = 0; j < POINT_SELECTION_FILTERED_CHUNKS_DATASET_DIMS; j++) + coords[(i * POINT_SELECTION_FILTERED_CHUNKS_DATASET_DIMS) + j] = (j > 0) ? (i % (hsize_t) POINT_SELECTION_FILTERED_CHUNKS_NCOLS) + : ((hsize_t) mpi_rank + ((hsize_t) mpi_size * (i / (hsize_t) POINT_SELECTION_FILTERED_CHUNKS_NCOLS))); + + VRFY((H5Sselect_elements(filespace, H5S_SELECT_SET, (hsize_t) num_points, (const hsize_t *) coords) >= 0), + "Point selection succeeded"); + + /* Fill data buffer */ + data_size = sel_dims[0] * sel_dims[1] * sizeof(*data); + correct_buf_size = dataset_dims[0] * dataset_dims[1] * sizeof(*correct_buf); + + data = (C_DATATYPE *) calloc(1, data_size); + VRFY((NULL != data), "calloc succeeded"); + + correct_buf = (C_DATATYPE *) calloc(1, correct_buf_size); + VRFY((NULL != correct_buf), "calloc succeeded"); + + for (i = 0; i < data_size / sizeof(*data); i++) + data[i] = (C_DATATYPE) GEN_DATA(i); + + for (i = 0; i < correct_buf_size / sizeof(*correct_buf); i++) + correct_buf[i] = (C_DATATYPE) ((dataset_dims[1] * (i / ((hsize_t) mpi_size * dataset_dims[1]))) + + (i % dataset_dims[1]) + + (((i % ((hsize_t) mpi_size * dataset_dims[1])) / dataset_dims[1]) % dataset_dims[1])); + + /* Create property list for collective dataset write */ + plist_id = H5Pcreate(H5P_DATASET_XFER); + VRFY((plist_id >= 0), "DXPL creation succeeded"); + + VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded"); + + VRFY((H5Dwrite(dset_id, HDF5_DATATYPE_NAME, memspace, filespace, plist_id, data) >= 0), "Dataset write succeeded"); + + if (data) free(data); + + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + + /* Verify the correct data was written */ + read_buf = (C_DATATYPE *) calloc(1, correct_buf_size); + VRFY((NULL != read_buf), "calloc succeeded"); + + dset_id = H5Dopen2(file_id, "/" POINT_SELECTION_FILTERED_CHUNKS_DATASET_NAME, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset open succeeded"); + + VRFY((H5Dread(dset_id, HDF5_DATATYPE_NAME, H5S_ALL, H5S_ALL, plist_id, read_buf) >= 0), "Dataset read succeeded"); + + VRFY((0 == memcmp(read_buf, correct_buf, correct_buf_size)), "Data verification succeeded"); + + if (coords) free(coords); + if (correct_buf) free(correct_buf); + if (read_buf) free(read_buf); + + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded"); + VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded"); + VRFY((H5Fclose(file_id) >= 0), "File close succeeded"); + + return; +} + +/* + * Tests parallel write of filtered data in the case where + * each process writes an equal amount of data to each chunk + * in the dataset. Each chunk is distributed among the + * processes in round-robin fashion by blocks of size 1 until + * the whole chunk is selected, leading to an interleaved + * write pattern. + * + * Programmer: Jordan Henderson + * 02/02/2017 + */ +static void +test_filtered_dataset_interleaved_write(void) +{ + C_DATATYPE *data = NULL; + C_DATATYPE *read_buf = NULL; + C_DATATYPE *correct_buf = NULL; + hsize_t dataset_dims[INTERLEAVED_WRITE_FILTERED_DATASET_DIMS]; + hsize_t chunk_dims[INTERLEAVED_WRITE_FILTERED_DATASET_DIMS]; + hsize_t sel_dims[INTERLEAVED_WRITE_FILTERED_DATASET_DIMS]; + hsize_t count[INTERLEAVED_WRITE_FILTERED_DATASET_DIMS]; + hsize_t stride[INTERLEAVED_WRITE_FILTERED_DATASET_DIMS]; + hsize_t block[INTERLEAVED_WRITE_FILTERED_DATASET_DIMS]; + hsize_t offset[INTERLEAVED_WRITE_FILTERED_DATASET_DIMS]; + size_t i, data_size, correct_buf_size; + hid_t file_id = -1, dset_id = -1, plist_id = -1; + hid_t filespace = -1, memspace = -1; + + if (MAINPROCESS) puts("Testing interleaved write to filtered chunks"); + + /* Set up file access property list with parallel I/O access */ + plist_id = H5Pcreate(H5P_FILE_ACCESS); + VRFY((plist_id >= 0), "FAPL creation succeeded"); + + VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded"); + + VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded"); + + file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id); + VRFY((file_id >= 0), "Test file open succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded"); + + /* Create the dataspace for the dataset */ + dataset_dims[0] = (hsize_t) INTERLEAVED_WRITE_FILTERED_DATASET_NROWS; + dataset_dims[1] = (hsize_t) INTERLEAVED_WRITE_FILTERED_DATASET_NCOLS; + chunk_dims[0] = (hsize_t) INTERLEAVED_WRITE_FILTERED_DATASET_CH_NROWS; + chunk_dims[1] = (hsize_t) INTERLEAVED_WRITE_FILTERED_DATASET_CH_NCOLS; + sel_dims[0] = (hsize_t) (INTERLEAVED_WRITE_FILTERED_DATASET_NROWS / mpi_size); + sel_dims[1] = (hsize_t) INTERLEAVED_WRITE_FILTERED_DATASET_NCOLS; + + filespace = H5Screate_simple(INTERLEAVED_WRITE_FILTERED_DATASET_DIMS, dataset_dims, NULL); + VRFY((filespace >= 0), "File dataspace creation succeeded"); + + memspace = H5Screate_simple(INTERLEAVED_WRITE_FILTERED_DATASET_DIMS, sel_dims, NULL); + VRFY((memspace >= 0), "Memory dataspace creation succeeded"); + + /* Create chunked dataset */ + plist_id = H5Pcreate(H5P_DATASET_CREATE); + VRFY((plist_id >= 0), "DCPL creation succeeded"); + + VRFY((H5Pset_chunk(plist_id, INTERLEAVED_WRITE_FILTERED_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set"); + + /* Add test filter to the pipeline */ + VRFY((SET_FILTER(plist_id) >= 0), "Filter set"); + + dset_id = H5Dcreate2(file_id, INTERLEAVED_WRITE_FILTERED_DATASET_NAME, HDF5_DATATYPE_NAME, filespace, + H5P_DEFAULT, plist_id, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset creation succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + + /* Each process defines the dataset selection in memory and writes + * it to the hyperslab in the file + */ + count[0] = (hsize_t) (INTERLEAVED_WRITE_FILTERED_DATASET_NROWS / INTERLEAVED_WRITE_FILTERED_DATASET_CH_NROWS); + count[1] = (hsize_t) (INTERLEAVED_WRITE_FILTERED_DATASET_NCOLS / INTERLEAVED_WRITE_FILTERED_DATASET_CH_NCOLS); + stride[0] = (hsize_t) INTERLEAVED_WRITE_FILTERED_DATASET_CH_NROWS; + stride[1] = (hsize_t) INTERLEAVED_WRITE_FILTERED_DATASET_CH_NCOLS; + block[0] = 1; + block[1] = (hsize_t) INTERLEAVED_WRITE_FILTERED_DATASET_CH_NCOLS; + offset[0] = (hsize_t) mpi_rank; + offset[1] = 0; + + if (VERBOSE_MED) + printf("Process %d is writing with count[ %llu, %llu ], stride[ %llu, %llu ], offset[ %llu, %llu ], block size[ %llu, %llu ]\n", + mpi_rank, count[0], count[1], stride[0], stride[1], offset[0], offset[1], block[0], block[1]); + + /* Select hyperslab in the file */ + filespace = H5Dget_space(dset_id); + VRFY((filespace >= 0), "File dataspace retrieval succeeded"); + + VRFY((H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, stride, count, block) >= 0), "Hyperslab selection succeeded"); + + /* Fill data buffer */ + data_size = sel_dims[0] * sel_dims[1] * sizeof(*data); + correct_buf_size = dataset_dims[0] * dataset_dims[1] * sizeof(*correct_buf); + + data = (C_DATATYPE *) calloc(1, data_size); + VRFY((NULL != data), "calloc succeeded"); + + correct_buf = (C_DATATYPE *) calloc(1, correct_buf_size); + VRFY((NULL != correct_buf), "calloc succeeded"); + + for (i = 0; i < data_size / sizeof(*data); i++) + data[i] = (C_DATATYPE) GEN_DATA(i); + + for (i = 0; i < correct_buf_size / sizeof(*correct_buf); i++) + /* Add Column Index */ + correct_buf[i] = (C_DATATYPE) ( (i % (hsize_t) INTERLEAVED_WRITE_FILTERED_DATASET_NCOLS) + + /* Add the Row Index */ + + ((i % (hsize_t) (mpi_size * INTERLEAVED_WRITE_FILTERED_DATASET_NCOLS)) / (hsize_t) INTERLEAVED_WRITE_FILTERED_DATASET_NCOLS) + + /* Add the amount that gets added when a rank moves down to its next section vertically in the dataset */ + + ((hsize_t) INTERLEAVED_WRITE_FILTERED_DATASET_NCOLS * (i / (hsize_t) (mpi_size * INTERLEAVED_WRITE_FILTERED_DATASET_NCOLS)))); + + /* Create property list for collective dataset write */ + plist_id = H5Pcreate(H5P_DATASET_XFER); + VRFY((plist_id >= 0), "DXPL creation succeeded"); + + VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded"); + + VRFY((H5Dwrite(dset_id, HDF5_DATATYPE_NAME, memspace, filespace, plist_id, data) >= 0), "Dataset write succeeded"); + + if (data) free(data); + + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + + /* Verify the correct data was written */ + read_buf = (C_DATATYPE *) calloc(1, correct_buf_size); + VRFY((NULL != read_buf), "calloc succeeded"); + + dset_id = H5Dopen2(file_id, "/" INTERLEAVED_WRITE_FILTERED_DATASET_NAME, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset open succeeded"); + + VRFY((H5Dread(dset_id, HDF5_DATATYPE_NAME, H5S_ALL, H5S_ALL, plist_id, read_buf) >= 0), "Dataset read succeeded"); + + VRFY((0 == memcmp(read_buf, correct_buf, correct_buf_size)), "Data verification succeeded"); + + if (correct_buf) free(correct_buf); + if (read_buf) free(read_buf); + + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded"); + VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded"); + VRFY((H5Fclose(file_id) >= 0), "File close succeeded"); + + return; +} + +/* + * Tests parallel write of filtered data in the case where + * the dataset has 3 dimensions and each process writes + * to its own "page" in the 3rd dimension. + * + * Programmer: Jordan Henderson + * 02/06/2017 + */ +static void +test_3d_filtered_dataset_no_overlap_separate_pages(void) +{ + C_DATATYPE *data = NULL; + C_DATATYPE *read_buf = NULL; + C_DATATYPE *correct_buf = NULL; + hsize_t dataset_dims[UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_DIMS]; + hsize_t chunk_dims[UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_DIMS]; + hsize_t sel_dims[UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_DIMS]; + hsize_t count[UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_DIMS]; + hsize_t stride[UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_DIMS]; + hsize_t block[UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_DIMS]; + hsize_t offset[UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_DIMS]; + size_t i, data_size, correct_buf_size; + hid_t file_id = -1, dset_id = -1, plist_id = -1; + hid_t filespace = -1, memspace = -1; + + if (MAINPROCESS) puts("Testing write to unshared filtered chunks on separate pages in 3D dataset"); + + /* Set up file access property list with parallel I/O access */ + plist_id = H5Pcreate(H5P_FILE_ACCESS); + VRFY((plist_id >= 0), "FAPL creation succeeded"); + + VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded"); + + VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded"); + + file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id); + VRFY((file_id >= 0), "Test file open succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded"); + + /* Create the dataspace for the dataset */ + dataset_dims[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_NROWS; + dataset_dims[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_NCOLS; + dataset_dims[2] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DEPTH; + chunk_dims[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_CH_NROWS; + chunk_dims[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_CH_NCOLS; + chunk_dims[2] = 1; + sel_dims[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_NROWS; + sel_dims[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_NCOLS; + sel_dims[2] = 1; + + filespace = H5Screate_simple(UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_DIMS, dataset_dims, NULL); + VRFY((filespace >= 0), "File dataspace creation succeeded"); + + memspace = H5Screate_simple(UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_DIMS, sel_dims, NULL); + VRFY((memspace >= 0), "Memory dataspace creation succeeded"); + + /* Create chunked dataset */ + plist_id = H5Pcreate(H5P_DATASET_CREATE); + VRFY((plist_id >= 0), "DCPL creation succeeded"); + + VRFY((H5Pset_chunk(plist_id, UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set"); + + /* Add test filter to the pipeline */ + VRFY((SET_FILTER(plist_id) >= 0), "Filter set"); + + dset_id = H5Dcreate2(file_id, UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_NAME, HDF5_DATATYPE_NAME, filespace, + H5P_DEFAULT, plist_id, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset creation succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + + /* Each process defines the dataset selection in memory and writes + * it to the hyperslab in the file + */ + count[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_NROWS / (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_CH_NROWS; + count[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_NCOLS / (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_CH_NCOLS; + count[2] = 1; + stride[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_CH_NROWS; + stride[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_CH_NCOLS; + stride[2] = 1; + block[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_CH_NROWS; + block[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_CH_NCOLS; + block[2] = 1; + offset[0] = 0; + offset[1] = 0; + offset[2] = (hsize_t) mpi_rank; + + if (VERBOSE_MED) + printf("Process %d is writing with count[ %llu, %llu, %llu ], stride[ %llu, %llu, %llu ], offset[ %llu, %llu, %llu ], block size[ %llu, %llu, %llu ]\n", + mpi_rank, count[0], count[1], count[2], stride[0], stride[1], stride[2], offset[0], offset[1], offset[2], block[0], block[1], block[2]); + + /* Select hyperslab in the file */ + filespace = H5Dget_space(dset_id); + VRFY((filespace >= 0), "File dataspace retrieval succeeded"); + + VRFY((H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, stride, count, block) >= 0), "Hyperslab selection succeeded"); + + /* Fill data buffer */ + data_size = sel_dims[0] * sel_dims[1] * sel_dims[2] * sizeof(*data); + correct_buf_size = dataset_dims[0] * dataset_dims[1] * dataset_dims[2] * sizeof(*correct_buf); + + data = (C_DATATYPE *) calloc(1, data_size); + VRFY((NULL != data), "calloc succeeded"); + + correct_buf = (C_DATATYPE *) calloc(1, correct_buf_size); + VRFY((NULL != correct_buf), "calloc succeeded"); + + for (i = 0; i < data_size / sizeof(*data); i++) + data[i] = (C_DATATYPE) GEN_DATA(i); + + for (i = 0; i < correct_buf_size / sizeof(*correct_buf); i++) + correct_buf[i] = (C_DATATYPE) ((i % (hsize_t) mpi_size) + (i / (hsize_t) mpi_size)); + + /* Create property list for collective dataset write */ + plist_id = H5Pcreate(H5P_DATASET_XFER); + VRFY((plist_id >= 0), "DXPL creation succeeded"); + + VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded"); + + VRFY((H5Dwrite(dset_id, HDF5_DATATYPE_NAME, memspace, filespace, plist_id, data) >= 0), "Dataset write succeeded"); + + if (data) free(data); + + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + + /* Verify the correct data was written */ + read_buf = (C_DATATYPE *) calloc(1, correct_buf_size); + VRFY((NULL != read_buf), "calloc succeeded"); + + dset_id = H5Dopen2(file_id, "/" UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_NAME, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset open succeeded"); + + VRFY((H5Dread(dset_id, HDF5_DATATYPE_NAME, H5S_ALL, H5S_ALL, plist_id, read_buf) >= 0), "Dataset read succeeded"); + + VRFY((0 == memcmp(read_buf, correct_buf, correct_buf_size)), "Data verification succeeded"); + + if (correct_buf) free(correct_buf); + if (read_buf) free(read_buf); + + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded"); + VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded"); + VRFY((H5Fclose(file_id) >= 0), "File close succeeded"); + + return; +} + +/* + * Tests parallel write of filtered data in the case where + * the dataset has 3 dimensions and each process writes + * to each "page" in the 3rd dimension. However, no chunk + * on a given "page" is written to by more than one process. + * + * Programmer: Jordan Henderson + * 02/06/2017 + */ +static void +test_3d_filtered_dataset_no_overlap_same_pages(void) +{ + C_DATATYPE *data = NULL; + C_DATATYPE *read_buf = NULL; + C_DATATYPE *correct_buf = NULL; + hsize_t dataset_dims[UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_DIMS]; + hsize_t chunk_dims[UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_DIMS]; + hsize_t sel_dims[UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_DIMS]; + hsize_t count[UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_DIMS]; + hsize_t stride[UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_DIMS]; + hsize_t block[UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_DIMS]; + hsize_t offset[UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_DIMS]; + size_t i, data_size, correct_buf_size; + hid_t file_id, dset_id, plist_id; + hid_t filespace, memspace; + + if (MAINPROCESS) puts("Testing write to unshared filtered chunks on the same pages in 3D dataset"); + + /* Set up file access property list with parallel I/O access */ + plist_id = H5Pcreate(H5P_FILE_ACCESS); + VRFY((plist_id >= 0), "FAPL creation succeeded"); + + VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded"); + + VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded"); + + file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id); + VRFY((file_id >= 0), "Test file open succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded"); + + /* Create the dataspace for the dataset */ + dataset_dims[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_NROWS; + dataset_dims[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_NCOLS; + dataset_dims[2] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DEPTH; + chunk_dims[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_CH_NROWS; + chunk_dims[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_CH_NCOLS; + chunk_dims[2] = 1; + sel_dims[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_CH_NROWS; + sel_dims[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_NCOLS; + sel_dims[2] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DEPTH; + + filespace = H5Screate_simple(UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_DIMS, dataset_dims, NULL); + VRFY((filespace >= 0), "File dataspace creation succeeded"); + + memspace = H5Screate_simple(UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_DIMS, sel_dims, NULL); + VRFY((memspace >= 0), "Memory dataspace creation succeeded"); + + /* Create chunked dataset */ + plist_id = H5Pcreate(H5P_DATASET_CREATE); + VRFY((plist_id >= 0), "DCPL creation succeeded"); + + VRFY((H5Pset_chunk(plist_id, UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set"); + + /* Add test filter to the pipeline */ + VRFY((SET_FILTER(plist_id) >= 0), "Filter set"); + + dset_id = H5Dcreate2(file_id, UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_NAME, HDF5_DATATYPE_NAME, filespace, + H5P_DEFAULT, plist_id, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset creation succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + + /* Each process defines the dataset selection in memory and writes + * it to the hyperslab in the file + */ + count[0] = 1; + count[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_NCOLS / (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_CH_NCOLS; + count[2] = (hsize_t) mpi_size; + stride[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_CH_NROWS; + stride[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_CH_NCOLS; + stride[2] = 1; + block[0] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_CH_NROWS; + block[1] = (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_CH_NCOLS; + block[2] = 1; + offset[0] = ((hsize_t) mpi_rank * (hsize_t) UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_CH_NROWS * count[0]); + offset[1] = 0; + offset[2] = 0; + + if (VERBOSE_MED) + printf("Process %d is writing with count[ %llu, %llu, %llu ], stride[ %llu, %llu, %llu ], offset[ %llu, %llu, %llu ], block size[ %llu, %llu, %llu ]\n", + mpi_rank, count[0], count[1], count[2], stride[0], stride[1], stride[2], offset[0], offset[1], offset[2], block[0], block[1], block[2]); + + /* Select hyperslab in the file */ + filespace = H5Dget_space(dset_id); + VRFY((filespace >= 0), "File dataspace retrieval succeeded"); + + VRFY((H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, stride, count, block) >= 0), "Hyperslab selection succeeded"); + + /* Fill data buffer */ + data_size = sel_dims[0] * sel_dims[1] * sel_dims[2] * sizeof(*data); + correct_buf_size = dataset_dims[0] * dataset_dims[1] * dataset_dims[2] * sizeof(*correct_buf); + + data = (C_DATATYPE *) calloc(1, data_size); + VRFY((NULL != data), "calloc succeeded"); + + correct_buf = (C_DATATYPE *) calloc(1, correct_buf_size); + VRFY((NULL != correct_buf), "calloc succeeded"); + + for (i = 0; i < data_size / sizeof(*data); i++) + data[i] = (C_DATATYPE) GEN_DATA(i); + + for (i = 0; i < correct_buf_size / sizeof(*correct_buf); i++) + correct_buf[i] = (C_DATATYPE) ((i % (dataset_dims[0] * dataset_dims[1])) + (i / (dataset_dims[0] * dataset_dims[1]))); + + /* Create property list for collective dataset write */ + plist_id = H5Pcreate(H5P_DATASET_XFER); + VRFY((plist_id >= 0), "DXPL creation succeeded"); + + VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded"); + + VRFY((H5Dwrite(dset_id, HDF5_DATATYPE_NAME, memspace, filespace, plist_id, data) >= 0), "Dataset write succeeded"); + + if (data) free(data); + + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + + /* Verify the correct data was written */ + read_buf = (C_DATATYPE *) calloc(1, correct_buf_size); + VRFY((NULL != read_buf), "calloc succeeded"); + + dset_id = H5Dopen2(file_id, "/" UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_NAME, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset open succeeded"); + + VRFY((H5Dread(dset_id, HDF5_DATATYPE_NAME, H5S_ALL, H5S_ALL, plist_id, read_buf) >= 0), "Dataset read succeeded"); + + VRFY((0 == memcmp(read_buf, correct_buf, correct_buf_size)), "Data verification succeeded"); + + if (correct_buf) free(correct_buf); + if (read_buf) free(read_buf); + + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded"); + VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded"); + VRFY((H5Fclose(file_id) >= 0), "File close succeeded"); + + return; +} + +/* + * Tests parallel write of filtered data in the case where + * the dataset has 3 dimensions and each process writes + * to each "page" in the 3rd dimension. Further, each chunk + * in each "page" is written to equally by all processes. + * + * Programmer: Jordan Henderson + * 02/06/2017 + */ +static void +test_3d_filtered_dataset_overlap(void) +{ + C_DATATYPE *data = NULL; + C_DATATYPE *read_buf = NULL; + C_DATATYPE *correct_buf = NULL; + hsize_t dataset_dims[SHARED_FILTERED_CHUNKS_3D_DATASET_DIMS]; + hsize_t chunk_dims[SHARED_FILTERED_CHUNKS_3D_DATASET_DIMS]; + hsize_t sel_dims[SHARED_FILTERED_CHUNKS_3D_DATASET_DIMS]; + hsize_t count[SHARED_FILTERED_CHUNKS_3D_DATASET_DIMS]; + hsize_t stride[SHARED_FILTERED_CHUNKS_3D_DATASET_DIMS]; + hsize_t block[SHARED_FILTERED_CHUNKS_3D_DATASET_DIMS]; + hsize_t offset[SHARED_FILTERED_CHUNKS_3D_DATASET_DIMS]; + size_t i, data_size, correct_buf_size; + hid_t file_id = -1, dset_id = -1, plist_id = -1; + hid_t filespace = -1, memspace = -1; + + if (MAINPROCESS) puts("Testing write to shared filtered chunks in 3D dataset"); + + /* Set up file access property list with parallel I/O access */ + plist_id = H5Pcreate(H5P_FILE_ACCESS); + VRFY((plist_id >= 0), "FAPL creation succeeded"); + + VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded"); + + VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded"); + + file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id); + VRFY((file_id >= 0), "Test file open succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded"); + + /* Create the dataspace for the dataset */ + dataset_dims[0] = (hsize_t) SHARED_FILTERED_CHUNKS_3D_NROWS; + dataset_dims[1] = (hsize_t) SHARED_FILTERED_CHUNKS_3D_NCOLS; + dataset_dims[2] = (hsize_t) SHARED_FILTERED_CHUNKS_3D_DEPTH; + chunk_dims[0] = (hsize_t) SHARED_FILTERED_CHUNKS_3D_CH_NROWS; + chunk_dims[1] = (hsize_t) SHARED_FILTERED_CHUNKS_3D_CH_NCOLS; + chunk_dims[2] = 1; + sel_dims[0] = (hsize_t) (SHARED_FILTERED_CHUNKS_3D_NROWS / mpi_size); + sel_dims[1] = (hsize_t) SHARED_FILTERED_CHUNKS_3D_NCOLS; + sel_dims[2] = (hsize_t) SHARED_FILTERED_CHUNKS_3D_DEPTH; + + filespace = H5Screate_simple(SHARED_FILTERED_CHUNKS_3D_DATASET_DIMS, dataset_dims, NULL); + VRFY((filespace >= 0), "File dataspace creation succeeded"); + + memspace = H5Screate_simple(SHARED_FILTERED_CHUNKS_3D_DATASET_DIMS, sel_dims, NULL); + VRFY((memspace >= 0), "Memory dataspace creation succeeded"); + + /* Create chunked dataset */ + plist_id = H5Pcreate(H5P_DATASET_CREATE); + VRFY((plist_id >= 0), "DCPL creation succeeded"); + + VRFY((H5Pset_chunk(plist_id, SHARED_FILTERED_CHUNKS_3D_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set"); + + /* Add test filter to the pipeline */ + VRFY((SET_FILTER(plist_id) >= 0), "Filter set"); + + dset_id = H5Dcreate2(file_id, SHARED_FILTERED_CHUNKS_3D_DATASET_NAME, HDF5_DATATYPE_NAME, filespace, + H5P_DEFAULT, plist_id, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset creation succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + + /* Each process defines the dataset selection in memory and writes + * it to the hyperslab in the file + */ + count[0] = (hsize_t) (SHARED_FILTERED_CHUNKS_3D_NROWS / SHARED_FILTERED_CHUNKS_3D_CH_NROWS); + count[1] = (hsize_t) (SHARED_FILTERED_CHUNKS_3D_NCOLS / SHARED_FILTERED_CHUNKS_3D_CH_NCOLS); + count[2] = (hsize_t) SHARED_FILTERED_CHUNKS_3D_DEPTH; + stride[0] = (hsize_t) SHARED_FILTERED_CHUNKS_3D_CH_NROWS; + stride[1] = (hsize_t) SHARED_FILTERED_CHUNKS_3D_CH_NCOLS; + stride[2] = 1; + block[0] = 1; + block[1] = (hsize_t) SHARED_FILTERED_CHUNKS_3D_CH_NCOLS; + block[2] = 1; + offset[0] = (hsize_t) mpi_rank; + offset[1] = 0; + offset[2] = 0; + + if (VERBOSE_MED) + printf("Process %d is writing with count[ %llu, %llu, %llu ], stride[ %llu, %llu, %llu ], offset[ %llu, %llu, %llu ], block size[ %llu, %llu, %llu ]\n", + mpi_rank, count[0], count[1], count[2], stride[0], stride[1], stride[2], offset[0], offset[1], offset[2], block[0], block[1], block[2]); + + /* Select hyperslab in the file */ + filespace = H5Dget_space(dset_id); + VRFY((filespace >= 0), "File dataspace retrieval succeeded"); + + VRFY((H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, stride, count, block) >= 0), "Hyperslab selection succeeded"); + + /* Fill data buffer */ + data_size = sel_dims[0] * sel_dims[1] * sel_dims[2] * sizeof(*data); + correct_buf_size = dataset_dims[0] * dataset_dims[1] * dataset_dims[2] * sizeof(*correct_buf); + + data = (C_DATATYPE *) calloc(1, data_size); + VRFY((NULL != data), "calloc succeeded"); + + correct_buf = (C_DATATYPE *) calloc(1, correct_buf_size); + VRFY((NULL != correct_buf), "calloc succeeded"); + + for (i = 0; i < data_size / sizeof(*data); i++) + data[i] = (C_DATATYPE) GEN_DATA(i); + + for (i = 0; i < correct_buf_size / sizeof(*correct_buf); i++) + /* Add the Column Index */ + correct_buf[i] = (C_DATATYPE) ( (i % (hsize_t) (SHARED_FILTERED_CHUNKS_3D_DEPTH * SHARED_FILTERED_CHUNKS_3D_NCOLS)) + + /* Add the Row Index */ + + ((i % (hsize_t) (mpi_size * SHARED_FILTERED_CHUNKS_3D_DEPTH * SHARED_FILTERED_CHUNKS_3D_NCOLS)) / (hsize_t) (SHARED_FILTERED_CHUNKS_3D_DEPTH * SHARED_FILTERED_CHUNKS_3D_NCOLS)) + + /* Add the amount that gets added when a rank moves down to its next section vertically in the dataset */ + + ((hsize_t) (SHARED_FILTERED_CHUNKS_3D_DEPTH * SHARED_FILTERED_CHUNKS_3D_NCOLS) * (i / (hsize_t) (mpi_size * SHARED_FILTERED_CHUNKS_3D_DEPTH * SHARED_FILTERED_CHUNKS_3D_NCOLS)))); + + /* Create property list for collective dataset write */ + plist_id = H5Pcreate(H5P_DATASET_XFER); + VRFY((plist_id >= 0), "DXPL creation succeeded"); + + VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded"); + + VRFY((H5Dwrite(dset_id, HDF5_DATATYPE_NAME, memspace, filespace, plist_id, data) >= 0), "Dataset write succeeded"); + + if (data) free(data); + + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + + /* Verify the correct data was written */ + read_buf = (C_DATATYPE *) calloc(1, correct_buf_size); + VRFY((NULL != read_buf), "calloc succeeded"); + + dset_id = H5Dopen2(file_id, "/" SHARED_FILTERED_CHUNKS_3D_DATASET_NAME, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset open succeeded"); + + VRFY((H5Dread(dset_id, HDF5_DATATYPE_NAME, H5S_ALL, H5S_ALL, plist_id, read_buf) >= 0), "Dataset read succeeded"); + + VRFY((0 == memcmp(read_buf, correct_buf, correct_buf_size)), "Data verification succeeded"); + + if (correct_buf) free(correct_buf); + if (read_buf) free(read_buf); + + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded"); + VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded"); + VRFY((H5Fclose(file_id) >= 0), "File close succeeded"); + + return; +} + +/* + * Tests parallel write of filtered data to unshared + * chunks using a compound datatype which doesn't + * require a datatype conversion. + * + * Programmer: Jordan Henderson + * 02/10/2017 + */ +/* JTH: This test currently cannot be data-verified due to the floating-point data involved */ +static void +test_cmpd_filtered_dataset_no_conversion_unshared(void) +{ + cmpd_filtered_t *data = NULL; + hsize_t dataset_dims[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_DATASET_DIMS]; + hsize_t chunk_dims[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_DATASET_DIMS]; + hsize_t sel_dims[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_DATASET_DIMS]; + hsize_t count[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_DATASET_DIMS]; + hsize_t stride[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_DATASET_DIMS]; + hsize_t block[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_DATASET_DIMS]; + hsize_t offset[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_DATASET_DIMS]; + size_t i; + hid_t file_id = -1, dset_id = -1, plist_id = -1, memtype = -1; + hid_t filespace = -1, memspace = -1; + + if (MAINPROCESS) puts("Testing write to unshared filtered chunks in Compound Datatype dataset without Datatype conversion"); + + /* Set up file access property list with parallel I/O access */ + plist_id = H5Pcreate(H5P_FILE_ACCESS); + VRFY((plist_id >= 0), "FAPL creation succeeded"); + + VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded"); + + VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded"); + + file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id); + VRFY((file_id >= 0), "Test file open succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded"); + + /* Create the dataspace for the dataset */ + dataset_dims[0] = COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_NROWS; + dataset_dims[1] = (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_NCOLS; + chunk_dims[0] = COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_CH_NROWS; + chunk_dims[1] = COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_CH_NCOLS; + sel_dims[0] = COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_CH_NROWS; + sel_dims[1] = (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_ENTRIES_PER_PROC; + + filespace = H5Screate_simple(COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_DATASET_DIMS, dataset_dims, NULL); + VRFY((filespace >= 0), "File dataspace creation succeeded"); + + memspace = H5Screate_simple(COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_DATASET_DIMS, sel_dims, NULL); + VRFY((memspace >= 0), "Memory dataspace creation succeeded"); + + /* Create chunked dataset */ + plist_id = H5Pcreate(H5P_DATASET_CREATE); + VRFY((plist_id >= 0), "DCPL creation succeeded"); + + VRFY((H5Pset_chunk(plist_id, COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set"); + + /* Add test filter to the pipeline */ + VRFY((SET_FILTER(plist_id) >= 0), "Filter set"); + + /* Create the compound type for memory. */ + memtype = H5Tcreate(H5T_COMPOUND, sizeof(cmpd_filtered_t)); + VRFY((memtype >= 0), "Datatype creation succeeded"); + + VRFY((H5Tinsert(memtype, "ShortData", HOFFSET(cmpd_filtered_t, field1), H5T_NATIVE_SHORT) >= 0), "Datatype insertion succeeded"); + VRFY((H5Tinsert(memtype, "IntData", HOFFSET(cmpd_filtered_t, field2), H5T_NATIVE_INT) >= 0), "Datatype insertion succeeded"); + VRFY((H5Tinsert(memtype, "LongData", HOFFSET(cmpd_filtered_t, field3), H5T_NATIVE_LONG) >= 0), "Datatype insertion succeeded"); + VRFY((H5Tinsert(memtype, "DoubleData", HOFFSET(cmpd_filtered_t, field4), H5T_NATIVE_DOUBLE) >= 0), "Datatype insertion succeeded"); + + dset_id = H5Dcreate2(file_id, COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_DATASET_NAME, memtype, filespace, + H5P_DEFAULT, plist_id, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset creation succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + + /* Each process defines the dataset selection in memory and writes + * it to the hyperslab in the file + */ + count[0] = 1; + count[1] = (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_ENTRIES_PER_PROC; + stride[0] = COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_CH_NROWS; + stride[1] = COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_CH_NCOLS; + block[0] = COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_CH_NROWS; + block[1] = COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_CH_NCOLS; + offset[0] = 0; + offset[1] = ((hsize_t) mpi_rank * COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_CH_NCOLS); + + if (VERBOSE_MED) + printf("Process %d is writing with count[ %llu, %llu ], stride[ %llu, %llu ], offset[ %llu, %llu ], block size[ %llu, %llu ]\n", + mpi_rank, count[0], count[1], stride[0], stride[1], offset[0], offset[1], block[0], block[1]); + + /* Select hyperslab in the file */ + filespace = H5Dget_space(dset_id); + VRFY((filespace >= 0), "File dataspace retrieval succeeded"); + + VRFY((H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, stride, count, block) >= 0), "Hyperslab selection succeeded"); + + data = (COMPOUND_C_DATATYPE *) calloc(1, (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_ENTRIES_PER_PROC * sizeof(*data)); + VRFY((NULL != data), "calloc succeeded"); + + /* Fill data buffer */ + memset(data, 0, sizeof(cmpd_filtered_t) * (size_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_ENTRIES_PER_PROC); + for (i = 0; i < (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_ENTRIES_PER_PROC; i++) { + data[i].field1 = (short) GEN_DATA(i); + data[i].field2 = (int) GEN_DATA(i); + data[i].field3 = (long) GEN_DATA(i); + data[i].field4 = (double) GEN_DATA(i); + } + + /* Create property list for collective dataset write */ + plist_id = H5Pcreate(H5P_DATASET_XFER); + VRFY((plist_id >= 0), "DXPL creation succeeded"); + + VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded"); + + VRFY((H5Dwrite(dset_id, memtype, memspace, filespace, plist_id, data) >= 0), "Dataset write succeeded"); + + if (data) free(data); + + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded"); + VRFY((H5Tclose(memtype) >= 0), "Datatype close succeeded"); + VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded"); + VRFY((H5Fclose(file_id) >= 0), "File close succeeded"); + + return; +} + +/* + * Tests parallel write of filtered data to shared + * chunks using a compound datatype which doesn't + * require a datatype conversion. + * + * Programmer: Jordan Henderson + * 02/10/2017 + */ +/* JTH: This test currently cannot be data-verified due to the floating-point data involved */ +static void +test_cmpd_filtered_dataset_no_conversion_shared(void) +{ + cmpd_filtered_t *data = NULL; + hsize_t dataset_dims[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_DATASET_DIMS]; + hsize_t chunk_dims[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_DATASET_DIMS]; + hsize_t sel_dims[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_DATASET_DIMS]; + hsize_t count[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_DATASET_DIMS]; + hsize_t stride[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_DATASET_DIMS]; + hsize_t block[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_DATASET_DIMS]; + hsize_t offset[COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_DATASET_DIMS]; + size_t i; + hid_t file_id, dset_id, plist_id, memtype; + hid_t filespace, memspace; + + if (MAINPROCESS) puts("Testing write to shared filtered chunks in Compound Datatype dataset without Datatype conversion"); + + /* Set up file access property list with parallel I/O access */ + plist_id = H5Pcreate(H5P_FILE_ACCESS); + VRFY((plist_id>= 0), "FAPL creation succeeded"); + + VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded"); + + VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded"); + + file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id); + VRFY((file_id >= 0), "Test file open succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded"); + + /* Create the dataspace for the dataset */ + dataset_dims[0] = (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_NROWS; + dataset_dims[1] = (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_NCOLS; + chunk_dims[0] = (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_CH_NROWS; + chunk_dims[1] = COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_CH_NCOLS; + sel_dims[0] = (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_CH_NROWS / (hsize_t) mpi_size; + sel_dims[1] = (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_ENTRIES_PER_PROC; + + filespace = H5Screate_simple(COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_DATASET_DIMS, dataset_dims, NULL); + VRFY((filespace >= 0), "File dataspace creation succeeded"); + + memspace = H5Screate_simple(COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_DATASET_DIMS, sel_dims, NULL); + VRFY((memspace >= 0), "Memory dataspace creation succeeded"); + + /* Create chunked dataset */ + plist_id = H5Pcreate(H5P_DATASET_CREATE); + VRFY((plist_id >= 0), "DCPL creation succeeded"); + + VRFY((H5Pset_chunk(plist_id, COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set"); + + /* Add test filter to the pipeline */ + VRFY((SET_FILTER(plist_id) >= 0), "Filter set"); + + /* Create the compound type for memory. */ + memtype = H5Tcreate(H5T_COMPOUND, sizeof(cmpd_filtered_t)); + VRFY((memtype >= 0), "Datatype creation succeeded"); + + VRFY((H5Tinsert(memtype, "ShortData", HOFFSET(cmpd_filtered_t, field1), H5T_NATIVE_SHORT) >= 0), "Datatype insertion succeeded"); + VRFY((H5Tinsert(memtype, "IntData", HOFFSET(cmpd_filtered_t, field2), H5T_NATIVE_INT) >= 0), "Datatype insertion succeeded"); + VRFY((H5Tinsert(memtype, "LongData", HOFFSET(cmpd_filtered_t, field3), H5T_NATIVE_LONG) >= 0), "Datatype insertion succeeded"); + VRFY((H5Tinsert(memtype, "DoubleData", HOFFSET(cmpd_filtered_t, field4), H5T_NATIVE_DOUBLE) >= 0), "Datatype insertion succeeded"); + + dset_id = H5Dcreate2(file_id, COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_DATASET_NAME, memtype, filespace, + H5P_DEFAULT, plist_id, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset creation succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + + /* Each process defines the dataset selection in memory and writes + * it to the hyperslab in the file + */ + count[0] = 1; + count[1] = (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_ENTRIES_PER_PROC; + stride[0] = (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_CH_NROWS; + stride[1] = COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_CH_NCOLS; + block[0] = (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_CH_NROWS / (hsize_t) mpi_size; + block[1] = COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_CH_NCOLS; + offset[0] = (hsize_t) mpi_rank; + offset[1] = 0; + + if (VERBOSE_MED) + printf("Process %d is writing with count[ %llu, %llu ], stride[ %llu, %llu ], offset[ %llu, %llu ], block size[ %llu, %llu ]\n", + mpi_rank, count[0], count[1], stride[0], stride[1], offset[0], offset[1], block[0], block[1]); + + /* Select hyperslab in the file */ + filespace = H5Dget_space(dset_id); + VRFY((filespace >= 0), "File dataspace retrieval succeeded"); + + VRFY((H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, stride, count, block) >= 0), "Hyperslab selection succeeded"); + + data = (COMPOUND_C_DATATYPE *) calloc(1, (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_ENTRIES_PER_PROC * sizeof(*data)); + VRFY((NULL != data), "calloc succeeded"); + + /* Fill data buffer */ + memset(data, 0, sizeof(cmpd_filtered_t) * (size_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_ENTRIES_PER_PROC); + for (i = 0; i < (hsize_t) COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_ENTRIES_PER_PROC; i++) { + data[i].field1 = (short) GEN_DATA(i); + data[i].field2 = (int) GEN_DATA(i); + data[i].field3 = (long) GEN_DATA(i); + data[i].field4 = (double) GEN_DATA(i); + } + + /* Create property list for collective dataset write */ + plist_id = H5Pcreate(H5P_DATASET_XFER); + VRFY((plist_id >= 0), "DXPL creation succeeded"); + + VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded"); + + VRFY((H5Dwrite(dset_id, memtype, memspace, filespace, plist_id, data) >= 0), "Dataset write succeeded"); + + if (data) free(data); + + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded"); + VRFY((H5Tclose(memtype) >= 0), "Datatype close succeeded"); + VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded"); + VRFY((H5Fclose(file_id) >= 0), "File close succeeded"); + + return; +} + +/* + * Tests parallel write of filtered data to unshared + * chunks using a compound datatype which requires a + * datatype conversion. + * + * This test currently should fail because the datatype + * conversion causes the parallel library to break + * to independent I/O and this isn't allowed when + * there are filters in the pipeline. + * + * Programmer: Jordan Henderson + * 02/07/2017 + */ +/* JTH: This test currently cannot be data-verified due to the floating-point data involved */ +static void +test_cmpd_filtered_dataset_type_conversion_unshared(void) +{ + cmpd_filtered_t *data = NULL; + hsize_t dataset_dims[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_DATASET_DIMS]; + hsize_t chunk_dims[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_DATASET_DIMS]; + hsize_t sel_dims[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_DATASET_DIMS]; + hsize_t count[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_DATASET_DIMS]; + hsize_t stride[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_DATASET_DIMS]; + hsize_t block[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_DATASET_DIMS]; + hsize_t offset[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_DATASET_DIMS]; + size_t i; + hid_t file_id = -1, dset_id = -1, plist_id = -1, filetype = -1, memtype = -1; + hid_t filespace = -1, memspace = -1; + + if (MAINPROCESS) puts("Testing write to unshared filtered chunks in Compound Datatype dataset with Datatype conversion"); + + /* Set up file access property list with parallel I/O access */ + plist_id = H5Pcreate(H5P_FILE_ACCESS); + VRFY((plist_id >= 0), "FAPL creation succeeded"); + + VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded"); + + VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded"); + + file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id); + VRFY((file_id >= 0), "Test file open succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded"); + + /* Create the dataspace for the dataset */ + dataset_dims[0] = COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_NROWS; + dataset_dims[1] = (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_NCOLS; + chunk_dims[0] = COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_CH_NROWS; + chunk_dims[1] = COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_CH_NCOLS; + sel_dims[0] = COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_CH_NROWS; + sel_dims[1] = (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_ENTRIES_PER_PROC; + + filespace = H5Screate_simple(COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_DATASET_DIMS, dataset_dims, NULL); + VRFY((filespace >= 0), "File dataspace creation succeeded"); + + memspace = H5Screate_simple(COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_DATASET_DIMS, sel_dims, NULL); + VRFY((memspace >= 0), "Memory dataspace creation succeeded"); + + /* Create chunked dataset */ + plist_id = H5Pcreate(H5P_DATASET_CREATE); + VRFY((plist_id >= 0), "DCPL creation succeeded"); + + VRFY((H5Pset_chunk(plist_id, COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set"); + + /* Add test filter to the pipeline */ + VRFY((SET_FILTER(plist_id) >= 0), "Filter set"); + + /* Create the compound type for memory. */ + memtype = H5Tcreate(H5T_COMPOUND, sizeof(cmpd_filtered_t)); + VRFY((memtype >= 0), "Datatype creation succeeded"); + + VRFY((H5Tinsert(memtype, "ShortData", HOFFSET(cmpd_filtered_t, field1), H5T_NATIVE_SHORT) >= 0), "Datatype insertion succeeded"); + VRFY((H5Tinsert(memtype, "IntData", HOFFSET(cmpd_filtered_t, field2), H5T_NATIVE_INT) >= 0), "Datatype insertion succeeded"); + VRFY((H5Tinsert(memtype, "LongData", HOFFSET(cmpd_filtered_t, field3), H5T_NATIVE_LONG) >= 0), "Datatype insertion succeeded"); + VRFY((H5Tinsert(memtype, "DoubleData", HOFFSET(cmpd_filtered_t, field4), H5T_NATIVE_DOUBLE) >= 0), "Datatype insertion succeeded"); + + /* Create the compound type for file. */ + filetype = H5Tcreate(H5T_COMPOUND, 32); + VRFY((filetype >= 0), "Datatype creation succeeded"); + + VRFY((H5Tinsert(filetype, "ShortData", 0, H5T_STD_I64BE) >= 0), "Datatype insertion succeeded"); + VRFY((H5Tinsert(filetype, "IntData", 8, H5T_STD_I64BE) >= 0), "Datatype insertion succeeded"); + VRFY((H5Tinsert(filetype, "LongData", 16, H5T_STD_I64BE) >= 0), "Datatype insertion succeeded"); + VRFY((H5Tinsert(filetype, "DoubleData", 24, H5T_IEEE_F64BE) >= 0), "Datatype insertion succeeded"); + + dset_id = H5Dcreate2(file_id, COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_DATASET_NAME, filetype, filespace, + H5P_DEFAULT, plist_id, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset creation succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + + /* Each process defines the dataset selection in memory and writes + * it to the hyperslab in the file + */ + count[0] = 1; + count[1] = (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_ENTRIES_PER_PROC; + stride[0] = COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_CH_NROWS; + stride[1] = COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_CH_NCOLS; + block[0] = COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_CH_NROWS; + block[1] = COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_CH_NCOLS; + offset[0] = 0; + offset[1] = ((hsize_t) mpi_rank * COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_CH_NCOLS); + + if (VERBOSE_MED) + printf("Process %d is writing with count[ %llu, %llu ], stride[ %llu, %llu ], offset[ %llu, %llu ], block size[ %llu, %llu ]\n", + mpi_rank, count[0], count[1], stride[0], stride[1], offset[0], offset[1], block[0], block[1]); + + /* Select hyperslab in the file */ + filespace = H5Dget_space(dset_id); + VRFY((filespace >= 0), "File dataspace retrieval succeeded"); + + VRFY((H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, stride, count, block) >= 0), "Hyperslab selection succeeded"); + + data = (COMPOUND_C_DATATYPE *) calloc(1, (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_ENTRIES_PER_PROC * sizeof(*data)); + VRFY((NULL != data), "calloc succeeded"); + + /* Fill data buffer */ + memset(data, 0, sizeof(cmpd_filtered_t) * (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_ENTRIES_PER_PROC); + for (i = 0; i < (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_ENTRIES_PER_PROC; i++) { + data[i].field1 = (short) GEN_DATA(i); + data[i].field2 = (int) GEN_DATA(i); + data[i].field3 = (long) GEN_DATA(i); + data[i].field4 = (double) GEN_DATA(i); + } + + /* Create property list for collective dataset write */ + plist_id = H5Pcreate(H5P_DATASET_XFER); + VRFY((plist_id >= 0), "DXPL creation succeeded"); + + VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded"); + + /* Ensure that this test currently fails since type conversions break collective mode */ + H5E_BEGIN_TRY { + VRFY((H5Dwrite(dset_id, memtype, memspace, filespace, plist_id, data) < 0), "Dataset write succeeded"); + } H5E_END_TRY; + + if (data) free(data); + + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded"); + VRFY((H5Tclose(filetype) >= 0), "File datatype close succeeded"); + VRFY((H5Tclose(memtype) >= 0), "Memory datatype close succeeded"); + VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded"); + VRFY((H5Fclose(file_id) >= 0), "File close succeeded"); + + return; +} + +/* + * Tests parallel write of filtered data to shared + * chunks using a compound datatype which requires + * a datatype conversion. + * + * This test currently should fail because the datatype + * conversion causes the parallel library to break + * to independent I/O and this isn't allowed when + * there are filters in the pipeline. + * + * Programmer: Jordan Henderson + * 02/10/2017 + */ +/* JTH: This test currently cannot be data-verified due to the floating-point data involved */ +static void +test_cmpd_filtered_dataset_type_conversion_shared(void) +{ + cmpd_filtered_t *data = NULL; + hsize_t dataset_dims[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_DATASET_DIMS]; + hsize_t chunk_dims[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_DATASET_DIMS]; + hsize_t sel_dims[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_DATASET_DIMS]; + hsize_t count[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_DATASET_DIMS]; + hsize_t stride[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_DATASET_DIMS]; + hsize_t block[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_DATASET_DIMS]; + hsize_t offset[COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_DATASET_DIMS]; + size_t i; + hid_t file_id, dset_id, plist_id, filetype, memtype; + hid_t filespace, memspace; + + if (MAINPROCESS) puts("Testing write to shared filtered chunks in Compound Datatype dataset with Datatype conversion"); + + /* Set up file access property list with parallel I/O access */ + plist_id = H5Pcreate(H5P_FILE_ACCESS); + VRFY((plist_id >= 0), "FAPL creation succeeded"); + + VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded"); + + VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded"); + + file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id); + VRFY((file_id >= 0), "Test file open succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded"); + + /* Create the dataspace for the dataset */ + dataset_dims[0] = (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_NROWS; + dataset_dims[1] = (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_NCOLS; + chunk_dims[0] = (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_CH_NROWS; + chunk_dims[1] = COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_CH_NCOLS; + sel_dims[0] = (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_CH_NROWS / (hsize_t) mpi_size; + sel_dims[1] = (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_ENTRIES_PER_PROC; + + filespace = H5Screate_simple(COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_DATASET_DIMS, dataset_dims, NULL); + VRFY((filespace >= 0), "File dataspace creation succeeded"); + + memspace = H5Screate_simple(COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_DATASET_DIMS, sel_dims, NULL); + VRFY((memspace >= 0), "Memory dataspace creation succeeded"); + + /* Create chunked dataset */ + plist_id = H5Pcreate(H5P_DATASET_CREATE); + VRFY((plist_id >= 0), "DCPL creation succeeded"); + + VRFY((H5Pset_chunk(plist_id, COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set"); + + /* Add test filter to the pipeline */ + VRFY((SET_FILTER(plist_id) >= 0), "Filter set"); + + /* Create the compound type for memory. */ + memtype = H5Tcreate(H5T_COMPOUND, sizeof(cmpd_filtered_t)); + VRFY((memtype >= 0), "Datatype creation succeeded"); + + VRFY((H5Tinsert(memtype, "ShortData", HOFFSET(cmpd_filtered_t, field1), H5T_NATIVE_SHORT) >= 0), "Datatype insertion succeeded"); + VRFY((H5Tinsert(memtype, "IntData", HOFFSET(cmpd_filtered_t, field2), H5T_NATIVE_INT) >= 0), "Datatype insertion succeeded"); + VRFY((H5Tinsert(memtype, "LongData", HOFFSET(cmpd_filtered_t, field3), H5T_NATIVE_LONG) >= 0), "Datatype insertion succeeded"); + VRFY((H5Tinsert(memtype, "DoubleData", HOFFSET(cmpd_filtered_t, field4), H5T_NATIVE_DOUBLE) >= 0), "Datatype insertion succeeded"); + + /* Create the compound type for file. */ + filetype = H5Tcreate(H5T_COMPOUND, 32); + VRFY((filetype >= 0), "Datatype creation succeeded"); + + VRFY((H5Tinsert(filetype, "ShortData", 0, H5T_STD_I64BE) >= 0), "Datatype insertion succeeded"); + VRFY((H5Tinsert(filetype, "IntData", 8, H5T_STD_I64BE) >= 0), "Datatype insertion succeeded"); + VRFY((H5Tinsert(filetype, "LongData", 16, H5T_STD_I64BE) >= 0), "Datatype insertion succeeded"); + VRFY((H5Tinsert(filetype, "DoubleData", 24, H5T_IEEE_F64BE) >= 0), "Datatype insertion succeeded"); + + dset_id = H5Dcreate2(file_id, COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_DATASET_NAME, filetype, filespace, + H5P_DEFAULT, plist_id, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset creation succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + + /* Each process defines the dataset selection in memory and writes + * it to the hyperslab in the file + */ + count[0] = 1; + count[1] = (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_ENTRIES_PER_PROC; + stride[0] = (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_CH_NROWS; + stride[1] = COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_CH_NCOLS; + block[0] = (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_CH_NROWS / (hsize_t) mpi_size; + block[1] = COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_CH_NCOLS; + offset[0] = (hsize_t) mpi_rank; + offset[1] = 0; + + if (VERBOSE_MED) + printf("Process %d is writing with count[ %llu, %llu ], stride[ %llu, %llu ], offset[ %llu, %llu ], block size[ %llu, %llu ]\n", + mpi_rank, count[0], count[1], stride[0], stride[1], offset[0], offset[1], block[0], block[1]); + + /* Select hyperslab in the file */ + filespace = H5Dget_space(dset_id); + VRFY((filespace >= 0), "File dataspace retrieval succeeded"); + + VRFY((H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, stride, count, block) >= 0), "Hyperslab selection succeeded"); + + data = (COMPOUND_C_DATATYPE *) calloc(1, (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_ENTRIES_PER_PROC * sizeof(*data)); + VRFY((NULL != data), "calloc succeeded"); + + /* Fill data buffer */ + memset(data, 0, sizeof(cmpd_filtered_t) * (size_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_ENTRIES_PER_PROC); + for (i = 0; i < (hsize_t) COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_ENTRIES_PER_PROC; i++) { + data[i].field1 = (short) GEN_DATA(i); + data[i].field2 = (int) GEN_DATA(i); + data[i].field3 = (long) GEN_DATA(i); + data[i].field4 = (double) GEN_DATA(i); + } + + /* Create property list for collective dataset write */ + plist_id = H5Pcreate(H5P_DATASET_XFER); + VRFY((plist_id >= 0), "DXPL creation succeeded"); + + VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded"); + + /* Ensure that this test currently fails since type conversions break collective mode */ + H5E_BEGIN_TRY { + VRFY((H5Dwrite(dset_id, memtype, memspace, filespace, plist_id, data) < 0), "Dataset write succeeded"); + } H5E_END_TRY; + + if (data) free(data); + + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded"); + VRFY((H5Tclose(filetype) >= 0), "File datatype close succeeded"); + VRFY((H5Tclose(memtype) >= 0), "Memory datatype close succeeded"); + VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded"); + VRFY((H5Fclose(file_id) >= 0), "File close succeeded"); + + return; +} + +/* + * Tests write of filtered data to a dataset + * by a single process. After the write has + * succeeded, the dataset is closed and then + * re-opened in parallel and read by all + * processes to ensure data correctness. + * + * Programmer: Jordan Henderson + * 08/03/2017 + */ +static void +test_write_serial_read_parallel(void) +{ + C_DATATYPE *data = NULL; + C_DATATYPE *read_buf = NULL; + C_DATATYPE *correct_buf = NULL; + hsize_t dataset_dims[WRITE_SERIAL_READ_PARALLEL_DATASET_DIMS]; + hsize_t chunk_dims[WRITE_SERIAL_READ_PARALLEL_DATASET_DIMS]; + size_t i, data_size, correct_buf_size; + hid_t file_id = -1, dset_id = -1, plist_id = -1; + hid_t filespace = -1; + + if (MAINPROCESS) puts("Testing write file serially; read file in parallel"); + + dataset_dims[0] = (hsize_t) WRITE_SERIAL_READ_PARALLEL_NROWS; + dataset_dims[1] = (hsize_t) WRITE_SERIAL_READ_PARALLEL_NCOLS; + dataset_dims[2] = (hsize_t) WRITE_SERIAL_READ_PARALLEL_DEPTH; + + /* Write the file on the MAINPROCESS rank */ + if (MAINPROCESS) { + /* Set up file access property list */ + plist_id = H5Pcreate(H5P_FILE_ACCESS); + VRFY((plist_id >= 0), "FAPL creation succeeded"); + + VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded"); + + file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id); + VRFY((file_id >= 0), "Test file open succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded"); + + /* Create the dataspace for the dataset */ + chunk_dims[0] = (hsize_t) WRITE_SERIAL_READ_PARALLEL_CH_NROWS; + chunk_dims[1] = (hsize_t) WRITE_SERIAL_READ_PARALLEL_CH_NCOLS; + chunk_dims[2] = 1; + + filespace = H5Screate_simple(WRITE_SERIAL_READ_PARALLEL_DATASET_DIMS, dataset_dims, NULL); + VRFY((filespace >= 0), "File dataspace creation succeeded"); + + /* Create chunked dataset */ + plist_id = H5Pcreate(H5P_DATASET_CREATE); + VRFY((plist_id >= 0), "DCPL creation succeeded"); + + VRFY((H5Pset_chunk(plist_id, WRITE_SERIAL_READ_PARALLEL_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set"); + + /* Add test filter to the pipeline */ + VRFY((SET_FILTER(plist_id) >= 0), "Filter set"); + + dset_id = H5Dcreate2(file_id, WRITE_SERIAL_READ_PARALLEL_DATASET_NAME, HDF5_DATATYPE_NAME, filespace, + H5P_DEFAULT, plist_id, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset creation succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + + data_size = dataset_dims[0] * dataset_dims[1] * dataset_dims[2] * sizeof(*data); + + data = (C_DATATYPE *) calloc(1, data_size); + VRFY((NULL != data), "calloc succeeded"); + + for (i = 0; i < data_size / sizeof(*data); i++) + data[i] = (C_DATATYPE) GEN_DATA(i); + + VRFY((H5Dwrite(dset_id, HDF5_DATATYPE_NAME, H5S_ALL, H5S_ALL, H5P_DEFAULT, data) >= 0), "Dataset write succeeded"); + + if (data) free(data); + + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + VRFY((H5Fclose(file_id) >= 0), "File close succeeded"); + } + + correct_buf_size = dataset_dims[0] * dataset_dims[1] * dataset_dims[2] * sizeof(*correct_buf); + + correct_buf = (C_DATATYPE *) calloc(1, correct_buf_size); + VRFY((NULL != correct_buf), "calloc succeeded"); + + read_buf = (C_DATATYPE *) calloc(1, correct_buf_size); + VRFY((NULL != read_buf), "calloc succeeded"); + + for (i = 0; i < correct_buf_size / sizeof(*correct_buf); i++) + correct_buf[i] = (long) i; + + /* All ranks open the file and verify their "portion" of the dataset is correct */ + plist_id = H5Pcreate(H5P_FILE_ACCESS); + VRFY((plist_id >= 0), "FAPL creation succeeded"); + + VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded"); + + VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded"); + + file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id); + VRFY((file_id >= 0), "Test file open succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded"); + + dset_id = H5Dopen2(file_id, "/" WRITE_SERIAL_READ_PARALLEL_DATASET_NAME, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset open succeeded"); + + plist_id = H5Pcreate(H5P_DATASET_XFER); + VRFY((plist_id >= 0), "DXPL creation succeeded"); + + VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded"); + + VRFY((H5Dread(dset_id, HDF5_DATATYPE_NAME, H5S_ALL, H5S_ALL, plist_id, read_buf) >= 0), "Dataset read succeeded"); + + VRFY((0 == memcmp(read_buf, correct_buf, correct_buf_size)), "Data verification succeeded"); + + if (correct_buf) free(correct_buf); + if (read_buf) free(read_buf); + + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded"); + VRFY((H5Fclose(file_id) >= 0), "File close succeeded"); + + return; +} + +/* + * Tests parallel write of filtered data + * to a dataset. After the write has + * succeeded, the dataset is closed and + * then re-opened and read by a single + * process to ensure data correctness. + * + * Programmer: Jordan Henderson + * 08/03/2017 + */ +static void +test_write_parallel_read_serial(void) +{ + C_DATATYPE *data = NULL; + C_DATATYPE *read_buf = NULL; + C_DATATYPE *correct_buf = NULL; + hsize_t dataset_dims[WRITE_PARALLEL_READ_SERIAL_DATASET_DIMS]; + hsize_t chunk_dims[WRITE_PARALLEL_READ_SERIAL_DATASET_DIMS]; + hsize_t sel_dims[WRITE_PARALLEL_READ_SERIAL_DATASET_DIMS]; + hsize_t count[WRITE_PARALLEL_READ_SERIAL_DATASET_DIMS]; + hsize_t stride[WRITE_PARALLEL_READ_SERIAL_DATASET_DIMS]; + hsize_t block[WRITE_PARALLEL_READ_SERIAL_DATASET_DIMS]; + hsize_t offset[WRITE_PARALLEL_READ_SERIAL_DATASET_DIMS]; + size_t i, data_size, correct_buf_size; + hid_t file_id = -1, dset_id = -1, plist_id = -1; + hid_t filespace = -1, memspace = -1; + + if (MAINPROCESS) puts("Testing write file in parallel; read serially"); + + /* Set up file access property list with parallel I/O access */ + plist_id = H5Pcreate(H5P_FILE_ACCESS); + VRFY((plist_id >= 0), "FAPL creation succeeded"); + + VRFY((H5Pset_fapl_mpio(plist_id, comm, info) >= 0), "Set FAPL MPIO succeeded"); + + VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded"); + + file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id); + VRFY((file_id >= 0), "Test file open succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded"); + + /* Create the dataspace for the dataset */ + dataset_dims[0] = (hsize_t) WRITE_PARALLEL_READ_SERIAL_NROWS; + dataset_dims[1] = (hsize_t) WRITE_PARALLEL_READ_SERIAL_NCOLS; + dataset_dims[2] = (hsize_t) WRITE_PARALLEL_READ_SERIAL_DEPTH; + chunk_dims[0] = (hsize_t) WRITE_PARALLEL_READ_SERIAL_CH_NROWS; + chunk_dims[1] = (hsize_t) WRITE_PARALLEL_READ_SERIAL_CH_NCOLS; + chunk_dims[2] = 1; + sel_dims[0] = (hsize_t) WRITE_PARALLEL_READ_SERIAL_CH_NROWS; + sel_dims[1] = (hsize_t) WRITE_PARALLEL_READ_SERIAL_NCOLS; + sel_dims[2] = (hsize_t) WRITE_PARALLEL_READ_SERIAL_DEPTH; + + filespace = H5Screate_simple(WRITE_PARALLEL_READ_SERIAL_DATASET_DIMS, dataset_dims, NULL); + VRFY((filespace >= 0), "File dataspace creation succeeded"); + + memspace = H5Screate_simple(WRITE_PARALLEL_READ_SERIAL_DATASET_DIMS, sel_dims, NULL); + VRFY((memspace >= 0), "Memory dataspace creation succeeded"); + + /* Create chunked dataset */ + plist_id = H5Pcreate(H5P_DATASET_CREATE); + VRFY((plist_id >= 0), "DCPL creation succeeded"); + + VRFY((H5Pset_chunk(plist_id, WRITE_PARALLEL_READ_SERIAL_DATASET_DIMS, chunk_dims) >= 0), "Chunk size set"); + + /* Add test filter to the pipeline */ + VRFY((SET_FILTER(plist_id) >= 0), "Filter set"); + + dset_id = H5Dcreate2(file_id, WRITE_PARALLEL_READ_SERIAL_DATASET_NAME, HDF5_DATATYPE_NAME, filespace, + H5P_DEFAULT, plist_id, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset creation succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "DCPL close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + + /* Each process defines the dataset selection in memory and writes + * it to the hyperslab in the file + */ + count[0] = 1; + count[1] = (hsize_t) WRITE_PARALLEL_READ_SERIAL_NCOLS / (hsize_t) WRITE_PARALLEL_READ_SERIAL_CH_NCOLS; + count[2] = (hsize_t) mpi_size; + stride[0] = (hsize_t) WRITE_PARALLEL_READ_SERIAL_CH_NROWS; + stride[1] = (hsize_t) WRITE_PARALLEL_READ_SERIAL_CH_NCOLS; + stride[2] = 1; + block[0] = (hsize_t) WRITE_PARALLEL_READ_SERIAL_CH_NROWS; + block[1] = (hsize_t) WRITE_PARALLEL_READ_SERIAL_CH_NCOLS; + block[2] = 1; + offset[0] = ((hsize_t) mpi_rank * (hsize_t) WRITE_PARALLEL_READ_SERIAL_CH_NROWS * count[0]); + offset[1] = 0; + offset[2] = 0; + + if (VERBOSE_MED) + printf("Process %d is writing with count[ %llu, %llu, %llu ], stride[ %llu, %llu, %llu ], offset[ %llu, %llu, %llu ], block size[ %llu, %llu, %llu ]\n", + mpi_rank, count[0], count[1], count[2], stride[0], stride[1], stride[2], offset[0], offset[1], offset[2], block[0], block[1], block[2]); + + /* Select hyperslab in the file */ + filespace = H5Dget_space(dset_id); + VRFY((filespace >= 0), "File dataspace retrieval succeeded"); + + VRFY((H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, stride, count, block) >= 0), "Hyperslab selection succeeded"); + + /* Fill data buffer */ + data_size = sel_dims[0] * sel_dims[1] * sel_dims[2] * sizeof(*data); + + data = (C_DATATYPE *) calloc(1, data_size); + VRFY((NULL != data), "calloc succeeded"); + + for (i = 0; i < data_size / sizeof(*data); i++) + data[i] = (C_DATATYPE) GEN_DATA(i); + + /* Create property list for collective dataset write */ + plist_id = H5Pcreate(H5P_DATASET_XFER); + VRFY((plist_id >= 0), "DXPL creation succeeded"); + + VRFY((H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE) >= 0), "Set DXPL MPIO succeeded"); + + VRFY((H5Dwrite(dset_id, HDF5_DATATYPE_NAME, memspace, filespace, plist_id, data) >= 0), "Dataset write succeeded"); + + if (data) free(data); + + VRFY((H5Pclose(plist_id) >= 0), "DXPL close succeeded"); + VRFY((H5Sclose(filespace) >= 0), "File dataspace close succeeded"); + VRFY((H5Sclose(memspace) >= 0), "Memory dataspace close succeeded"); + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + VRFY((H5Fclose(file_id) >= 0), "File close succeeded"); + + if (MAINPROCESS) { + plist_id = H5Pcreate(H5P_FILE_ACCESS); + VRFY((plist_id >= 0), "FAPL creation succeeded"); + + VRFY((H5Pset_libver_bounds(plist_id, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded"); + + file_id = H5Fopen(filenames[0], H5F_ACC_RDWR, plist_id); + VRFY((file_id >= 0), "Test file open succeeded"); + + VRFY((H5Pclose(plist_id) >= 0), "FAPL close succeeded"); + + dset_id = H5Dopen2(file_id, "/" WRITE_PARALLEL_READ_SERIAL_DATASET_NAME, H5P_DEFAULT); + VRFY((dset_id >= 0), "Dataset open succeeded"); + + correct_buf_size = dataset_dims[0] * dataset_dims[1] * dataset_dims[2] * sizeof(*correct_buf); + + correct_buf = (C_DATATYPE *) calloc(1, correct_buf_size); + VRFY((NULL != correct_buf), "calloc succeeded"); + + read_buf = (C_DATATYPE *) calloc(1, correct_buf_size); + VRFY((NULL != read_buf), "calloc succeeded"); + + for (i = 0; i < correct_buf_size / sizeof(*correct_buf); i++) + correct_buf[i] = (C_DATATYPE) ((i % (dataset_dims[0] * dataset_dims[1])) + (i / (dataset_dims[0] * dataset_dims[1])));; + + VRFY((H5Dread(dset_id, HDF5_DATATYPE_NAME, H5S_ALL, H5S_ALL, H5P_DEFAULT, read_buf) >= 0), "Dataset read succeeded"); + + VRFY((0 == memcmp(read_buf, correct_buf, correct_buf_size)), "Data verification succeeded"); + + VRFY((H5Dclose(dset_id) >= 0), "Dataset close succeeded"); + VRFY((H5Fclose(file_id) >= 0), "File close succeeded"); + } + + return; +} + +int +main(int argc, char** argv) +{ + size_t i; + hid_t file_id = -1, fapl = -1; + int mpi_code; + + /* Initialize MPI */ + MPI_Init(&argc, &argv); + MPI_Comm_size(comm, &mpi_size); + MPI_Comm_rank(comm, &mpi_rank); + + if (mpi_size <= 0) { + if (MAINPROCESS) { + printf("The Parallel Filters tests require at least 1 rank.\n"); + printf("Quitting...\n"); + } + + MPI_Abort(MPI_COMM_WORLD, 1); + } + + if (H5dont_atexit() < 0) { + printf("Failed to turn off atexit processing. Continue.\n"); + } + + H5open(); + + if (MAINPROCESS) { + printf("==========================\n"); + printf("Parallel Filters tests\n"); + printf("==========================\n\n"); + } + + if (VERBOSE_MED) h5_show_hostname(); + + ALARM_ON; + + /* Create test file */ + fapl = H5Pcreate(H5P_FILE_ACCESS); + VRFY((fapl >= 0), "FAPL creation succeeded"); + + VRFY((H5Pset_fapl_mpio(fapl, comm, info) >= 0), "Set FAPL MPIO succeeded"); + + VRFY((H5Pset_libver_bounds(fapl, H5F_LIBVER_LATEST, H5F_LIBVER_LATEST) >= 0), "Set libver bounds succeeded"); + + VRFY((h5_fixname(FILENAME[0], fapl, filenames[0], sizeof(filenames[0])) != NULL), "Test file name created"); + + file_id = H5Fcreate(filenames[0], H5F_ACC_TRUNC, H5P_DEFAULT, fapl); + VRFY((file_id >= 0), "Test file creation succeeded"); + + VRFY((H5Fclose(file_id) >= 0), "File close succeeded"); + + for (i = 0; i < ARRAY_SIZE(tests); i++) { + if (MPI_SUCCESS == (mpi_code = MPI_Barrier(comm))) { + (*tests[i])(); + } else { + if (MAINPROCESS) MESG("MPI_Barrier failed"); + nerrors++; + } + } + + if (nerrors) goto exit; + + if (MAINPROCESS) puts("All Parallel Filters tests passed\n"); + +exit: + if (nerrors) + if (MAINPROCESS) printf("*** %d TEST ERROR%s OCCURRED ***\n", nerrors, nerrors > 1 ? "S" : ""); + + ALARM_OFF; + + h5_clean_files(FILENAME, fapl); + + H5close(); + + MPI_Finalize(); + + exit((nerrors ? EXIT_FAILURE : EXIT_SUCCESS)); +} diff --git a/testpar/t_filters_parallel.h b/testpar/t_filters_parallel.h new file mode 100644 index 0000000..cb9a1ab --- /dev/null +++ b/testpar/t_filters_parallel.h @@ -0,0 +1,212 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Copyright by The HDF Group. * + * Copyright by the Board of Trustees of the University of Illinois. * + * All rights reserved. * + * * + * This file is part of HDF5. The full HDF5 copyright notice, including * + * terms governing use, modification, and redistribution, is contained in * + * the files COPYING and Copyright.html. COPYING can be found at the root * + * of the source code distribution tree; Copyright.html can be found at the * + * root level of an installed copy of the electronic HDF5 document set and * + * is linked from the top-level documents page. It can also be found at * + * http://hdfgroup.org/HDF5/doc/Copyright.html. If you do not have * + * access to either file, you may request a copy from help@hdfgroup.org. * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +/* + * Programmer: Jordan Henderson + * 01/31/2017 + * + * This file contains #defines for tests of the use + * of filters in parallel HDF5, implemented in + * H5Dmpio.c + */ + +#ifndef TEST_PARALLEL_FILTERS_H_ +#define TEST_PARALLEL_FILTERS_H_ + +#include <string.h> + +#include "stdlib.h" +#include "testpar.h" + +/* Used to load other filters than GZIP */ +/* #define DYNAMIC_FILTER */ /* Uncomment and define the fields below to use a dynamically loaded filter */ +#define FILTER_NUM_CDVALUES 1 +const unsigned int cd_values[FILTER_NUM_CDVALUES] = { 0 }; +H5Z_filter_t filter_id; +unsigned int flags = 0; +size_t cd_nelmts = FILTER_NUM_CDVALUES; + +/* Utility Macros */ +#define STRINGIFY(type) #type + +/* Common defines for all tests */ +#define C_DATATYPE long +#define COMPOUND_C_DATATYPE cmpd_filtered_t +#define C_DATATYPE_STR(type) STRINGIFY(type) +#define HDF5_DATATYPE_NAME H5T_NATIVE_LONG + +#define GEN_DATA(i) INCREMENTAL_DATA(i) +#define INCREMENTAL_DATA(i) ((size_t) mpi_rank + i) /* Generates incremental test data */ + +/* For experimental purposes only, will cause tests to fail data verification phase - JTH */ +/* #define GEN_DATA(i) RANK_DATA(i) */ /* Given an index value i, generates test data based upon selected mode */ +#define RANK_DATA(i) (mpi_rank) /* Generates test data to visibly show which rank wrote to which parts of the dataset */ + +#ifdef DYNAMIC_FILTER +#define SET_FILTER(dcpl) H5Pset_filter(dcpl, filter_id, flags, FILTER_NUM_CDVALUES, cd_values) /* Test other filter in parallel */ +#else +#define SET_FILTER(dcpl) H5Pset_deflate(dcpl, 6) /* Test GZIP filter in parallel */ +#endif + +#define DIM0_SCALE_FACTOR 4 +#define DIM1_SCALE_FACTOR 2 + +/* Defines for the one-chunk filtered dataset test */ +#define ONE_CHUNK_FILTERED_DATASET_NAME "one_chunk_filtered_dataset" +#define ONE_CHUNK_FILTERED_DATASET_DIMS 2 +#define ONE_CHUNK_FILTERED_DATASET_NROWS (mpi_size * DIM0_SCALE_FACTOR) /* Must be an even multiple of the number of ranks to avoid issues */ +#define ONE_CHUNK_FILTERED_DATASET_NCOLS (mpi_size * DIM1_SCALE_FACTOR) /* Must be an even multiple of the number of ranks to avoid issues */ +#define ONE_CHUNK_FILTERED_DATASET_CH_NROWS ONE_CHUNK_FILTERED_DATASET_NROWS +#define ONE_CHUNK_FILTERED_DATASET_CH_NCOLS ONE_CHUNK_FILTERED_DATASET_NCOLS + +/* Defines for the unshared filtered chunks write test */ +#define UNSHARED_FILTERED_CHUNKS_DATASET_NAME "unshared_filtered_chunks" +#define UNSHARED_FILTERED_CHUNKS_DATASET_DIMS 2 +#define UNSHARED_FILTERED_CHUNKS_NROWS (mpi_size * DIM0_SCALE_FACTOR) +#define UNSHARED_FILTERED_CHUNKS_NCOLS (mpi_size * DIM1_SCALE_FACTOR) +#define UNSHARED_FILTERED_CHUNKS_CH_NROWS (UNSHARED_FILTERED_CHUNKS_NROWS / mpi_size) +#define UNSHARED_FILTERED_CHUNKS_CH_NCOLS (UNSHARED_FILTERED_CHUNKS_NCOLS / mpi_size) + +/* Defines for the shared filtered chunks write test */ +#define SHARED_FILTERED_CHUNKS_DATASET_NAME "shared_filtered_chunks" +#define SHARED_FILTERED_CHUNKS_DATASET_DIMS 2 +#define SHARED_FILTERED_CHUNKS_CH_NROWS (mpi_size) +#define SHARED_FILTERED_CHUNKS_CH_NCOLS (mpi_size) +#define SHARED_FILTERED_CHUNKS_NROWS (SHARED_FILTERED_CHUNKS_CH_NROWS * DIM0_SCALE_FACTOR) +#define SHARED_FILTERED_CHUNKS_NCOLS (SHARED_FILTERED_CHUNKS_CH_NCOLS * DIM1_SCALE_FACTOR) + +/* Defines for the filtered chunks write test where a process has no selection */ +#define SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_NAME "single_no_selection_filtered_chunks" +#define SINGLE_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS 2 +#define SINGLE_NO_SELECTION_FILTERED_CHUNKS_CH_NROWS (DIM0_SCALE_FACTOR) +#define SINGLE_NO_SELECTION_FILTERED_CHUNKS_CH_NCOLS (DIM1_SCALE_FACTOR) +#define SINGLE_NO_SELECTION_FILTERED_CHUNKS_NROWS (SINGLE_NO_SELECTION_FILTERED_CHUNKS_CH_NROWS * mpi_size) +#define SINGLE_NO_SELECTION_FILTERED_CHUNKS_NCOLS (SINGLE_NO_SELECTION_FILTERED_CHUNKS_CH_NCOLS * mpi_size) +#define SINGLE_NO_SELECTION_FILTERED_CHUNKS_NO_SELECT_PROC (mpi_size - 1) + +/* Defines for the filtered chunks write test where no process has a selection */ +#define ALL_NO_SELECTION_FILTERED_CHUNKS_DATASET_NAME "all_no_selection_filtered_chunks" +#define ALL_NO_SELECTION_FILTERED_CHUNKS_DATASET_DIMS 2 +#define ALL_NO_SELECTION_FILTERED_CHUNKS_CH_NROWS (DIM0_SCALE_FACTOR) +#define ALL_NO_SELECTION_FILTERED_CHUNKS_CH_NCOLS (DIM1_SCALE_FACTOR) +#define ALL_NO_SELECTION_FILTERED_CHUNKS_NROWS (ALL_NO_SELECTION_FILTERED_CHUNKS_CH_NROWS * mpi_size) +#define ALL_NO_SELECTION_FILTERED_CHUNKS_NCOLS (ALL_NO_SELECTION_FILTERED_CHUNKS_CH_NCOLS * mpi_size) + +/* Defines for the filtered chunks write test with a point selection */ +#define POINT_SELECTION_FILTERED_CHUNKS_DATASET_NAME "point_selection_filtered_chunks" +#define POINT_SELECTION_FILTERED_CHUNKS_DATASET_DIMS 2 +#define POINT_SELECTION_FILTERED_CHUNKS_CH_NROWS (DIM0_SCALE_FACTOR) +#define POINT_SELECTION_FILTERED_CHUNKS_CH_NCOLS (DIM1_SCALE_FACTOR) +#define POINT_SELECTION_FILTERED_CHUNKS_NROWS (POINT_SELECTION_FILTERED_CHUNKS_CH_NROWS * mpi_size) +#define POINT_SELECTION_FILTERED_CHUNKS_NCOLS (POINT_SELECTION_FILTERED_CHUNKS_CH_NCOLS * mpi_size) + +/* Defines for the filtered dataset interleaved write test */ +#define INTERLEAVED_WRITE_FILTERED_DATASET_NAME "interleaved_write_filtered_dataset" +#define INTERLEAVED_WRITE_FILTERED_DATASET_DIMS 2 +#define INTERLEAVED_WRITE_FILTERED_DATASET_CH_NROWS (mpi_size) +#define INTERLEAVED_WRITE_FILTERED_DATASET_CH_NCOLS (DIM1_SCALE_FACTOR) +#define INTERLEAVED_WRITE_FILTERED_DATASET_NROWS (INTERLEAVED_WRITE_FILTERED_DATASET_CH_NROWS * DIM0_SCALE_FACTOR) +#define INTERLEAVED_WRITE_FILTERED_DATASET_NCOLS (INTERLEAVED_WRITE_FILTERED_DATASET_CH_NCOLS * DIM1_SCALE_FACTOR) + +/* Defines for the 3D unshared filtered dataset separate page write test */ +#define UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_NAME "3D_unshared_filtered_chunks_separate_pages" +#define UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DATASET_DIMS 3 +#define UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_NROWS (mpi_size * DIM0_SCALE_FACTOR) +#define UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_NCOLS (mpi_size * DIM1_SCALE_FACTOR) +#define UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_DEPTH (mpi_size) +#define UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_CH_NROWS (UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_NROWS / mpi_size) +#define UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_CH_NCOLS (UNSHARED_FILTERED_CHUNKS_3D_SEP_PAGE_NCOLS / mpi_size) + +/* Defines for the 3D unshared filtered dataset same page write test */ +#define UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_NAME "3D_unshared_filtered_chunks_same_pages" +#define UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DATASET_DIMS 3 +#define UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_NROWS (mpi_size * DIM0_SCALE_FACTOR) +#define UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_NCOLS (mpi_size * DIM1_SCALE_FACTOR) +#define UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_DEPTH (mpi_size) +#define UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_CH_NROWS (UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_NROWS / mpi_size) +#define UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_CH_NCOLS (UNSHARED_FILTERED_CHUNKS_3D_SAME_PAGE_NCOLS / mpi_size) + +/* Defines for the 3d shared filtered dataset write test */ +#define SHARED_FILTERED_CHUNKS_3D_DATASET_NAME "3D_shared_filtered_chunks" +#define SHARED_FILTERED_CHUNKS_3D_DATASET_DIMS 3 +#define SHARED_FILTERED_CHUNKS_3D_CH_NROWS (mpi_size) +#define SHARED_FILTERED_CHUNKS_3D_CH_NCOLS (DIM1_SCALE_FACTOR) +#define SHARED_FILTERED_CHUNKS_3D_NROWS (SHARED_FILTERED_CHUNKS_3D_CH_NROWS * DIM0_SCALE_FACTOR) +#define SHARED_FILTERED_CHUNKS_3D_NCOLS (SHARED_FILTERED_CHUNKS_3D_CH_NCOLS * DIM1_SCALE_FACTOR) +#define SHARED_FILTERED_CHUNKS_3D_DEPTH (mpi_size) + +/* Struct type for the compound datatype filtered dataset tests */ +typedef struct { + short field1; + int field2; + long field3; + double field4; +} COMPOUND_C_DATATYPE; + +/* Defines for the compound datatype filtered dataset no conversion write test with unshared chunks */ +#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_DATASET_NAME "compound_unshared_filtered_chunks_no_conversion" +#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_DATASET_DIMS 2 +#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_NROWS 1 +#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_NCOLS mpi_size +#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_CH_NROWS 1 +#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_CH_NCOLS 1 +#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_ENTRIES_PER_PROC (COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_UNSHARED_NCOLS / mpi_size) + +/* Defines for the compound datatype filtered dataset no conversion write test with shared chunks */ +#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_DATASET_NAME "compound_shared_filtered_chunks_no_conversion" +#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_DATASET_DIMS 2 +#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_NROWS mpi_size +#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_NCOLS mpi_size +#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_CH_NROWS mpi_size +#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_CH_NCOLS 1 +#define COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_ENTRIES_PER_PROC COMPOUND_FILTERED_CHUNKS_NO_CONVERSION_SHARED_NCOLS + +/* Defines for the compound datatype filtered dataset type conversion write test with unshared chunks */ +#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_DATASET_NAME "compound_unshared_filtered_chunks_type_conversion" +#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_DATASET_DIMS 2 +#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_NROWS 1 +#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_NCOLS mpi_size +#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_CH_NROWS 1 +#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_CH_NCOLS 1 +#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_ENTRIES_PER_PROC (COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_UNSHARED_NCOLS / mpi_size) + +/* Defines for the compound datatype filtered dataset type conversion write test with shared chunks */ +#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_DATASET_NAME "compound_shared_filtered_chunks_type_conversion" +#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_DATASET_DIMS 2 +#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_NROWS mpi_size +#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_NCOLS mpi_size +#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_CH_NROWS mpi_size +#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_CH_NCOLS 1 +#define COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_ENTRIES_PER_PROC COMPOUND_FILTERED_CHUNKS_TYPE_CONVERSION_SHARED_NCOLS + +/* Defines for the write file serially/read in parallel test */ +#define WRITE_SERIAL_READ_PARALLEL_DATASET_NAME "write_serial_read_parallel" +#define WRITE_SERIAL_READ_PARALLEL_DATASET_DIMS 3 +#define WRITE_SERIAL_READ_PARALLEL_NROWS (mpi_size * DIM0_SCALE_FACTOR) +#define WRITE_SERIAL_READ_PARALLEL_NCOLS (mpi_size * DIM1_SCALE_FACTOR) +#define WRITE_SERIAL_READ_PARALLEL_DEPTH (mpi_size) +#define WRITE_SERIAL_READ_PARALLEL_CH_NROWS (WRITE_SERIAL_READ_PARALLEL_NROWS / mpi_size) +#define WRITE_SERIAL_READ_PARALLEL_CH_NCOLS (WRITE_SERIAL_READ_PARALLEL_NCOLS / mpi_size) + +/* Defines for the write file in parallel/read serially test */ +#define WRITE_PARALLEL_READ_SERIAL_DATASET_NAME "write_parallel_read_serial" +#define WRITE_PARALLEL_READ_SERIAL_DATASET_DIMS 3 +#define WRITE_PARALLEL_READ_SERIAL_NROWS (mpi_size * DIM0_SCALE_FACTOR) +#define WRITE_PARALLEL_READ_SERIAL_NCOLS (mpi_size * DIM1_SCALE_FACTOR) +#define WRITE_PARALLEL_READ_SERIAL_DEPTH (mpi_size) +#define WRITE_PARALLEL_READ_SERIAL_CH_NROWS (WRITE_PARALLEL_READ_SERIAL_NROWS / mpi_size) +#define WRITE_PARALLEL_READ_SERIAL_CH_NCOLS (WRITE_PARALLEL_READ_SERIAL_NCOLS / mpi_size) + +#endif /* TEST_PARALLEL_FILTERS_H_ */ |