diff options
author | Jordan Henderson <jhenderson@hdfgroup.org> | 2017-01-19 17:54:07 (GMT) |
---|---|---|
committer | Jordan Henderson <jhenderson@hdfgroup.org> | 2017-01-19 17:54:07 (GMT) |
commit | 740e85a82dc01ce6ea62d86bef557e31012a0695 (patch) | |
tree | fa686af21e134a04bc8eec63e4879c29b534f689 /src/H5Dmpio.c | |
parent | 2c8bddb4ab08427baafd2d205e6157a163da1fd5 (diff) | |
download | hdf5-740e85a82dc01ce6ea62d86bef557e31012a0695.zip hdf5-740e85a82dc01ce6ea62d86bef557e31012a0695.tar.gz hdf5-740e85a82dc01ce6ea62d86bef557e31012a0695.tar.bz2 |
Code refactoring
Modify single chunk entry function to handle both read and write cases
Store array of MPI derived types in Multi-chunk IO so that all freeing
can be done at end instead of during processing
Add read support for Multi-chunk IO only currently
Diffstat (limited to 'src/H5Dmpio.c')
-rw-r--r-- | src/H5Dmpio.c | 354 |
1 files changed, 197 insertions, 157 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c index b223203..6d97cb1 100644 --- a/src/H5Dmpio.c +++ b/src/H5Dmpio.c @@ -158,9 +158,8 @@ static herr_t H5D__mpio_filtered_collective_write_type( H5D_filtered_collective_io_info_t *chunk_list, size_t num_entries, MPI_Datatype *new_mem_type, hbool_t *mem_type_derived, MPI_Datatype *new_file_type, hbool_t *file_type_derived); -static herr_t H5D__update_filtered_collective_chunk_entry( - H5D_filtered_collective_io_info_t *chunk_entry, const H5D_io_info_t *io_info, - const H5D_type_info_t *type_info); +static herr_t H5D__filtered_collective_chunk_io(H5D_filtered_collective_io_info_t *chunk_entry, + const H5D_io_info_t *io_info, const H5D_type_info_t *type_info); static int H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2); static int H5D__cmp_filtered_collective_io_entry(const void *filtered_collective_io_entry1, const void *filtered_collective_io_entry2); @@ -390,10 +389,10 @@ H5D__mpio_array_gather(const H5D_io_info_t *io_info, void *local_array, HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code) /* Multiply each receive count by the size of the array entry, - * since the data is sent in a count of bytes */ - /* XXX: Check to make sure array_entry_size doesn't overflow an int */ + * since the data is sent in a count of bytes + */ for (i = 0; i < (size_t) mpi_size; i++) - receive_counts_array[i] *= array_entry_size; + H5_CHECKED_ASSIGN(receive_counts_array[i], int, (size_t) receive_counts_array[i] * array_entry_size, size_t); /* Set receive buffer offsets for MPI_Allgatherv */ displacements_array[0] = 0; @@ -572,9 +571,6 @@ H5D__mpio_get_sum_chunk(const H5D_io_info_t *io_info, const H5D_chunk_map_t *fm, H5_CHECKED_ASSIGN(num_chunkf, int, ori_num_chunkf, size_t); /* Determine the summation of number of chunks for all processes */ - /* XXX: In the case that more than one process is writing to the - * same chunk, the summation of H5SL_count calls is incorrect. - JTH - */ if(MPI_SUCCESS != (mpi_code = MPI_Allreduce(&num_chunkf, sum_chunkf, 1, MPI_INT, MPI_SUM, io_info->comm))) HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code) @@ -1305,18 +1301,18 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in H5D_chunk_map_t *fm, H5P_genplist_t *dx_plist) { H5D_filtered_collective_io_info_t *chunk_list = NULL; /* The list of chunks being read/written */ - H5D_filtered_collective_io_info_t *collective_chunk_list = NULL; - H5D_mpio_actual_chunk_opt_mode_t actual_chunk_opt_mode = H5D_MPIO_LINK_CHUNK; - H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_CHUNK_COLLECTIVE; - H5D_storage_t ctg_store; + H5D_filtered_collective_io_info_t *collective_chunk_list = NULL; /* The list of chunks used during collective operations */ + H5D_mpio_actual_chunk_opt_mode_t actual_chunk_opt_mode = H5D_MPIO_LINK_CHUNK; /* The actual chunk IO optimization mode */ + H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_CHUNK_COLLECTIVE; /* The chunk IO mode used (Independent vs Collective) */ + H5D_storage_t ctg_store; /* Chunk storage information as contiguous dataset */ MPI_Datatype mem_type = MPI_BYTE; MPI_Datatype file_type = MPI_BYTE; hbool_t mem_type_is_derived = FALSE; hbool_t file_type_is_derived = FALSE; size_t chunk_list_num_entries; size_t collective_chunk_list_num_entries; - size_t *num_chunks_selected_array = NULL; - size_t i; + size_t *num_chunks_selected_array = NULL; /* Array of number of chunks selected on each process */ + size_t i; /* Local index variable */ int mpi_rank, mpi_size, mpi_code; herr_t ret_value = SUCCEED; @@ -1333,7 +1329,8 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual chunk opt mode property") /* Set the actual-io-mode property. - * Link chunk I/O does not break to independent, so can set right away */ + * Link chunk filtered I/O does not break to independent, so can set right away + */ if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_IO_MODE_NAME, &actual_io_mode) < 0) HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual io mode property") @@ -1377,18 +1374,20 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in /* Iterate through all the chunks in the collective write operation, * updating each chunk with the data modifications from other processes, - * then re-filtering the chunk. */ + * then re-filtering the chunk. + */ /* XXX: Not sure about minor error code */ for (i = 0; i < chunk_list_num_entries; i++) - if (H5D__update_filtered_collective_chunk_entry(&chunk_list[i], io_info, type_info) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't update filtered chunk entry") + if (H5D__filtered_collective_chunk_io(&chunk_list[i], io_info, type_info) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't process chunk entry") #ifdef PARALLEL_COMPRESS_DEBUG HDfprintf(debug_file, "---------------------------------------------------\n\n"); #endif /* Gather the new chunk sizes to all processes for a collective reallocation - * of the chunks in the file */ + * of the chunks in the file. + */ /* XXX: change minor error code */ if (H5D__mpio_array_gather(io_info, chunk_list, chunk_list_num_entries, sizeof(*chunk_list), (void **) &collective_chunk_list, &collective_chunk_list_num_entries, NULL) < 0) @@ -1422,7 +1421,8 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in /* If this process has any chunks selected, create a MPI type for collectively * writing out the chunks to file. Otherwise, the process contributes to the - * collective write with a none type. */ + * collective write with a none type. + */ if (chunk_list_num_entries) { size_t offset; @@ -1436,13 +1436,10 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in * This works correctly because the array gather function guarantees that the chunk * data in the total array is ordered in blocks by rank. */ - /* XXX: No need to use bytes here, should be able to simply find offset in - * terms of H5D_filtered_collective_io_info_t's */ - offset = 0; - for (i = 0; i < (size_t) mpi_rank; i++) - offset += num_chunks_selected_array[i] * sizeof(H5D_filtered_collective_io_info_t); + for (i = 0, offset = 0; i < (size_t) mpi_rank; i++) + offset += num_chunks_selected_array[i]; - HDmemcpy(chunk_list, &((char *) collective_chunk_list)[offset], num_chunks_selected_array[mpi_rank] * sizeof(H5D_filtered_collective_io_info_t)); + HDmemcpy(chunk_list, &collective_chunk_list[offset], num_chunks_selected_array[mpi_rank] * sizeof(H5D_filtered_collective_io_info_t)); /* Create single MPI type encompassing each selection in the dataspace */ /* XXX: change minor error code */ @@ -1451,7 +1448,8 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't create MPI link chunk I/O type") /* Override the write buffer to point to the address of the first - * chunk data buffer */ + * chunk data buffer + */ io_info->u.wbuf = chunk_list[0].buf; } /* end if */ @@ -1467,7 +1465,8 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish MPI-IO") /* Participate in the collective re-insertion of all chunks modified - * in this iteration into the chunk index */ + * in this iteration into the chunk index + */ #ifdef PARALLEL_COMPRESS_DEBUG HDfprintf(debug_file, "Reinserting chunks into chunk index.\n"); HDfprintf(debug_file, "---------------------------------------\n"); @@ -1747,7 +1746,9 @@ done: * chunk will take ownership of the chunk (the first * process seen that is writing the most data becomes * the new owner in the case of ties) - * 2. If the operation is a write operation + * 2. If the operation is a read operation + * A. + * 3. If the operation is a write operation * A. Loop through each chunk in the operation * I. If this is not a full overwrite of the chunk * a) Read the chunk from file and pass the chunk @@ -1773,8 +1774,6 @@ done: * V. All processes collectively re-insert each * chunk from the gathered array into the chunk * index - * 3. If the operation is a read operation - * I. * * Return: Non-negative on success/Negative on failure * @@ -1788,19 +1787,19 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i H5D_chunk_map_t *fm, H5P_genplist_t *dx_plist) { H5D_filtered_collective_io_info_t *chunk_list = NULL; /* The list of chunks being read/written */ - H5D_filtered_collective_io_info_t *collective_chunk_list = NULL; - H5D_mpio_actual_chunk_opt_mode_t actual_chunk_opt_mode = H5D_MPIO_MULTI_CHUNK; /* actual chunk optimization mode */ - H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_CHUNK_COLLECTIVE; /* Local variable for tracking the I/O mode used. */ + H5D_filtered_collective_io_info_t *collective_chunk_list = NULL; /* The list of chunks used during collective operations */ + H5D_mpio_actual_chunk_opt_mode_t actual_chunk_opt_mode = H5D_MPIO_MULTI_CHUNK; /* The actual chunk IO optimization mode */ + H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_CHUNK_COLLECTIVE; /* The chunk IO mode used (Independent vs Collective) */ H5D_storage_t store; /* union of EFL and chunk pointer in file space */ H5D_io_info_t ctg_io_info; /* Contiguous I/O info object */ H5D_storage_t ctg_store; /* Chunk storage information as contiguous dataset */ - MPI_Datatype file_type = MPI_BYTE; - MPI_Datatype mem_type = MPI_BYTE; - hbool_t file_type_is_derived = FALSE; - hbool_t mem_type_is_derived = FALSE; + MPI_Datatype *file_type_array = NULL; + MPI_Datatype *mem_type_array = NULL; + hbool_t *file_type_is_derived_array = NULL; + hbool_t *mem_type_is_derived_array = NULL; size_t chunk_list_num_entries; size_t collective_chunk_list_num_entries; - size_t *num_chunks_selected_array = NULL; + size_t *num_chunks_selected_array = NULL; /* Array of number of chunks selected on each process */ size_t i, j; /* Local index variable */ int mpi_rank, mpi_size, mpi_code; herr_t ret_value = SUCCEED; @@ -1817,8 +1816,9 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_CHUNK_OPT_MODE_NAME, &actual_chunk_opt_mode) < 0) HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual chunk opt mode property") - /* Set the actual_io_mode property. Filtered collective writes can't break - * to independent, so set actual_io_mode right away */ + /* Set the actual_io_mode property. + * Multi chunk I/O does not break to independent, so can set right away + */ if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_IO_MODE_NAME, &actual_io_mode) < 0) HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual chunk io mode property") @@ -1839,7 +1839,13 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i /* Set dataset storage for I/O info */ io_info->store = &store; - if (io_info->op_type == H5D_IO_OP_WRITE) { /* Filtered collective write */ + if (io_info->op_type == H5D_IO_OP_READ) { /* Filtered collective read */ + /* XXX: Change minor error code */ + for (i = 0; i < chunk_list_num_entries; i++) + if (H5D__filtered_collective_chunk_io(&chunk_list[i], io_info, type_info) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't process chunk entry") + } /* end if */ + else { /* Filtered collective write */ H5D_chk_idx_info_t index_info; H5D_chunk_ud_t udata; size_t max_num_chunks; @@ -1870,24 +1876,42 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i HDfprintf(debug_file, "---------------------------------------------------\n"); #endif + /* Retrieve the maximum number of chunks being written among all processes */ if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&chunk_list_num_entries, &max_num_chunks, 1, MPI_UNSIGNED_LONG_LONG, MPI_MAX, io_info->comm))) HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code) - /* XXX: Iteration should be for the max number among processes, since a process could - * have no chunks assigned to it */ + /* Allocate arrays for storing MPI file and mem types and whether or not the + * types were derived. + */ + if (NULL == (file_type_array = (MPI_Datatype *) H5MM_malloc(max_num_chunks * sizeof(*file_type_array)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate file type array") + + if (NULL == (file_type_is_derived_array = (hbool_t *) H5MM_calloc(max_num_chunks * sizeof(*file_type_is_derived_array)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate file type is derived array") + + if (NULL == (mem_type_array = (MPI_Datatype *) H5MM_malloc(max_num_chunks * sizeof(*mem_type_array)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate mem type array") + + if (NULL == (mem_type_is_derived_array = (hbool_t *) H5MM_calloc(max_num_chunks * sizeof(*mem_type_is_derived_array)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate mem type is derived array") + + /* Iterate over the max number of chunks among all processes, as this process could + * have no chunks left to work on, but it still needs to participate in the collective + * re-allocation and re-insertion of chunks modified by other processes. + */ for (i = 0; i < max_num_chunks; i++) { /* Check if this process has a chunk to work on for this iteration */ hbool_t have_chunk_to_process = i < chunk_list_num_entries; /* XXX: Not sure about minor error code */ if (have_chunk_to_process) - if (H5D__update_filtered_collective_chunk_entry(&chunk_list[i], io_info, type_info) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't update filtered chunk entry") + if (H5D__filtered_collective_chunk_io(&chunk_list[i], io_info, type_info) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't process chunk entry") - /* Participate in the collective re-allocation of all chunks modified - * in this iteration. Gather the new chunk sizes to all processes for - * the collective re-allocation. */ + /* Gather the new chunk sizes to all processes for a collective re-allocation + * of the chunks in the file + */ /* XXX: May access unavailable memory on processes with no selection */ /* XXX: change minor error code */ if (H5D__mpio_array_gather(io_info, &chunk_list[i], have_chunk_to_process ? 1 : 0, sizeof(*chunk_list), @@ -1899,16 +1923,17 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i HDfprintf(debug_file, "------------------------------\n"); #endif - /* Collectively re-allocate the modified chunks (from each process) in the file */ + /* Participate in the collective re-allocation of all chunks modified + * in this iteration. + */ for (j = 0; j < collective_chunk_list_num_entries; j++) { hbool_t insert = FALSE; #ifdef PARALLEL_COMPRESS_DEBUG HDfprintf(debug_file, "| Re-allocing chunk at address %a with new length of %llu bytes.\n", - collective_chunk_list[j].new_chunk.offset, collective_chunk_list[j].new_chunk.length); + collective_chunk_list[j].new_chunk.offset, collective_chunk_list[j].new_chunk.length); #endif - /* Collectively re-allocate the chunk in the file */ if (H5D__chunk_file_alloc(&index_info, &collective_chunk_list[j].old_chunk, &collective_chunk_list[j].new_chunk, &insert, chunk_list[j].chunk_info.scaled) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk") @@ -1923,7 +1948,8 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i #endif /* If this process has a chunk to work on, create a MPI type for the - * memory and file for writing out the chunk */ + * memory and file for writing out the chunk + */ if (have_chunk_to_process) { int mpi_type_count; @@ -1938,35 +1964,37 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i H5_CHECKED_ASSIGN(mpi_type_count, int, chunk_list[i].new_chunk.length, hsize_t); /* Create MPI memory type for writing to chunk */ - if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(mpi_type_count, MPI_BYTE, &mem_type))) + if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(mpi_type_count, MPI_BYTE, &mem_type_array[i]))) HMPI_GOTO_ERROR(FAIL, "MPI_Type_contiguous failed", mpi_code) - if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&mem_type))) + if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&mem_type_array[i]))) HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) - mem_type_is_derived = TRUE; + mem_type_is_derived_array[i] = TRUE; /* Create MPI file type for writing to chunk */ - if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(mpi_type_count, MPI_BYTE, &file_type))) + if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(mpi_type_count, MPI_BYTE, &file_type_array[i]))) HMPI_GOTO_ERROR(FAIL, "MPI_Type_contiguous failed", mpi_code) - if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&file_type))) + if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&file_type_array[i]))) HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) - file_type_is_derived = TRUE; + file_type_is_derived_array[i] = TRUE; /* Set up the base storage address for this operation */ ctg_store.contig.dset_addr = chunk_list[i].new_chunk.offset; /* Override the write buffer to point to the address of the - * chunk data buffer */ + * chunk data buffer + */ ctg_io_info.u.wbuf = chunk_list[i].buf; } /* end if */ - mpi_buf_count = (mem_type_is_derived && file_type_is_derived) ? 1 : 0; + mpi_buf_count = (mem_type_is_derived_array[i] && file_type_is_derived_array[i]) ? 1 : 0; /* Perform the I/O */ - if (H5D__final_collective_io(&ctg_io_info, type_info, mpi_buf_count, &file_type, &mem_type) < 0) + if (H5D__final_collective_io(&ctg_io_info, type_info, mpi_buf_count, &file_type_array[i], &mem_type_array[i]) < 0) HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish MPI-IO") /* Participate in the collective re-insertion of all chunks modified - * in this iteration into the chunk index */ + * in this iteration into the chunk index + */ #ifdef PARALLEL_COMPRESS_DEBUG HDfprintf(debug_file, "Reinserting chunks into chunk index.\n"); HDfprintf(debug_file, "---------------------------------------\n"); @@ -1988,61 +2016,23 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i HDfprintf(debug_file, "---------------------------------------\n"); #endif - /* Free the MPI memory and file type, if they were derived */ - /* XXX: For performance, collect each type into an array and free at end */ - if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type))) - HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) - if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type))) - HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) - - mem_type_is_derived = FALSE; - file_type_is_derived = FALSE; - if (collective_chunk_list) collective_chunk_list = (H5D_filtered_collective_io_info_t *) H5MM_free(collective_chunk_list); } /* end for */ - } /* end if */ - else { /* Filtered collective read */ - unsigned filter_mask = 0; - size_t buf_size; - - for (i = 0; i < chunk_list_num_entries; i++) { - buf_size = chunk_list[i].old_chunk.length; - -#ifdef PARALLEL_COMPRESS_DEBUG - HDfprintf(debug_file, "Allocing %zd bytes for chunk read buffer.\n", buf_size); -#endif - - if (NULL == (chunk_list[i].buf = H5MM_malloc(buf_size))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk read buffer") - - /* Read the chunk from the file */ - if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_list[i].old_chunk.offset, - buf_size, H5AC_rawdata_dxpl_id, chunk_list[i].buf) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't read raw data chunk") - /* Unfilter the chunk before modifying it */ - if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, &filter_mask, - io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb, - (size_t *) &chunk_list[i].old_chunk.length, &buf_size, &chunk_list[i].buf) < 0) - HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "couldn't unfilter chunk for modifying") - -#ifdef PARALLEL_COMPRESS_DEBUG - for (size_t k = 0; k < chunk_list[i].old_chunk.length / type_info->src_type_size; k++) - HDfprintf(debug_file, "Read buf entry %d is %lld.\n", k, ((long *) chunk_list[i].buf)[k]); -#endif - - /* Scatter the unfiltered chunk data into the user's read buffer */ + /* Free the MPI file and memory types, if they were derived */ + for (i = 0; i < max_num_chunks; i++) { + if (file_type_is_derived_array[i]) + if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type_array[i]))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + if (mem_type_is_derived_array[i]) + if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type_array[i]))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) } /* end for */ } /* end else */ done: - if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type))) - HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) - if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type))) - HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) - if (chunk_list) { for (i = 0; i < chunk_list_num_entries; i++) if (chunk_list[i].buf) @@ -2051,10 +2041,18 @@ done: H5MM_free(chunk_list); } - if (num_chunks_selected_array) - H5MM_free(num_chunks_selected_array); if (collective_chunk_list) H5MM_free(collective_chunk_list); + if (file_type_array) + H5MM_free(file_type_array); + if (mem_type_array) + H5MM_free(mem_type_array); + if (file_type_is_derived_array) + H5MM_free(file_type_is_derived_array); + if (mem_type_is_derived_array) + H5MM_free(mem_type_is_derived_array); + if (num_chunks_selected_array) + H5MM_free(num_chunks_selected_array); FUNC_LEAVE_NOAPI(ret_value) } /* end H5D__multi_chunk_filtered_collective_io() */ @@ -2752,17 +2750,20 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ int new_owner = 0; /* Process duplicate entries caused by another process writing - * to the same chunk */ + * to the same chunk + */ do { /* Store the correct chunk entry information in case this process * becomes the new chunk's owner. The chunk entry that this process * contributed will be the only one with a valid dataspace selection - * on this particular process */ + * on this particular process + */ if (mpi_rank == overlap_info_array[i].owner) chunk_entry = overlap_info_array[i]; /* New owner of the chunk is determined by the process - * which is writing the most data to the chunk */ + * which is writing the most data to the chunk + */ if (overlap_info_array[i].io_size > max_bytes) { max_bytes = overlap_info_array[i].io_size; new_owner = overlap_info_array[i].owner; @@ -2776,7 +2777,8 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ if (mpi_rank == new_owner) { /* Make sure the new owner will know how many other processes will - * be sending chunk modification data to it */ + * be sending chunk modification data to it + */ chunk_entry.num_writers = num_writers; /* New owner takes possession of the chunk */ @@ -2844,7 +2846,8 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ if (NULL == (send_statuses = (MPI_Status *) H5MM_malloc(num_send_requests * sizeof(*send_statuses)))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send statuses buffer") - if (MPI_SUCCESS != (mpi_code = MPI_Waitall(num_send_requests, send_requests, send_statuses))) + H5_CHECK_OVERFLOW(num_send_requests, size_t, int); + if (MPI_SUCCESS != (mpi_code = MPI_Waitall((int) num_send_requests, send_requests, send_statuses))) HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code) } @@ -2964,12 +2967,24 @@ done: } /* end H5D__mpio_filtered_collective_write_type() */ +/*------------------------------------------------------------------------- + * Function: H5D__filtered_collective_chunk_io + * + * Purpose: XXX: description + * + * Return: Non-negative on success/Negative on failure + * + * Programmer: Jordan Henderson + * Wednesday, January 18, 2017 + * + *------------------------------------------------------------------------- + */ static herr_t -H5D__update_filtered_collective_chunk_entry(H5D_filtered_collective_io_info_t *chunk_entry, +H5D__filtered_collective_chunk_io(H5D_filtered_collective_io_info_t *chunk_entry, const H5D_io_info_t *io_info, const H5D_type_info_t *type_info) { H5S_sel_iter_t *mem_iter = NULL; - unsigned filter_mask; + unsigned filter_mask = 0; hssize_t iter_nelmts; hbool_t full_overwrite = TRUE; hbool_t mem_iter_init = FALSE; @@ -2979,37 +2994,44 @@ H5D__update_filtered_collective_chunk_entry(H5D_filtered_collective_io_info_t *c FUNC_ENTER_STATIC HDassert(chunk_entry); + HDassert(io_info); + HDassert(type_info); #ifdef PARALLEL_COMPRESS_DEBUG HDfprintf(debug_file, "| Chunk at address %a:\n", chunk_entry->old_chunk.offset); #endif /* XXX: Determine if a chunk is being fully overwritten by looking at the total selection - * in the dataspace */ + * in the dataspace + */ - /* If this is a full overwrite of this chunk, enough memory must be allocated for - * the size of the unfiltered chunk. Otherwise, enough memory must be allocated - * to read the filtered chunk from the file. */ + /* If this is a read operation or a write operation where the chunk is not being fully + * overwritten, enough memory must be allocated to read the filtered chunk from the file. + * If this is a write operation where the chunk is being fully overwritten, enough memory + * must be allocated for the size of the unfiltered chunk. + */ /* XXX: Return value of macro should be checked instead */ - buf_size = (full_overwrite) ? (hsize_t) H5S_GET_EXTENT_NPOINTS(chunk_entry->chunk_info.fspace) * type_info->src_type_size - : chunk_entry->old_chunk.length; + buf_size = (!full_overwrite || io_info->op_type == H5D_IO_OP_READ) ? chunk_entry->old_chunk.length + : (hsize_t) H5S_GET_EXTENT_NPOINTS(chunk_entry->chunk_info.fspace) * type_info->src_type_size; chunk_entry->new_chunk.length = buf_size; #ifdef PARALLEL_COMPRESS_DEBUG - HDfprintf(debug_file, "| - Allocing %zd bytes for %s chunk overwrite buffer.\n", buf_size, - full_overwrite ? "full" : "non-full"); + HDfprintf(debug_file, "| - Allocing %zd bytes for chunk data buffer.\n", buf_size); + if (io_info->op_type == H5D_IO_OP_WRITE) + HDfprintf(debug_file, "| - Write type is: %s.\n", (full_overwrite) ? "overwrite" : "non-overwrite"); #endif if (NULL == (chunk_entry->buf = H5MM_malloc(buf_size))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification buffer") + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer") - if (!full_overwrite) { - /* Read the chunk from the file */ + /* If this is not a full chunk overwrite or this is a read operation, the chunk must be + * read from the file and unfiltered. + */ + if (!full_overwrite || io_info->op_type == H5D_IO_OP_READ) { if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_entry->old_chunk.offset, buf_size, H5AC_rawdata_dxpl_id, chunk_entry->buf) < 0) HGOTO_ERROR(H5E_IO, H5E_READERROR, H5_ITER_ERROR, "unable to read raw data chunk") - /* Unfilter the chunk before modifying it */ if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, &filter_mask, io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb, (size_t *) &chunk_entry->new_chunk.length, &buf_size, &chunk_entry->buf) < 0) @@ -3029,12 +3051,11 @@ H5D__update_filtered_collective_chunk_entry(H5D_filtered_collective_io_info_t *c /* Owner of this chunk, receive modification data from other processes */ - /* Update the chunk data with the modifications from the current (owning) process */ - if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(*mem_iter)))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator") /* Initialize iterator for memory selection */ + /* XXX: dst_type_size may need to be src_type_size depending on operation */ if (H5S_select_iter_init(mem_iter, chunk_entry->chunk_info.mspace, type_info->dst_type_size) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") mem_iter_init = TRUE; @@ -3042,40 +3063,59 @@ H5D__update_filtered_collective_chunk_entry(H5D_filtered_collective_io_info_t *c if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_entry->chunk_info.mspace)) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") - if (!H5D__gather_mem(io_info->u.wbuf, chunk_entry->chunk_info.mspace, mem_iter, - (size_t) iter_nelmts, io_info->dxpl_cache, chunk_entry->buf)) - HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't gather from write buffer") + /* If this is a read operation, scatter the read chunk data to the user's buffer. + * + * If this is a write operation, update the chunk data buffer with the modifications + * from the current process, then apply any modifications from other processes. Finally, + * filter the newly-update chunk. + */ + switch (io_info->op_type) { + case H5D_IO_OP_READ: + if (H5D__scatter_mem(chunk_entry->buf, chunk_entry->chunk_info.mspace, mem_iter, + (size_t) iter_nelmts, io_info->dxpl_cache, io_info->u.rbuf) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't scatter to read buffer") + break; - /* Update the chunk data with any modifications from other processes */ + case H5D_IO_OP_WRITE: + /* Update the chunk data with the modifications from the current (owning) process */ + if (!H5D__gather_mem(io_info->u.wbuf, chunk_entry->chunk_info.mspace, mem_iter, + (size_t) iter_nelmts, io_info->dxpl_cache, chunk_entry->buf)) + HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't gather from write buffer") + /* Update the chunk data with any modifications from other processes */ -#ifdef PARALLEL_COMPRESS_DEBUG - HDfprintf(debug_file, "| - Write Buffer:\n"); - HDfprintf(debug_file, "| - ["); - for (size_t j = 0; j < chunk_entry->new_chunk.length / type_info->src_type_size; j++) { - if (j > 0) HDfprintf(debug_file, ", "); - HDfprintf(debug_file, "%lld", ((long *) chunk_entry->buf)[j]); - } - HDfprintf(debug_file, "]\n|\n"); - HDfprintf(debug_file, "| - Write buf is at address %a.\n|\n", chunk_entry->buf); - HDfprintf(debug_file, "| - About to filter %zd bytes in buffer of size %zd.\n|\n", chunk_entry->new_chunk.length, buf_size); +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "| - Chunk Data Buffer:\n"); + HDfprintf(debug_file, "| - ["); + for (size_t j = 0; j < chunk_entry->new_chunk.length / type_info->src_type_size; j++) { + if (j > 0) HDfprintf(debug_file, ", "); + HDfprintf(debug_file, "%lld", ((long *) chunk_entry->buf)[j]); + } + HDfprintf(debug_file, "]\n|\n"); + + HDfprintf(debug_file, "| - About to filter %zd bytes in buffer of size %zd.\n|\n", chunk_entry->new_chunk.length, buf_size); #endif - /* Filter the chunk */ - filter_mask = 0; - if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, &filter_mask, - io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb, - (size_t *) &chunk_entry->new_chunk.length, &buf_size, &chunk_entry->buf) < 0) - HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "output pipeline failed") + /* Filter the chunk */ + filter_mask = 0; + if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, &filter_mask, + io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb, + (size_t *) &chunk_entry->new_chunk.length, &buf_size, &chunk_entry->buf) < 0) + HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "output pipeline failed") #if H5_SIZEOF_SIZE_T > 4 - /* Check for the chunk expanding too much to encode in a 32-bit value */ - if (chunk_entry->new_chunk.length > ((size_t) 0xffffffff)) - HGOTO_ERROR(H5E_DATASET, H5E_BADRANGE, FAIL, "chunk too large for 32-bit length") + /* Check for the chunk expanding too much to encode in a 32-bit value */ + if (chunk_entry->new_chunk.length > ((size_t) 0xffffffff)) + HGOTO_ERROR(H5E_DATASET, H5E_BADRANGE, FAIL, "chunk too large for 32-bit length") #endif + break; + default: + HGOTO_ERROR(H5E_DATASET, H5E_BADVALUE, FAIL, "unknown I/O operation") + } + done: if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") @@ -3083,6 +3123,6 @@ done: H5MM_free(mem_iter) FUNC_LEAVE_NOAPI(ret_value) -} /* end H5D__update_filtered_collective_chunk_entry() */ +} /* end H5D__filtered_collective_chunk_io() */ #endif /* H5_HAVE_PARALLEL */ |