diff options
Diffstat (limited to 'src/H5Dmpio.c')
-rw-r--r-- | src/H5Dmpio.c | 1378 |
1 files changed, 1330 insertions, 48 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c index 441cc96..e6fbb6a 100644 --- a/src/H5Dmpio.c +++ b/src/H5Dmpio.c @@ -86,7 +86,6 @@ #define H5D_CHUNK_SELECT_IRREG 2 #define H5D_CHUNK_SELECT_NONE 0 - /******************/ /* Local Typedefs */ /******************/ @@ -96,6 +95,17 @@ typedef struct H5D_chunk_addr_info_t { H5D_chunk_info_t chunk_info; } H5D_chunk_addr_info_t; +/* Information about a single chunk when performing collective filtered IO */ +typedef struct H5D_filtered_collective_io_info_t { + H5D_chunk_info_t chunk_info; /* Info about this chunk, such as chunk index and file and memory dataspace */ + H5F_block_t old_chunk; /* The address in the file and size of this chunk before being filtered */ + H5F_block_t new_chunk; /* The address in the file and size of this chunk after being filtered */ + hbool_t full_overwrite; /* Whether or not this chunk is being fully overwritten */ + size_t io_size; /* Size of the I/O to this chunk */ + size_t num_writers; /* Total number of processes writing to this chunk */ + int owner; /* Process which will be writing to this chunk */ + void *buf; /* Chunk data to be written to file/that has been read from file*/ +} H5D_filtered_collective_io_info_t; /********************/ /* Local Prototypes */ @@ -105,9 +115,15 @@ static herr_t H5D__chunk_collective_io(H5D_io_info_t *io_info, static herr_t H5D__multi_chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, H5D_chunk_map_t *fm, H5P_genplist_t *dx_plist); +static herr_t H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, + const H5D_type_info_t *type_info, H5D_chunk_map_t *fm, + H5P_genplist_t *dx_plist); static herr_t H5D__link_chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, H5D_chunk_map_t *fm, int sum_chunk, H5P_genplist_t *dx_plist); +static herr_t H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, + const H5D_type_info_t *type_info, H5D_chunk_map_t *fm, + H5P_genplist_t *dx_plist); static herr_t H5D__inter_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, const H5S_t *file_space, const H5S_t *mem_space); @@ -126,6 +142,21 @@ static herr_t H5D__mpio_get_min_chunk(const H5D_io_info_t *io_info, const H5D_chunk_map_t *fm, int *min_chunkf); static herr_t H5D__mpio_get_sum_chunk(const H5D_io_info_t *io_info, const H5D_chunk_map_t *fm, int *sum_chunkf); +static herr_t H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, + const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm, + H5D_filtered_collective_io_info_t **chunk_list, size_t *num_entries); +static herr_t H5D__mpio_array_gatherv(void *local_array, size_t local_array_num_entries, + size_t array_entry_size, void **gathered_array, size_t *gathered_array_num_entries, + int nprocs, hbool_t allgather, int root, MPI_Comm comm, int (*sort_func)(const void *, const void *)); +static herr_t H5D__mpio_filtered_collective_write_type( + H5D_filtered_collective_io_info_t *chunk_list, size_t num_entries, + MPI_Datatype *new_mem_type, hbool_t *mem_type_derived, + MPI_Datatype *new_file_type, hbool_t *file_type_derived); +static herr_t H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk_entry, + const H5D_io_info_t *io_info, const H5D_type_info_t *type_info); +static int H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2); +static int H5D__cmp_filtered_collective_io_info_entry(const void *filtered_collective_io_info_entry1, + const void *filtered_collective_io_info_entry2); /*********************/ @@ -144,7 +175,7 @@ static herr_t H5D__mpio_get_sum_chunk(const H5D_io_info_t *io_info, * Purpose: Checks if an direct I/O transfer is possible between memory and * the file. * - * Return: Sauccess: Non-negative: TRUE or FALSE + * Return: Success: Non-negative: TRUE or FALSE * Failure: Negative * * Programmer: Quincey Koziol @@ -155,11 +186,11 @@ static herr_t H5D__mpio_get_sum_chunk(const H5D_io_info_t *io_info, htri_t H5D__mpio_opt_possible(const H5D_io_info_t *io_info, const H5S_t *file_space, const H5S_t *mem_space, const H5D_type_info_t *type_info, - const H5D_chunk_map_t *fm, H5P_genplist_t *dx_plist) + const H5D_chunk_map_t H5_ATTR_UNUSED *fm, H5P_genplist_t *dx_plist) { int local_cause = 0; /* Local reason(s) for breaking collective mode */ int global_cause = 0; /* Global reason(s) for breaking collective mode */ - htri_t ret_value; /* Return value */ + htri_t ret_value = SUCCEED; /* Return value */ FUNC_ENTER_PACKAGE @@ -208,11 +239,6 @@ H5D__mpio_opt_possible(const H5D_io_info_t *io_info, const H5S_t *file_space, * use collective IO will defer until each chunk IO is reached. */ - /* Don't allow collective operations if filters need to be applied */ - if(io_info->dset->shared->layout.type == H5D_CHUNKED && - io_info->dset->shared->dcpl_cache.pline.nused > 0) - local_cause |= H5D_MPIO_FILTERS; - /* Check for independent I/O */ if(local_cause & H5D_MPIO_SET_INDEPENDENT) global_cause = local_cause; @@ -302,6 +328,100 @@ done: /*------------------------------------------------------------------------- + * Function: H5D__mpio_array_gatherv + * + * Purpose: Given arrays by MPI ranks, gathers them into a single array + * which is either gathered to the rank specified by root when + * allgather is false, or is distributed back to all ranks + * when allgather is true. If the sort_func argument is + * specified, the list is sorted before being returned. + * + * If allgather is specified as true, root is ignored. + * + * Return: Non-negative on success/Negative on failure + * + * Programmer: Jordan Henderson + * Sunday, April 9th, 2017 + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__mpio_array_gatherv(void *local_array, size_t local_array_num_entries, + size_t array_entry_size, void **_gathered_array, size_t *_gathered_array_num_entries, + int nprocs, hbool_t allgather, int root, MPI_Comm comm, int (*sort_func)(const void *, const void *)) +{ + size_t gathered_array_num_entries = 0; /* The size of the newly-constructed array */ + size_t i; + void *gathered_array = NULL; /* The newly-constructed array returned to the caller */ + int *receive_counts_array = NULL; /* Array containing number of entries each process is contributing */ + int *displacements_array = NULL; /* Array of displacements where each process places its data in the final array */ + int mpi_code; + int sendcount; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + HDassert(_gathered_array); + HDassert(_gathered_array_num_entries); + + /* Determine the size of the end result array */ + if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&local_array_num_entries, &gathered_array_num_entries, 1, MPI_INT, MPI_SUM, comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code) + + /* If 0 entries resulted from the collective operation, no one is writing anything */ + if (gathered_array_num_entries > 0) { + if (NULL == (gathered_array = H5MM_malloc(gathered_array_num_entries * array_entry_size))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate gathered array") + + if (NULL == (receive_counts_array = (int *) H5MM_malloc((size_t) nprocs * sizeof(*receive_counts_array)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive counts array") + + if (NULL == (displacements_array = (int *) H5MM_malloc((size_t) nprocs * sizeof(*displacements_array)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate receive displacements array") + + /* Inform each process of how many entries each other process is contributing to the resulting array */ + if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&local_array_num_entries, 1, MPI_INT, receive_counts_array, 1, MPI_INT, comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code) + + /* Multiply each receive count by the size of the array entry, since the data is sent as bytes */ + for (i = 0; i < (size_t) nprocs; i++) + H5_CHECKED_ASSIGN(receive_counts_array[i], int, (size_t) receive_counts_array[i] * array_entry_size, size_t); + + /* Set receive buffer offsets for MPI_Allgatherv */ + displacements_array[0] = 0; + for (i = 1; i < (size_t) nprocs; i++) + displacements_array[i] = displacements_array[i - 1] + receive_counts_array[i - 1]; + + H5_CHECKED_ASSIGN(sendcount, int, local_array_num_entries * array_entry_size, size_t); + + if (allgather) { + if (MPI_SUCCESS != (mpi_code = MPI_Allgatherv(local_array, sendcount, MPI_BYTE, + gathered_array, receive_counts_array, displacements_array, MPI_BYTE, comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Allgatherv failed", mpi_code) + } /* end if */ + else { + if (MPI_SUCCESS != (mpi_code = MPI_Gatherv(local_array, sendcount, MPI_BYTE, + gathered_array, receive_counts_array, displacements_array, MPI_BYTE, root, comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Allgatherv failed", mpi_code) + } /* end else */ + + if (sort_func) HDqsort(gathered_array, gathered_array_num_entries, array_entry_size, sort_func); + } /* end if */ + + *_gathered_array = gathered_array; + *_gathered_array_num_entries = gathered_array_num_entries; + +done: + if (receive_counts_array) + H5MM_free(receive_counts_array); + if (displacements_array) + H5MM_free(displacements_array); + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__mpio_array_gatherv() */ + + +/*------------------------------------------------------------------------- * Function: H5D__ioinfo_xfer_mode * * Purpose: Switch to between collective & independent MPI I/O @@ -400,7 +520,7 @@ H5D__mpio_get_min_chunk(const H5D_io_info_t *io_info, const H5D_chunk_map_t *fm, FUNC_ENTER_STATIC /* Get the number of chunks to perform I/O on */ - num_chunkf = H5SL_count(fm->sel_chunks); + H5_CHECKED_ASSIGN(num_chunkf, int, H5SL_count(fm->sel_chunks), size_t) /* Determine the minimum # of chunks for all processes */ if(MPI_SUCCESS != (mpi_code = MPI_Allreduce(&num_chunkf, min_chunkf, 1, MPI_INT, MPI_MIN, io_info->comm))) @@ -482,7 +602,7 @@ H5D__contig_collective_read(H5D_io_info_t *io_info, const H5D_type_info_t *type_ HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "couldn't finish shared collective MPI-IO") /* Obtain the data transfer properties */ - if(NULL == (dx_plist = H5I_object(io_info->raw_dxpl_id))) + if(NULL == (dx_plist = (H5P_genplist_t *)H5I_object(io_info->raw_dxpl_id))) HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a data transfer property list") /* Set the actual I/O mode property. internal_collective_io will not break to @@ -529,7 +649,7 @@ H5D__contig_collective_write(H5D_io_info_t *io_info, const H5D_type_info_t *type HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't finish shared collective MPI-IO") /* Obtain the data transfer properties */ - if(NULL == (dx_plist = H5I_object(io_info->raw_dxpl_id))) + if(NULL == (dx_plist = (H5P_genplist_t *)H5I_object(io_info->raw_dxpl_id))) HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a data transfer property list") /* Set the actual I/O mode property. internal_collective_io will not break to @@ -601,12 +721,13 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf HDassert(fm); /* Obtain the data transfer properties */ - if(NULL == (dx_plist = H5I_object(io_info->raw_dxpl_id))) + if(NULL == (dx_plist = (H5P_genplist_t *)H5I_object(io_info->raw_dxpl_id))) HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a file access property list") - /* Check the optional property list on what to do with collective chunk IO. */ + /* Check the optional property list for the collective chunk IO optimization option */ if(H5P_get(dx_plist, H5D_XFER_MPIO_CHUNK_OPT_HARD_NAME, &chunk_opt_mode) < 0) HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't get chunk optimization option") + if(H5FD_MPIO_CHUNK_ONE_IO == chunk_opt_mode) io_option = H5D_ONE_LINK_CHUNK_IO; /*no opt*/ /* direct request to multi-chunk-io */ @@ -622,13 +743,13 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf if((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0) HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size") - /* Get the chunk optimization option */ + /* Get the chunk optimization option threshold */ if(H5P_get(dx_plist, H5D_XFER_MPIO_CHUNK_OPT_NUM_NAME, &one_link_chunk_io_threshold) < 0) - HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't get chunk optimization option") + HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't get chunk optimization option threshold value") /* step 1: choose an IO option */ /* If the average number of chunk per process is greater than a threshold, we will do one link chunked IO. */ - if((unsigned)sum_chunk / mpi_size >= one_link_chunk_io_threshold) + if((unsigned)sum_chunk / (unsigned)mpi_size >= one_link_chunk_io_threshold) io_option = H5D_ONE_LINK_CHUNK_IO_MORE_OPT; #ifdef H5_HAVE_INSTRUMENTED_LIBRARY else @@ -683,19 +804,46 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf #endif /* step 2: Go ahead to do IO.*/ - if(H5D_ONE_LINK_CHUNK_IO == io_option || H5D_ONE_LINK_CHUNK_IO_MORE_OPT == io_option) { - if(H5D__link_chunk_collective_io(io_info, type_info, fm, sum_chunk, dx_plist) < 0) - HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish linked chunk MPI-IO") - } /* end if */ - /* direct request to multi-chunk-io */ - else if(H5D_MULTI_CHUNK_IO == io_option) { - if(H5D__multi_chunk_collective_io(io_info, type_info, fm, dx_plist) < 0) - HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple chunk MPI-IO") - } /* end if */ - else { /* multiple chunk IO via threshold */ - if(H5D__multi_chunk_collective_io(io_info, type_info, fm, dx_plist) < 0) - HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple chunk MPI-IO") - } /* end else */ + switch (io_option) { + case H5D_ONE_LINK_CHUNK_IO: + case H5D_ONE_LINK_CHUNK_IO_MORE_OPT: + /* Check if there are any filters in the pipeline */ + if(io_info->dset->shared->dcpl_cache.pline.nused > 0) { + /* For now, Multi-chunk IO must be forced for parallel filtered read, + * so that data can be unfiltered as it is received. There is significant + * complexity in unfiltering the data when it is read all at once into a + * single buffer. + */ + if (io_info->op_type == H5D_IO_OP_READ) { + if(H5D__multi_chunk_filtered_collective_io(io_info, type_info, fm, dx_plist) < 0) + HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple filtered chunk MPI-IO") + } + else { + if(H5D__link_chunk_filtered_collective_io(io_info, type_info, fm, dx_plist) < 0) + HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish filtered linked chunk MPI-IO") + } + } + else { + /* Perform unfiltered link chunk collective IO */ + if(H5D__link_chunk_collective_io(io_info, type_info, fm, sum_chunk, dx_plist) < 0) + HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish linked chunk MPI-IO") + } + break; + + case H5D_MULTI_CHUNK_IO: /* direct request to do multi-chunk IO */ + default: /* multiple chunk IO via threshold */ + /* Check if there are any filters in the pipeline */ + if(io_info->dset->shared->dcpl_cache.pline.nused > 0) { + if(H5D__multi_chunk_filtered_collective_io(io_info, type_info, fm, dx_plist) < 0) + HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple filtered chunk MPI-IO") + } + else { + /* Perform unfiltered multi chunk collective IO */ + if(H5D__multi_chunk_collective_io(io_info, type_info, fm, dx_plist) < 0) + HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple chunk MPI-IO") + } + break; + } /* end switch */ done: FUNC_LEAVE_NOAPI(ret_value) @@ -1095,6 +1243,226 @@ if(H5DEBUG(D)) /*------------------------------------------------------------------------- + * Function: H5D__link_chunk_filtered_collective_io + * + * Purpose: Routine for one collective IO with one MPI derived datatype + * to link with all filtered chunks + * + * 1. Construct a list of selected chunks in the collective IO + * operation + * A. If any chunk is being written to by more than 1 + * process, the process writing the most data to the + * chunk will take ownership of the chunk (the first + * process seen that is writing the most data becomes + * the new owner in the case of ties) + * 2. If the operation is a write operation + * A. Loop through each chunk in the operation + * I. If this is not a full overwrite of the chunk + * a) Read the chunk from file and pass the chunk + * through the filter pipeline in reverse order + * (Unfilter the chunk) + * II. Update the chunk data with the modifications from + * the owning process + * III. Receive any modification data from other + * processes and update the chunk data with these + * modifications + * IV. Filter the chunk + * B. Contribute the modified chunks to an array gathered + * by all processes which contains the new sizes of + * every chunk modified in the collective IO operation + * C. All processes collectively re-allocate each chunk + * from the gathered array with their new sizes after + * the filter operation + * D. If this process has any chunks selected in the IO + * operation, create an MPI derived type for memory and + * file to write out the process' selected chunks to the + * file + * E. Perform the collective write + * F. All processes collectively re-insert each modified + * chunk from the gathered array into the chunk index + * + * + * Return: Non-negative on success/Negative on failure + * + * Programmer: Jordan Henderson + * Friday, Nov. 4th, 2016 + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, + H5D_chunk_map_t *fm, H5P_genplist_t *dx_plist) +{ + H5D_filtered_collective_io_info_t *chunk_list = NULL; /* The list of chunks being read/written */ + H5D_filtered_collective_io_info_t *collective_chunk_list = NULL; /* The list of chunks used during collective operations */ + H5D_mpio_actual_chunk_opt_mode_t actual_chunk_opt_mode = H5D_MPIO_LINK_CHUNK; /* The actual chunk IO optimization mode */ + H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_CHUNK_COLLECTIVE; /* The chunk IO mode used (Independent vs Collective) */ + H5D_storage_t ctg_store; /* Chunk storage information as contiguous dataset */ + MPI_Datatype mem_type = MPI_BYTE; + MPI_Datatype file_type = MPI_BYTE; + hbool_t mem_type_is_derived = FALSE; + hbool_t file_type_is_derived = FALSE; + size_t chunk_list_num_entries; + size_t collective_chunk_list_num_entries; + size_t *num_chunks_selected_array = NULL; /* Array of number of chunks selected on each process */ + size_t i; /* Local index variable */ + int mpi_rank, mpi_size, mpi_code; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + /* Obtain the current rank of the process and the number of processes */ + if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0) + HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank") + if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0) + HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size") + + /* Set the actual-chunk-opt-mode property. */ + if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_CHUNK_OPT_MODE_NAME, &actual_chunk_opt_mode) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual chunk opt mode property") + + /* Set the actual-io-mode property. + * Link chunk filtered I/O does not break to independent, so can set right away + */ + if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_IO_MODE_NAME, &actual_io_mode) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual io mode property") + + /* Build a list of selected chunks in the collective io operation */ + if (H5D__construct_filtered_io_info_list(io_info, type_info, fm, &chunk_list, &chunk_list_num_entries) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "couldn't construct filtered I/O info list") + + if (io_info->op_type == H5D_IO_OP_WRITE) { /* Filtered collective write */ + H5D_chk_idx_info_t index_info; + H5D_chunk_ud_t udata; + hsize_t mpi_buf_count; + + /* Construct chunked index info */ + index_info.f = io_info->dset->oloc.file; + index_info.dxpl_id = io_info->md_dxpl_id; + index_info.pline = &(io_info->dset->shared->dcpl_cache.pline); + index_info.layout = &(io_info->dset->shared->layout.u.chunk); + index_info.storage = &(io_info->dset->shared->layout.storage.u.chunk); + + /* Set up chunk information for insertion to chunk index */ + udata.common.layout = index_info.layout; + udata.common.storage = index_info.storage; + udata.filter_mask = 0; + + /* Iterate through all the chunks in the collective write operation, + * updating each chunk with the data modifications from other processes, + * then re-filtering the chunk. + */ + for (i = 0; i < chunk_list_num_entries; i++) + if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't process chunk entry") + + /* Gather the new chunk sizes to all processes for a collective reallocation + * of the chunks in the file. + */ + if (H5D__mpio_array_gatherv(chunk_list, chunk_list_num_entries, sizeof(*chunk_list), + (void **) &collective_chunk_list, &collective_chunk_list_num_entries, mpi_size, + true, 0, io_info->comm, NULL) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather new chunk sizes") + + /* Collectively re-allocate the modified chunks (from each process) in the file */ + for (i = 0; i < collective_chunk_list_num_entries; i++) { + hbool_t insert; + + if (H5D__chunk_file_alloc(&index_info, &collective_chunk_list[i].old_chunk, &collective_chunk_list[i].new_chunk, + &insert, collective_chunk_list[i].chunk_info.scaled) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk") + } /* end for */ + + if (NULL == (num_chunks_selected_array = (size_t *) H5MM_malloc((size_t) mpi_size * sizeof(*num_chunks_selected_array)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate num chunks selected array") + + if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&chunk_list_num_entries, 1, MPI_UNSIGNED_LONG_LONG, num_chunks_selected_array, + 1, MPI_UNSIGNED_LONG_LONG, io_info->comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code) + + /* If this process has any chunks selected, create a MPI type for collectively + * writing out the chunks to file. Otherwise, the process contributes to the + * collective write with a none type. + */ + if (chunk_list_num_entries) { + size_t offset; + + /* During the collective re-allocation of chunks in the file, the record for each + * chunk is only updated in the collective array, not in the local copy of chunks on each + * process. However, each process needs the updated chunk records so that they can create + * a MPI type for the collective write that will write to the chunk's possible new locations + * in the file instead of the old ones. This ugly hack seems to be the best solution to + * copy the information back to the local array and avoid having to modify the collective + * write type function in an ugly way so that it will accept the collective array instead + * of the local array. This works correctly because the array gather function guarantees + * that the chunk data in the collective array is ordered in blocks by rank. + */ + for (i = 0, offset = 0; i < (size_t) mpi_rank; i++) + offset += num_chunks_selected_array[i]; + + HDmemcpy(chunk_list, &collective_chunk_list[offset], num_chunks_selected_array[mpi_rank] * sizeof(H5D_filtered_collective_io_info_t)); + + /* Create single MPI type encompassing each selection in the dataspace */ + if (H5D__mpio_filtered_collective_write_type(chunk_list, chunk_list_num_entries, + &mem_type, &mem_type_is_derived, &file_type, &file_type_is_derived) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_BADTYPE, FAIL, "couldn't create MPI link chunk I/O type") + + /* Override the write buffer to point to the address of the first + * chunk data buffer + */ + io_info->u.wbuf = chunk_list[0].buf; + } /* end if */ + + /* We have a single, complicated MPI datatype for both memory & file */ + mpi_buf_count = (mem_type_is_derived && file_type_is_derived) ? (hsize_t) 1 : (hsize_t) 0; + + /* Set up the base storage address for this operation */ + ctg_store.contig.dset_addr = 0; /* Write address must be set to address 0 */ + io_info->store = &ctg_store; + + /* Perform I/O */ + if (H5D__final_collective_io(io_info, type_info, mpi_buf_count, &file_type, &mem_type) < 0) + HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish MPI-IO") + + /* Participate in the collective re-insertion of all chunks modified + * in this iteration into the chunk index + */ + for (i = 0; i < collective_chunk_list_num_entries; i++) { + udata.chunk_block = collective_chunk_list[i].new_chunk; + udata.common.scaled = collective_chunk_list[i].chunk_info.scaled; + udata.chunk_idx = collective_chunk_list[i].chunk_info.index; + + if ((index_info.storage->ops->insert)(&index_info, &udata, io_info->dset) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINSERT, FAIL, "unable to insert chunk address into index") + } /* end for */ + } /* end if */ + +done: + /* Free resources used by a process which had some selection */ + if (chunk_list) { + for (i = 0; i < chunk_list_num_entries; i++) + if (chunk_list[i].buf) + H5MM_free(chunk_list[i].buf); + + H5MM_free(chunk_list); + } + + if (num_chunks_selected_array) + H5MM_free(num_chunks_selected_array); + if (collective_chunk_list) + H5MM_free(collective_chunk_list); + + /* Free the MPI buf and file types, if they were derived */ + if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__link_chunk_filtered_collective_io() */ + + +/*------------------------------------------------------------------------- * Function: H5D__multi_chunk_collective_io * * Purpose: To do IO per chunk according to IO mode(collective/independent/none) @@ -1227,7 +1595,7 @@ if(H5DEBUG(D)) * to ease switching between to mixed I/O without checking the current * value of the property. You can see the definition in H5Ppublic.h */ - actual_io_mode = actual_io_mode | H5D_MPIO_CHUNK_COLLECTIVE; + actual_io_mode = (H5D_mpio_actual_io_mode_t) (actual_io_mode | H5D_MPIO_CHUNK_COLLECTIVE); } /* end if */ else { @@ -1267,7 +1635,7 @@ if(H5DEBUG(D)) mspace = chunk_info->mspace; /* Update the local variable tracking the dxpl's actual io mode. */ - actual_io_mode = actual_io_mode | H5D_MPIO_CHUNK_INDEPENDENT; + actual_io_mode = (H5D_mpio_actual_io_mode_t) (actual_io_mode | H5D_MPIO_CHUNK_INDEPENDENT); } /* end if */ else { fspace = mspace = NULL; @@ -1308,6 +1676,305 @@ done: /*------------------------------------------------------------------------- + * Function: H5D__multi_chunk_filtered_collective_io + * + * Purpose: To do filtered collective IO per chunk to save on memory, + * as opposed to collective IO of every chunk at once + * + * 1. Construct a list of selected chunks in the collective IO + * operation + * A. If any chunk is being written to by more than 1 + * process, the process writing the most data to the + * chunk will take ownership of the chunk (the first + * process seen that is writing the most data becomes + * the new owner in the case of ties) + * 2. If the operation is a read operation + * A. Loop through each chunk in the operation + * I. Read the chunk from the file + * II. Unfilter the chunk + * III. Scatter the read chunk data to the user's buffer + * 3. If the operation is a write operation + * A. Loop through each chunk in the operation + * I. If this is not a full overwrite of the chunk + * a) Read the chunk from file and pass the chunk + * through the filter pipeline in reverse order + * (Unfilter the chunk) + * II. Update the chunk data with the modifications from + * the owning process + * III. Receive any modification data from other + * processes and update the chunk data with these + * modifications + * IV. Filter the chunk + * V. Contribute the chunk to an array gathered by + * all processes which contains every chunk + * modified in this iteration (up to one chunk + * per process, some processes may not have a + * selection/may have less chunks to work on than + * other processes) + * VI. All processes collectively re-allocate each + * chunk from the gathered array with their new + * sizes after the filter operation + * VII. Proceed with the collective write operation + * for the chunks modified on this iteration + * VIII. All processes collectively re-insert each + * chunk from the gathered array into the chunk + * index + * + * Return: Non-negative on success/Negative on failure + * + * Programmer: Jordan Henderson + * Friday, Dec. 2nd, 2016 + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, + H5D_chunk_map_t *fm, H5P_genplist_t *dx_plist) +{ + H5D_filtered_collective_io_info_t *chunk_list = NULL; /* The list of chunks being read/written */ + H5D_filtered_collective_io_info_t *collective_chunk_list = NULL; /* The list of chunks used during collective operations */ + H5D_mpio_actual_chunk_opt_mode_t actual_chunk_opt_mode = H5D_MPIO_MULTI_CHUNK; /* The actual chunk IO optimization mode */ + H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_CHUNK_COLLECTIVE; /* The chunk IO mode used (Independent vs Collective) */ + H5D_storage_t store; /* union of EFL and chunk pointer in file space */ + H5D_io_info_t ctg_io_info; /* Contiguous I/O info object */ + H5D_storage_t ctg_store; /* Chunk storage information as contiguous dataset */ + MPI_Datatype *file_type_array = NULL; + MPI_Datatype *mem_type_array = NULL; + hbool_t *file_type_is_derived_array = NULL; + hbool_t *mem_type_is_derived_array = NULL; + hbool_t *has_chunk_selected_array = NULL; /* Array of whether or not each process is contributing a chunk to each iteration */ + size_t chunk_list_num_entries; + size_t collective_chunk_list_num_entries; + size_t i, j; /* Local index variable */ + int mpi_rank, mpi_size, mpi_code; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + /* Obtain the current rank of the process and the number of processes */ + if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0) + HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank") + if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0) + HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size") + + /* Set the actual chunk opt mode property */ + if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_CHUNK_OPT_MODE_NAME, &actual_chunk_opt_mode) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual chunk opt mode property") + + /* Set the actual_io_mode property. + * Multi chunk I/O does not break to independent, so can set right away + */ + if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_IO_MODE_NAME, &actual_io_mode) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual chunk io mode property") + + /* Build a list of selected chunks in the collective IO operation */ + if (H5D__construct_filtered_io_info_list(io_info, type_info, fm, &chunk_list, &chunk_list_num_entries) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "couldn't construct filtered I/O info list") + + /* Set up contiguous I/O info object */ + HDmemcpy(&ctg_io_info, io_info, sizeof(ctg_io_info)); + ctg_io_info.store = &ctg_store; + ctg_io_info.layout_ops = *H5D_LOPS_CONTIG; + + /* Initialize temporary contiguous storage info */ + ctg_store.contig.dset_size = (hsize_t) io_info->dset->shared->layout.u.chunk.size; + ctg_store.contig.dset_addr = 0; + + /* Set dataset storage for I/O info */ + io_info->store = &store; + + if (io_info->op_type == H5D_IO_OP_READ) { /* Filtered collective read */ + for (i = 0; i < chunk_list_num_entries; i++) + if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't process chunk entry") + } /* end if */ + else { /* Filtered collective write */ + H5D_chk_idx_info_t index_info; + H5D_chunk_ud_t udata; + size_t max_num_chunks; + hsize_t mpi_buf_count; + + /* Construct chunked index info */ + index_info.f = io_info->dset->oloc.file; + index_info.dxpl_id = io_info->md_dxpl_id; + index_info.pline = &(io_info->dset->shared->dcpl_cache.pline); + index_info.layout = &(io_info->dset->shared->layout.u.chunk); + index_info.storage = &(io_info->dset->shared->layout.storage.u.chunk); + + /* Set up chunk information for insertion to chunk index */ + udata.common.layout = index_info.layout; + udata.common.storage = index_info.storage; + udata.filter_mask = 0; + + /* Retrieve the maximum number of chunks being written among all processes */ + if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&chunk_list_num_entries, &max_num_chunks, + 1, MPI_UNSIGNED_LONG_LONG, MPI_MAX, io_info->comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code) + + /* If no one is writing anything at all, end the operation */ + if (!(max_num_chunks > 0)) HGOTO_DONE(SUCCEED); + + /* Allocate arrays for storing MPI file and mem types and whether or not the + * types were derived. + */ + if (NULL == (file_type_array = (MPI_Datatype *) H5MM_malloc(max_num_chunks * sizeof(*file_type_array)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate file type array") + + if (NULL == (file_type_is_derived_array = (hbool_t *) H5MM_calloc(max_num_chunks * sizeof(*file_type_is_derived_array)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate file type is derived array") + + if (NULL == (mem_type_array = (MPI_Datatype *) H5MM_malloc(max_num_chunks * sizeof(*mem_type_array)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate mem type array") + + if (NULL == (mem_type_is_derived_array = (hbool_t *) H5MM_calloc(max_num_chunks * sizeof(*mem_type_is_derived_array)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate mem type is derived array") + + /* Iterate over the max number of chunks among all processes, as this process could + * have no chunks left to work on, but it still needs to participate in the collective + * re-allocation and re-insertion of chunks modified by other processes. + */ + for (i = 0; i < max_num_chunks; i++) { + /* Check if this process has a chunk to work on for this iteration */ + hbool_t have_chunk_to_process = i < chunk_list_num_entries; + + if (have_chunk_to_process) + if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't process chunk entry") + + /* Gather the new chunk sizes to all processes for a collective re-allocation + * of the chunks in the file + */ + if (H5D__mpio_array_gatherv(&chunk_list[i], have_chunk_to_process ? 1 : 0, sizeof(*chunk_list), + (void **) &collective_chunk_list, &collective_chunk_list_num_entries, mpi_size, + true, 0, io_info->comm, NULL) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather new chunk sizes") + + /* Participate in the collective re-allocation of all chunks modified + * in this iteration. + */ + for (j = 0; j < collective_chunk_list_num_entries; j++) { + hbool_t insert = FALSE; + + if (H5D__chunk_file_alloc(&index_info, &collective_chunk_list[j].old_chunk, &collective_chunk_list[j].new_chunk, + &insert, chunk_list[j].chunk_info.scaled) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk") + } /* end for */ + + if (NULL == (has_chunk_selected_array = (hbool_t *) H5MM_malloc((size_t) mpi_size * sizeof(*has_chunk_selected_array)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate num chunks selected array") + + if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&have_chunk_to_process, 1, MPI_C_BOOL, has_chunk_selected_array, + 1, MPI_C_BOOL, io_info->comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code) + + /* If this process has a chunk to work on, create a MPI type for the + * memory and file for writing out the chunk + */ + if (have_chunk_to_process) { + size_t offset; + int mpi_type_count; + + for (j = 0, offset = 0; j < (size_t) mpi_rank; j++) + offset += has_chunk_selected_array[j]; + + /* Collect the new chunk info back to the local copy, since only the record in the + * collective array gets updated by the chunk re-allocation */ + HDmemcpy(&chunk_list[i].new_chunk, &collective_chunk_list[offset].new_chunk, sizeof(chunk_list[i].new_chunk)); + + H5_CHECKED_ASSIGN(mpi_type_count, int, chunk_list[i].new_chunk.length, hsize_t); + + /* Create MPI memory type for writing to chunk */ + if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(mpi_type_count, MPI_BYTE, &mem_type_array[i]))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_contiguous failed", mpi_code) + if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&mem_type_array[i]))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) + mem_type_is_derived_array[i] = TRUE; + + /* Create MPI file type for writing to chunk */ + if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(mpi_type_count, MPI_BYTE, &file_type_array[i]))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_contiguous failed", mpi_code) + if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&file_type_array[i]))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) + file_type_is_derived_array[i] = TRUE; + + mpi_buf_count = 1; + + /* Set up the base storage address for this operation */ + ctg_store.contig.dset_addr = chunk_list[i].new_chunk.offset; + + /* Override the write buffer to point to the address of the + * chunk data buffer + */ + ctg_io_info.u.wbuf = chunk_list[i].buf; + } /* end if */ + else { + mem_type_array[i] = file_type_array[i] = MPI_BYTE; + mpi_buf_count = 0; + } /* end else */ + + /* Perform the I/O */ + if (H5D__final_collective_io(&ctg_io_info, type_info, mpi_buf_count, &file_type_array[i], &mem_type_array[i]) < 0) + HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish MPI-IO") + + /* Participate in the collective re-insertion of all chunks modified + * in this iteration into the chunk index + */ + for (j = 0; j < collective_chunk_list_num_entries; j++) { + udata.chunk_block = collective_chunk_list[j].new_chunk; + udata.common.scaled = collective_chunk_list[j].chunk_info.scaled; + udata.chunk_idx = collective_chunk_list[j].chunk_info.index; + + if ((index_info.storage->ops->insert)(&index_info, &udata, io_info->dset) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINSERT, FAIL, "unable to insert chunk address into index") + } /* end for */ + + if (collective_chunk_list){ + H5MM_free(collective_chunk_list); + collective_chunk_list = NULL; + } + if (has_chunk_selected_array){ + H5MM_free(has_chunk_selected_array); + has_chunk_selected_array = NULL; + } + } /* end for */ + + /* Free the MPI file and memory types, if they were derived */ + for (i = 0; i < max_num_chunks; i++) { + if (file_type_is_derived_array[i]) + if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type_array[i]))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + + if (mem_type_is_derived_array[i]) + if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type_array[i]))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + } /* end for */ + } /* end else */ + +done: + if (chunk_list) { + for (i = 0; i < chunk_list_num_entries; i++) + if (chunk_list[i].buf) + H5MM_free(chunk_list[i].buf); + + H5MM_free(chunk_list); + } + + if (collective_chunk_list) + H5MM_free(collective_chunk_list); + if (file_type_array) + H5MM_free(file_type_array); + if (mem_type_array) + H5MM_free(mem_type_array); + if (file_type_is_derived_array) + H5MM_free(file_type_is_derived_array); + if (mem_type_is_derived_array) + H5MM_free(mem_type_is_derived_array); + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__multi_chunk_filtered_collective_io() */ + + +/*------------------------------------------------------------------------- * Function: H5D__inter_collective_io * * Purpose: Routine for the shared part of collective IO between multiple chunk @@ -1474,7 +2141,7 @@ if(H5DEBUG(D)) static int H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2) { - haddr_t addr1, addr2; + haddr_t addr1 = HADDR_UNDEF, addr2 = HADDR_UNDEF; FUNC_ENTER_STATIC_NOERR @@ -1486,6 +2153,36 @@ H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2) /*------------------------------------------------------------------------- + * Function: H5D__cmp_filtered_collective_io_info_entry + * + * Purpose: Routine to compare filtered collective chunk io info + * entries + * + * Description: Callback for qsort() to compare filtered collective chunk + * io info entries + * + * Return: -1, 0, 1 + * + * Programmer: Jordan Henderson + * Wednesday, Nov. 30th, 2016 + * + *------------------------------------------------------------------------- + */ +static int +H5D__cmp_filtered_collective_io_info_entry(const void *filtered_collective_io_info_entry1, const void *filtered_collective_io_info_entry2) +{ + haddr_t addr1 = HADDR_UNDEF, addr2 = HADDR_UNDEF; + + FUNC_ENTER_STATIC_NOERR + + addr1 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry1)->new_chunk.offset; + addr2 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry2)->new_chunk.offset; + + FUNC_LEAVE_NOAPI(H5F_addr_cmp(addr1, addr2)) +} /* end H5D__cmp_filtered_collective_io_info_entry() */ + + +/*------------------------------------------------------------------------- * Function: H5D__sort_chunk * * Purpose: Routine to sort chunks in increasing order of chunk address @@ -1559,7 +2256,7 @@ if(H5DEBUG(D)) HDfprintf(H5DEBUG(D), "Coming inside H5D_OBTAIN_ALL_CHUNK_ADDR_COL\n"); #endif /* Allocate array for chunk addresses */ - if(NULL == (total_chunk_addr_array = H5MM_malloc(sizeof(haddr_t) * (size_t)fm->layout->u.chunk.nchunks))) + if(NULL == (total_chunk_addr_array = (haddr_t *)H5MM_malloc(sizeof(haddr_t) * (size_t)fm->layout->u.chunk.nchunks))) HGOTO_ERROR(H5E_RESOURCE, H5E_NOSPACE, FAIL, "unable to allocate memory chunk address array") /* Retrieve all the chunk addresses with process 0 */ @@ -1583,7 +2280,7 @@ if(H5DEBUG(D)) /* Iterate over all chunks for this process */ while(chunk_node) { - if(NULL == (chunk_info = H5SL_item(chunk_node))) + if(NULL == (chunk_info = (H5D_chunk_info_t *)H5SL_item(chunk_node))) HGOTO_ERROR(H5E_STORAGE, H5E_CANTGET, FAIL,"couldn't get chunk info from skipped list") if(many_chunk_opt == H5D_OBTAIN_ONE_CHUNK_ADDR_IND) { @@ -1668,7 +2365,7 @@ static herr_t H5D__obtain_mpio_mode(H5D_io_info_t* io_info, H5D_chunk_map_t *fm, H5P_genplist_t *dx_plist, uint8_t assign_io_mode[], haddr_t chunk_addr[]) { - int total_chunks; + size_t total_chunks; unsigned percent_nproc_per_chunk, threshold_nproc_per_chunk; uint8_t* io_mode_info = NULL; uint8_t* recv_io_mode_info = NULL; @@ -1678,7 +2375,8 @@ H5D__obtain_mpio_mode(H5D_io_info_t* io_info, H5D_chunk_map_t *fm, H5D_chunk_info_t* chunk_info; int mpi_size, mpi_rank; MPI_Comm comm; - int ic, root; + int root; + size_t ic; int mpi_code; #ifdef H5_HAVE_INSTRUMENTED_LIBRARY int new_value; @@ -1699,7 +2397,7 @@ H5D__obtain_mpio_mode(H5D_io_info_t* io_info, H5D_chunk_map_t *fm, HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size") /* Setup parameters */ - H5_CHECKED_ASSIGN(total_chunks, int, fm->layout->u.chunk.nchunks, hsize_t); + H5_CHECKED_ASSIGN(total_chunks, size_t, fm->layout->u.chunk.nchunks, hsize_t); if(H5P_get(dx_plist, H5D_XFER_MPIO_CHUNK_OPT_RATIO_NAME, &percent_nproc_per_chunk) < 0) HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't get percent nproc per chunk") /* if ratio is 0, perform collective io */ @@ -1711,39 +2409,42 @@ H5D__obtain_mpio_mode(H5D_io_info_t* io_info, H5D_chunk_map_t *fm, HGOTO_DONE(SUCCEED) } /* end if */ - threshold_nproc_per_chunk = mpi_size * percent_nproc_per_chunk/100; + + threshold_nproc_per_chunk = (unsigned)mpi_size * percent_nproc_per_chunk/100; /* Allocate memory */ if(NULL == (io_mode_info = (uint8_t *)H5MM_calloc(total_chunks))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate I/O mode info buffer") - if(NULL == (mergebuf = H5MM_malloc((sizeof(haddr_t) + 1) * total_chunks))) + if(NULL == (mergebuf = (uint8_t *)H5MM_malloc((sizeof(haddr_t) + 1) * total_chunks))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate mergebuf buffer") tempbuf = mergebuf + total_chunks; if(mpi_rank == root) - if(NULL == (recv_io_mode_info = (uint8_t *)H5MM_malloc(total_chunks * mpi_size))) + if(NULL == (recv_io_mode_info = (uint8_t *)H5MM_malloc(total_chunks * (size_t)mpi_size))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate recv I/O mode info buffer") /* Obtain the regularity and selection information for all chunks in this process. */ chunk_node = H5SL_first(fm->sel_chunks); while(chunk_node) { - chunk_info = H5SL_item(chunk_node); + chunk_info = (H5D_chunk_info_t *)H5SL_item(chunk_node); - io_mode_info[chunk_info->index] = H5D_CHUNK_SELECT_REG; /* this chunk is selected and is "regular" */ + io_mode_info[chunk_info->index] = H5D_CHUNK_SELECT_REG; /* this chunk is selected and is "regular" */ chunk_node = H5SL_next(chunk_node); } /* end while */ /* Gather all the information */ - if(MPI_SUCCESS != (mpi_code = MPI_Gather(io_mode_info, total_chunks, MPI_BYTE, recv_io_mode_info, total_chunks, MPI_BYTE, root, comm))) + H5_CHECK_OVERFLOW(total_chunks, size_t, int) + if(MPI_SUCCESS != (mpi_code = MPI_Gather(io_mode_info, (int)total_chunks, MPI_BYTE, + recv_io_mode_info, (int)total_chunks, MPI_BYTE, root, comm))) HMPI_GOTO_ERROR(FAIL, "MPI_Gather failed", mpi_code) /* Calculate the mode for IO(collective, independent or none) at root process */ if(mpi_rank == root) { - int nproc; - int* nproc_per_chunk; + size_t nproc; + unsigned* nproc_per_chunk; /* pre-computing: calculate number of processes and regularity of the selection occupied in each chunk */ - if(NULL == (nproc_per_chunk = (int*)H5MM_calloc(total_chunks * sizeof(int)))) + if(NULL == (nproc_per_chunk = (unsigned*)H5MM_calloc(total_chunks * sizeof(*nproc_per_chunk)))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate nproc_per_chunk buffer") /* calculating the chunk address */ @@ -1753,7 +2454,7 @@ H5D__obtain_mpio_mode(H5D_io_info_t* io_info, H5D_chunk_map_t *fm, } /* end if */ /* checking for number of process per chunk and regularity of the selection*/ - for(nproc = 0; nproc < mpi_size; nproc++) { + for(nproc = 0; nproc < (size_t)mpi_size; nproc++) { uint8_t *tmp_recv_io_mode_info = recv_io_mode_info + (nproc * total_chunks); /* Calculate the number of process per chunk and adding irregular selection option */ @@ -1837,5 +2538,586 @@ done: FUNC_LEAVE_NOAPI(ret_value) } /* end H5D__obtain_mpio_mode() */ + + +/*------------------------------------------------------------------------- + * Function: H5D__construct_filtered_io_info_list + * + * Purpose: Constructs a list of entries which contain the necessary + * information for inter-process communication when performing + * collective io on filtered chunks. This list is used by + * every process in operations that must be collectively done + * on every chunk, such as chunk re-allocation, insertion of + * chunks into the chunk index, etc. + * + * Return: Non-negative on success/Negative on failure + * + * Programmer: Jordan Henderson + * Tuesday, January 10th, 2017 + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_type_info_t *type_info, + const H5D_chunk_map_t *fm, H5D_filtered_collective_io_info_t **chunk_list, size_t *num_entries) +{ + H5D_filtered_collective_io_info_t *local_info_array = NULL; /* The list of initially select chunks for this process */ + H5D_filtered_collective_io_info_t *shared_chunks_info_array = NULL; /* The list of all chunks selected in the operation by all processes */ + H5S_sel_iter_t *mem_iter = NULL; /* Memory iterator for H5D__gather_mem */ + unsigned char *mod_data = NULL; /* Chunk modification data sent by a process to a chunk's owner */ + H5SL_node_t *chunk_node; + MPI_Request *send_requests = NULL; /* Array of MPI_Isend chunk modification data send requests */ + MPI_Status *send_statuses = NULL; /* Array of MPI_Isend chunk modification send statuses */ + hbool_t mem_iter_init = FALSE; + size_t num_send_requests = 0; + size_t num_chunks_selected; + size_t shared_chunks_info_array_num_entries; + size_t i; + int mpi_rank, mpi_size, mpi_code; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + HDassert(io_info); + HDassert(type_info); + HDassert(fm); + HDassert(chunk_list); + HDassert(num_entries); + HDassert(TRUE == H5P_isa_class(io_info->raw_dxpl_id, H5P_DATASET_XFER)); + + if ((mpi_rank = H5F_mpi_get_rank(io_info->dset->oloc.file)) < 0) + HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi rank") + if ((mpi_size = H5F_mpi_get_size(io_info->dset->oloc.file)) < 0) + HGOTO_ERROR(H5E_IO, H5E_MPI, FAIL, "unable to obtain mpi size") + + /* Each process builds a local list of the chunks they have selected */ + if ((num_chunks_selected = H5SL_count(fm->sel_chunks))) { + H5D_chunk_info_t *chunk_info; + H5D_chunk_ud_t udata; + hssize_t select_npoints; + hssize_t chunk_npoints; + + if (NULL == (local_info_array = (H5D_filtered_collective_io_info_t *) H5MM_malloc(num_chunks_selected * sizeof(*local_info_array)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate local io info array buffer") + + chunk_node = H5SL_first(fm->sel_chunks); + for (i = 0; chunk_node; i++) { + chunk_info = (H5D_chunk_info_t *) H5SL_item(chunk_node); + + /* Obtain this chunk's address */ + if (H5D__chunk_lookup(io_info->dset, io_info->md_dxpl_id, chunk_info->scaled, &udata) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "error looking up chunk address") + + local_info_array[i].chunk_info = *chunk_info; + local_info_array[i].old_chunk = local_info_array[i].new_chunk = udata.chunk_block; + local_info_array[i].num_writers = 0; + local_info_array[i].owner = mpi_rank; + local_info_array[i].buf = NULL; + + if ((select_npoints = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + local_info_array[i].io_size = (size_t) select_npoints * type_info->src_type_size; + + if ((chunk_npoints = H5S_GET_EXTENT_NPOINTS(chunk_info->fspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + local_info_array[i].full_overwrite = + (local_info_array[i].io_size >= (hsize_t) chunk_npoints * type_info->dst_type_size) ? TRUE : FALSE; + + chunk_node = H5SL_next(chunk_node); + } /* end for */ + } /* end if */ + + /* Redistribute shared chunks to new owners as necessary */ + if (io_info->op_type == H5D_IO_OP_WRITE) { + if (num_chunks_selected) + if (NULL == (send_requests = (MPI_Request *) H5MM_malloc(num_chunks_selected * sizeof(*send_requests)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send requests buffer") + + if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(H5S_sel_iter_t)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator") + + if (H5D__mpio_array_gatherv(local_info_array, num_chunks_selected, sizeof(*local_info_array), + (void **) &shared_chunks_info_array, &shared_chunks_info_array_num_entries, mpi_size, + true, 0, io_info->comm, H5D__cmp_filtered_collective_io_info_entry) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather array") + + for (i = 0, num_chunks_selected = 0, num_send_requests = 0; i < shared_chunks_info_array_num_entries;) { + H5D_filtered_collective_io_info_t chunk_entry; + haddr_t chunk_addr = shared_chunks_info_array[i].old_chunk.offset; + size_t total_io_size = 0; + size_t num_writers = 0; + size_t max_bytes = 0; + int new_owner = 0; + + /* Set the chunk entry's file dataspace to NULL as a sentinel value. + * Any process which is contributing modifications to this chunk will + * obtain a valid file space while processing duplicates below. Any + * process which still has a NULL file space after processing all of + * the duplicate entries for a shared chunk are assumed to not be + * contributing to the chunk and so will not try to access an invalid + * dataspace when processes are sending chunk data to new owners */ + chunk_entry.chunk_info.fspace = NULL; + + /* Process duplicate entries caused by another process writing + * to the same chunk + */ + do { + /* Store the correct chunk entry information in case this process + * becomes the new chunk's owner. The chunk entry that this process + * contributed will be the only one with a valid dataspace selection + * on that particular process + */ + if (mpi_rank == shared_chunks_info_array[i].owner) + chunk_entry = shared_chunks_info_array[i]; + + /* Add this chunk entry's IO size to the running total */ + total_io_size += shared_chunks_info_array[i].io_size; + + /* New owner of the chunk is determined by the process + * which is writing the most data to the chunk + */ + if (shared_chunks_info_array[i].io_size > max_bytes) { + max_bytes = shared_chunks_info_array[i].io_size; + new_owner = shared_chunks_info_array[i].owner; + } + + num_writers++; + } while (++i < shared_chunks_info_array_num_entries && shared_chunks_info_array[i].old_chunk.offset == chunk_addr); + + if (mpi_rank == new_owner) { + hssize_t chunk_npoints; + + /* Make sure the new owner will know how many other processes will + * be sending chunk modification data to it + */ + chunk_entry.num_writers = num_writers; + + /* Set the full chunk overwrite status. It is assumed that this is a full + * overwrite of the chunk if the total IO size is equal to the size of the + * chunk. If the IO size is greater than the size of the chunk, there is an + * overlapping write between processes, meaning there is no guarantee on + * the integrity of data in the write operation. However, this still + * represents a full overwrite of the chunk. + */ + if ((chunk_npoints = H5S_GET_EXTENT_NPOINTS(chunk_entry.chunk_info.fspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + + chunk_entry.full_overwrite = (total_io_size >= (hsize_t) chunk_npoints * type_info->dst_type_size) ? TRUE : FALSE; + + /* New owner takes possession of the chunk */ + shared_chunks_info_array[num_chunks_selected++] = chunk_entry; + } /* end if */ + else if (chunk_entry.chunk_info.fspace) { + unsigned char *mod_data_p = NULL; /* Use second pointer since H5S_encode advances pointer */ + hssize_t iter_nelmts; /* Number of points to iterate over for the send operation */ + size_t mod_data_size; + + /* Not the new owner of this chunk, encode the file space selection and + * modification data into a buffer and send it to the new chunk owner */ + + /* Determine size of serialized chunk memory dataspace plus the size + * of the data being written + */ + if (H5S_encode(chunk_entry.chunk_info.fspace, &mod_data, &mod_data_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to get encoded dataspace size") + + if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_entry.chunk_info.mspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + + mod_data_size += (size_t) iter_nelmts * type_info->src_type_size; + + if (NULL == (mod_data = (unsigned char *) H5MM_malloc(mod_data_size))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification send buffer") + + /* Serialize the chunk's file dataspace into the buffer */ + mod_data_p = mod_data; + if (H5S_encode(chunk_entry.chunk_info.fspace, &mod_data_p, &mod_data_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to encode dataspace") + + /* Initialize iterator for memory selection */ + if (H5S_select_iter_init(mem_iter, chunk_entry.chunk_info.mspace, type_info->src_type_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") + mem_iter_init = TRUE; + + /* Collect the modification data into the buffer */ + if (!H5D__gather_mem(io_info->u.wbuf, chunk_entry.chunk_info.mspace, mem_iter, + (size_t) iter_nelmts, io_info->dxpl_cache, mod_data_p)) + HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't gather from write buffer") + + /* Send modification data to new owner */ + H5_CHECK_OVERFLOW(mod_data_size, size_t, int) + H5_CHECK_OVERFLOW(chunk_entry.chunk_info.index, hsize_t, int) + if (MPI_SUCCESS != (mpi_code = MPI_Isend(mod_data, (int) mod_data_size, MPI_BYTE, new_owner, + (int) chunk_entry.chunk_info.index, io_info->comm, &send_requests[num_send_requests++]))) + HMPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code) + + if (mod_data) { + H5MM_free(mod_data); + mod_data = NULL; + } + if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") + mem_iter_init = FALSE; + } /* end else */ + } /* end for */ + + /* Rank 0 redistributes any shared chunks to new owners as necessary */ + if (mpi_rank == 0) { + + } + + /* Release old list */ + if (local_info_array) + H5MM_free(local_info_array); + + /* Local info list becomes modified (redistributed) chunk list */ + local_info_array = shared_chunks_info_array; + + /* Now that the chunks have been redistributed, each process must send its modification data + * to the new owners of any of the chunks it previously possessed + */ + for (i = 0; i < num_chunks_selected; i++) { + if (mpi_rank != local_info_array[i].owner) { + + } + } /* end for */ + + /* Wait for all async send requests to complete before returning */ + if (num_send_requests) { + if (NULL == (send_statuses = (MPI_Status *) H5MM_malloc(num_send_requests * sizeof(*send_statuses)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send statuses buffer") + + H5_CHECK_OVERFLOW(num_send_requests, size_t, int); + if (MPI_SUCCESS != (mpi_code = MPI_Waitall((int) num_send_requests, send_requests, send_statuses))) + HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code) + } + } /* end if */ + + *chunk_list = local_info_array; + *num_entries = num_chunks_selected; + +done: + if (send_requests) + H5MM_free(send_requests); + if (send_statuses) + H5MM_free(send_statuses); + if (mod_data) + H5MM_free(mod_data); + if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) + HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") + if (mem_iter) + H5MM_free(mem_iter); + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__construct_filtered_io_info_list() */ + + +/*------------------------------------------------------------------------- + * Function: H5D__mpio_filtered_collective_write_type + * + * Purpose: Constructs a MPI derived datatype for both the memory and + * the file for a collective write of filtered chunks. The + * datatype contains the offsets in the file and the locations + * of the filtered chunk data buffers. + * + * Return: Non-negative on success/Negative on failure + * + * Programmer: Jordan Henderson + * Tuesday, November 22, 2016 + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chunk_list, + size_t num_entries, MPI_Datatype *new_mem_type, hbool_t *mem_type_derived, + MPI_Datatype *new_file_type, hbool_t *file_type_derived) +{ + MPI_Aint *write_buf_array = NULL; /* Relative displacements of filtered chunk data buffers */ + MPI_Aint *file_offset_array = NULL; /* Chunk offsets in the file */ + int *length_array = NULL; /* Filtered Chunk lengths */ + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + HDassert(chunk_list); + HDassert(new_mem_type); + HDassert(mem_type_derived); + HDassert(new_file_type); + HDassert(file_type_derived); + + if (num_entries > 0) { + size_t i; + int mpi_code; + void *base_buf; + + H5_CHECK_OVERFLOW(num_entries, size_t, int); + + /* Allocate arrays */ + if (NULL == (length_array = (int *) H5MM_malloc((size_t) num_entries * sizeof(int)))) + HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "memory allocation failed for filtered collective write length array") + if (NULL == (write_buf_array = (MPI_Aint *) H5MM_malloc((size_t) num_entries * sizeof(MPI_Aint)))) + HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "memory allocation failed for filtered collective write buf length array") + if (NULL == (file_offset_array = (MPI_Aint *) H5MM_malloc((size_t) num_entries * sizeof(MPI_Aint)))) + HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "memory allocation failed for collective write offset array") + + /* Ensure the list is sorted in ascending order of offset in the file */ + HDqsort(chunk_list, num_entries, sizeof(*chunk_list), H5D__cmp_filtered_collective_io_info_entry); + + base_buf = chunk_list[0].buf; + for (i = 0; i < num_entries; i++) { + /* Set up the offset in the file, the length of the chunk data, and the relative + * displacement of the chunk data write buffer + */ + file_offset_array[i] = (MPI_Aint) chunk_list[i].new_chunk.offset; + length_array[i] = (int) chunk_list[i].new_chunk.length; + write_buf_array[i] = (MPI_Aint) chunk_list[i].buf - (MPI_Aint) base_buf; + } /* end for */ + + /* Create memory MPI type */ + if (MPI_SUCCESS != (mpi_code = MPI_Type_create_hindexed((int) num_entries, length_array, write_buf_array, MPI_BYTE, new_mem_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code) + *mem_type_derived = TRUE; + if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_mem_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) + + /* Create file MPI type */ + if (MPI_SUCCESS != (mpi_code = MPI_Type_create_hindexed((int) num_entries, length_array, file_offset_array, MPI_BYTE, new_file_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code) + *file_type_derived = TRUE; + if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_file_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) + } /* end if */ + +done: + if (write_buf_array) + H5MM_free(write_buf_array); + if (file_offset_array) + H5MM_free(file_offset_array); + if (length_array) + H5MM_free(length_array); + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__mpio_filtered_collective_write_type() */ + + +/*------------------------------------------------------------------------- + * Function: H5D__filtered_collective_chunk_entry_io + * + * Purpose: Given an entry for a filtered chunk, performs the necessary + * steps for updating the chunk data during a collective + * write, or for reading the chunk from file during a + * collective read. + * + * Return: Non-negative on success/Negative on failure + * + * Programmer: Jordan Henderson + * Wednesday, January 18, 2017 + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk_entry, + const H5D_io_info_t *io_info, const H5D_type_info_t *type_info) +{ + H5S_sel_iter_t *mem_iter = NULL; /* Memory iterator for H5D__scatter_mem/H5D__gather_mem */ + unsigned char *mod_data = NULL; /* Chunk modification data sent by a process to a chunk's owner */ + unsigned filter_mask = 0; + hssize_t iter_nelmts; /* Number of points to iterate over for the chunk IO operation */ + hbool_t mem_iter_init = FALSE; + size_t buf_size; + size_t mod_data_alloced_bytes = 0; + H5S_t *dataspace = NULL; /* Other process' dataspace for the chunk */ + void *tmp_gath_buf = NULL; /* Temporary gather buffer for owner of the chunk to gather into from + application write buffer before scattering out to the chunk data buffer */ + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + HDassert(chunk_entry); + HDassert(io_info); + HDassert(type_info); + + /* If this is a read operation or a write operation where the chunk is not being fully + * overwritten, enough memory must be allocated to read the filtered chunk from the file. + * If this is a write operation where the chunk is being fully overwritten, enough memory + * must be allocated for the size of the unfiltered chunk. + */ + if (!chunk_entry->full_overwrite || io_info->op_type == H5D_IO_OP_READ) { + buf_size = chunk_entry->old_chunk.length; + } + else { + hssize_t extent_npoints; + + if ((extent_npoints = H5S_GET_EXTENT_NPOINTS(chunk_entry->chunk_info.fspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + + buf_size = (hsize_t) extent_npoints * type_info->src_type_size; + } + + chunk_entry->new_chunk.length = buf_size; + + if (NULL == (chunk_entry->buf = H5MM_malloc(buf_size))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer") + + /* If this is not a full chunk overwrite or this is a read operation, the chunk must be + * read from the file and unfiltered. + */ + if (!chunk_entry->full_overwrite || io_info->op_type == H5D_IO_OP_READ) { + /* XXX: Test with MPI types and collective read to improve performance */ + if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_entry->old_chunk.offset, + buf_size, H5AC_rawdata_dxpl_id, chunk_entry->buf) < 0) + HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "unable to read raw data chunk") + + if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, &filter_mask, + io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb, + (size_t *) &chunk_entry->new_chunk.length, &buf_size, &chunk_entry->buf) < 0) + HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, FAIL, "couldn't unfilter chunk for modifying") + } /* end if */ + + /* Initialize iterator for memory selection */ + if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(*mem_iter)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator") + + if (H5S_select_iter_init(mem_iter, chunk_entry->chunk_info.mspace, type_info->src_type_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") + mem_iter_init = TRUE; + + if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_entry->chunk_info.mspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + + /* If this is a read operation, scatter the read chunk data to the user's buffer. + * + * If this is a write operation, update the chunk data buffer with the modifications + * from the current process, then apply any modifications from other processes. Finally, + * filter the newly-updated chunk. + */ + switch (io_info->op_type) { + case H5D_IO_OP_READ: + if (H5D__scatter_mem(chunk_entry->buf, chunk_entry->chunk_info.mspace, mem_iter, + (size_t) iter_nelmts, io_info->dxpl_cache, io_info->u.rbuf) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't scatter to read buffer") + break; + + case H5D_IO_OP_WRITE: + if (NULL == (tmp_gath_buf = H5MM_malloc((hsize_t) iter_nelmts * type_info->src_type_size))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate temporary gather buffer") + + /* Gather modification data from the application write buffer into a temporary buffer */ + if (!H5D__gather_mem(io_info->u.wbuf, chunk_entry->chunk_info.mspace, mem_iter, + (size_t) iter_nelmts, io_info->dxpl_cache, tmp_gath_buf)) + HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't gather from write buffer") + + if (H5S_SELECT_ITER_RELEASE(mem_iter) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") + mem_iter_init = FALSE; + + /* Initialize iterator for file selection */ + if (H5S_select_iter_init(mem_iter, chunk_entry->chunk_info.fspace, type_info->dst_type_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize file selection information") + mem_iter_init = TRUE; + + if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_entry->chunk_info.fspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + + /* Scatter the owner's modification data into the chunk data buffer according to + * the file space. + */ + if (H5D__scatter_mem(tmp_gath_buf, chunk_entry->chunk_info.fspace, mem_iter, + (size_t) iter_nelmts, io_info->dxpl_cache, chunk_entry->buf) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't scatter to chunk data buffer") + + if (H5S_SELECT_ITER_RELEASE(mem_iter) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") + mem_iter_init = FALSE; + + /* Update the chunk data with any modifications from other processes */ + while (chunk_entry->num_writers > 1) { + const unsigned char *mod_data_p; /* Use second pointer since H5S_decode advances pointer */ + MPI_Status status; + int count; + int mpi_code; + + /* Probe for the incoming message from another process */ + H5_CHECK_OVERFLOW(chunk_entry->chunk_info.index, hsize_t, int) + if (MPI_SUCCESS != (mpi_code = MPI_Probe(MPI_ANY_SOURCE, (int) chunk_entry->chunk_info.index, + io_info->comm, &status))) + HMPI_GOTO_ERROR(FAIL, "MPI_Probe failed", mpi_code) + + /* Retrieve the message size */ + if (MPI_SUCCESS != (mpi_code = MPI_Get_count(&status, MPI_BYTE, &count))) + HMPI_GOTO_ERROR(FAIL, "MPI_Get_count failed", mpi_code) + + if ((size_t) count > mod_data_alloced_bytes) { + if (NULL == (mod_data = (unsigned char *) H5MM_realloc(mod_data, (size_t) count))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate modification data receive buffer") + + mod_data_alloced_bytes = (size_t) count; + } + + if (MPI_SUCCESS != (mpi_code = MPI_Recv(mod_data, count, MPI_BYTE, MPI_ANY_SOURCE, + (int) chunk_entry->chunk_info.index, io_info->comm, &status))) + HMPI_GOTO_ERROR(FAIL, "MPI_Recv failed", mpi_code) + + /* Decode the process' chunk file dataspace */ + mod_data_p = mod_data; + if (NULL == (dataspace = H5S_decode(&mod_data_p))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTDECODE, FAIL, "unable to decode dataspace") + + if (H5S_select_iter_init(mem_iter, dataspace, type_info->dst_type_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") + mem_iter_init = TRUE; + + if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(dataspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + + /* Update the chunk data with the received modification data */ + if (H5D__scatter_mem(mod_data_p, dataspace, mem_iter, (size_t) iter_nelmts, + io_info->dxpl_cache, chunk_entry->buf) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't scatter to write buffer") + + chunk_entry->num_writers--; + + if (H5S_SELECT_ITER_RELEASE(mem_iter) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") + mem_iter_init = FALSE; + if (dataspace) { + if (H5S_close(dataspace) < 0) + HGOTO_ERROR(H5E_DATASPACE, H5E_CANTFREE, FAIL, "can't close dataspace") + dataspace = NULL; + } + } /* end while */ + + /* Filter the chunk */ + if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, &filter_mask, + io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb, + (size_t *) &chunk_entry->new_chunk.length, &buf_size, &chunk_entry->buf) < 0) + HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "output pipeline failed") + +#if H5_SIZEOF_SIZE_T > 4 + /* Check for the chunk expanding too much to encode in a 32-bit value */ + if (chunk_entry->new_chunk.length > ((size_t) 0xffffffff)) + HGOTO_ERROR(H5E_DATASET, H5E_BADRANGE, FAIL, "chunk too large for 32-bit length") +#endif + + break; + default: + HGOTO_ERROR(H5E_DATASET, H5E_BADVALUE, FAIL, "invalid I/O operation") + } /* end switch */ + +done: + if (mod_data) + H5MM_free(mod_data); + if (tmp_gath_buf) + H5MM_free(tmp_gath_buf); + if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) + HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") + if (mem_iter) + H5MM_free(mem_iter); + if (dataspace) + if (H5S_close(dataspace) < 0) + HDONE_ERROR(H5E_DATASPACE, H5E_CANTFREE, FAIL, "can't close dataspace") + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__filtered_collective_chunk_entry_io() */ #endif /* H5_HAVE_PARALLEL */ |