diff options
author | Jordan Henderson <jhenderson@hdfgroup.org> | 2017-01-24 21:01:31 (GMT) |
---|---|---|
committer | Jordan Henderson <jhenderson@hdfgroup.org> | 2017-01-24 21:01:31 (GMT) |
commit | aab742c9a2faa5d2136245b1370b250022804861 (patch) | |
tree | b9ae7f64b1cf81ea5c6ec0293a9c18999678a9f4 /src | |
parent | b19b0ea67d5636504e4b7ebd6241d52a5a4f09df (diff) | |
download | hdf5-aab742c9a2faa5d2136245b1370b250022804861.zip hdf5-aab742c9a2faa5d2136245b1370b250022804861.tar.gz hdf5-aab742c9a2faa5d2136245b1370b250022804861.tar.bz2 |
Test code for sending chunk modification data around
Diffstat (limited to 'src')
-rw-r--r-- | src/H5Dmpio.c | 200 |
1 files changed, 190 insertions, 10 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c index 68d2752..70a859e 100644 --- a/src/H5Dmpio.c +++ b/src/H5Dmpio.c @@ -758,7 +758,7 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf /* step 1: choose an IO option */ /* If the average number of chunk per process is greater than a threshold, we will do one link chunked IO. */ - if((unsigned)sum_chunk / mpi_size >= one_link_chunk_io_threshold) + if((unsigned)sum_chunk / (unsigned)mpi_size >= one_link_chunk_io_threshold) io_option = H5D_ONE_LINK_CHUNK_IO_MORE_OPT; #ifdef H5_HAVE_INSTRUMENTED_LIBRARY else @@ -2660,10 +2660,13 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ { H5D_filtered_collective_io_info_t *local_info_array = NULL; H5D_filtered_collective_io_info_t *overlap_info_array = NULL; + H5S_sel_iter_t *mem_iter = NULL; H5SL_node_t *chunk_node; MPI_Request *send_requests = NULL; MPI_Status *send_statuses = NULL; hbool_t no_overlap = FALSE; + hbool_t mem_iter_init = FALSE; + uint8_t *mod_data = NULL; size_t num_send_requests; size_t num_chunks_selected; size_t overlap_info_array_num_entries; @@ -2751,6 +2754,9 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ if (NULL == (send_requests = (MPI_Request *) H5MM_malloc(num_chunks_selected * sizeof(*send_requests)))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send requests buffer") + if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(H5S_sel_iter_t)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator") + /* XXX: Change minor error code */ if (H5D__mpio_array_gather(io_info, local_info_array, num_chunks_selected, sizeof(*local_info_array), (void **) &overlap_info_array, &overlap_info_array_num_entries, @@ -2771,7 +2777,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ /* Store the correct chunk entry information in case this process * becomes the new chunk's owner. The chunk entry that this process * contributed will be the only one with a valid dataspace selection - * on this particular process + * on that particular process */ if (mpi_rank == overlap_info_array[i].owner) chunk_entry = overlap_info_array[i]; @@ -2785,9 +2791,8 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ } num_writers++; - i++; - if (i == overlap_info_array_num_entries) break; + if (++i == overlap_info_array_num_entries) break; } while (overlap_info_array[i].old_chunk.offset == chunk_addr); if (mpi_rank == new_owner) { @@ -2800,11 +2805,80 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ overlap_info_array[num_chunks_selected++] = chunk_entry; } /* end if */ else { + hssize_t iter_nelmts; + hssize_t mod_data_size; + uint8_t *mod_data_p = NULL; + + /* XXX: Need some way of checking chunk entry to validate that this process + * is actually contributing some data for this chunk update + */ + + /* Determine size of serialized chunk selection plus the size + * of the data being written + */ + if ((mod_data_size = H5S_SELECT_SERIAL_SIZE(chunk_entry.chunk_info.mspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to check dataspace selection size") + + if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_entry.chunk_info.mspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + + /* XXX: For now, make sure enough memory is allocated by just adding the chunk + * size + */ + mod_data_size += iter_nelmts * type_info->src_type_size; + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "Allocing %zd bytes for mod. data buffer.\n", (size_t) mod_data_size); +#endif + + if (NULL == (mod_data = (uint8_t *) H5MM_malloc(mod_data_size))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification send buffer") + + /* Serialize the current selection into the buffer */ + mod_data_p = mod_data; + if (H5S_SELECT_SERIALIZE(chunk_entry.chunk_info.mspace, &mod_data_p) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to serialize dataspace selection") + + /* Initialize iterator for memory selection */ + if (H5S_select_iter_init(mem_iter, chunk_entry.chunk_info.mspace, type_info->src_type_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") + mem_iter_init = TRUE; + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "Iterating over %lld elements.\n", iter_nelmts); +#endif + + /* Collect the modification data into the buffer */ + if (!H5D__gather_mem(io_info->u.wbuf, chunk_entry.chunk_info.mspace, mem_iter, + (size_t) iter_nelmts, io_info->dxpl_cache, mod_data_p)) + HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't gather from write buffer") + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "| - Mod. Data Buffer:\n"); + HDfprintf(debug_file, "| - ["); + for (size_t j = 0; j < (size_t) iter_nelmts; j++) { + if (j > 0) HDfprintf(debug_file, ", "); + HDfprintf(debug_file, "%lld", ((long *) mod_data_p)[j]); + } + HDfprintf(debug_file, "]\n|\n"); + + HDfprintf(debug_file, "Sending modification data for chunk at address %a to process %d.\n", chunk_entry.old_chunk.offset, new_owner); +#endif + /* Send modification data to new owner */ + H5_CHECK_OVERFLOW(mod_data_size, hssize_t, int) + if (MPI_SUCCESS != (mpi_code = MPI_Isend(mod_data, (int) mod_data_size, MPI_BYTE, new_owner, + chunk_entry.chunk_info.index, io_info->comm, &send_requests[num_send_requests++]))) + HMPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code) - /* if (MPI_SUCCESS != (mpi_code= MPI_Isend(, , , new_owner, - chunk_entry.old_chunk.offset, io_info->comm, &send_requests[num_send_requests++]))) - HMPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code) */ +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "Mod. data sent.\n"); +#endif + + if (mod_data) + mod_data = (uint8_t *) H5MM_free(mod_data); + if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") } /* end else */ #ifdef PARALLEL_COMPRESS_DEBUG @@ -2866,12 +2940,15 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code) } - done: if (send_requests) H5MM_free(send_requests); if (send_statuses) H5MM_free(send_statuses); + if (mod_data) + H5MM_free(mod_data); + if (mem_iter) + H5MM_free(mem_iter); FUNC_LEAVE_NOAPI(ret_value) } /* end H5D__construct_filtered_io_info_list() */ @@ -3001,7 +3078,7 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk H5S_sel_iter_t *mem_iter = NULL; unsigned filter_mask = 0; hssize_t iter_nelmts; - hbool_t full_overwrite = TRUE; + hbool_t full_overwrite = FALSE; hbool_t mem_iter_init = FALSE; size_t buf_size; herr_t ret_value = SUCCEED; @@ -3066,10 +3143,10 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk /* Owner of this chunk, receive modification data from other processes */ + /* Initialize iterator for memory selection */ if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(*mem_iter)))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator") - /* Initialize iterator for memory selection */ /* XXX: dst_type_size may need to be src_type_size depending on operation */ if (H5S_select_iter_init(mem_iter, chunk_entry->chunk_info.mspace, type_info->dst_type_size) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") @@ -3097,8 +3174,111 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk (size_t) iter_nelmts, io_info->dxpl_cache, chunk_entry->buf)) HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't gather from write buffer") + if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") + /* Update the chunk data with any modifications from other processes */ + while (chunk_entry->num_writers > 1) { + MPI_Status status; + uint8_t *buf = NULL; + uint8_t *buf_p = NULL; + H5S_t *selection = NULL; + int count; + int mpi_code; + + /* XXX: Since the receive tag needs to be an int, it is possible that a chunk's index + * may fall outside the range of an int and cause an overflow problem when casting down + * here + */ + H5_CHECK_OVERFLOW(chunk_entry->chunk_info.index, hsize_t, int) + if (MPI_SUCCESS != (mpi_code = MPI_Probe(MPI_ANY_SOURCE, (int) chunk_entry->chunk_info.index, + io_info->comm, &status))) + HMPI_GOTO_ERROR(FAIL, "MPI_Probe failed", mpi_code) + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "Found message from source %d with tag %d.\n", status.MPI_SOURCE, status.MPI_TAG); +#endif + + /* Retrieve the message size */ + if (MPI_SUCCESS != (mpi_code = MPI_Get_count(&status, MPI_BYTE, &count))) + HMPI_GOTO_ERROR(FAIL, "MPI_Get_count failed", mpi_code) + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "Message size is %d bytes.\n", count); +#endif + if (NULL == (buf = (uint8_t *) H5MM_malloc(count))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate modification data receive buffer") + buf_p = buf; + + if (MPI_SUCCESS != (mpi_code = MPI_Recv(buf, count, MPI_BYTE, MPI_ANY_SOURCE, + chunk_entry->chunk_info.index, io_info->comm, &status))) + HMPI_GOTO_ERROR(FAIL, "MPI_Recv failed", mpi_code) + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "Received the message.\n"); +#endif + + /* Deserialize the selection in the chunk's dataspace */ + if (H5S_SELECT_DESERIALIZE(&selection, &buf_p) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTDECODE, FAIL, "unable to deserialize dataspace selection") + + /* H5S_extent_copy(selection, chunk_entry->chunk_info.mspace); */ + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "Deserialized selection info:\n"); + HDfprintf(debug_file, "| Mem Space:\n"); + HDfprintf(debug_file, "| - Extent Num elements: %zd\n", H5S_GET_EXTENT_NPOINTS(selection)); + HDfprintf(debug_file, "| - Extent type: %d\n", H5S_GET_EXTENT_TYPE(selection)); + HDfprintf(debug_file, "| - Selection Num elements: %zd\n", H5S_GET_SELECT_NPOINTS(selection)); + HDfprintf(debug_file, "| - Selection type: %d\n", H5S_GET_SELECT_TYPE(selection)); +#endif + + /* XXX: After receiving the selection information, the extent is not + * set/valid. Possibly copy the selection information directly into + * chunk entry mem space, or copy extent into received selection H5S type + */ + + + if (H5S_select_iter_init(mem_iter, selection, type_info->dst_type_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") + mem_iter_init = TRUE; + + if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(selection)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "Contents of message:\n["); + for (size_t j = 0; j < iter_nelmts; j++) { + if (j > 0) HDfprintf(debug_file, ", "); + HDfprintf(debug_file, "%lld", ((long *) buf_p)[j]); + } + HDfprintf(debug_file, "]\n"); +#endif + +#ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "Iter nelmts=%lld.\n", iter_nelmts); + HDfprintf(debug_file, "Mem space selected points: %zd.\n", H5S_GET_SELECT_NPOINTS(selection)); +#endif + + /* Update the chunk data with the received modification data */ + /* if (!H5D__gather_mem(buf_p, selection, mem_iter, (size_t) iter_nelmts, + io_info->dxpl_cache, chunk_entry->buf)) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't gather to write buffer") */ + if (H5D__scatter_mem(buf_p, selection, mem_iter, (size_t) iter_nelmts, + io_info->dxpl_cache, chunk_entry->buf) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't scatter to write buffer") + + chunk_entry->num_writers--; + + if (buf) + H5MM_free(buf); + if (H5S_SELECT_ITER_RELEASE(mem_iter) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") + if (selection) + if (H5S_close(selection) < 0) + HGOTO_ERROR(H5E_DATASPACE, H5E_CANTFREE, FAIL, "can't close dataspace") + } #ifdef PARALLEL_COMPRESS_DEBUG |