diff options
author | Jordan Henderson <jhenderson@hdfgroup.org> | 2017-01-18 22:48:27 (GMT) |
---|---|---|
committer | Jordan Henderson <jhenderson@hdfgroup.org> | 2017-01-18 22:48:27 (GMT) |
commit | 4a83ceaf7d7ec17779589864645d39436c0f5227 (patch) | |
tree | 50f4a228765c3221c484e5a3fd5a31af7c4ed7cc /src/H5Dmpio.c | |
parent | 349b3634f73954eb17fd8f2ee730e43606959467 (diff) | |
download | hdf5-4a83ceaf7d7ec17779589864645d39436c0f5227.zip hdf5-4a83ceaf7d7ec17779589864645d39436c0f5227.tar.gz hdf5-4a83ceaf7d7ec17779589864645d39436c0f5227.tar.bz2 |
Major cleanup
Separate update phase for chunk entry into its own function, since code
for multi-chunk IO and link-chunk IO is exactly the same
Remove last IO mode code from multi-chunk IO, since filtered collective
writes cannot break to independent IO mode
Fix collective overlapping IO for multi-chunk IO by iterating equal to
the max number of chunks on any process
Make hard separation between collective read and write since trying to
mix the two into one loop becomes messy
Add preliminary code for asynchronous sending of chunk modification data
to new owning process when redistributing chunks
Diffstat (limited to 'src/H5Dmpio.c')
-rw-r--r-- | src/H5Dmpio.c | 778 |
1 files changed, 303 insertions, 475 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c index 49f25ae..aa95a3a 100644 --- a/src/H5Dmpio.c +++ b/src/H5Dmpio.c @@ -158,6 +158,9 @@ static herr_t H5D__mpio_filtered_collective_write_type( H5D_filtered_collective_io_info_t *chunk_list, size_t num_entries, MPI_Datatype *new_mem_type, hbool_t *mem_type_derived, MPI_Datatype *new_file_type, hbool_t *file_type_derived); +static herr_t H5D__update_filtered_collective_chunk_entry( + H5D_filtered_collective_io_info_t *chunk_entry, H5D_io_info_t *io_info, + H5D_type_info_t *type_info); static int H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2); static int H5D__cmp_filtered_collective_io_entry(const void *filtered_collective_io_entry1, const void *filtered_collective_io_entry2); @@ -1251,6 +1254,8 @@ if(H5DEBUG(D)) * Purpose: Routine for one collective IO with one MPI derived datatype * to link with all filtered chunks * + * XXX: Update later to reflect changes in structure + * * 1. Construct a list of selected chunks in the collective IO * operation * A. If any chunk is being written to by more than 1 @@ -1297,22 +1302,17 @@ static herr_t H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, H5D_chunk_map_t *fm, H5P_genplist_t *dx_plist) { - H5D_filtered_collective_io_info_t *chunk_list = NULL; - H5D_filtered_collective_io_info_t *total_array = NULL; + H5D_filtered_collective_io_info_t *chunk_list = NULL; /* The list of chunks being read/written */ + H5D_filtered_collective_io_info_t *collective_chunk_list = NULL; H5D_mpio_actual_chunk_opt_mode_t actual_chunk_opt_mode = H5D_MPIO_LINK_CHUNK; H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_CHUNK_COLLECTIVE; - H5D_chk_idx_info_t index_info; - H5S_sel_iter_t *mem_iter = NULL; H5D_storage_t ctg_store; MPI_Datatype mem_type = MPI_BYTE; MPI_Datatype file_type = MPI_BYTE; hbool_t mem_type_is_derived = FALSE; hbool_t file_type_is_derived = FALSE; - hbool_t mem_iter_init = FALSE; - hsize_t mpi_buf_count; /* Number of MPI types */ - haddr_t *total_chunk_addr_array = NULL; size_t chunk_list_num_entries; - size_t total_array_num_entries; + size_t collective_chunk_list_num_entries; size_t *num_chunks_selected_array = NULL; size_t i; int mpi_rank, mpi_size, mpi_code; @@ -1335,13 +1335,6 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_IO_MODE_NAME, &actual_io_mode) < 0) HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual io mode property") - /* Construct chunked index info */ - index_info.f = io_info->dset->oloc.file; - index_info.dxpl_id = io_info->md_dxpl_id; - index_info.pline = &(io_info->dset->shared->dcpl_cache.pline); - index_info.layout = &(io_info->dset->shared->layout.u.chunk); - index_info.storage = &(io_info->dset->shared->layout.storage.u.chunk); - /* Build a list of selected chunks in the collective io operation */ /* XXX: Not sure about correct minor error code */ if (H5D__construct_filtered_io_info_list(io_info, type_info, fm, &chunk_list, &chunk_list_num_entries, &num_chunks_selected_array) < 0) @@ -1364,10 +1357,24 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in HDfprintf(debug_file, "-----------------------------------------\n\n"); #endif - if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(*mem_iter)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator") - if (io_info->op_type == H5D_IO_OP_WRITE) { /* Filtered collective write */ + H5D_chk_idx_info_t index_info; + H5D_chunk_ud_t udata; + hsize_t mpi_buf_count; + + /* Construct chunked index info */ + index_info.f = io_info->dset->oloc.file; + index_info.dxpl_id = io_info->md_dxpl_id; + index_info.pline = &(io_info->dset->shared->dcpl_cache.pline); + index_info.layout = &(io_info->dset->shared->layout.u.chunk); + index_info.storage = &(io_info->dset->shared->layout.storage.u.chunk); + + /* Set up chunk information for insertion to chunk index */ + udata.common.layout = index_info.layout; + udata.common.storage = index_info.storage; + udata.filter_mask = 0; + + #ifdef PARALLEL_COMPRESS_DEBUG HDfprintf(debug_file, "Processing chunks:\n"); HDfprintf(debug_file, "---------------------------------------------------\n"); @@ -1376,107 +1383,10 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in /* Iterate through all the chunks in the collective write operation, * updating each chunk with the data modifications from other processes, * then re-filtering the chunk. */ - for (i = 0; i < chunk_list_num_entries; i++) { - unsigned filter_mask = 0; - hbool_t full_overwrite = TRUE; - size_t buf_size; - hssize_t iter_nelmts; - -#ifdef PARALLEL_COMPRESS_DEBUG - HDfprintf(debug_file, "| Chunk at address %a:\n", chunk_list[i].old_chunk.offset); -#endif - - /* XXX: Determine if a chunk is being fully overwritten by looking at the total selection - * in the dataspace */ - - /* If this is a full overwrite of this chunk, enough memory must be allocated for - * the size of the unfiltered chunk. Otherwise, enough memory must be allocated - * to read the filtered chunk from the file. */ - /* XXX: Return value of macro should be checked instead */ - buf_size = (full_overwrite) ? (hsize_t) H5S_GET_EXTENT_NPOINTS(chunk_list[i].chunk_info.fspace) * type_info->src_type_size - : chunk_list[i].old_chunk.length; - chunk_list[i].new_chunk.length = buf_size; - -#ifdef PARALLEL_COMPRESS_DEBUG - HDfprintf(debug_file, "| - Allocing %zd bytes for %s chunk overwrite buffer.\n", buf_size, - full_overwrite ? "full" : "non-full"); -#endif - - if (NULL == (chunk_list[i].buf = H5MM_malloc(buf_size))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification buffer") - - /* Owner of this chunk, receive modification data from other processes */ - - if (!full_overwrite) { - /* Read the chunk from the file */ - if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_list[i].old_chunk.offset, - buf_size, H5AC_rawdata_dxpl_id, chunk_list[i].buf) < 0) - HGOTO_ERROR(H5E_IO, H5E_READERROR, H5_ITER_ERROR, "unable to read raw data chunk") - - /* Unfilter the chunk before modifying it */ - if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, &filter_mask, - io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb, - (size_t *) &chunk_list[i].new_chunk.length, &buf_size, &chunk_list[i].buf) < 0) - HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "couldn't unfilter chunk for modifying") - -#ifdef PARALLEL_COMPRESS_DEBUG - HDfprintf(debug_file, "| - After decompression: Nbytes=%zd; Buf_size=%zd.\n", chunk_list[i].new_chunk.length, buf_size); - - HDfprintf(debug_file, "| - Read buf:\n| - ["); - for (size_t j = 0; j < chunk_list[i].new_chunk.length / type_info->src_type_size; j++) { - if (j > 0) HDfprintf(debug_file, ", "); - HDfprintf(debug_file, "%lld", ((long *) chunk_list[i].buf)[j]); - } - HDfprintf(debug_file, "]\n|\n"); -#endif - } /* end if */ - - /* Update the chunk data with the modifications from the current (owning) process */ - - /* Initialize iterator for memory selection */ - if (H5S_select_iter_init(mem_iter, chunk_list[i].chunk_info.mspace, type_info->dst_type_size) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") - mem_iter_init = TRUE; - - if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_list[i].chunk_info.mspace)) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") - - if (!H5D__gather_mem(io_info->u.wbuf, chunk_list[i].chunk_info.mspace, mem_iter, - (size_t) iter_nelmts, io_info->dxpl_cache, chunk_list[i].buf)) - HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't gather from write buffer") - - /* Update the chunk data with any modifications from other processes */ - - -#ifdef PARALLEL_COMPRESS_DEBUG - HDfprintf(debug_file, "| - Write Buffer:\n"); - HDfprintf(debug_file, "| - ["); - for (size_t j = 0; j < chunk_list[i].new_chunk.length / type_info->src_type_size; j++) { - if (j > 0) HDfprintf(debug_file, ", "); - HDfprintf(debug_file, "%lld", ((long *) chunk_list[i].buf)[j]); - } - HDfprintf(debug_file, "]\n|\n"); - - HDfprintf(debug_file, "| - Write buf is at address %a.\n|\n", chunk_list[i].buf); - - HDfprintf(debug_file, "| - About to filter %zd bytes in buffer of size %zd.\n|\n", chunk_list[i].new_chunk.length, buf_size); -#endif - - /* Filter the chunk */ - if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, &filter_mask, - io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb, - (size_t *) &chunk_list[i].new_chunk.length, &buf_size, &chunk_list[i].buf) < 0) - HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "output pipeline failed") - -#if H5_SIZEOF_SIZE_T > 4 - /* Check for the chunk expanding too much to encode in a 32-bit value */ - if (chunk_list[i].new_chunk.length > ((size_t) 0xffffffff)) - HGOTO_ERROR(H5E_DATASET, H5E_BADRANGE, FAIL, "chunk too large for 32-bit length") -#endif - - if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") - } /* end for */ + /* XXX: Not sure about minor error code */ + for (i = 0; i < chunk_list_num_entries; i++) + if (H5D__update_filtered_collective_chunk_entry(&chunk_list[i], io_info, type_info) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't update filtered chunk entry") #ifdef PARALLEL_COMPRESS_DEBUG HDfprintf(debug_file, "---------------------------------------------------\n\n"); @@ -1485,7 +1395,7 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in /* Gather the new chunk sizes to all processes for a collective reallocation * of the chunks in the file */ if (H5D__mpio_array_gather(io_info, chunk_list, chunk_list_num_entries, sizeof(*chunk_list), - (void **) &total_array, &total_array_num_entries, NULL) < 0) + (void **) &collective_chunk_list, &collective_chunk_list_num_entries, NULL) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't gather new chunk sizes") #ifdef PARALLEL_COMPRESS_DEBUG @@ -1494,19 +1404,19 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in #endif /* Collectively re-allocate the modified chunks (from each process) in the file */ - for (i = 0; i < total_array_num_entries; i++) { + for (i = 0; i < collective_chunk_list_num_entries; i++) { hbool_t insert; #ifdef PARALLEL_COMPRESS_DEBUG - HDfprintf(debug_file, "| Reallocing chunk at address %a with new length of %zd.\n", total_array[i].new_chunk.offset, total_array[i].new_chunk.length); + HDfprintf(debug_file, "| Reallocing chunk at address %a with new length of %zd.\n", collective_chunk_list[i].new_chunk.offset, collective_chunk_list[i].new_chunk.length); #endif - if (H5D__chunk_file_alloc(&index_info, &total_array[i].old_chunk, &total_array[i].new_chunk, - &insert, total_array[i].chunk_info.scaled) < 0) + if (H5D__chunk_file_alloc(&index_info, &collective_chunk_list[i].old_chunk, &collective_chunk_list[i].new_chunk, + &insert, collective_chunk_list[i].chunk_info.scaled) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk") #ifdef PARALLEL_COMPRESS_DEBUG - HDfprintf(debug_file, "| - Chunk now at address %a.\n|\n", total_array[i].new_chunk); + HDfprintf(debug_file, "| - Chunk now at address %a.\n|\n", collective_chunk_list[i].new_chunk); #endif } /* end for */ @@ -1514,74 +1424,61 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in HDfprintf(debug_file, "------------------------------\n\n"); #endif - /* XXX: During the collective re-allocation of chunks in the file, the record for each - * chunk is only update in the total array, not in the local copy of chunks on each - * process. However, each process needs the updated chunk records so that they can create - * a MPI type for the collective write that will write to the chunk's new locations instead - * of the old ones. This ugly hack seems to be the best solution to copy the information - * back to the local array and avoid having to modify the collective write type function - * in an ugly way so that it will accept the total array instead of the local array. - * This works correctly because the array gather function guarantees that the chunk - * data in the total array is ordered in blocks by rank. - */ - { + /* If this process has any chunks selected, create a MPI type for collectively + * writing out the chunks to file. Otherwise, the process contributes to the + * collective write with a none type. */ + if (chunk_list_num_entries) { size_t offset; + /* XXX: During the collective re-allocation of chunks in the file, the record for each + * chunk is only update in the total array, not in the local copy of chunks on each + * process. However, each process needs the updated chunk records so that they can create + * a MPI type for the collective write that will write to the chunk's new locations instead + * of the old ones. This ugly hack seems to be the best solution to copy the information + * back to the local array and avoid having to modify the collective write type function + * in an ugly way so that it will accept the total array instead of the local array. + * This works correctly because the array gather function guarantees that the chunk + * data in the total array is ordered in blocks by rank. + */ /* XXX: No need to use bytes here, should be able to simply find offset in * terms of H5D_filtered_collective_io_info_t's */ - offset = 0; for (i = 0; i < (size_t) mpi_rank; i++) offset += num_chunks_selected_array[i] * sizeof(H5D_filtered_collective_io_info_t); - HDmemcpy(chunk_list, &((char *) total_array)[offset], num_chunks_selected_array[mpi_rank] * sizeof(H5D_filtered_collective_io_info_t)); - } + HDmemcpy(chunk_list, &((char *) collective_chunk_list)[offset], num_chunks_selected_array[mpi_rank] * sizeof(H5D_filtered_collective_io_info_t)); - /* Create single MPI type encompassing each selection in the dataspace */ - if (H5D__mpio_filtered_collective_write_type(chunk_list, chunk_list_num_entries, - &mem_type, &mem_type_is_derived, &file_type, &file_type_is_derived) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't create MPI link chunk I/O type") + /* Create single MPI type encompassing each selection in the dataspace */ + if (H5D__mpio_filtered_collective_write_type(chunk_list, chunk_list_num_entries, + &mem_type, &mem_type_is_derived, &file_type, &file_type_is_derived) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't create MPI link chunk I/O type") - /* Override the write buffer to point to the address of the first - * chunk data buffer */ - /* XXX: Find a better solution, but processes with no chunks on them - * are a special case so they have a NULL buf */ - if (mem_type_is_derived && file_type_is_derived) + /* Override the write buffer to point to the address of the first + * chunk data buffer */ io_info->u.wbuf = chunk_list[0].buf; - } /* end if */ - else { /* Filtered collective read */ - - } /* end else */ - - /* We have a single, complicated MPI datatype for both memory & file */ - mpi_buf_count = (mem_type_is_derived && file_type_is_derived) ? (hsize_t) 1 : (hsize_t) 0; - - /* Set up the base storage address for this operation */ - ctg_store.contig.dset_addr = 0; - io_info->store = &ctg_store; + } /* end if */ - /* Perform I/O */ - if (H5D__final_collective_io(io_info, type_info, mpi_buf_count, &file_type, &mem_type) < 0) - HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish MPI-IO") + /* We have a single, complicated MPI datatype for both memory & file */ + mpi_buf_count = (mem_type_is_derived && file_type_is_derived) ? (hsize_t) 1 : (hsize_t) 0; - /* Collectively insert each chunk into the chunk index if this - * is a filtered collective write */ - if (io_info->op_type == H5D_IO_OP_WRITE) { - H5D_chunk_ud_t udata; + /* Set up the base storage address for this operation */ + ctg_store.contig.dset_addr = 0; + io_info->store = &ctg_store; - /* Set up chunk information for insertion to chunk index */ - udata.common.layout = index_info.layout; - udata.common.storage = index_info.storage; - udata.filter_mask = 0; + /* Perform I/O */ + if (H5D__final_collective_io(io_info, type_info, mpi_buf_count, &file_type, &mem_type) < 0) + HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish MPI-IO") + /* Participate in the collective re-insertion of all chunks modified + * in this iteration into the chunk index */ #ifdef PARALLEL_COMPRESS_DEBUG HDfprintf(debug_file, "Reinserting chunks into chunk index.\n"); HDfprintf(debug_file, "---------------------------------------\n"); #endif - for (i = 0; i < total_array_num_entries; i++) { - udata.chunk_block = total_array[i].new_chunk; - udata.common.scaled = total_array[i].chunk_info.scaled; + for (i = 0; i < collective_chunk_list_num_entries; i++) { + udata.chunk_block = collective_chunk_list[i].new_chunk; + udata.common.scaled = collective_chunk_list[i].chunk_info.scaled; if ((index_info.storage->ops->insert)(&index_info, &udata, io_info->dset) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTINSERT, FAIL, "unable to insert chunk address into index") @@ -1595,12 +1492,11 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in HDfprintf(debug_file, "---------------------------------------\n"); #endif } /* end if */ + else { /* Filtered collective read */ -done: - /* Free resources used by a process which had no selection at all */ - if (total_chunk_addr_array) - H5MM_free(total_chunk_addr_array); + } /* end else */ +done: /* Free resources used by a process which had some selection */ if (chunk_list) { for (i = 0; i < chunk_list_num_entries; i++) @@ -1612,10 +1508,8 @@ done: if (num_chunks_selected_array) H5MM_free(num_chunks_selected_array); - if (total_array) - H5MM_free(total_array); - if (mem_iter) - H5MM_free(mem_iter); + if (collective_chunk_list) + H5MM_free(collective_chunk_list); /* Free the MPI buf and file types, if they were derived */ if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type))) @@ -1846,6 +1740,7 @@ done: * Purpose: To do filtered collective IO per chunk to save on memory, * as opposed to collective IO of every chunk at once * + * XXX: Update later to reflect changes in structure * XXX: Add read operation description * * 1. Construct a list of selected chunks in the collective IO @@ -1894,28 +1789,19 @@ static herr_t H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_info, H5D_chunk_map_t *fm, H5P_genplist_t *dx_plist) { - H5D_filtered_collective_io_info_t *chunk_list = NULL; - H5D_filtered_collective_io_info_t *gathered_array = NULL; + H5D_filtered_collective_io_info_t *chunk_list = NULL; /* The list of chunks being read/written */ + H5D_filtered_collective_io_info_t *collective_chunk_list = NULL; H5D_mpio_actual_chunk_opt_mode_t actual_chunk_opt_mode = H5D_MPIO_MULTI_CHUNK; /* actual chunk optimization mode */ - H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_NO_COLLECTIVE; /* Local variable for tracking the I/O mode used. */ - H5FD_mpio_collective_opt_t last_coll_opt_mode = H5FD_MPIO_COLLECTIVE_IO; /* Last parallel transfer with independent IO or collective IO with this mode */ - H5FD_mpio_xfer_t last_xfer_mode = H5FD_MPIO_COLLECTIVE; /* Last parallel transfer for this request (H5D_XFER_IO_XFER_MODE_NAME) */ - H5D_chk_idx_info_t index_info; - H5S_sel_iter_t *mem_iter = NULL; + H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_CHUNK_COLLECTIVE; /* Local variable for tracking the I/O mode used. */ H5D_storage_t store; /* union of EFL and chunk pointer in file space */ H5D_io_info_t ctg_io_info; /* Contiguous I/O info object */ H5D_storage_t ctg_store; /* Chunk storage information as contiguous dataset */ - MPI_Datatype file_type; - MPI_Datatype mem_type; - uint8_t *chunk_io_option = NULL; - haddr_t *chunk_addr = NULL; + MPI_Datatype file_type = MPI_BYTE; + MPI_Datatype mem_type = MPI_BYTE; hbool_t file_type_is_derived = FALSE; hbool_t mem_type_is_derived = FALSE; - hbool_t mem_iter_init = FALSE; - hsize_t mpi_buf_count; - size_t total_chunk; /* Total # of chunks in dataset */ size_t chunk_list_num_entries; - size_t gathered_array_num_entries; + size_t collective_chunk_list_num_entries; size_t *num_chunks_selected_array = NULL; size_t i, j; /* Local index variable */ int mpi_rank, mpi_size, mpi_code; @@ -1933,30 +1819,15 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_CHUNK_OPT_MODE_NAME, &actual_chunk_opt_mode) < 0) HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual chunk opt mode property") - /* Construct chunked index info */ - index_info.f = io_info->dset->oloc.file; - index_info.dxpl_id = io_info->md_dxpl_id; - index_info.pline = &(io_info->dset->shared->dcpl_cache.pline); - index_info.layout = &(io_info->dset->shared->layout.u.chunk); - index_info.storage = &(io_info->dset->shared->layout.storage.u.chunk); - + /* Set the actual_io_mode property. Filtered collective writes can't break + * to independent, so set actual_io_mode right away */ + if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_IO_MODE_NAME, &actual_io_mode) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual chunk io mode property") /* Build a list of selected chunks in the collective IO operation */ if (H5D__construct_filtered_io_info_list(io_info, type_info, fm, &chunk_list, &chunk_list_num_entries, &num_chunks_selected_array) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't construct filtered I/O info list") - /* Allocate memories */ - /* chunk_io_option = (uint8_t *) H5MM_calloc(total_chunk); - chunk_addr = (haddr_t *) H5MM_calloc(total_chunk * sizeof(haddr_t)); */ -#ifdef H5D_DEBUG -if(H5DEBUG(D)) - HDfprintf(H5DEBUG(D), "total_chunk %Zu\n", total_chunk); -#endif - - /* Obtain IO option for each chunk */ - /* if (H5D__obtain_mpio_mode(io_info, fm, dx_plist, chunk_io_option, chunk_addr) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "unable to obtain MPIO mode") */ - /* Set up contiguous I/O info object */ HDmemcpy(&ctg_io_info, io_info, sizeof(ctg_io_info)); ctg_io_info.store = &ctg_store; @@ -1964,18 +1835,23 @@ if(H5DEBUG(D)) /* Initialize temporary contiguous storage info */ ctg_store.contig.dset_size = (hsize_t) io_info->dset->shared->layout.u.chunk.size; + ctg_store.contig.dset_addr = 0; /* Set dataset storage for I/O info */ io_info->store = &store; - if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(*mem_iter)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator") - - /* Loop over all the chunks in the collective IO operation */ - /* XXX: Multi-chunk needs to loop over all chunks and check for write/read inside - * loop, not the other way around */ if (io_info->op_type == H5D_IO_OP_WRITE) { /* Filtered collective write */ - H5D_chunk_ud_t udata; + H5D_chk_idx_info_t index_info; + H5D_chunk_ud_t udata; + size_t max_num_chunks; + hsize_t mpi_buf_count; + + /* Construct chunked index info */ + index_info.f = io_info->dset->oloc.file; + index_info.dxpl_id = io_info->md_dxpl_id; + index_info.pline = &(io_info->dset->shared->dcpl_cache.pline); + index_info.layout = &(io_info->dset->shared->layout.u.chunk); + index_info.storage = &(io_info->dset->shared->layout.storage.u.chunk); /* Set up chunk information for insertion to chunk index */ udata.common.layout = index_info.layout; @@ -1995,109 +1871,27 @@ if(H5DEBUG(D)) HDfprintf(debug_file, "---------------------------------------------------\n"); #endif + if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&chunk_list_num_entries, &max_num_chunks, + 1, MPI_UNSIGNED_LONG_LONG, MPI_MAX, io_info->comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code) + /* XXX: Iteration should be for the max number among processes, since a process could * have no chunks assigned to it */ - for (i = 0; i < chunk_list_num_entries; i++) { - unsigned filter_mask = 0; - hbool_t full_overwrite = TRUE; - size_t buf_size; - hssize_t iter_nelmts; - -#ifdef PARALLEL_COMPRESS_DEBUG - HDfprintf(debug_file, "| Chunk at address %a:\n", chunk_list[i].old_chunk.offset); -#endif - - /* XXX: Determine if a chunk is being fully overwritten by looking at the total selection - * in the dataspace */ - - /* If this is a full overwrite of this chunk, enough memory must be allocated for - * the size of the unfiltered chunk. Otherwise, enough memory must be allocated - * to read the filtered chunk from the file. */ - /* XXX: Return value of macro should be checked instead */ - buf_size = (full_overwrite) ? (hsize_t) H5S_GET_EXTENT_NPOINTS(chunk_list[i].chunk_info.fspace) * type_info->src_type_size - : chunk_list[i].old_chunk.length; - chunk_list[i].new_chunk.length = buf_size; - -#ifdef PARALLEL_COMPRESS_DEBUG - HDfprintf(debug_file, "| - Allocing %zd bytes for %s chunk overwrite buffer.\n", buf_size, - full_overwrite ? "full" : "non-full"); -#endif - - if (NULL == (chunk_list[i].buf = H5MM_malloc(buf_size))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification buffer") - - /* Owner of this chunk, receive modification data from other processes */ - - if (!full_overwrite) { - /* Read the chunk from the file */ - if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_list[i].old_chunk.offset, - buf_size, H5AC_rawdata_dxpl_id, chunk_list[i].buf) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't read raw data chunk") - - /* Unfilter the chunk before modifying it */ - if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, &filter_mask, - io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb, - (size_t *) &chunk_list[i].new_chunk.length, &buf_size, &chunk_list[i].buf) < 0) - HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "couldn't unfilter chunk for modifying") - -#ifdef PARALLEL_COMPRESS_DEBUG - HDfprintf(debug_file, "| - After decompression: Nbytes=%zd; Buf_size=%zd.\n", chunk_list[i].new_chunk.length, buf_size); - - HDfprintf(debug_file, "| - Read buf:\n| - ["); - for (size_t k = 0; k < chunk_list[i].new_chunk.length / type_info->src_type_size; k++) { - if (k > 0) HDfprintf(debug_file, ", "); - HDfprintf(debug_file, "%lld", ((long *) chunk_list[i].buf)[k]); - } - HDfprintf(debug_file, "]\n|\n"); -#endif - } /* end if */ - - /* Update the chunk data with the modifications from the current (owning) process */ - - /* Initialize iterator for memory selection */ - if (H5S_select_iter_init(mem_iter, chunk_list[i].chunk_info.mspace, type_info->dst_type_size) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") - mem_iter_init = TRUE; - - if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_list[i].chunk_info.mspace)) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") - - if (!H5D__gather_mem(io_info->u.wbuf, chunk_list[i].chunk_info.mspace, mem_iter, - (size_t) iter_nelmts, io_info->dxpl_cache, chunk_list[i].buf)) - HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't gather from write buffer") - - /* Update the chunk data with any modifications from other processes */ - -#ifdef PARALLEL_COMPRESS_DEBUG - HDfprintf(debug_file, "| - Write Buffer:\n"); - HDfprintf(debug_file, "| - ["); - for (size_t k = 0; k < chunk_list[i].new_chunk.length / type_info->src_type_size; k++) { - if (k > 0) HDfprintf(debug_file, ", "); - HDfprintf(debug_file, "%lld", ((long *) chunk_list[i].buf)[k]); - } - HDfprintf(debug_file, "]\n|\n"); - - HDfprintf(debug_file, "| - Write buf is at address %a.\n|\n", chunk_list[i].buf); - - HDfprintf(debug_file, "| - About to filter %zd bytes in buffer of size %zd.\n|\n", chunk_list[i].new_chunk.length, buf_size); -#endif - - /* Filter the chunk */ - if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, &filter_mask, - io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb, - (size_t *) &chunk_list[i].new_chunk.length, &buf_size, &chunk_list[i].buf) < 0) - HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "output pipeline failed") - -#if H5_SIZEOF_SIZE_T > 4 - /* Check for the chunk expanding too much to encode in a 32-bit value */ - if (chunk_list[i].new_chunk.length > ((size_t) 0xffffffff)) - HGOTO_ERROR(H5E_DATASET, H5E_BADRANGE, FAIL, "chunk too large for 32-bit length") -#endif - - /* Gather the new chunk sizes to all processes for a collective reallocation - * of the chunks in the file */ - if (H5D__mpio_array_gather(io_info, &chunk_list[i], 1, sizeof(*chunk_list), - (void **) &gathered_array, &gathered_array_num_entries, NULL) < 0) + for (i = 0; i < max_num_chunks; i++) { + /* Check if this process has a chunk to work on for this iteration */ + hbool_t have_chunk_to_process = i < chunk_list_num_entries; + + /* XXX: Not sure about minor error code */ + if (have_chunk_to_process) + if (H5D__update_filtered_collective_chunk_entry(&chunk_list[i], io_info, type_info) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't update filtered chunk entry") + + /* Participate in the collective re-allocation of all chunks modified + * in this iteration. Gather the new chunk sizes to all processes for + * the collective re-allocation. */ + /* XXX: May access unavailable memory on processes with no selection */ + if (H5D__mpio_array_gather(io_info, &chunk_list[i], have_chunk_to_process ? 1 : 0, sizeof(*chunk_list), + (void **) &collective_chunk_list, &collective_chunk_list_num_entries, NULL) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't gather new chunk sizes") #ifdef PARALLEL_COMPRESS_DEBUG @@ -2106,21 +1900,21 @@ if(H5DEBUG(D)) #endif /* Collectively re-allocate the modified chunks (from each process) in the file */ - for (j = 0; j < gathered_array_num_entries; j++) { + for (j = 0; j < collective_chunk_list_num_entries; j++) { hbool_t insert = FALSE; #ifdef PARALLEL_COMPRESS_DEBUG HDfprintf(debug_file, "| Re-allocing chunk at address %a with new length of %llu bytes.\n", - gathered_array[j].new_chunk.offset, gathered_array[j].new_chunk.length); + collective_chunk_list[j].new_chunk.offset, collective_chunk_list[j].new_chunk.length); #endif /* Collectively re-allocate the chunk in the file */ - if (H5D__chunk_file_alloc(&index_info, &gathered_array[j].old_chunk, &gathered_array[j].new_chunk, + if (H5D__chunk_file_alloc(&index_info, &collective_chunk_list[j].old_chunk, &collective_chunk_list[j].new_chunk, &insert, chunk_list[j].chunk_info.scaled) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk") #ifdef PARALLEL_COMPRESS_DEBUG - HDfprintf(debug_file, "| Chunk now at address %a.\n|\n", gathered_array[j].new_chunk); + HDfprintf(debug_file, "| Chunk now at address %a.\n|\n", collective_chunk_list[j].new_chunk); #endif } /* end for */ @@ -2128,58 +1922,59 @@ if(H5DEBUG(D)) HDfprintf(debug_file, "------------------------------\n\n"); #endif - /* Collect the new chunk info back to the local copy */ - /* XXX: This may encounter a problem if there is a process with no selection */ - HDmemcpy(&chunk_list[i].new_chunk, &gathered_array[mpi_rank].new_chunk, sizeof(chunk_list[i].new_chunk)); + /* If this process has a chunk to work on, create a MPI type for the + * memory and file for writing out the chunk */ + if (have_chunk_to_process) { + int mpi_type_count; + + /* Collect the new chunk info back to the local copy */ + HDmemcpy(&chunk_list[i].new_chunk, &collective_chunk_list[mpi_rank].new_chunk, sizeof(chunk_list[i].new_chunk)); #ifdef PARALLEL_COMPRESS_DEBUG - HDfprintf(debug_file, "New chunk record after memcpy back to local:\n"); - HDfprintf(debug_file, " - Chunk offset: %a, Chunk length: %lld\n", chunk_list[i].new_chunk.offset, chunk_list[i].new_chunk.length); + HDfprintf(debug_file, "New chunk record after memcpy back to local:\n"); + HDfprintf(debug_file, " - Chunk offset: %a, Chunk length: %lld\n", chunk_list[i].new_chunk.offset, chunk_list[i].new_chunk.length); #endif - { - int count; - - H5_CHECKED_ASSIGN(count, int, chunk_list[i].new_chunk.length, hsize_t); + H5_CHECKED_ASSIGN(mpi_type_count, int, chunk_list[i].new_chunk.length, hsize_t); /* Create MPI memory type for writing to chunk */ - if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(count, MPI_BYTE, &mem_type))) + if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(mpi_type_count, MPI_BYTE, &mem_type))) HMPI_GOTO_ERROR(FAIL, "MPI_Type_contiguous failed", mpi_code) if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&mem_type))) HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) mem_type_is_derived = TRUE; /* Create MPI file type for writing to chunk */ - if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(count, MPI_BYTE, &file_type))) + if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(mpi_type_count, MPI_BYTE, &file_type))) HMPI_GOTO_ERROR(FAIL, "MPI_Type_contiguous failed", mpi_code) if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&file_type))) HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) file_type_is_derived = TRUE; - mpi_buf_count = 1; - } - + /* Set up the base storage address for this operation */ + ctg_store.contig.dset_addr = chunk_list[i].new_chunk.offset; - /* Set up the base storage address for this operation */ - ctg_store.contig.dset_addr = chunk_list[i].new_chunk.offset; + /* Override the write buffer to point to the address of the + * chunk data buffer */ + ctg_io_info.u.wbuf = chunk_list[i].buf; + } /* end if */ - /* Override the write buffer to point to the address of the - * chunk data buffer */ - ctg_io_info.u.wbuf = chunk_list[i].buf; + mpi_buf_count = (mem_type_is_derived && file_type_is_derived) ? 1 : 0; /* Perform the I/O */ if (H5D__final_collective_io(&ctg_io_info, type_info, mpi_buf_count, &file_type, &mem_type) < 0) HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish MPI-IO") + /* Participate in the collective re-insertion of all chunks modified + * in this iteration into the chunk index */ #ifdef PARALLEL_COMPRESS_DEBUG HDfprintf(debug_file, "Reinserting chunks into chunk index.\n"); HDfprintf(debug_file, "---------------------------------------\n"); #endif - /* Re-insert the modified chunks (from each process) into the chunk index */ - for (j = 0; j < gathered_array_num_entries; j++) { - udata.chunk_block = gathered_array[j].new_chunk; - udata.common.scaled = gathered_array[j].chunk_info.scaled; + for (j = 0; j < collective_chunk_list_num_entries; j++) { + udata.chunk_block = collective_chunk_list[j].new_chunk; + udata.common.scaled = collective_chunk_list[j].chunk_info.scaled; if ((index_info.storage->ops->insert)(&index_info, &udata, io_info->dset) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTINSERT, FAIL, "unable to insert chunk address into index") @@ -2203,11 +1998,8 @@ if(H5DEBUG(D)) mem_type_is_derived = FALSE; file_type_is_derived = FALSE; - if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") - - if (gathered_array) - gathered_array = (H5D_filtered_collective_io_info_t *) H5MM_free(gathered_array); + if (collective_chunk_list) + collective_chunk_list = (H5D_filtered_collective_io_info_t *) H5MM_free(collective_chunk_list); } /* end for */ } /* end if */ else { /* Filtered collective read */ @@ -2245,117 +2037,7 @@ if(H5DEBUG(D)) } /* end for */ } /* end else */ -#if 0 - /* Loop over _all_ the chunks */ - for (u = 0; u < total_chunk; u++) { - H5D_chunk_info_t *chunk_info; /* Chunk info for current chunk */ - H5S_t *fspace; /* Dataspace describing chunk & selection in it */ - H5S_t *mspace; /* Dataspace describing selection in memory corresponding to this chunk */ - hbool_t insert = FALSE; - - /* Initialize temporary contiguous storage address */ - ctg_store.contig.dset_addr = chunk_addr[u]; - -#ifdef H5D_DEBUG -if(H5DEBUG(D)) - HDfprintf(H5DEBUG(D),"mpi_rank = %d, chunk index = %Zu\n", mpi_rank, u); -#endif - /* Get the chunk info for this chunk, if there are elements selected */ - chunk_info = &filtered_io_info_array[u].chunk_info; - - /* Set the storage information for chunks with selections */ - if (chunk_info) { - /* HDassert(chunk_info->index == u); */ - - /* Pass in chunk's coordinates in a union. */ - store.chunk.scaled = chunk_info->scaled; - } /* end if */ - - /* Collective IO for this chunk, - * Note: even there is no selection for this process, the process still - * needs to contribute MPI NONE TYPE. - */ - if (chunk_io_option[u] == H5D_CHUNK_IO_MODE_COL) { -#ifdef H5D_DEBUG -if(H5DEBUG(D)) - HDfprintf(H5DEBUG(D),"inside collective chunk IO mpi_rank = %d, chunk index = %Zu\n", mpi_rank, u); -#endif - - /* Set the file & memory dataspaces */ - if (chunk_info) { - fspace = chunk_info->fspace; - mspace = chunk_info->mspace; - - /* Update the local variable tracking the dxpl's actual io mode property. - * - * Note: H5D_MPIO_COLLECTIVE_MULTI | H5D_MPIO_INDEPENDENT = H5D_MPIO_MIXED - * to ease switching between to mixed I/O without checking the current - * value of the property. You can see the definition in H5Ppublic.h - */ - actual_io_mode = actual_io_mode | H5D_MPIO_CHUNK_COLLECTIVE; - - } /* end if */ - else { - fspace = mspace = NULL; - } /* end else */ - - /* Switch back to collective I/O */ - if (last_xfer_mode != H5FD_MPIO_COLLECTIVE) { - if (H5D__ioinfo_xfer_mode(io_info, dx_plist, H5FD_MPIO_COLLECTIVE) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't switch to collective I/O") - last_xfer_mode = H5FD_MPIO_COLLECTIVE; - } /* end if */ - if (last_coll_opt_mode != H5FD_MPIO_COLLECTIVE_IO) { - if (H5D__ioinfo_coll_opt_mode(io_info, dx_plist, H5FD_MPIO_COLLECTIVE_IO) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't switch to collective I/O") - last_coll_opt_mode = H5FD_MPIO_COLLECTIVE_IO; - } /* end if */ - } /* end if */ - else { /* possible independent IO for this chunk */ -#ifdef H5D_DEBUG -if(H5DEBUG(D)) - HDfprintf(H5DEBUG(D),"inside independent IO mpi_rank = %d, chunk index = %Zu\n", mpi_rank, u); -#endif - - HDassert(chunk_io_option[u] == 0); - - /* Set the file & memory dataspaces */ - if (chunk_info) { - fspace = chunk_info->fspace; - mspace = chunk_info->mspace; - - /* Update the local variable tracking the dxpl's actual io mode. */ - actual_io_mode = actual_io_mode | H5D_MPIO_CHUNK_INDEPENDENT; - } /* end if */ - else { - fspace = mspace = NULL; - } /* end else */ - - /* Using independent I/O with file setview.*/ - if (last_coll_opt_mode != H5FD_MPIO_INDIVIDUAL_IO) { - if (H5D__ioinfo_coll_opt_mode(io_info, dx_plist, H5FD_MPIO_INDIVIDUAL_IO) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't switch to individual I/O") - last_coll_opt_mode = H5FD_MPIO_INDIVIDUAL_IO; - } /* end if */ - } /* end else */ - -#ifdef H5D_DEBUG - if(H5DEBUG(D)) - HDfprintf(H5DEBUG(D),"after inter collective IO\n"); -#endif - } /* end for */ -#endif - - /* Write the local value of actual io mode to the DXPL. */ - if (H5P_set(dx_plist, H5D_MPIO_ACTUAL_IO_MODE_NAME, &actual_io_mode) < 0) - HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "couldn't set actual io mode property") - done: - if (chunk_io_option) - H5MM_free(chunk_io_option); - if (chunk_addr) - H5MM_free(chunk_addr); - if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type))) HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type))) @@ -2371,10 +2053,8 @@ done: if (num_chunks_selected_array) H5MM_free(num_chunks_selected_array); - if (gathered_array) - H5MM_free(gathered_array); - if (mem_iter) - H5MM_free(mem_iter); + if (collective_chunk_list) + H5MM_free(collective_chunk_list); FUNC_LEAVE_NOAPI(ret_value) } /* end H5D__multi_chunk_filtered_collective_io() */ @@ -2968,7 +2648,10 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ H5D_filtered_collective_io_info_t *local_info_array = NULL; H5D_filtered_collective_io_info_t *overlap_array = NULL; H5SL_node_t *chunk_node; + MPI_Request *send_requests = NULL; + MPI_Status *send_statuses = NULL; hbool_t no_overlap = FALSE; + size_t num_send_requests; size_t num_chunks_selected; size_t overlap_array_num_entries; size_t *num_chunks_selected_array = NULL; @@ -3050,14 +2733,17 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ /* Redistribute chunks to new owners as necessary */ if (!no_overlap && (io_info->op_type == H5D_IO_OP_WRITE)) { - size_t i; + size_t i; + + if (NULL == (send_requests = H5MM_malloc(num_chunks_selected * sizeof(*send_requests)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send requests buffer") if (H5D__mpio_array_gather(io_info, local_info_array, num_chunks_selected, sizeof(*local_info_array), &overlap_array, &overlap_array_num_entries, H5D__cmp_filtered_collective_io_entry) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't ") - for (i = 0, num_chunks_selected = 0; i < overlap_array_num_entries;) { + for (i = 0, num_chunks_selected = 0, num_send_requests = 0; i < overlap_array_num_entries;) { H5D_filtered_collective_io_info_t chunk_entry; haddr_t chunk_addr = overlap_array[i].old_chunk.offset; size_t num_writers = 0; @@ -3068,7 +2754,9 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ * to the same chunk */ do { /* Store the correct chunk entry information in case this process - * become's the new chunk's owner */ + * becomes the new chunk's owner. The chunk entry that this process + * contributed will be the only one with a valid dataspace selection + * on this particular process */ if (mpi_rank == overlap_array[i].owner) chunk_entry = overlap_array[i]; @@ -3092,16 +2780,19 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ /* New owner takes possession of the chunk */ overlap_array[num_chunks_selected++] = chunk_entry; - } + } /* end if */ else { /* Send modification data to new owner */ - } + /* if (MPI_SUCCESS != (mpi_code= MPI_Isend(, , , new_owner, + chunk_entry.old_chunk.offset, io_info->comm, &send_requests[num_send_requests++]))) + HMPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code) */ + } /* end else */ #ifdef PARALLEL_COMPRESS_DEBUG HDfprintf(debug_file, "Chunk at address %a re-assigned to process %d.\n", chunk_addr, new_owner); #endif - } + } /* end for */ /* Release old list */ if (local_info_array) @@ -3147,7 +2838,22 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ *num_entries = num_chunks_selected; *_num_chunks_selected_array = num_chunks_selected_array; + /* Wait for all async send requests to complete before returning */ + if (!no_overlap && num_send_requests) { + if (NULL == (send_statuses = (MPI_Status *) H5MM_malloc(num_send_requests * sizeof(*send_statuses)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send statuses buffer") + + if (MPI_SUCCESS != (mpi_code = MPI_Waitall(num_send_requests, send_requests, send_statuses))) + HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code) + } + + done: + if (send_requests) + H5MM_free(send_requests); + if (send_statuses) + H5MM_free(send_statuses); + FUNC_LEAVE_NOAPI(ret_value) } /* end H5D__construct_filtered_io_info_list() */ @@ -3255,5 +2961,127 @@ done: FUNC_LEAVE_NOAPI(ret_value) } /* end H5D__mpio_filtered_collective_write_type() */ + + +static herr_t +H5D__update_filtered_collective_chunk_entry(H5D_filtered_collective_io_info_t *chunk_entry, + H5D_io_info_t *io_info, H5D_type_info_t *type_info) +{ + H5S_sel_iter_t *mem_iter = NULL; + unsigned filter_mask; + hssize_t iter_nelmts; + hbool_t full_overwrite = TRUE; + hbool_t mem_iter_init = FALSE; + size_t buf_size; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + HDassert(chunk_entry); + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "| Chunk at address %a:\n", chunk_entry->old_chunk.offset); +#endif + + /* XXX: Determine if a chunk is being fully overwritten by looking at the total selection + * in the dataspace */ + + /* If this is a full overwrite of this chunk, enough memory must be allocated for + * the size of the unfiltered chunk. Otherwise, enough memory must be allocated + * to read the filtered chunk from the file. */ + /* XXX: Return value of macro should be checked instead */ + buf_size = (full_overwrite) ? (hsize_t) H5S_GET_EXTENT_NPOINTS(chunk_entry->chunk_info.fspace) * type_info->src_type_size + : chunk_entry->old_chunk.length; + chunk_entry->new_chunk.length = buf_size; + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "| - Allocing %zd bytes for %s chunk overwrite buffer.\n", buf_size, + full_overwrite ? "full" : "non-full"); +#endif + + if (NULL == (chunk_entry->buf = H5MM_malloc(buf_size))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification buffer") + + if (!full_overwrite) { + /* Read the chunk from the file */ + if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_entry->old_chunk.offset, + buf_size, H5AC_rawdata_dxpl_id, chunk_entry->buf) < 0) + HGOTO_ERROR(H5E_IO, H5E_READERROR, H5_ITER_ERROR, "unable to read raw data chunk") + + /* Unfilter the chunk before modifying it */ + if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, &filter_mask, + io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb, + (size_t *) &chunk_entry->new_chunk.length, &buf_size, &chunk_entry->buf) < 0) + HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "couldn't unfilter chunk for modifying") + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "| - After decompression: Nbytes=%zd; Buf_size=%zd.\n", chunk_entry->new_chunk.length, buf_size); + + HDfprintf(debug_file, "| - Read buf:\n| - ["); + for (size_t j = 0; j < chunk_entry->new_chunk.length / type_info->src_type_size; j++) { + if (j > 0) HDfprintf(debug_file, ", "); + HDfprintf(debug_file, "%lld", ((long *) chunk_entry->buf)[j]); + } + HDfprintf(debug_file, "]\n|\n"); +#endif + } /* end if */ + + /* Owner of this chunk, receive modification data from other processes */ + + /* Update the chunk data with the modifications from the current (owning) process */ + + if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(*mem_iter)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator") + + /* Initialize iterator for memory selection */ + if (H5S_select_iter_init(mem_iter, chunk_entry->chunk_info.mspace, type_info->dst_type_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") + mem_iter_init = TRUE; + + if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_entry->chunk_info.mspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + + if (!H5D__gather_mem(io_info->u.wbuf, chunk_entry->chunk_info.mspace, mem_iter, + (size_t) iter_nelmts, io_info->dxpl_cache, chunk_entry->buf)) + HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't gather from write buffer") + + /* Update the chunk data with any modifications from other processes */ + + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "| - Write Buffer:\n"); + HDfprintf(debug_file, "| - ["); + for (size_t j = 0; j < chunk_entry->new_chunk.length / type_info->src_type_size; j++) { + if (j > 0) HDfprintf(debug_file, ", "); + HDfprintf(debug_file, "%lld", ((long *) chunk_entry->buf)[j]); + } + HDfprintf(debug_file, "]\n|\n"); + + HDfprintf(debug_file, "| - Write buf is at address %a.\n|\n", chunk_entry->buf); + + HDfprintf(debug_file, "| - About to filter %zd bytes in buffer of size %zd.\n|\n", chunk_entry->new_chunk.length, buf_size); +#endif + + /* Filter the chunk */ + filter_mask = 0; + if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, &filter_mask, + io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb, + (size_t *) &chunk_entry->new_chunk.length, &buf_size, &chunk_entry->buf) < 0) + HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "output pipeline failed") + +#if H5_SIZEOF_SIZE_T > 4 + /* Check for the chunk expanding too much to encode in a 32-bit value */ + if (chunk_entry->new_chunk.length > ((size_t) 0xffffffff)) + HGOTO_ERROR(H5E_DATASET, H5E_BADRANGE, FAIL, "chunk too large for 32-bit length") +#endif + +done: + if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") + if (mem_iter) + H5MM_free(mem_iter) + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__update_filtered_collective_chunk_entry() */ #endif /* H5_HAVE_PARALLEL */ |