summaryrefslogtreecommitdiffstats
path: root/src/H5Dmpio.c
diff options
context:
space:
mode:
authorJordan Henderson <jhenderson@hdfgroup.org>2017-01-24 21:01:31 (GMT)
committerJordan Henderson <jhenderson@hdfgroup.org>2017-01-24 21:01:31 (GMT)
commitaab742c9a2faa5d2136245b1370b250022804861 (patch)
treeb9ae7f64b1cf81ea5c6ec0293a9c18999678a9f4 /src/H5Dmpio.c
parentb19b0ea67d5636504e4b7ebd6241d52a5a4f09df (diff)
downloadhdf5-aab742c9a2faa5d2136245b1370b250022804861.zip
hdf5-aab742c9a2faa5d2136245b1370b250022804861.tar.gz
hdf5-aab742c9a2faa5d2136245b1370b250022804861.tar.bz2
Test code for sending chunk modification data around
Diffstat (limited to 'src/H5Dmpio.c')
-rw-r--r--src/H5Dmpio.c200
1 files changed, 190 insertions, 10 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c
index 68d2752..70a859e 100644
--- a/src/H5Dmpio.c
+++ b/src/H5Dmpio.c
@@ -758,7 +758,7 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf
/* step 1: choose an IO option */
/* If the average number of chunk per process is greater than a threshold, we will do one link chunked IO. */
- if((unsigned)sum_chunk / mpi_size >= one_link_chunk_io_threshold)
+ if((unsigned)sum_chunk / (unsigned)mpi_size >= one_link_chunk_io_threshold)
io_option = H5D_ONE_LINK_CHUNK_IO_MORE_OPT;
#ifdef H5_HAVE_INSTRUMENTED_LIBRARY
else
@@ -2660,10 +2660,13 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
{
H5D_filtered_collective_io_info_t *local_info_array = NULL;
H5D_filtered_collective_io_info_t *overlap_info_array = NULL;
+ H5S_sel_iter_t *mem_iter = NULL;
H5SL_node_t *chunk_node;
MPI_Request *send_requests = NULL;
MPI_Status *send_statuses = NULL;
hbool_t no_overlap = FALSE;
+ hbool_t mem_iter_init = FALSE;
+ uint8_t *mod_data = NULL;
size_t num_send_requests;
size_t num_chunks_selected;
size_t overlap_info_array_num_entries;
@@ -2751,6 +2754,9 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
if (NULL == (send_requests = (MPI_Request *) H5MM_malloc(num_chunks_selected * sizeof(*send_requests))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send requests buffer")
+ if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(H5S_sel_iter_t))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator")
+
/* XXX: Change minor error code */
if (H5D__mpio_array_gather(io_info, local_info_array, num_chunks_selected,
sizeof(*local_info_array), (void **) &overlap_info_array, &overlap_info_array_num_entries,
@@ -2771,7 +2777,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
/* Store the correct chunk entry information in case this process
* becomes the new chunk's owner. The chunk entry that this process
* contributed will be the only one with a valid dataspace selection
- * on this particular process
+ * on that particular process
*/
if (mpi_rank == overlap_info_array[i].owner)
chunk_entry = overlap_info_array[i];
@@ -2785,9 +2791,8 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
}
num_writers++;
- i++;
- if (i == overlap_info_array_num_entries) break;
+ if (++i == overlap_info_array_num_entries) break;
} while (overlap_info_array[i].old_chunk.offset == chunk_addr);
if (mpi_rank == new_owner) {
@@ -2800,11 +2805,80 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
overlap_info_array[num_chunks_selected++] = chunk_entry;
} /* end if */
else {
+ hssize_t iter_nelmts;
+ hssize_t mod_data_size;
+ uint8_t *mod_data_p = NULL;
+
+ /* XXX: Need some way of checking chunk entry to validate that this process
+ * is actually contributing some data for this chunk update
+ */
+
+ /* Determine size of serialized chunk selection plus the size
+ * of the data being written
+ */
+ if ((mod_data_size = H5S_SELECT_SERIAL_SIZE(chunk_entry.chunk_info.mspace)) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to check dataspace selection size")
+
+ if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_entry.chunk_info.mspace)) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
+
+ /* XXX: For now, make sure enough memory is allocated by just adding the chunk
+ * size
+ */
+ mod_data_size += iter_nelmts * type_info->src_type_size;
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "Allocing %zd bytes for mod. data buffer.\n", (size_t) mod_data_size);
+#endif
+
+ if (NULL == (mod_data = (uint8_t *) H5MM_malloc(mod_data_size)))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification send buffer")
+
+ /* Serialize the current selection into the buffer */
+ mod_data_p = mod_data;
+ if (H5S_SELECT_SERIALIZE(chunk_entry.chunk_info.mspace, &mod_data_p) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to serialize dataspace selection")
+
+ /* Initialize iterator for memory selection */
+ if (H5S_select_iter_init(mem_iter, chunk_entry.chunk_info.mspace, type_info->src_type_size) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
+ mem_iter_init = TRUE;
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "Iterating over %lld elements.\n", iter_nelmts);
+#endif
+
+ /* Collect the modification data into the buffer */
+ if (!H5D__gather_mem(io_info->u.wbuf, chunk_entry.chunk_info.mspace, mem_iter,
+ (size_t) iter_nelmts, io_info->dxpl_cache, mod_data_p))
+ HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't gather from write buffer")
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "| - Mod. Data Buffer:\n");
+ HDfprintf(debug_file, "| - [");
+ for (size_t j = 0; j < (size_t) iter_nelmts; j++) {
+ if (j > 0) HDfprintf(debug_file, ", ");
+ HDfprintf(debug_file, "%lld", ((long *) mod_data_p)[j]);
+ }
+ HDfprintf(debug_file, "]\n|\n");
+
+ HDfprintf(debug_file, "Sending modification data for chunk at address %a to process %d.\n", chunk_entry.old_chunk.offset, new_owner);
+#endif
+
/* Send modification data to new owner */
+ H5_CHECK_OVERFLOW(mod_data_size, hssize_t, int)
+ if (MPI_SUCCESS != (mpi_code = MPI_Isend(mod_data, (int) mod_data_size, MPI_BYTE, new_owner,
+ chunk_entry.chunk_info.index, io_info->comm, &send_requests[num_send_requests++])))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code)
- /* if (MPI_SUCCESS != (mpi_code= MPI_Isend(, , , new_owner,
- chunk_entry.old_chunk.offset, io_info->comm, &send_requests[num_send_requests++])))
- HMPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code) */
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "Mod. data sent.\n");
+#endif
+
+ if (mod_data)
+ mod_data = (uint8_t *) H5MM_free(mod_data);
+ if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
} /* end else */
#ifdef PARALLEL_COMPRESS_DEBUG
@@ -2866,12 +2940,15 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code)
}
-
done:
if (send_requests)
H5MM_free(send_requests);
if (send_statuses)
H5MM_free(send_statuses);
+ if (mod_data)
+ H5MM_free(mod_data);
+ if (mem_iter)
+ H5MM_free(mem_iter);
FUNC_LEAVE_NOAPI(ret_value)
} /* end H5D__construct_filtered_io_info_list() */
@@ -3001,7 +3078,7 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
H5S_sel_iter_t *mem_iter = NULL;
unsigned filter_mask = 0;
hssize_t iter_nelmts;
- hbool_t full_overwrite = TRUE;
+ hbool_t full_overwrite = FALSE;
hbool_t mem_iter_init = FALSE;
size_t buf_size;
herr_t ret_value = SUCCEED;
@@ -3066,10 +3143,10 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
/* Owner of this chunk, receive modification data from other processes */
+ /* Initialize iterator for memory selection */
if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(*mem_iter))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator")
- /* Initialize iterator for memory selection */
/* XXX: dst_type_size may need to be src_type_size depending on operation */
if (H5S_select_iter_init(mem_iter, chunk_entry->chunk_info.mspace, type_info->dst_type_size) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
@@ -3097,8 +3174,111 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
(size_t) iter_nelmts, io_info->dxpl_cache, chunk_entry->buf))
HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't gather from write buffer")
+ if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
+
/* Update the chunk data with any modifications from other processes */
+ while (chunk_entry->num_writers > 1) {
+ MPI_Status status;
+ uint8_t *buf = NULL;
+ uint8_t *buf_p = NULL;
+ H5S_t *selection = NULL;
+ int count;
+ int mpi_code;
+
+ /* XXX: Since the receive tag needs to be an int, it is possible that a chunk's index
+ * may fall outside the range of an int and cause an overflow problem when casting down
+ * here
+ */
+ H5_CHECK_OVERFLOW(chunk_entry->chunk_info.index, hsize_t, int)
+ if (MPI_SUCCESS != (mpi_code = MPI_Probe(MPI_ANY_SOURCE, (int) chunk_entry->chunk_info.index,
+ io_info->comm, &status)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Probe failed", mpi_code)
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "Found message from source %d with tag %d.\n", status.MPI_SOURCE, status.MPI_TAG);
+#endif
+
+ /* Retrieve the message size */
+ if (MPI_SUCCESS != (mpi_code = MPI_Get_count(&status, MPI_BYTE, &count)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Get_count failed", mpi_code)
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "Message size is %d bytes.\n", count);
+#endif
+ if (NULL == (buf = (uint8_t *) H5MM_malloc(count)))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate modification data receive buffer")
+ buf_p = buf;
+
+ if (MPI_SUCCESS != (mpi_code = MPI_Recv(buf, count, MPI_BYTE, MPI_ANY_SOURCE,
+ chunk_entry->chunk_info.index, io_info->comm, &status)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Recv failed", mpi_code)
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "Received the message.\n");
+#endif
+
+ /* Deserialize the selection in the chunk's dataspace */
+ if (H5S_SELECT_DESERIALIZE(&selection, &buf_p) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTDECODE, FAIL, "unable to deserialize dataspace selection")
+
+ /* H5S_extent_copy(selection, chunk_entry->chunk_info.mspace); */
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "Deserialized selection info:\n");
+ HDfprintf(debug_file, "| Mem Space:\n");
+ HDfprintf(debug_file, "| - Extent Num elements: %zd\n", H5S_GET_EXTENT_NPOINTS(selection));
+ HDfprintf(debug_file, "| - Extent type: %d\n", H5S_GET_EXTENT_TYPE(selection));
+ HDfprintf(debug_file, "| - Selection Num elements: %zd\n", H5S_GET_SELECT_NPOINTS(selection));
+ HDfprintf(debug_file, "| - Selection type: %d\n", H5S_GET_SELECT_TYPE(selection));
+#endif
+
+ /* XXX: After receiving the selection information, the extent is not
+ * set/valid. Possibly copy the selection information directly into
+ * chunk entry mem space, or copy extent into received selection H5S type
+ */
+
+
+ if (H5S_select_iter_init(mem_iter, selection, type_info->dst_type_size) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
+ mem_iter_init = TRUE;
+
+ if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(selection)) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "Contents of message:\n[");
+ for (size_t j = 0; j < iter_nelmts; j++) {
+ if (j > 0) HDfprintf(debug_file, ", ");
+ HDfprintf(debug_file, "%lld", ((long *) buf_p)[j]);
+ }
+ HDfprintf(debug_file, "]\n");
+#endif
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "Iter nelmts=%lld.\n", iter_nelmts);
+ HDfprintf(debug_file, "Mem space selected points: %zd.\n", H5S_GET_SELECT_NPOINTS(selection));
+#endif
+
+ /* Update the chunk data with the received modification data */
+ /* if (!H5D__gather_mem(buf_p, selection, mem_iter, (size_t) iter_nelmts,
+ io_info->dxpl_cache, chunk_entry->buf))
+ HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't gather to write buffer") */
+ if (H5D__scatter_mem(buf_p, selection, mem_iter, (size_t) iter_nelmts,
+ io_info->dxpl_cache, chunk_entry->buf) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't scatter to write buffer")
+
+ chunk_entry->num_writers--;
+
+ if (buf)
+ H5MM_free(buf);
+ if (H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
+ if (selection)
+ if (H5S_close(selection) < 0)
+ HGOTO_ERROR(H5E_DATASPACE, H5E_CANTFREE, FAIL, "can't close dataspace")
+ }
#ifdef PARALLEL_COMPRESS_DEBUG