diff options
author | Jordan Henderson <jhenderson@hdfgroup.org> | 2017-01-13 14:14:38 (GMT) |
---|---|---|
committer | Jordan Henderson <jhenderson@hdfgroup.org> | 2017-01-13 14:14:38 (GMT) |
commit | a3605cbdeb5f79c5753b848dfef1706988ba10e7 (patch) | |
tree | a27678d0e33455a6c165425704122f3091c853cd | |
parent | 089afc48561ba8838d6a515c6b00fc6f7032ca13 (diff) | |
download | hdf5-a3605cbdeb5f79c5753b848dfef1706988ba10e7.zip hdf5-a3605cbdeb5f79c5753b848dfef1706988ba10e7.tar.gz hdf5-a3605cbdeb5f79c5753b848dfef1706988ba10e7.tar.bz2 |
Switch working branch from master to develop
-rw-r--r-- | src/H5Dchunk.c | 5 | ||||
-rw-r--r-- | src/H5Dint.c | 4 | ||||
-rw-r--r-- | src/H5Dio.c | 5 | ||||
-rw-r--r-- | src/H5Dmpio.c | 1400 | ||||
-rw-r--r-- | src/H5Dpkg.h | 5 | ||||
-rw-r--r-- | src/H5Dscatgath.c | 7 |
6 files changed, 1391 insertions, 35 deletions
diff --git a/src/H5Dchunk.c b/src/H5Dchunk.c index 7a646af..8623325 100644 --- a/src/H5Dchunk.c +++ b/src/H5Dchunk.c @@ -297,9 +297,6 @@ static herr_t H5D__chunk_unlock(const H5D_io_info_t *io_info, static herr_t H5D__chunk_cache_prune(const H5D_t *dset, hid_t dxpl_id, const H5D_dxpl_cache_t *dxpl_cache, size_t size); static herr_t H5D__chunk_prune_fill(H5D_chunk_it_ud1_t *udata, hbool_t new_unfilt_chunk); -static herr_t H5D__chunk_file_alloc(const H5D_chk_idx_info_t *idx_info, - const H5F_block_t *old_chunk, H5F_block_t *new_chunk, hbool_t *need_insert, - hsize_t scaled[]); #ifdef H5_HAVE_PARALLEL static herr_t H5D__chunk_collective_fill(const H5D_t *dset, hid_t dxpl_id, H5D_chunk_coll_info_t *chunk_info, size_t chunk_size, const void *fill_buf); @@ -6240,7 +6237,7 @@ done: * *------------------------------------------------------------------------- */ -static herr_t +herr_t H5D__chunk_file_alloc(const H5D_chk_idx_info_t *idx_info, const H5F_block_t *old_chunk, H5F_block_t *new_chunk, hbool_t *need_insert, hsize_t scaled[]) { diff --git a/src/H5Dint.c b/src/H5Dint.c index 5a11581..dac12df 100644 --- a/src/H5Dint.c +++ b/src/H5Dint.c @@ -1210,10 +1210,6 @@ H5D__create(H5F_t *file, hid_t type_id, const H5S_t *space, hid_t dcpl_id, /* Don't allow compact datasets to allocate space later */ if(layout->type == H5D_COMPACT && fill->alloc_time != H5D_ALLOC_TIME_EARLY) HGOTO_ERROR(H5E_DATASET, H5E_BADVALUE, NULL, "compact dataset must have early space allocation") - - /* If MPI VFD is used, no filter support yet. */ - if(H5F_HAS_FEATURE(file, H5FD_FEAT_HAS_MPI) && pline->nused > 0) - HGOTO_ERROR(H5E_DATASET, H5E_UNSUPPORTED, NULL, "Parallel I/O does not support filters yet") } /* end if */ /* Set the latest version of the layout, pline & fill messages, if requested */ diff --git a/src/H5Dio.c b/src/H5Dio.c index f5087da..6a4e6ec 100644 --- a/src/H5Dio.c +++ b/src/H5Dio.c @@ -666,11 +666,6 @@ H5D__write(H5D_t *dataset, hid_t mem_type_id, const H5S_t *mem_space, if(H5T_get_class(type_info.mem_type, TRUE) == H5T_REFERENCE && H5T_get_ref_type(type_info.mem_type) == H5R_DATASET_REGION) HGOTO_ERROR(H5E_DATASET, H5E_UNSUPPORTED, FAIL, "Parallel IO does not support writing region reference datatypes yet") - - /* Can't write to chunked datasets with filters, in parallel */ - if(dataset->shared->layout.type == H5D_CHUNKED && - dataset->shared->dcpl_cache.pline.nused > 0) - HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "cannot write to chunked storage with filters in parallel") } /* end if */ else { /* Collective access is not permissible without a MPI based VFD */ diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c index 441cc96..cfcc5c4 100644 --- a/src/H5Dmpio.c +++ b/src/H5Dmpio.c @@ -86,6 +86,11 @@ #define H5D_CHUNK_SELECT_IRREG 2 #define H5D_CHUNK_SELECT_NONE 0 +#define PARALLEL_COMPRESS_DEBUG + +#ifdef PARALLEL_COMPRESS_DEBUG +FILE *debug_file; +#endif /******************/ /* Local Typedefs */ @@ -96,6 +101,14 @@ typedef struct H5D_chunk_addr_info_t { H5D_chunk_info_t chunk_info; } H5D_chunk_addr_info_t; +typedef struct H5D_filtered_collective_io_info_t { + H5D_chunk_info_t chunk_info; + H5F_block_t old_chunk; + H5F_block_t new_chunk; + int num_writers; + int owner; + void *buf; +} H5D_filtered_collective_io_info_t; /********************/ /* Local Prototypes */ @@ -105,9 +118,15 @@ static herr_t H5D__chunk_collective_io(H5D_io_info_t *io_info, static herr_t H5D__multi_chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, H5D_chunk_map_t *fm, H5P_genplist_t *dx_plist); +static herr_t H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, + const H5D_type_info_t *type_info, H5D_chunk_map_t *fm, + H5P_genplist_t *dx_plist); static herr_t H5D__link_chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, H5D_chunk_map_t *fm, int sum_chunk, H5P_genplist_t *dx_plist); +static herr_t H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, + const H5D_type_info_t *type_info, H5D_chunk_map_t *fm, + H5P_genplist_t *dx_plist); static herr_t H5D__inter_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, const H5S_t *file_space, const H5S_t *mem_space); @@ -126,6 +145,21 @@ static herr_t H5D__mpio_get_min_chunk(const H5D_io_info_t *io_info, const H5D_chunk_map_t *fm, int *min_chunkf); static herr_t H5D__mpio_get_sum_chunk(const H5D_io_info_t *io_info, const H5D_chunk_map_t *fm, int *sum_chunkf); +static herr_t H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, + const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm, + H5D_filtered_collective_io_info_t **chunk_list, size_t *num_entries, + size_t **_num_chunks_selected_array); +static herr_t H5D__mpio_array_gather(H5D_io_info_t *io_info, void *local_array, + size_t array_num_entries, size_t array_entry_size, + void **gathered_array, size_t *gathered_array_num_entries, + int (*sort_func)(const void *, const void *)); +static herr_t H5D__mpio_filtered_collective_write_type( + H5D_filtered_collective_io_info_t *chunk_list, size_t num_entries, + MPI_Datatype *new_mem_type, hbool_t *mem_type_derived, + MPI_Datatype *new_file_type, hbool_t *file_type_derived); +static int H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2); +static int H5D__cmp_filtered_collective_io_entry(const void *filtered_collective_io_entry1, + const void *filtered_collective_io_entry2); /*********************/ @@ -208,11 +242,6 @@ H5D__mpio_opt_possible(const H5D_io_info_t *io_info, const H5S_t *file_space, * use collective IO will defer until each chunk IO is reached. */ - /* Don't allow collective operations if filters need to be applied */ - if(io_info->dset->shared->layout.type == H5D_CHUNKED && - io_info->dset->shared->dcpl_cache.pline.nused > 0) - local_cause |= H5D_MPIO_FILTERS; - /* Check for independent I/O */ if(local_cause & H5D_MPIO_SET_INDEPENDENT) global_cause = local_cause; @@ -302,6 +331,105 @@ done: /*------------------------------------------------------------------------- + * Function: H5D__mpio_array_gather + * + * Purpose: Given arrays by MPI ranks, gathers them into a single large + * array which is then distributed back to all ranks. If the + * sort_func argument is specified, the list is sorted before + * being returned. + * + * Return: Non-negative on success/Negative on failure + * + * Programmer: Jordan Henderson + * Friday, January 6th, 2016 + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__mpio_array_gather(H5D_io_info_t *io_info, void *local_array, + size_t array_num_entries, size_t array_entry_size, + void **_gathered_array, size_t *_gathered_array_num_entries, + int (*sort_func)(const void *, const void *)) +{ + size_t gathered_array_num_entries = 0; + size_t i; + void *gathered_array = NULL; + int *receive_counts_array = NULL; + int *displacements_array = NULL; + int mpi_code, mpi_size; + int sendcount; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + HDassert(io_info); + HDassert(local_array); + HDassert(_gathered_array); + HDassert(_gathered_array_num_entries); + + if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0) + HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size") + + if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&array_num_entries, &gathered_array_num_entries, 1, MPI_INT, MPI_SUM, io_info->comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code) + + if (NULL == (gathered_array = H5MM_malloc(gathered_array_num_entries * array_entry_size))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate total gathered array") + + if (NULL == (receive_counts_array = (int *) H5MM_malloc((size_t) mpi_size * sizeof(*receive_counts_array)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive counts array") + + if (NULL == (displacements_array = (int *) H5MM_malloc((size_t) mpi_size * sizeof(*displacements_array)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate displacements array") + + if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&array_num_entries, 1, MPI_INT, receive_counts_array, 1, MPI_INT, io_info->comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code) + + /* Multiply each receive count by the size of the array entry, + * since the data is sent in a count of bytes */ + /* XXX: Check to make sure array_entry_size doesn't overflow an int */ + for (i = 0; i < (size_t) mpi_size; i++) + receive_counts_array[i] *= array_entry_size; + + /* Set receive buffer offsets for MPI_Allgatherv */ + displacements_array[0] = 0; + for (i = 1; i < (size_t) mpi_size; i++) + displacements_array[i] = displacements_array[i - 1] + receive_counts_array[i - 1]; + + H5_CHECKED_ASSIGN(sendcount, int, array_num_entries * array_entry_size, size_t); + if (MPI_SUCCESS != (mpi_code = MPI_Allgatherv(local_array, sendcount, MPI_BYTE, + gathered_array, receive_counts_array, displacements_array, MPI_BYTE, io_info->comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Allgatherv failed", mpi_code) + + if (sort_func) HDqsort(gathered_array, gathered_array_num_entries, array_entry_size, sort_func); + + *_gathered_array = gathered_array; + *_gathered_array_num_entries = gathered_array_num_entries; + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, " Contents of gathered array:\n"); + HDfprintf(debug_file, "------------------------------\n"); + for (size_t j = 0; j < (size_t) gathered_array_num_entries; j++) { + HDfprintf(debug_file, "| Chunk Entry %zd:\n", j); + HDfprintf(debug_file, "| - Chunk Address: %a\n", ((H5D_filtered_collective_io_info_t *) gathered_array)[j].old_chunk.offset); + HDfprintf(debug_file, "| - Chunk Length: %zd\n", ((H5D_filtered_collective_io_info_t *) gathered_array)[j].old_chunk.length); + HDfprintf(debug_file, "| - Chunk Owner: %d\n", ((H5D_filtered_collective_io_info_t *) gathered_array)[j].owner); + HDfprintf(debug_file, "| - Address of mspace: %x\n", ((H5D_filtered_collective_io_info_t *) gathered_array)[j].chunk_info.mspace); + } + HDfprintf(debug_file, "------------------------------\n\n"); +#endif + +done: + if (receive_counts_array) + H5MM_free(receive_counts_array); + if (displacements_array) + H5MM_free(displacements_array); + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__filtered_collective_io_info_arraygather() */ + + +/*------------------------------------------------------------------------- * Function: H5D__ioinfo_xfer_mode * * Purpose: Switch to between collective & independent MPI I/O @@ -441,6 +569,9 @@ H5D__mpio_get_sum_chunk(const H5D_io_info_t *io_info, const H5D_chunk_map_t *fm, H5_CHECKED_ASSIGN(num_chunkf, int, ori_num_chunkf, size_t); /* Determine the summation of number of chunks for all processes */ + /* XXX: In the case that more than one process is writing to the + * same chunk, the summation of H5SL_count calls is incorrect. - JTH + */ if(MPI_SUCCESS != (mpi_code = MPI_Allreduce(&num_chunkf, sum_chunkf, 1, MPI_INT, MPI_SUM, io_info->comm))) HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code) @@ -683,19 +814,31 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf #endif /* step 2: Go ahead to do IO.*/ - if(H5D_ONE_LINK_CHUNK_IO == io_option || H5D_ONE_LINK_CHUNK_IO_MORE_OPT == io_option) { - if(H5D__link_chunk_collective_io(io_info, type_info, fm, sum_chunk, dx_plist) < 0) - HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish linked chunk MPI-IO") - } /* end if */ - /* direct request to multi-chunk-io */ - else if(H5D_MULTI_CHUNK_IO == io_option) { - if(H5D__multi_chunk_collective_io(io_info, type_info, fm, dx_plist) < 0) - HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple chunk MPI-IO") - } /* end if */ - else { /* multiple chunk IO via threshold */ - if(H5D__multi_chunk_collective_io(io_info, type_info, fm, dx_plist) < 0) - HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple chunk MPI-IO") - } /* end else */ + switch (io_option) { + case H5D_ONE_LINK_CHUNK_IO: + case H5D_ONE_LINK_CHUNK_IO_MORE_OPT: + if(io_info->dset->shared->dcpl_cache.pline.nused > 0) { + if(H5D__link_chunk_filtered_collective_io(io_info, type_info, fm, dx_plist) < 0) + HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish filtered linked chunk MPI-IO") + } + else { + if(H5D__link_chunk_collective_io(io_info, type_info, fm, sum_chunk, dx_plist) < 0) + HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish linked chunk MPI-IO") + } + break; + + case H5D_MULTI_CHUNK_IO: /* direct request to do multi-chunk IO */ + default: /* multiple chunk IO via threshold */ + if(io_info->dset->shared->dcpl_cache.pline.nused > 0) { + if(H5D__multi_chunk_filtered_collective_io(io_info, type_info, fm, dx_plist) < 0) + HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple filtered chunk MPI-IO") + } + else { + if(H5D__multi_chunk_collective_io(io_info, type_info, fm, dx_plist) < 0) + HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple chunk MPI-IO") + } + break; + } done: FUNC_LEAVE_NOAPI(ret_value) @@ -755,6 +898,14 @@ H5D__chunk_collective_write(H5D_io_info_t *io_info, const H5D_type_info_t *type_ FUNC_ENTER_PACKAGE +#ifdef PARALLEL_COMPRESS_DEBUG + char name[10]; + + snprintf(name, 10, "out - %d", H5F_mpi_get_rank(io_info->dset->oloc.file)); + + debug_file = fopen(name, "w"); +#endif + /* Call generic selection operation */ if(H5D__chunk_collective_io(io_info, type_info, fm) < 0) HGOTO_ERROR(H5E_DATASPACE, H5E_WRITEERROR, FAIL, "write error") @@ -1095,6 +1246,408 @@ if(H5DEBUG(D)) /*------------------------------------------------------------------------- + * Function: H5D__link_chunk_filtered_collective_io + * + * Purpose: Routine for one collective IO with one MPI derived datatype + * to link with all filtered chunks + * + * 1. Construct a list of selected chunks in the collective IO + * operation + * A. If any chunk is being written to by more than 1 + * process, the process writing the most data to the + * chunk will take ownership of the chunk (ties are + * broken randomly) + * 2. If the operation is a write operation + * A. Loop through each chunk in the operation + * I. Determine if this is a full overwrite of the chunk + * a) If it is not, read the chunk from file and + * pass the chunk through the filter pipeline in + * reverse order (Unfilter the chunk) + * b) Else copy the owning process' modification + * data into a buffer of size large enough to + * completely overwrite the chunk + * II. Receive any modification data from other + * processes and update the chunk data with these + * modifications + * III. Filter the chunk + * B. Contribute the modified chunks to an array gathered + * by all processes which contains the new sizes of + * every chunk modified in the collective IO operation + * C. All processes collectively re-allocate each chunk + * from the gathered array with their new sizes after + * the filter operation + * D. Create an MPI derived type for memory and file to + * write out the process' selected chunks to the file + * 3. If the operation is a read operation + * A. Loop through each chunk in the operation + * I. + * 3. Proceeed with the collective IO operation + * 4. If the collective operation was a write operation, + * all processes collectively re-insert each modified + * chunk from the gathered array into the chunk index + * + * Return: Non-negative on success/Negative on failure + * + * Programmer: Jordan Henderson + * Friday, Nov. 4th, 2016 + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, + H5D_chunk_map_t *fm, H5P_genplist_t *dx_plist) +{ + H5D_filtered_collective_io_info_t *chunk_list = NULL; + H5D_filtered_collective_io_info_t *total_array = NULL; + H5D_mpio_actual_chunk_opt_mode_t actual_chunk_opt_mode = H5D_MPIO_LINK_CHUNK; + H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_CHUNK_COLLECTIVE; + H5D_chk_idx_info_t index_info; + H5S_sel_iter_t *mem_iter = NULL; + H5D_storage_t ctg_store; + MPI_Datatype mem_type; + MPI_Datatype file_type; + hbool_t mem_type_is_derived = FALSE; + hbool_t file_type_is_derived = FALSE; + hbool_t mem_iter_init = FALSE; + hsize_t mpi_buf_count; /* Number of MPI types */ + haddr_t *total_chunk_addr_array = NULL; + size_t chunk_list_num_entries; + size_t total_array_num_entries; + size_t *num_chunks_selected_array = NULL; + size_t i; + int mpi_rank, mpi_size, mpi_code; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + /* Obtain the current rank of the process and the number of processes */ + if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0) + HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank") + if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0) + HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size") + + /* Set the actual-chunk-opt-mode property. */ + if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_CHUNK_OPT_MODE_NAME, &actual_chunk_opt_mode) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual chunk opt mode property") + + /* Set the actual-io-mode property. + * Link chunk I/O does not break to independent, so can set right away */ + if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_IO_MODE_NAME, &actual_io_mode) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual io mode property") + + /* Construct chunked index info */ + index_info.f = io_info->dset->oloc.file; + index_info.dxpl_id = io_info->md_dxpl_id; + index_info.pline = &(io_info->dset->shared->dcpl_cache.pline); + index_info.layout = &(io_info->dset->shared->layout.u.chunk); + index_info.storage = &(io_info->dset->shared->layout.storage.u.chunk); + + /* Build a list of selected chunks in the collective io operation */ + /* XXX: Not sure about correct minor error code */ + if (H5D__construct_filtered_io_info_list(io_info, type_info, fm, &chunk_list, &chunk_list_num_entries, &num_chunks_selected_array) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't construct filtered I/O info list") + + /* If this process has any selection at all in the dataspace, create + * a MPI type for the I/O operation. Otherwise, the process contributes + * with a none type. */ + /* XXX: Processes with no selection will have to be re-worked, as they + * still have to do the re-allocation in the file. Get rid of else case + * and instead change mpi_buf_count to 0 if they have no selection + */ + if (H5SL_count(fm->sel_chunks)) { +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "Incoming messages from other processes:\n"); + HDfprintf(debug_file, "-----------------------------------------\n"); + for (size_t j = 0; j < chunk_list_num_entries; j++) { + HDfprintf(debug_file, "| Owner of chunk at address %a is expecting messages from %d other processes.\n", + chunk_list[j].old_chunk.offset, chunk_list[j].num_writers); + } + HDfprintf(debug_file, "-----------------------------------------\n\n"); +#endif + + if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(*mem_iter)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator") + + if (io_info->op_type == H5D_IO_OP_WRITE) { /* Filtered collective write */ +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "Processing chunks:\n"); + HDfprintf(debug_file, "---------------------------------------------------\n"); +#endif + + /* Iterate through all the chunks in the collective write operation, + * updating each chunk with the data modifications from other processes, + * then re-filtering the chunk. */ + for (i = 0; i < chunk_list_num_entries; i++) { + unsigned filter_mask = 0; + hbool_t full_overwrite = TRUE; + size_t buf_size; + hssize_t iter_nelmts; + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "| Chunk at address %a:\n", chunk_list[i].old_chunk.offset); +#endif + + /* If this is a full overwrite of this chunk, enough memory must be allocated for + * the size of the unfiltered chunk. Otherwise, enough memory must be allocated + * to read the filtered chunk from the file. */ + /* XXX: Return value of macro should be checked instead */ + buf_size = (full_overwrite) ? (hsize_t) H5S_GET_EXTENT_NPOINTS(chunk_list[i].chunk_info.fspace) * type_info->src_type_size + : chunk_list[i].old_chunk.length; + chunk_list[i].new_chunk.length = buf_size; + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "| - Allocing %zd bytes for %s chunk overwrite buffer.\n", buf_size, + full_overwrite ? "full" : "non-full"); +#endif + + if (NULL == (chunk_list[i].buf = H5MM_malloc(buf_size))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification buffer") + + /* Initialize iterator for memory selection */ + if (H5S_select_iter_init(mem_iter, chunk_list[i].chunk_info.mspace, type_info->dst_type_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") + mem_iter_init = TRUE; + + /* Owner of this chunk, receive modification data from other processes */ + + if (!full_overwrite) { + /* Read the chunk from the file */ + if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_list[i].old_chunk.offset, + buf_size, H5AC_rawdata_dxpl_id, chunk_list[i].buf) < 0) + HGOTO_ERROR(H5E_IO, H5E_READERROR, H5_ITER_ERROR, "unable to read raw data chunk") + + /* Unfilter the chunk before modifying it */ + if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, &filter_mask, + io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb, + (size_t *) &chunk_list[i].new_chunk.length, &buf_size, &chunk_list[i].buf) < 0) + HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "couldn't unfilter chunk for modifying") + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "| - After decompression: Nbytes=%zd; Buf_size=%zd.\n", chunk_list[i].new_chunk.length, buf_size); + + HDfprintf(debug_file, "| - Read buf:\n| - ["); + for (size_t j = 0; j < chunk_list[i].new_chunk.length / type_info->src_type_size; j++) { + if (j > 0) HDfprintf(debug_file, ", "); + HDfprintf(debug_file, "%lld", ((long *) chunk_list[i].buf)[j]); + } + HDfprintf(debug_file, "]\n|\n"); +#endif + } /* end if */ + + /* Update the chunk data with the modifications from the current (owning) process */ + if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_list[i].chunk_info.mspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + + if (!H5D__gather_mem(io_info->u.wbuf, chunk_list[i].chunk_info.mspace, mem_iter, + (size_t) iter_nelmts, io_info->dxpl_cache, chunk_list[i].buf)) + HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't gather from write buffer") + + /* Update the chunk data with any modifications from other processes */ + + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "| - Write Buffer:\n"); + HDfprintf(debug_file, "| - ["); + for (size_t j = 0; j < chunk_list[i].new_chunk.length / type_info->src_type_size; j++) { + if (j > 0) HDfprintf(debug_file, ", "); + HDfprintf(debug_file, "%lld", ((long *) chunk_list[i].buf)[j]); + } + HDfprintf(debug_file, "]\n|\n"); + + HDfprintf(debug_file, "| - Write buf is at address %a.\n|\n", chunk_list[i].buf); + + HDfprintf(debug_file, "| - About to filter %zd bytes in buffer of size %zd.\n|\n", chunk_list[i].new_chunk.length, buf_size); +#endif + + /* Filter the chunk */ + if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, &filter_mask, + io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb, + (size_t *) &chunk_list[i].new_chunk.length, &buf_size, &chunk_list[i].buf) < 0) + HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "output pipeline failed") + +#if H5_SIZEOF_SIZE_T > 4 + /* Check for the chunk expanding too much to encode in a 32-bit value */ + if (chunk_list[i].new_chunk.length > ((size_t) 0xffffffff)) + HGOTO_ERROR(H5E_DATASET, H5E_BADRANGE, FAIL, "chunk too large for 32-bit length") +#endif + + if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") + } /* end for */ + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "---------------------------------------------------\n\n"); +#endif + + /* Gather the new chunk sizes to all processes for a collective reallocation + * of the chunks in the file */ + if (H5D__mpio_array_gather(io_info, chunk_list, chunk_list_num_entries, sizeof(*chunk_list), + (void **) &total_array, &total_array_num_entries, NULL) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't gather new chunk sizes") + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "Reallocing chunks:\n"); + HDfprintf(debug_file, "------------------------------\n"); +#endif + + /* Collectively re-allocate the modified chunks (from each process) in the file */ + for (i = 0; i < total_array_num_entries; i++) { + hbool_t insert; + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "| Reallocing chunk at address %a with new length of %zd.\n", total_array[i].new_chunk.offset, total_array[i].new_chunk.length); +#endif + + if (H5D__chunk_file_alloc(&index_info, &total_array[i].old_chunk, &total_array[i].new_chunk, + &insert, total_array[i].chunk_info.scaled) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk") + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "| - Chunk now at address %a.\n|\n", total_array[i].new_chunk); +#endif + } /* end for */ + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "------------------------------\n\n"); +#endif + + /* XXX: During the collective re-allocation of chunks in the file, the record for each + * chunk is only update in the total array, not in the local copy of chunks on each + * process. However, each process needs the updated chunk records so that they can create + * a MPI type for the collective write that will write to the chunk's new locations instead + * of the old ones. This ugly hack seems to be the best solution to copy the information + * back to the local array and avoid having to modify the collective write type function + * in an ugly way so that it will accept the total array instead of the local array. + * This works correctly because the array gather function guarantees that the chunk + * data in the total array is ordered in blocks by rank. + */ + { + size_t offset; + + offset = 0; + for (i = 0; i < (size_t) mpi_rank; i++) + offset += num_chunks_selected_array[i] * sizeof(H5D_filtered_collective_io_info_t); + + HDmemcpy(chunk_list, &((char *) total_array)[offset], num_chunks_selected_array[mpi_rank] * sizeof(H5D_filtered_collective_io_info_t)); + } + + /* Create single MPI type encompassing each selection in the dataspace */ + if (H5D__mpio_filtered_collective_write_type(chunk_list, chunk_list_num_entries, + &mem_type, &mem_type_is_derived, &file_type, &file_type_is_derived) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't create MPI link chunk I/O type") + + /* Override the write buffer to point to the address of the first + * chunk data buffer */ + io_info->u.wbuf = chunk_list[0].buf; + } /* end if */ + else { /* Filtered collective read */ + + } /* end else */ + + /* We have a single, complicated MPI datatype for both memory & file */ + mpi_buf_count = (hsize_t) 1; + } + else { /* No selection at all for this process, contribute none type */ + size_t dataset_num_chunks; + + /* Retrieve total # of chunks in dataset */ + H5_CHECKED_ASSIGN(dataset_num_chunks, size_t, fm->layout->u.chunk.nchunks, hsize_t); + + /* Allocate chunking information */ + if (NULL == (total_chunk_addr_array = (haddr_t *) H5MM_malloc(sizeof(haddr_t) * dataset_num_chunks))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate total chunk address arraybuffer") + + /* Retrieve chunk address map */ + if (H5D__chunk_addrmap(io_info, total_chunk_addr_array) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get chunk address") + + /* Get chunk with lowest address */ + ctg_store.contig.dset_addr = HADDR_MAX; + for (i = 0; i < dataset_num_chunks; i++) + if (total_chunk_addr_array[i] < ctg_store.contig.dset_addr) + ctg_store.contig.dset_addr = total_chunk_addr_array[i]; + HDassert(ctg_store.contig.dset_addr != HADDR_MAX); + + /* Set the MPI datatype */ + file_type = MPI_BYTE; + mem_type = MPI_BYTE; + + /* No chunks selected for this process */ + mpi_buf_count = (hsize_t) 0; + } /* end else */ + + /* Set up the base storage address for this operation */ + ctg_store.contig.dset_addr = 0; + io_info->store = &ctg_store; + + /* Perform I/O */ + if (H5D__final_collective_io(io_info, type_info, mpi_buf_count, &file_type, &mem_type) < 0) + HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish MPI-IO") + + /* Collectively insert each chunk into the chunk index if this + * is a filtered collective write */ + if (io_info->op_type == H5D_IO_OP_WRITE) { + H5D_chunk_ud_t udata; + + /* Set up chunk information for insertion to chunk index */ + udata.common.layout = index_info.layout; + udata.common.storage = index_info.storage; + udata.filter_mask = 0; + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "Reinserting chunks into chunk index.\n"); + HDfprintf(debug_file, "---------------------------------------\n"); +#endif + + for (i = 0; i < total_array_num_entries; i++) { + udata.chunk_block = total_array[i].new_chunk; + udata.common.scaled = total_array[i].chunk_info.scaled; + + if ((index_info.storage->ops->insert)(&index_info, &udata, io_info->dset) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINSERT, FAIL, "unable to insert chunk address into index") + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "| Successfully inserted chunk at address %a into the chunk index.\n", udata.chunk_block.offset); +#endif + } /* end for */ + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "---------------------------------------\n"); +#endif + } /* end if */ + +done: + /* Free resources used by a process which had no selection at all */ + if (total_chunk_addr_array) + H5MM_free(total_chunk_addr_array); + + /* Free resources used by a process which had some selection */ + if (chunk_list) { + for (i = 0; i < chunk_list_num_entries; i++) + if (chunk_list[i].buf) + H5MM_free(chunk_list[i].buf); + + H5MM_free(chunk_list); + } + + if (num_chunks_selected_array) + H5MM_free(num_chunks_selected_array); + if (total_array) + H5MM_free(total_array); + if (mem_iter) + H5MM_free(mem_iter); + + /* Free the MPI buf and file types, if they were derived */ + if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__link_chunk_filtered_collective_io() */ + + +/*------------------------------------------------------------------------- * Function: H5D__multi_chunk_collective_io * * Purpose: To do IO per chunk according to IO mode(collective/independent/none) @@ -1308,6 +1861,543 @@ done: /*------------------------------------------------------------------------- + * Function: H5D__multi_chunk_filtered_collective_io + * + * Purpose: To do filtered collective IO per chunk to save on memory, + * as opposed to collective IO of every chunk at once + * + * XXX: Add read operation description + * + * 1. Construct a list of selected chunks in the collective IO + * operation + * A. If any chunk is being written to by more than 1 + * process, the process writing the most data to the + * chunk will take ownership of the chunk (ties are + * broken randomly) + * 2. Loop through each chunk in the operation + * A. If the operation is a write operation + * I. Determine if this is a full overwrite of the chunk + * a) If it is not, read the chunk from file and + * pass the chunk through the filter pipeline in + * reverse order (Unfilter the chunk) + * b) Else copy the owning process' modification + * data into a buffer of size large enough to + * completely overwrite the chunk + * II. Receive any modification data from other + * processes and update the chunk data with these + * modifications + * III. Filter the chunk + * IV. Contribute the chunk to an array gathered by + * all processes which contains every chunk + * modified in this iteration (up to one chunk + * per process, some processes may not have a + * selection/may have less chunks to work on than + * other processes) + * II. All processes collectively re-allocate each + * chunk from the gathered array with their new + * sizes after the filter operation + * IV. Proceed with the collective write operation + * V. All processes collectively re-insert each + * chunk from the gathered array into the chunk + * index + * B. If the operation is a read operation + * I. + * + * Return: Non-negative on success/Negative on failure + * + * Programmer: Jordan Henderson + * Friday, Dec. 2nd, 2016 + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, + H5D_chunk_map_t *fm, H5P_genplist_t *dx_plist) +{ + H5D_filtered_collective_io_info_t *chunk_list = NULL; + H5D_filtered_collective_io_info_t *gathered_array = NULL; + H5D_mpio_actual_chunk_opt_mode_t actual_chunk_opt_mode = H5D_MPIO_MULTI_CHUNK; /* actual chunk optimization mode */ + H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_NO_COLLECTIVE; /* Local variable for tracking the I/O mode used. */ + H5FD_mpio_collective_opt_t last_coll_opt_mode = H5FD_MPIO_COLLECTIVE_IO; /* Last parallel transfer with independent IO or collective IO with this mode */ + H5FD_mpio_xfer_t last_xfer_mode = H5FD_MPIO_COLLECTIVE; /* Last parallel transfer for this request (H5D_XFER_IO_XFER_MODE_NAME) */ + H5D_chk_idx_info_t index_info; + H5S_sel_iter_t *mem_iter = NULL; + H5D_storage_t store; /* union of EFL and chunk pointer in file space */ + H5D_io_info_t ctg_io_info; /* Contiguous I/O info object */ + H5D_storage_t ctg_store; /* Chunk storage information as contiguous dataset */ + MPI_Datatype file_type; + MPI_Datatype mem_type; + uint8_t *chunk_io_option = NULL; + haddr_t *chunk_addr = NULL; + hbool_t file_type_is_derived = FALSE; + hbool_t mem_type_is_derived = FALSE; + hbool_t mem_iter_init = FALSE; + hsize_t mpi_buf_count; + size_t total_chunk; /* Total # of chunks in dataset */ + size_t chunk_list_num_entries; + size_t gathered_array_num_entries; + size_t *num_chunks_selected_array = NULL; + size_t i, j; /* Local index variable */ + int mpi_rank, mpi_size, mpi_code; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + /* Obtain the current rank of the process and the number of processes */ + if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0) + HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank") + if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0) + HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size") + + /* Set the actual chunk opt mode property */ + if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_CHUNK_OPT_MODE_NAME, &actual_chunk_opt_mode) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual chunk opt mode property") + + /* Construct chunked index info */ + index_info.f = io_info->dset->oloc.file; + index_info.dxpl_id = io_info->md_dxpl_id; + index_info.pline = &(io_info->dset->shared->dcpl_cache.pline); + index_info.layout = &(io_info->dset->shared->layout.u.chunk); + index_info.storage = &(io_info->dset->shared->layout.storage.u.chunk); + + + /* Build a list of selected chunks in the collective IO operation */ + if (H5D__construct_filtered_io_info_list(io_info, type_info, fm, &chunk_list, &chunk_list_num_entries, &num_chunks_selected_array) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't construct filtered I/O info list") + + /* Allocate memories */ + /* chunk_io_option = (uint8_t *) H5MM_calloc(total_chunk); + chunk_addr = (haddr_t *) H5MM_calloc(total_chunk * sizeof(haddr_t)); */ +#ifdef H5D_DEBUG +if(H5DEBUG(D)) + HDfprintf(H5DEBUG(D), "total_chunk %Zu\n", total_chunk); +#endif + + /* Obtain IO option for each chunk */ + /* if (H5D__obtain_mpio_mode(io_info, fm, dx_plist, chunk_io_option, chunk_addr) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "unable to obtain MPIO mode") */ + + /* Set up contiguous I/O info object */ + HDmemcpy(&ctg_io_info, io_info, sizeof(ctg_io_info)); + ctg_io_info.store = &ctg_store; + ctg_io_info.layout_ops = *H5D_LOPS_CONTIG; + + /* Initialize temporary contiguous storage info */ + ctg_store.contig.dset_size = (hsize_t) io_info->dset->shared->layout.u.chunk.size; + + /* Set dataset storage for I/O info */ + io_info->store = &store; + + if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(*mem_iter)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator") + + /* Loop over all the chunks in the collective IO operation */ + /* XXX: Multi-chunk needs to loop over all chunks and check for write/read inside + * loop, not the other way around */ + if (io_info->op_type == H5D_IO_OP_WRITE) { /* Filtered collective write */ + H5D_chunk_ud_t udata; + + /* Set up chunk information for insertion to chunk index */ + udata.common.layout = index_info.layout; + udata.common.storage = index_info.storage; + udata.filter_mask = 0; + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "Incoming messages from other processes:\n"); + HDfprintf(debug_file, "-----------------------------------------\n"); + for (size_t k = 0; k < chunk_list_num_entries; k++) { + HDfprintf(debug_file, "| Owner of chunk at address %a is expecting messages from %d other processes.\n", + chunk_list[k].old_chunk.offset, chunk_list[k].num_writers); + } + HDfprintf(debug_file, "-----------------------------------------\n\n"); + + HDfprintf(debug_file, "Processing chunks:\n"); + HDfprintf(debug_file, "---------------------------------------------------\n"); +#endif + + for (i = 0; i < chunk_list_num_entries; i++) { + unsigned filter_mask = 0; + hbool_t full_overwrite = TRUE; + size_t buf_size; + hssize_t iter_nelmts; + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "| Chunk at address %a:\n", chunk_list[i].old_chunk.offset); +#endif + + /* XXX: Determine if a chunk is being fully overwritten by looking at the total selection + * in the dataspace */ + + /* If this is a full overwrite of this chunk, enough memory must be allocated for + * the size of the unfiltered chunk. Otherwise, enough memory must be allocated + * to read the filtered chunk from the file. */ + /* XXX: Return value of macro should be checked instead */ + buf_size = (full_overwrite) ? (hsize_t) H5S_GET_EXTENT_NPOINTS(chunk_list[i].chunk_info.fspace) * type_info->src_type_size + : chunk_list[i].old_chunk.length; + chunk_list[i].new_chunk.length = buf_size; + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "| - Allocing %zd bytes for %s chunk overwrite buffer.\n", buf_size, + full_overwrite ? "full" : "non-full"); +#endif + + if (NULL == (chunk_list[i].buf = H5MM_malloc(buf_size))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification buffer") + + /* Initialize iterator for memory selection */ + if (H5S_select_iter_init(mem_iter, chunk_list[i].chunk_info.mspace, type_info->dst_type_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") + mem_iter_init = TRUE; + + /* Owner of this chunk, receive modification data from other processes */ + + if (!full_overwrite) { + /* Read the chunk from the file */ + if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_list[i].old_chunk.offset, + buf_size, H5AC_rawdata_dxpl_id, chunk_list[i].buf) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't read raw data chunk") + + /* Unfilter the chunk before modifying it */ + if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, &filter_mask, + io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb, + (size_t *) &chunk_list[i].new_chunk.length, &buf_size, &chunk_list[i].buf) < 0) + HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "couldn't unfilter chunk for modifying") + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "| - After decompression: Nbytes=%zd; Buf_size=%zd.\n", chunk_list[i].new_chunk.length, buf_size); + + HDfprintf(debug_file, "| - Read buf:\n| - ["); + for (size_t k = 0; k < chunk_list[i].new_chunk.length / type_info->src_type_size; k++) { + if (k > 0) HDfprintf(debug_file, ", "); + HDfprintf(debug_file, "%lld", ((long *) chunk_list[i].buf)[k]); + } + HDfprintf(debug_file, "]\n|\n"); +#endif + } /* end if */ + + /* Update the chunk data with the modifications from the current (owning) process */ + if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_list[i].chunk_info.mspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + + if (!H5D__gather_mem(io_info->u.wbuf, chunk_list[i].chunk_info.mspace, mem_iter, + (size_t) iter_nelmts, io_info->dxpl_cache, chunk_list[i].buf)) + HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't gather from write buffer") + + /* Update the chunk data with any modifications from other processes */ + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "| - Write Buffer:\n"); + HDfprintf(debug_file, "| - ["); + for (size_t k = 0; k < chunk_list[i].new_chunk.length / type_info->src_type_size; k++) { + if (k > 0) HDfprintf(debug_file, ", "); + HDfprintf(debug_file, "%lld", ((long *) chunk_list[i].buf)[k]); + } + HDfprintf(debug_file, "]\n|\n"); + + HDfprintf(debug_file, "| - Write buf is at address %a.\n|\n", chunk_list[i].buf); + + HDfprintf(debug_file, "| - About to filter %zd bytes in buffer of size %zd.\n|\n", chunk_list[i].new_chunk.length, buf_size); +#endif + + /* Filter the chunk */ + if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, &filter_mask, + io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb, + (size_t *) &chunk_list[i].new_chunk.length, &buf_size, &chunk_list[i].buf) < 0) + HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "output pipeline failed") + +#if H5_SIZEOF_SIZE_T > 4 + /* Check for the chunk expanding too much to encode in a 32-bit value */ + if (chunk_list[i].new_chunk.length > ((size_t) 0xffffffff)) + HGOTO_ERROR(H5E_DATASET, H5E_BADRANGE, FAIL, "chunk too large for 32-bit length") +#endif + + /* Gather the new chunk sizes to all processes for a collective reallocation + * of the chunks in the file */ + if (H5D__mpio_array_gather(io_info, &chunk_list[i], 1, sizeof(*chunk_list), + (void **) &gathered_array, &gathered_array_num_entries, NULL) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't gather new chunk sizes") + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "Reallocing chunks:\n"); + HDfprintf(debug_file, "------------------------------\n"); +#endif + + /* Collectively re-allocate the modified chunks (from each process) in the file */ + for (j = 0; j < gathered_array_num_entries; j++) { + hbool_t insert = FALSE; + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "| Re-allocing chunk at address %a with new length of %llu bytes.\n", + gathered_array[j].new_chunk.offset, gathered_array[j].new_chunk.length); +#endif + + /* Collectively re-allocate the chunk in the file */ + if (H5D__chunk_file_alloc(&index_info, &gathered_array[j].old_chunk, &gathered_array[j].new_chunk, + &insert, chunk_list[j].chunk_info.scaled) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk") + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "| Chunk now at address %a.\n|\n", gathered_array[j].new_chunk); +#endif + } /* end for */ + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "------------------------------\n\n"); +#endif + + /* Collect the new chunk info back to the local copy */ + /* XXX: This may encounter a problem if there is a process with no selection */ + HDmemcpy(&chunk_list[i].new_chunk, &gathered_array[mpi_rank].new_chunk, sizeof(chunk_list[i].new_chunk)); + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "New chunk record after memcpy back to local:\n"); + HDfprintf(debug_file, " - Chunk offset: %a, Chunk length: %lld\n", chunk_list[i].new_chunk.offset, chunk_list[i].new_chunk.length); +#endif + + { + int count; + + H5_CHECKED_ASSIGN(count, int, chunk_list[i].new_chunk.length, hsize_t); + + /* Create MPI memory type for writing to chunk */ + if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(count, MPI_BYTE, &mem_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_contiguous failed", mpi_code) + if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&mem_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) + mem_type_is_derived = TRUE; + + /* Create MPI file type for writing to chunk */ + if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(count, MPI_BYTE, &file_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_contiguous failed", mpi_code) + if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&file_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) + file_type_is_derived = TRUE; + + mpi_buf_count = 1; + } + + + /* Set up the base storage address for this operation */ + ctg_store.contig.dset_addr = chunk_list[i].new_chunk.offset; + + /* Override the write buffer to point to the address of the + * chunk data buffer */ + ctg_io_info.u.wbuf = chunk_list[i].buf; + + /* Perform the I/O */ + if (H5D__final_collective_io(&ctg_io_info, type_info, mpi_buf_count, &file_type, &mem_type) < 0) + HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish MPI-IO") + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "Reinserting chunks into chunk index.\n"); + HDfprintf(debug_file, "---------------------------------------\n"); +#endif + + /* Re-insert the modified chunks (from each process) into the chunk index */ + for (j = 0; j < gathered_array_num_entries; j++) { + udata.chunk_block = gathered_array[j].new_chunk; + udata.common.scaled = gathered_array[j].chunk_info.scaled; + + if ((index_info.storage->ops->insert)(&index_info, &udata, io_info->dset) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINSERT, FAIL, "unable to insert chunk address into index") + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "| Successfully inserted chunk at address %a into the chunk index.\n", udata.chunk_block.offset); +#endif + } /* end for */ + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "---------------------------------------\n"); +#endif + + /* Free the MPI memory and file type, if they were derived */ + /* XXX: For performance, collect each type into an array and free at end */ + if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + + mem_type_is_derived = FALSE; + file_type_is_derived = FALSE; + + if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") + + if (gathered_array) + gathered_array = (H5D_filtered_collective_io_info_t *) H5MM_free(gathered_array); + } /* end for */ + } /* end if */ + else { /* Filtered collective read */ + unsigned filter_mask = 0; + size_t buf_size; + + for (i = 0; i < chunk_list_num_entries; i++) { + buf_size = chunk_list[i].old_chunk.length; + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "Allocing %zd bytes for chunk read buffer.\n", buf_size); +#endif + + if (NULL == (chunk_list[i].buf = H5MM_malloc(buf_size))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk read buffer") + + /* Read the chunk from the file */ + if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_list[i].old_chunk.offset, + buf_size, H5AC_rawdata_dxpl_id, chunk_list[i].buf) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't read raw data chunk") + + /* Unfilter the chunk before modifying it */ + if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, &filter_mask, + io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb, + (size_t *) &chunk_list[i].old_chunk.length, &buf_size, &chunk_list[i].buf) < 0) + HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "couldn't unfilter chunk for modifying") + +#ifdef PARALLEL_COMPRESS_DEBUG + for (size_t k = 0; k < chunk_list[i].old_chunk.length / type_info->src_type_size; k++) + HDfprintf(debug_file, "Read buf entry %d is %lld.\n", k, ((long *) chunk_list[i].buf)[k]); +#endif + + /* Scatter the unfiltered chunk data into the user's read buffer */ + + } /* end for */ + } /* end else */ + +#if 0 + /* Loop over _all_ the chunks */ + for (u = 0; u < total_chunk; u++) { + H5D_chunk_info_t *chunk_info; /* Chunk info for current chunk */ + H5S_t *fspace; /* Dataspace describing chunk & selection in it */ + H5S_t *mspace; /* Dataspace describing selection in memory corresponding to this chunk */ + hbool_t insert = FALSE; + + /* Initialize temporary contiguous storage address */ + ctg_store.contig.dset_addr = chunk_addr[u]; + +#ifdef H5D_DEBUG +if(H5DEBUG(D)) + HDfprintf(H5DEBUG(D),"mpi_rank = %d, chunk index = %Zu\n", mpi_rank, u); +#endif + /* Get the chunk info for this chunk, if there are elements selected */ + chunk_info = &filtered_io_info_array[u].chunk_info; + + /* Set the storage information for chunks with selections */ + if (chunk_info) { + /* HDassert(chunk_info->index == u); */ + + /* Pass in chunk's coordinates in a union. */ + store.chunk.scaled = chunk_info->scaled; + } /* end if */ + + /* Collective IO for this chunk, + * Note: even there is no selection for this process, the process still + * needs to contribute MPI NONE TYPE. + */ + if (chunk_io_option[u] == H5D_CHUNK_IO_MODE_COL) { +#ifdef H5D_DEBUG +if(H5DEBUG(D)) + HDfprintf(H5DEBUG(D),"inside collective chunk IO mpi_rank = %d, chunk index = %Zu\n", mpi_rank, u); +#endif + + /* Set the file & memory dataspaces */ + if (chunk_info) { + fspace = chunk_info->fspace; + mspace = chunk_info->mspace; + + /* Update the local variable tracking the dxpl's actual io mode property. + * + * Note: H5D_MPIO_COLLECTIVE_MULTI | H5D_MPIO_INDEPENDENT = H5D_MPIO_MIXED + * to ease switching between to mixed I/O without checking the current + * value of the property. You can see the definition in H5Ppublic.h + */ + actual_io_mode = actual_io_mode | H5D_MPIO_CHUNK_COLLECTIVE; + + } /* end if */ + else { + fspace = mspace = NULL; + } /* end else */ + + /* Switch back to collective I/O */ + if (last_xfer_mode != H5FD_MPIO_COLLECTIVE) { + if (H5D__ioinfo_xfer_mode(io_info, dx_plist, H5FD_MPIO_COLLECTIVE) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't switch to collective I/O") + last_xfer_mode = H5FD_MPIO_COLLECTIVE; + } /* end if */ + if (last_coll_opt_mode != H5FD_MPIO_COLLECTIVE_IO) { + if (H5D__ioinfo_coll_opt_mode(io_info, dx_plist, H5FD_MPIO_COLLECTIVE_IO) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't switch to collective I/O") + last_coll_opt_mode = H5FD_MPIO_COLLECTIVE_IO; + } /* end if */ + } /* end if */ + else { /* possible independent IO for this chunk */ +#ifdef H5D_DEBUG +if(H5DEBUG(D)) + HDfprintf(H5DEBUG(D),"inside independent IO mpi_rank = %d, chunk index = %Zu\n", mpi_rank, u); +#endif + + HDassert(chunk_io_option[u] == 0); + + /* Set the file & memory dataspaces */ + if (chunk_info) { + fspace = chunk_info->fspace; + mspace = chunk_info->mspace; + + /* Update the local variable tracking the dxpl's actual io mode. */ + actual_io_mode = actual_io_mode | H5D_MPIO_CHUNK_INDEPENDENT; + } /* end if */ + else { + fspace = mspace = NULL; + } /* end else */ + + /* Using independent I/O with file setview.*/ + if (last_coll_opt_mode != H5FD_MPIO_INDIVIDUAL_IO) { + if (H5D__ioinfo_coll_opt_mode(io_info, dx_plist, H5FD_MPIO_INDIVIDUAL_IO) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't switch to individual I/O") + last_coll_opt_mode = H5FD_MPIO_INDIVIDUAL_IO; + } /* end if */ + } /* end else */ + +#ifdef H5D_DEBUG + if(H5DEBUG(D)) + HDfprintf(H5DEBUG(D),"after inter collective IO\n"); +#endif + } /* end for */ +#endif + + /* Write the local value of actual io mode to the DXPL. */ + if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_IO_MODE_NAME, &actual_io_mode) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual io mode property") + +done: + if (chunk_io_option) + H5MM_free(chunk_io_option); + if (chunk_addr) + H5MM_free(chunk_addr); + + if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + + if (chunk_list) { + for (i = 0; i < chunk_list_num_entries; i++) + if (chunk_list[i].buf) + H5MM_free(chunk_list[i].buf); + + H5MM_free(chunk_list); + } + + if (num_chunks_selected_array) + H5MM_free(num_chunks_selected_array); + if (gathered_array) + H5MM_free(gathered_array); + if (mem_iter) + H5MM_free(mem_iter); + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__multi_chunk_filtered_collective_io() */ + + +/*------------------------------------------------------------------------- * Function: H5D__inter_collective_io * * Purpose: Routine for the shared part of collective IO between multiple chunk @@ -1486,6 +2576,36 @@ H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2) /*------------------------------------------------------------------------- + * Function: H5D__cmp_filtered_collective_io_entry + * + * Purpose: Routine to compare filtered collective chunk io info + * entries + * + * Description: Callback for qsort() to compare filtered collective chunk + * io info entries + * + * Return: -1, 0, 1 + * + * Programmer: Jordan Henderson + * Wednesday, Nov. 30th, 2016 + * + *------------------------------------------------------------------------- + */ +static int +H5D__cmp_filtered_collective_io_entry(const void *filtered_collective_io_entry1, const void *filtered_collective_io_entry2) +{ + haddr_t addr1, addr2; + + FUNC_ENTER_STATIC_NOERR + + addr1 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_entry1)->new_chunk.offset; + addr2 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_entry2)->new_chunk.offset; + + FUNC_LEAVE_NOAPI(H5F_addr_cmp(addr1, addr2)) +} /* end H5D__cmp_filtered_collective_io_entry() */ + + +/*------------------------------------------------------------------------- * Function: H5D__sort_chunk * * Purpose: Routine to sort chunks in increasing order of chunk address @@ -1837,5 +2957,249 @@ done: FUNC_LEAVE_NOAPI(ret_value) } /* end H5D__obtain_mpio_mode() */ + + +/*------------------------------------------------------------------------- + * Function: H5D__construct_filtered_io_info_list + * + * XXX: Revise description + * Purpose: Constructs a list of entries which contain the necessary + * information for inter-process communication when performing + * collective io on filtered chunks. This list is used by + * every process in operations that must be collectively done + * on every chunk, such as chunk re-allocation, insertion of + * chunks into the chunk index, etc. + * + * Return: Non-negative on success/Negative on failure + * + * Programmer: Jordan Henderson + * Tuesday, January 10th, 2017 + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_type_info_t *type_info, + const H5D_chunk_map_t *fm, H5D_filtered_collective_io_info_t **chunk_list, size_t *num_entries, + size_t **_num_chunks_selected_array) +{ + H5D_filtered_collective_io_info_t *local_info_array = NULL; + H5SL_node_t *chunk_node; + hbool_t no_overlap = FALSE; + size_t i; + size_t num_chunks_selected; + size_t *num_chunks_selected_array = NULL; + int mpi_rank, mpi_size, mpi_code; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + HDassert(io_info); + HDassert(type_info); + HDassert(fm); + HDassert(chunk_list); + + if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0) + HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank") + if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0) + HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size") + + /* Get the no overlap property */ + + + num_chunks_selected = H5SL_count(fm->sel_chunks); + + if (!no_overlap) { + /* Redistribute chunks so that no more than 1 process is writing to a given chunk */ + } + + if (NULL == (local_info_array = (H5D_filtered_collective_io_info_t *) H5MM_malloc(num_chunks_selected * sizeof(*local_info_array)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate local io info array buffer") + + chunk_node = H5SL_first(fm->sel_chunks); + for (i = 0; chunk_node; i++) { + H5D_chunk_info_t *chunk_info = (H5D_chunk_info_t *) H5SL_item(chunk_node); + H5D_chunk_ud_t udata; + + /* Obtain this chunk's address */ + if (H5D__chunk_lookup(io_info->dset, io_info->md_dxpl_id, chunk_info->scaled, &udata) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "error looking up chunk address") + +#ifdef PARALLEL_COMPRESS_DEBUG + local_info_array[i].owner = mpi_rank; + local_info_array[i].num_writers = 0; +#endif + + local_info_array[i].old_chunk = local_info_array[i].new_chunk = udata.chunk_block; + local_info_array[i].chunk_info = *chunk_info; + local_info_array[i].buf = NULL; + + chunk_node = H5SL_next(chunk_node); + } /* end for */ + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, " Contents of local info array\n"); + HDfprintf(debug_file, "------------------------------\n"); + for (size_t j = 0; j < (size_t) num_chunks_selected; j++) { + HDfprintf(debug_file, "| Chunk Entry %zd:\n", j); + HDfprintf(debug_file, "| - Chunk Address: %a\n", local_info_array[j].old_chunk.offset); + HDfprintf(debug_file, "| - Chunk Length: %zd\n", local_info_array[j].old_chunk.length); + HDfprintf(debug_file, "| - Chunk Owner: %d\n", local_info_array[j].owner); + HDfprintf(debug_file, "| - Address of mspace: %x\n", local_info_array[j].chunk_info.mspace); + HDfprintf(debug_file, "| - Chunk Selection Type: %d\n", H5S_GET_SELECT_TYPE(local_info_array[j].chunk_info.mspace)); + HDfprintf(debug_file, "| - Chunk Num Elmts Sel.: %zd\n", H5S_GET_SELECT_NPOINTS(local_info_array[j].chunk_info.mspace)); + } + HDfprintf(debug_file, "------------------------------\n\n"); +#endif + + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "Testing mem/file space addresses:\n"); + HDfprintf(debug_file, "-----------------------------------\n"); + + for (size_t j = 0; j < num_chunks_selected; j++) { + HDfprintf(debug_file, "| Testing chunk at address %a.\n", local_info_array[j].old_chunk.offset); + HDfprintf(debug_file, "| Mem Space:\n"); + HDfprintf(debug_file, "| - Extent Num elements: %zd\n", H5S_GET_EXTENT_NPOINTS(local_info_array[j].chunk_info.mspace)); + HDfprintf(debug_file, "| - Extent type: %d\n", H5S_GET_EXTENT_TYPE(local_info_array[j].chunk_info.mspace)); + HDfprintf(debug_file, "| - Selection Num elements: %zd\n", H5S_GET_SELECT_NPOINTS(local_info_array[j].chunk_info.mspace)); + HDfprintf(debug_file, "| - Selection type: %d\n", H5S_GET_SELECT_TYPE(local_info_array[j].chunk_info.mspace)); + HDfprintf(debug_file, "| File Space:\n"); + HDfprintf(debug_file, "| - Extent Num elements: %zd\n", H5S_GET_EXTENT_NPOINTS(local_info_array[j].chunk_info.fspace)); + HDfprintf(debug_file, "| - Extent type: %d\n", H5S_GET_EXTENT_TYPE(local_info_array[j].chunk_info.fspace)); + HDfprintf(debug_file, "| - Selection Num elements: %zd\n", H5S_GET_SELECT_NPOINTS(local_info_array[j].chunk_info.fspace)); + HDfprintf(debug_file, "| - Selection type: %d\n|\n", H5S_GET_SELECT_TYPE(local_info_array[j].chunk_info.fspace)); + } + + HDfprintf(debug_file, "-----------------------------------\n\n"); +#endif + + /* Gather the number of chunks each process is writing to all processes */ + if (NULL == (num_chunks_selected_array = (size_t *) H5MM_malloc((size_t) mpi_size * sizeof(*num_chunks_selected_array)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate num chunks selected array") + + if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&num_chunks_selected, 1, MPI_UNSIGNED_LONG_LONG, num_chunks_selected_array, 1, MPI_UNSIGNED_LONG_LONG, io_info->comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Allgather of num chunks selected array failed", mpi_code) + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, " Num Chunks Selected Array\n"); + HDfprintf(debug_file, "------------------------------------\n"); + for (size_t j = 0; j < (size_t) mpi_size; j++) { + HDfprintf(debug_file, "| Process %d has %zd chunks selected.\n", j, num_chunks_selected_array[j]); + } + HDfprintf(debug_file, "------------------------------------\n\n"); +#endif + + *chunk_list = local_info_array; + *num_entries = num_chunks_selected; + *_num_chunks_selected_array = num_chunks_selected_array; + +done: + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__construct_filtered_io_info_list() */ + + +/*------------------------------------------------------------------------- + * Function: H5D__mpio_filtered_collective_write_type + * + * Purpose: Constructs a MPI derived datatype for both the memory and + * the file for a collective write of filtered chunks. The + * datatype contains the offsets in the file and the locations + * of the filtered chunk data buffers + * + * XXX: Same type may be reusable for filtered collective read + * + * Return: Non-negative on success/Negative on failure + * + * Programmer: Jordan Henderson + * Tuesday, November 22, 2016 + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chunk_list, + size_t num_entries, MPI_Datatype *new_mem_type, hbool_t *mem_type_created, + MPI_Datatype *new_file_type, hbool_t *file_type_created) +{ + MPI_Aint *write_buf_array = NULL; /* Relative displacements of filtered chunk data buffers */ + MPI_Aint *file_offset_array = NULL; /* Chunk offsets in the file */ + int *length_array = NULL; /* Filtered Chunk lengths */ + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + HDassert(chunk_list); + HDassert(new_mem_type); + HDassert(mem_type_created); + HDassert(new_file_type); + HDassert(file_type_created); + + if (num_entries > 0) { + size_t i; + int mpi_code; + void *base_buf; + + H5_CHECK_OVERFLOW(num_entries, size_t, int); + + /* Allocate arrays */ + if (NULL == (length_array = (int *) H5MM_malloc((size_t) num_entries * sizeof(int)))) + HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "memory allocation failed for filtered collective write length array") + if (NULL == (write_buf_array = (MPI_Aint *) H5MM_malloc((size_t) num_entries * sizeof(MPI_Aint)))) + HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "memory allocation failed for filtered collective write buf length array") + if (NULL == (file_offset_array = (MPI_Aint *) H5MM_malloc((size_t) num_entries * sizeof(MPI_Aint)))) + HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "memory allocation failed for collective write offset array") + + /* Ensure the list is sorted in ascending order of offset in the file */ + HDqsort(chunk_list, num_entries, sizeof(*chunk_list), H5D__cmp_filtered_collective_io_entry); + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "MPI Write type entries:\n"); + HDfprintf(debug_file, "---------------------------------\n"); +#endif + + base_buf = chunk_list[0].buf; + for (i = 0; i < num_entries; i++) { + /* XXX: Revise description */ + /* Set up array position */ + file_offset_array[i] = (MPI_Aint) chunk_list[i].new_chunk.offset; + length_array[i] = (int) chunk_list[i].new_chunk.length; + write_buf_array[i] = (MPI_Aint) chunk_list[i].buf - (MPI_Aint) base_buf; + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "| Type Entry %zd:\n", i); + HDfprintf(debug_file, "| - Offset: %a; Length: %zd\n", file_offset_array[i], length_array[i]); + HDfprintf(debug_file, "| - Write buffer:\n| ["); + for (size_t j = 0; j < (size_t) length_array[i]; j++) { + HDfprintf(debug_file, "%c, ", ((char *) chunk_list[i].buf)[j]); + } + HDfprintf(debug_file, "]\n|\n"); +#endif + } /* end while */ + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "---------------------------------\n\n"); +#endif + + /* Create memory MPI type */ + if (MPI_SUCCESS != (mpi_code = MPI_Type_create_hindexed((int) num_entries, length_array, write_buf_array, MPI_BYTE, new_mem_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code) + *mem_type_created = TRUE; + if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_mem_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) + + /* Create file MPI type */ + if (MPI_SUCCESS != (mpi_code = MPI_Type_create_hindexed((int) num_entries, length_array, file_offset_array, MPI_BYTE, new_file_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code) + *file_type_created = TRUE; + if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_file_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) + } /* end if */ + +done: + length_array = (int *) H5MM_free(length_array); + write_buf_array = (MPI_Aint *) H5MM_free(write_buf_array); + file_offset_array = (MPI_Aint *) H5MM_free(file_offset_array); + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__mpio_filtered_collective_write_type() */ #endif /* H5_HAVE_PARALLEL */ diff --git a/src/H5Dpkg.h b/src/H5Dpkg.h index f54a9f2..bd58d38 100644 --- a/src/H5Dpkg.h +++ b/src/H5Dpkg.h @@ -618,6 +618,9 @@ H5_DLL herr_t H5D__select_write(const H5D_io_info_t *io_info, H5_DLL herr_t H5D__scatter_mem(const void *_tscat_buf, const H5S_t *space, H5S_sel_iter_t *iter, size_t nelmts, const H5D_dxpl_cache_t *dxpl_cache, void *_buf); +H5_DLL size_t H5D__gather_mem(const void *_buf, + const H5S_t *space, H5S_sel_iter_t *iter, size_t nelmts, + const H5D_dxpl_cache_t *dxpl_cache, void *_tgath_buf/*out*/); H5_DLL herr_t H5D__scatgath_read(const H5D_io_info_t *io_info, const H5D_type_info_t *type_info, hsize_t nelmts, const H5S_t *file_space, const H5S_t *mem_space); @@ -667,6 +670,8 @@ H5_DLL herr_t H5D__chunk_lookup(const H5D_t *dset, hid_t dxpl_id, const hsize_t *scaled, H5D_chunk_ud_t *udata); H5_DLL herr_t H5D__chunk_allocated(H5D_t *dset, hid_t dxpl_id, hsize_t *nbytes); H5_DLL herr_t H5D__chunk_allocate(const H5D_io_info_t *io_info, hbool_t full_overwrite, hsize_t old_dim[]); +H5_DLL herr_t H5D__chunk_file_alloc(const H5D_chk_idx_info_t *idx_info, const H5F_block_t *old_chunk, + H5F_block_t *new_chunk, hbool_t *need_insert, hsize_t scaled[]); H5_DLL herr_t H5D__chunk_update_old_edge_chunks(H5D_t *dset, hid_t dxpl_id, hsize_t old_dim[]); H5_DLL herr_t H5D__chunk_prune_by_extent(H5D_t *dset, hid_t dxpl_id, diff --git a/src/H5Dscatgath.c b/src/H5Dscatgath.c index 55111f0..929feb7 100644 --- a/src/H5Dscatgath.c +++ b/src/H5Dscatgath.c @@ -49,9 +49,6 @@ static herr_t H5D__scatter_file(const H5D_io_info_t *io_info, static size_t H5D__gather_file(const H5D_io_info_t *io_info, const H5S_t *file_space, H5S_sel_iter_t *file_iter, size_t nelmts, void *buf); -static size_t H5D__gather_mem(const void *_buf, - const H5S_t *space, H5S_sel_iter_t *iter, size_t nelmts, - const H5D_dxpl_cache_t *dxpl_cache, void *_tgath_buf/*out*/); static herr_t H5D__compound_opt_read(size_t nelmts, const H5S_t *mem_space, H5S_sel_iter_t *iter, const H5D_dxpl_cache_t *dxpl_cache, const H5D_type_info_t *type_info, void *user_buf/*out*/); @@ -305,6 +302,7 @@ H5D__scatter_mem (const void *_tscat_buf, const H5S_t *space, HDassert(space); HDassert(iter); HDassert(nelmts > 0); + HDassert(dxpl_cache); HDassert(buf); /* Allocate the vector I/O arrays */ @@ -366,7 +364,7 @@ done: * *------------------------------------------------------------------------- */ -static size_t +size_t H5D__gather_mem(const void *_buf, const H5S_t *space, H5S_sel_iter_t *iter, size_t nelmts, const H5D_dxpl_cache_t *dxpl_cache, void *_tgath_buf/*out*/) @@ -389,6 +387,7 @@ H5D__gather_mem(const void *_buf, const H5S_t *space, HDassert(space); HDassert(iter); HDassert(nelmts > 0); + HDassert(dxpl_cache); HDassert(tgath_buf); /* Allocate the vector I/O arrays */ |