diff options
author | Jordan Henderson <jhenderson@hdfgroup.org> | 2017-01-17 20:39:41 (GMT) |
---|---|---|
committer | Jordan Henderson <jhenderson@hdfgroup.org> | 2017-01-17 20:39:41 (GMT) |
commit | 349b3634f73954eb17fd8f2ee730e43606959467 (patch) | |
tree | bc9be36ff1234be8344678dcda89c16cb8f582d6 | |
parent | 2292a8520df0738826e22e2e097495761f3e5d28 (diff) | |
download | hdf5-349b3634f73954eb17fd8f2ee730e43606959467.zip hdf5-349b3634f73954eb17fd8f2ee730e43606959467.tar.gz hdf5-349b3634f73954eb17fd8f2ee730e43606959467.tar.bz2 |
Add chunk redistribution for shared chunks
Remove check for process having a selection as this needs to be reworked
-rw-r--r-- | src/H5Dmpio.c | 438 |
1 files changed, 248 insertions, 190 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c index 6ae4ef8..49f25ae 100644 --- a/src/H5Dmpio.c +++ b/src/H5Dmpio.c @@ -105,7 +105,9 @@ typedef struct H5D_filtered_collective_io_info_t { H5D_chunk_info_t chunk_info; H5F_block_t old_chunk; H5F_block_t new_chunk; - int num_writers; + size_t io_size; + size_t num_writers; + int owner; void *buf; } H5D_filtered_collective_io_info_t; @@ -424,7 +426,7 @@ done: H5MM_free(displacements_array); FUNC_LEAVE_NOAPI(ret_value) -} /* end H5D__filtered_collective_io_info_arraygather() */ +} /* end H5D__mpio_array_gather() */ /*------------------------------------------------------------------------- @@ -1302,8 +1304,8 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in H5D_chk_idx_info_t index_info; H5S_sel_iter_t *mem_iter = NULL; H5D_storage_t ctg_store; - MPI_Datatype mem_type; - MPI_Datatype file_type; + MPI_Datatype mem_type = MPI_BYTE; + MPI_Datatype file_type = MPI_BYTE; hbool_t mem_type_is_derived = FALSE; hbool_t file_type_is_derived = FALSE; hbool_t mem_iter_init = FALSE; @@ -1352,228 +1354,208 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in * still have to do the re-allocation in the file. Get rid of else case * and instead change mpi_buf_count to 0 if they have no selection */ - if (H5SL_count(fm->sel_chunks)) { #ifdef PARALLEL_COMPRESS_DEBUG - HDfprintf(debug_file, "Incoming messages from other processes:\n"); - HDfprintf(debug_file, "-----------------------------------------\n"); - for (size_t j = 0; j < chunk_list_num_entries; j++) { - HDfprintf(debug_file, "| Owner of chunk at address %a is expecting messages from %d other processes.\n", - chunk_list[j].old_chunk.offset, chunk_list[j].num_writers); - } - HDfprintf(debug_file, "-----------------------------------------\n\n"); + HDfprintf(debug_file, "Incoming messages from other processes:\n"); + HDfprintf(debug_file, "-----------------------------------------\n"); + for (size_t j = 0; j < chunk_list_num_entries; j++) { + HDfprintf(debug_file, "| Owner of chunk at address %a is expecting messages from %d other processes.\n", + chunk_list[j].old_chunk.offset, chunk_list[j].num_writers - 1); + } + HDfprintf(debug_file, "-----------------------------------------\n\n"); #endif - if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(*mem_iter)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator") + if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(*mem_iter)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator") - if (io_info->op_type == H5D_IO_OP_WRITE) { /* Filtered collective write */ + if (io_info->op_type == H5D_IO_OP_WRITE) { /* Filtered collective write */ #ifdef PARALLEL_COMPRESS_DEBUG - HDfprintf(debug_file, "Processing chunks:\n"); - HDfprintf(debug_file, "---------------------------------------------------\n"); + HDfprintf(debug_file, "Processing chunks:\n"); + HDfprintf(debug_file, "---------------------------------------------------\n"); #endif - /* Iterate through all the chunks in the collective write operation, - * updating each chunk with the data modifications from other processes, - * then re-filtering the chunk. */ - for (i = 0; i < chunk_list_num_entries; i++) { - unsigned filter_mask = 0; - hbool_t full_overwrite = TRUE; - size_t buf_size; - hssize_t iter_nelmts; + /* Iterate through all the chunks in the collective write operation, + * updating each chunk with the data modifications from other processes, + * then re-filtering the chunk. */ + for (i = 0; i < chunk_list_num_entries; i++) { + unsigned filter_mask = 0; + hbool_t full_overwrite = TRUE; + size_t buf_size; + hssize_t iter_nelmts; #ifdef PARALLEL_COMPRESS_DEBUG - HDfprintf(debug_file, "| Chunk at address %a:\n", chunk_list[i].old_chunk.offset); + HDfprintf(debug_file, "| Chunk at address %a:\n", chunk_list[i].old_chunk.offset); #endif - /* If this is a full overwrite of this chunk, enough memory must be allocated for - * the size of the unfiltered chunk. Otherwise, enough memory must be allocated - * to read the filtered chunk from the file. */ - /* XXX: Return value of macro should be checked instead */ - buf_size = (full_overwrite) ? (hsize_t) H5S_GET_EXTENT_NPOINTS(chunk_list[i].chunk_info.fspace) * type_info->src_type_size - : chunk_list[i].old_chunk.length; - chunk_list[i].new_chunk.length = buf_size; + /* XXX: Determine if a chunk is being fully overwritten by looking at the total selection + * in the dataspace */ + + /* If this is a full overwrite of this chunk, enough memory must be allocated for + * the size of the unfiltered chunk. Otherwise, enough memory must be allocated + * to read the filtered chunk from the file. */ + /* XXX: Return value of macro should be checked instead */ + buf_size = (full_overwrite) ? (hsize_t) H5S_GET_EXTENT_NPOINTS(chunk_list[i].chunk_info.fspace) * type_info->src_type_size + : chunk_list[i].old_chunk.length; + chunk_list[i].new_chunk.length = buf_size; #ifdef PARALLEL_COMPRESS_DEBUG - HDfprintf(debug_file, "| - Allocing %zd bytes for %s chunk overwrite buffer.\n", buf_size, - full_overwrite ? "full" : "non-full"); + HDfprintf(debug_file, "| - Allocing %zd bytes for %s chunk overwrite buffer.\n", buf_size, + full_overwrite ? "full" : "non-full"); #endif - if (NULL == (chunk_list[i].buf = H5MM_malloc(buf_size))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification buffer") - - /* Initialize iterator for memory selection */ - if (H5S_select_iter_init(mem_iter, chunk_list[i].chunk_info.mspace, type_info->dst_type_size) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") - mem_iter_init = TRUE; + if (NULL == (chunk_list[i].buf = H5MM_malloc(buf_size))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification buffer") - /* Owner of this chunk, receive modification data from other processes */ + /* Owner of this chunk, receive modification data from other processes */ - if (!full_overwrite) { - /* Read the chunk from the file */ - if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_list[i].old_chunk.offset, - buf_size, H5AC_rawdata_dxpl_id, chunk_list[i].buf) < 0) - HGOTO_ERROR(H5E_IO, H5E_READERROR, H5_ITER_ERROR, "unable to read raw data chunk") + if (!full_overwrite) { + /* Read the chunk from the file */ + if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_list[i].old_chunk.offset, + buf_size, H5AC_rawdata_dxpl_id, chunk_list[i].buf) < 0) + HGOTO_ERROR(H5E_IO, H5E_READERROR, H5_ITER_ERROR, "unable to read raw data chunk") - /* Unfilter the chunk before modifying it */ - if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, &filter_mask, - io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb, - (size_t *) &chunk_list[i].new_chunk.length, &buf_size, &chunk_list[i].buf) < 0) - HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "couldn't unfilter chunk for modifying") + /* Unfilter the chunk before modifying it */ + if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, &filter_mask, + io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb, + (size_t *) &chunk_list[i].new_chunk.length, &buf_size, &chunk_list[i].buf) < 0) + HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "couldn't unfilter chunk for modifying") #ifdef PARALLEL_COMPRESS_DEBUG - HDfprintf(debug_file, "| - After decompression: Nbytes=%zd; Buf_size=%zd.\n", chunk_list[i].new_chunk.length, buf_size); + HDfprintf(debug_file, "| - After decompression: Nbytes=%zd; Buf_size=%zd.\n", chunk_list[i].new_chunk.length, buf_size); - HDfprintf(debug_file, "| - Read buf:\n| - ["); - for (size_t j = 0; j < chunk_list[i].new_chunk.length / type_info->src_type_size; j++) { - if (j > 0) HDfprintf(debug_file, ", "); - HDfprintf(debug_file, "%lld", ((long *) chunk_list[i].buf)[j]); - } - HDfprintf(debug_file, "]\n|\n"); + HDfprintf(debug_file, "| - Read buf:\n| - ["); + for (size_t j = 0; j < chunk_list[i].new_chunk.length / type_info->src_type_size; j++) { + if (j > 0) HDfprintf(debug_file, ", "); + HDfprintf(debug_file, "%lld", ((long *) chunk_list[i].buf)[j]); + } + HDfprintf(debug_file, "]\n|\n"); #endif - } /* end if */ + } /* end if */ - /* Update the chunk data with the modifications from the current (owning) process */ - if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_list[i].chunk_info.mspace)) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + /* Update the chunk data with the modifications from the current (owning) process */ + + /* Initialize iterator for memory selection */ + if (H5S_select_iter_init(mem_iter, chunk_list[i].chunk_info.mspace, type_info->dst_type_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") + mem_iter_init = TRUE; - if (!H5D__gather_mem(io_info->u.wbuf, chunk_list[i].chunk_info.mspace, mem_iter, - (size_t) iter_nelmts, io_info->dxpl_cache, chunk_list[i].buf)) - HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't gather from write buffer") + if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_list[i].chunk_info.mspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + + if (!H5D__gather_mem(io_info->u.wbuf, chunk_list[i].chunk_info.mspace, mem_iter, + (size_t) iter_nelmts, io_info->dxpl_cache, chunk_list[i].buf)) + HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't gather from write buffer") - /* Update the chunk data with any modifications from other processes */ + /* Update the chunk data with any modifications from other processes */ #ifdef PARALLEL_COMPRESS_DEBUG - HDfprintf(debug_file, "| - Write Buffer:\n"); - HDfprintf(debug_file, "| - ["); - for (size_t j = 0; j < chunk_list[i].new_chunk.length / type_info->src_type_size; j++) { - if (j > 0) HDfprintf(debug_file, ", "); - HDfprintf(debug_file, "%lld", ((long *) chunk_list[i].buf)[j]); - } - HDfprintf(debug_file, "]\n|\n"); + HDfprintf(debug_file, "| - Write Buffer:\n"); + HDfprintf(debug_file, "| - ["); + for (size_t j = 0; j < chunk_list[i].new_chunk.length / type_info->src_type_size; j++) { + if (j > 0) HDfprintf(debug_file, ", "); + HDfprintf(debug_file, "%lld", ((long *) chunk_list[i].buf)[j]); + } + HDfprintf(debug_file, "]\n|\n"); - HDfprintf(debug_file, "| - Write buf is at address %a.\n|\n", chunk_list[i].buf); + HDfprintf(debug_file, "| - Write buf is at address %a.\n|\n", chunk_list[i].buf); - HDfprintf(debug_file, "| - About to filter %zd bytes in buffer of size %zd.\n|\n", chunk_list[i].new_chunk.length, buf_size); + HDfprintf(debug_file, "| - About to filter %zd bytes in buffer of size %zd.\n|\n", chunk_list[i].new_chunk.length, buf_size); #endif - /* Filter the chunk */ - if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, &filter_mask, - io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb, - (size_t *) &chunk_list[i].new_chunk.length, &buf_size, &chunk_list[i].buf) < 0) - HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "output pipeline failed") + /* Filter the chunk */ + if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, &filter_mask, + io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb, + (size_t *) &chunk_list[i].new_chunk.length, &buf_size, &chunk_list[i].buf) < 0) + HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "output pipeline failed") #if H5_SIZEOF_SIZE_T > 4 - /* Check for the chunk expanding too much to encode in a 32-bit value */ - if (chunk_list[i].new_chunk.length > ((size_t) 0xffffffff)) - HGOTO_ERROR(H5E_DATASET, H5E_BADRANGE, FAIL, "chunk too large for 32-bit length") + /* Check for the chunk expanding too much to encode in a 32-bit value */ + if (chunk_list[i].new_chunk.length > ((size_t) 0xffffffff)) + HGOTO_ERROR(H5E_DATASET, H5E_BADRANGE, FAIL, "chunk too large for 32-bit length") #endif - if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") - } /* end for */ + if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") + } /* end for */ #ifdef PARALLEL_COMPRESS_DEBUG - HDfprintf(debug_file, "---------------------------------------------------\n\n"); + HDfprintf(debug_file, "---------------------------------------------------\n\n"); #endif - /* Gather the new chunk sizes to all processes for a collective reallocation - * of the chunks in the file */ - if (H5D__mpio_array_gather(io_info, chunk_list, chunk_list_num_entries, sizeof(*chunk_list), - (void **) &total_array, &total_array_num_entries, NULL) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't gather new chunk sizes") + /* Gather the new chunk sizes to all processes for a collective reallocation + * of the chunks in the file */ + if (H5D__mpio_array_gather(io_info, chunk_list, chunk_list_num_entries, sizeof(*chunk_list), + (void **) &total_array, &total_array_num_entries, NULL) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't gather new chunk sizes") #ifdef PARALLEL_COMPRESS_DEBUG - HDfprintf(debug_file, "Reallocing chunks:\n"); - HDfprintf(debug_file, "------------------------------\n"); + HDfprintf(debug_file, "Reallocing chunks:\n"); + HDfprintf(debug_file, "------------------------------\n"); #endif - /* Collectively re-allocate the modified chunks (from each process) in the file */ - for (i = 0; i < total_array_num_entries; i++) { - hbool_t insert; + /* Collectively re-allocate the modified chunks (from each process) in the file */ + for (i = 0; i < total_array_num_entries; i++) { + hbool_t insert; #ifdef PARALLEL_COMPRESS_DEBUG - HDfprintf(debug_file, "| Reallocing chunk at address %a with new length of %zd.\n", total_array[i].new_chunk.offset, total_array[i].new_chunk.length); + HDfprintf(debug_file, "| Reallocing chunk at address %a with new length of %zd.\n", total_array[i].new_chunk.offset, total_array[i].new_chunk.length); #endif - if (H5D__chunk_file_alloc(&index_info, &total_array[i].old_chunk, &total_array[i].new_chunk, - &insert, total_array[i].chunk_info.scaled) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk") + if (H5D__chunk_file_alloc(&index_info, &total_array[i].old_chunk, &total_array[i].new_chunk, + &insert, total_array[i].chunk_info.scaled) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk") #ifdef PARALLEL_COMPRESS_DEBUG - HDfprintf(debug_file, "| - Chunk now at address %a.\n|\n", total_array[i].new_chunk); + HDfprintf(debug_file, "| - Chunk now at address %a.\n|\n", total_array[i].new_chunk); #endif - } /* end for */ + } /* end for */ #ifdef PARALLEL_COMPRESS_DEBUG - HDfprintf(debug_file, "------------------------------\n\n"); -#endif + HDfprintf(debug_file, "------------------------------\n\n"); +#endif + + /* XXX: During the collective re-allocation of chunks in the file, the record for each + * chunk is only update in the total array, not in the local copy of chunks on each + * process. However, each process needs the updated chunk records so that they can create + * a MPI type for the collective write that will write to the chunk's new locations instead + * of the old ones. This ugly hack seems to be the best solution to copy the information + * back to the local array and avoid having to modify the collective write type function + * in an ugly way so that it will accept the total array instead of the local array. + * This works correctly because the array gather function guarantees that the chunk + * data in the total array is ordered in blocks by rank. + */ + { + size_t offset; - /* XXX: During the collective re-allocation of chunks in the file, the record for each - * chunk is only update in the total array, not in the local copy of chunks on each - * process. However, each process needs the updated chunk records so that they can create - * a MPI type for the collective write that will write to the chunk's new locations instead - * of the old ones. This ugly hack seems to be the best solution to copy the information - * back to the local array and avoid having to modify the collective write type function - * in an ugly way so that it will accept the total array instead of the local array. - * This works correctly because the array gather function guarantees that the chunk - * data in the total array is ordered in blocks by rank. - */ - { - size_t offset; + /* XXX: No need to use bytes here, should be able to simply find offset in + * terms of H5D_filtered_collective_io_info_t's */ - offset = 0; - for (i = 0; i < (size_t) mpi_rank; i++) - offset += num_chunks_selected_array[i] * sizeof(H5D_filtered_collective_io_info_t); + offset = 0; + for (i = 0; i < (size_t) mpi_rank; i++) + offset += num_chunks_selected_array[i] * sizeof(H5D_filtered_collective_io_info_t); - HDmemcpy(chunk_list, &((char *) total_array)[offset], num_chunks_selected_array[mpi_rank] * sizeof(H5D_filtered_collective_io_info_t)); - } + HDmemcpy(chunk_list, &((char *) total_array)[offset], num_chunks_selected_array[mpi_rank] * sizeof(H5D_filtered_collective_io_info_t)); + } - /* Create single MPI type encompassing each selection in the dataspace */ - if (H5D__mpio_filtered_collective_write_type(chunk_list, chunk_list_num_entries, - &mem_type, &mem_type_is_derived, &file_type, &file_type_is_derived) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't create MPI link chunk I/O type") + /* Create single MPI type encompassing each selection in the dataspace */ + if (H5D__mpio_filtered_collective_write_type(chunk_list, chunk_list_num_entries, + &mem_type, &mem_type_is_derived, &file_type, &file_type_is_derived) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't create MPI link chunk I/O type") - /* Override the write buffer to point to the address of the first - * chunk data buffer */ + /* Override the write buffer to point to the address of the first + * chunk data buffer */ + /* XXX: Find a better solution, but processes with no chunks on them + * are a special case so they have a NULL buf */ + if (mem_type_is_derived && file_type_is_derived) io_info->u.wbuf = chunk_list[0].buf; - } /* end if */ - else { /* Filtered collective read */ - - } /* end else */ - - /* We have a single, complicated MPI datatype for both memory & file */ - mpi_buf_count = (hsize_t) 1; - } - else { /* No selection at all for this process, contribute none type */ - size_t dataset_num_chunks; - - /* Retrieve total # of chunks in dataset */ - H5_CHECKED_ASSIGN(dataset_num_chunks, size_t, fm->layout->u.chunk.nchunks, hsize_t); - - /* Allocate chunking information */ - if (NULL == (total_chunk_addr_array = (haddr_t *) H5MM_malloc(sizeof(haddr_t) * dataset_num_chunks))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate total chunk address arraybuffer") - - /* Retrieve chunk address map */ - if (H5D__chunk_addrmap(io_info, total_chunk_addr_array) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get chunk address") - - /* Get chunk with lowest address */ - ctg_store.contig.dset_addr = HADDR_MAX; - for (i = 0; i < dataset_num_chunks; i++) - if (total_chunk_addr_array[i] < ctg_store.contig.dset_addr) - ctg_store.contig.dset_addr = total_chunk_addr_array[i]; - HDassert(ctg_store.contig.dset_addr != HADDR_MAX); - - /* Set the MPI datatype */ - file_type = MPI_BYTE; - mem_type = MPI_BYTE; + } /* end if */ + else { /* Filtered collective read */ - /* No chunks selected for this process */ - mpi_buf_count = (hsize_t) 0; } /* end else */ + /* We have a single, complicated MPI datatype for both memory & file */ + mpi_buf_count = (mem_type_is_derived && file_type_is_derived) ? (hsize_t) 1 : (hsize_t) 0; + /* Set up the base storage address for this operation */ ctg_store.contig.dset_addr = 0; io_info->store = &ctg_store; @@ -2005,7 +1987,7 @@ if(H5DEBUG(D)) HDfprintf(debug_file, "-----------------------------------------\n"); for (size_t k = 0; k < chunk_list_num_entries; k++) { HDfprintf(debug_file, "| Owner of chunk at address %a is expecting messages from %d other processes.\n", - chunk_list[k].old_chunk.offset, chunk_list[k].num_writers); + chunk_list[k].old_chunk.offset, chunk_list[k].num_writers - 1); } HDfprintf(debug_file, "-----------------------------------------\n\n"); @@ -2013,6 +1995,8 @@ if(H5DEBUG(D)) HDfprintf(debug_file, "---------------------------------------------------\n"); #endif + /* XXX: Iteration should be for the max number among processes, since a process could + * have no chunks assigned to it */ for (i = 0; i < chunk_list_num_entries; i++) { unsigned filter_mask = 0; hbool_t full_overwrite = TRUE; @@ -2042,11 +2026,6 @@ if(H5DEBUG(D)) if (NULL == (chunk_list[i].buf = H5MM_malloc(buf_size))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification buffer") - /* Initialize iterator for memory selection */ - if (H5S_select_iter_init(mem_iter, chunk_list[i].chunk_info.mspace, type_info->dst_type_size) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") - mem_iter_init = TRUE; - /* Owner of this chunk, receive modification data from other processes */ if (!full_overwrite) { @@ -2074,6 +2053,12 @@ if(H5DEBUG(D)) } /* end if */ /* Update the chunk data with the modifications from the current (owning) process */ + + /* Initialize iterator for memory selection */ + if (H5S_select_iter_init(mem_iter, chunk_list[i].chunk_info.mspace, type_info->dst_type_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") + mem_iter_init = TRUE; + if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_list[i].chunk_info.mspace)) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") @@ -2981,9 +2966,11 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ size_t **_num_chunks_selected_array) { H5D_filtered_collective_io_info_t *local_info_array = NULL; + H5D_filtered_collective_io_info_t *overlap_array = NULL; H5SL_node_t *chunk_node; hbool_t no_overlap = FALSE; size_t num_chunks_selected; + size_t overlap_array_num_entries; size_t *num_chunks_selected_array = NULL; int mpi_rank, mpi_size, mpi_code; herr_t ret_value = SUCCEED; @@ -3003,17 +2990,11 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ /* Get the no overlap property */ - /* Redistribute chunks to new owners as necessary */ - if (!no_overlap) { - - } - if (NULL == (local_info_array = (H5D_filtered_collective_io_info_t *) H5MM_malloc(H5SL_count(fm->sel_chunks) * sizeof(*local_info_array)))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate local io info array buffer") - num_chunks_selected = 0; chunk_node = H5SL_first(fm->sel_chunks); - while (chunk_node) { + for (num_chunks_selected = 0; chunk_node; num_chunks_selected++) { H5D_chunk_info_t *chunk_info = (H5D_chunk_info_t *) H5SL_item(chunk_node); H5D_chunk_ud_t udata; @@ -3021,17 +3002,15 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ if (H5D__chunk_lookup(io_info->dset, io_info->md_dxpl_id, chunk_info->scaled, &udata) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "error looking up chunk address") -#ifdef PARALLEL_COMPRESS_DEBUG - local_info_array[num_chunks_selected].num_writers = 0; -#endif - - local_info_array[num_chunks_selected].old_chunk = local_info_array[num_chunks_selected].new_chunk = udata.chunk_block; local_info_array[num_chunks_selected].chunk_info = *chunk_info; + local_info_array[num_chunks_selected].old_chunk = local_info_array[num_chunks_selected].new_chunk = udata.chunk_block; + local_info_array[num_chunks_selected].io_size = H5S_GET_SELECT_NPOINTS(chunk_info->mspace) * type_info->src_type_size; + local_info_array[num_chunks_selected].num_writers = 0; + local_info_array[num_chunks_selected].owner = mpi_rank; local_info_array[num_chunks_selected].buf = NULL; - num_chunks_selected++; chunk_node = H5SL_next(chunk_node); - } /* end while */ + } /* end for */ #ifdef PARALLEL_COMPRESS_DEBUG HDfprintf(debug_file, " Contents of local info array\n"); @@ -3069,6 +3048,85 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ HDfprintf(debug_file, "-----------------------------------\n\n"); #endif + /* Redistribute chunks to new owners as necessary */ + if (!no_overlap && (io_info->op_type == H5D_IO_OP_WRITE)) { + size_t i; + + if (H5D__mpio_array_gather(io_info, local_info_array, num_chunks_selected, + sizeof(*local_info_array), &overlap_array, &overlap_array_num_entries, + H5D__cmp_filtered_collective_io_entry) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't ") + + for (i = 0, num_chunks_selected = 0; i < overlap_array_num_entries;) { + H5D_filtered_collective_io_info_t chunk_entry; + haddr_t chunk_addr = overlap_array[i].old_chunk.offset; + size_t num_writers = 0; + size_t max_bytes = 0; + int new_owner = 0; + + /* Process duplicate entries caused by another process writing + * to the same chunk */ + do { + /* Store the correct chunk entry information in case this process + * become's the new chunk's owner */ + if (mpi_rank == overlap_array[i].owner) + chunk_entry = overlap_array[i]; + + /* New owner of the chunk is determined by the process + * which is writing the most data to the chunk */ + if (overlap_array[i].io_size > max_bytes) { + max_bytes = overlap_array[i].io_size; + new_owner = overlap_array[i].owner; + } + + num_writers++; + i++; + + if (i == overlap_array_num_entries) break; + } while (overlap_array[i].old_chunk.offset == chunk_addr); + + if (mpi_rank == new_owner) { + /* Make sure the new owner will know how many other processes will + * be sending chunk modification data to it */ + chunk_entry.num_writers = num_writers; + + /* New owner takes possession of the chunk */ + overlap_array[num_chunks_selected++] = chunk_entry; + } + else { + /* Send modification data to new owner */ + + } + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "Chunk at address %a re-assigned to process %d.\n", chunk_addr, new_owner); +#endif + } + + /* Release old list */ + if (local_info_array) + H5MM_free(local_info_array); + + /* Local info list becomes modified (redistributed) chunk list */ + local_info_array = overlap_array; + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "This process now has %d chunks selected after redistribution.\n\n", num_chunks_selected); + + HDfprintf(debug_file, " Contents of local info array (after redistribution)\n"); + HDfprintf(debug_file, "------------------------------\n"); + for (size_t j = 0; j < (size_t) num_chunks_selected; j++) { + HDfprintf(debug_file, "| Chunk Entry %zd:\n", j); + HDfprintf(debug_file, "| - Chunk Address: %a\n", local_info_array[j].old_chunk.offset); + HDfprintf(debug_file, "| - Chunk Length: %zd\n", local_info_array[j].old_chunk.length); + HDfprintf(debug_file, "| - Address of mspace: %x\n", local_info_array[j].chunk_info.mspace); + HDfprintf(debug_file, "| - Chunk Selection Type: %d\n", H5S_GET_SELECT_TYPE(local_info_array[j].chunk_info.mspace)); + HDfprintf(debug_file, "| - Chunk Num Elmts Sel.: %zd\n", H5S_GET_SELECT_NPOINTS(local_info_array[j].chunk_info.mspace)); + } + HDfprintf(debug_file, "------------------------------\n\n"); +#endif + } + /* Gather the number of chunks each process is writing to all processes */ if (NULL == (num_chunks_selected_array = (size_t *) H5MM_malloc((size_t) mpi_size * sizeof(*num_chunks_selected_array)))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate num chunks selected array") @@ -3113,8 +3171,8 @@ done: */ static herr_t H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chunk_list, - size_t num_entries, MPI_Datatype *new_mem_type, hbool_t *mem_type_created, - MPI_Datatype *new_file_type, hbool_t *file_type_created) + size_t num_entries, MPI_Datatype *new_mem_type, hbool_t *mem_type_derived, + MPI_Datatype *new_file_type, hbool_t *file_type_derived) { MPI_Aint *write_buf_array = NULL; /* Relative displacements of filtered chunk data buffers */ MPI_Aint *file_offset_array = NULL; /* Chunk offsets in the file */ @@ -3125,9 +3183,9 @@ H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chun HDassert(chunk_list); HDassert(new_mem_type); - HDassert(mem_type_created); + HDassert(mem_type_derived); HDassert(new_file_type); - HDassert(file_type_created); + HDassert(file_type_derived); if (num_entries > 0) { size_t i; @@ -3178,14 +3236,14 @@ H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chun /* Create memory MPI type */ if (MPI_SUCCESS != (mpi_code = MPI_Type_create_hindexed((int) num_entries, length_array, write_buf_array, MPI_BYTE, new_mem_type))) HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code) - *mem_type_created = TRUE; + *mem_type_derived = TRUE; if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_mem_type))) HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) /* Create file MPI type */ if (MPI_SUCCESS != (mpi_code = MPI_Type_create_hindexed((int) num_entries, length_array, file_offset_array, MPI_BYTE, new_file_type))) HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code) - *file_type_created = TRUE; + *file_type_derived = TRUE; if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_file_type))) HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) } /* end if */ |