summaryrefslogtreecommitdiffstats
path: root/src/H5FDsubfiling/H5FDioc_int.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/H5FDsubfiling/H5FDioc_int.c')
-rw-r--r--src/H5FDsubfiling/H5FDioc_int.c295
1 files changed, 176 insertions, 119 deletions
diff --git a/src/H5FDsubfiling/H5FDioc_int.c b/src/H5FDsubfiling/H5FDioc_int.c
index 71afef4..e2ba95a 100644
--- a/src/H5FDsubfiling/H5FDioc_int.c
+++ b/src/H5FDsubfiling/H5FDioc_int.c
@@ -16,31 +16,36 @@
#include "H5FDioc_priv.h"
-static int async_completion(void *arg);
-
/*
- * Given a file offset, the stripe size and
- * the number of IOCs, calculate the target
- * IOC for I/O and the file offset for the
- * subfile that IOC controls
+ * Given a file offset, the stripe size, the
+ * number of IOCs and the number of subfiles,
+ * calculate the target IOC for I/O, the index
+ * of the target subfile out of the subfiles
+ * that the IOC controls and the file offset
+ * into that subfile
*/
static inline void
-calculate_target_ioc(int64_t file_offset, int64_t stripe_size, int n_io_concentrators, int64_t *target_ioc,
- int64_t *ioc_file_offset)
+calculate_target_ioc(int64_t file_offset, int64_t stripe_size, int num_io_concentrators, int num_subfiles,
+ int64_t *target_ioc, int64_t *ioc_file_offset, int64_t *ioc_subfile_idx)
{
int64_t stripe_idx;
int64_t subfile_row;
+ int64_t subfile_idx;
+ HDassert(stripe_size > 0);
+ HDassert(num_io_concentrators > 0);
+ HDassert(num_subfiles > 0);
HDassert(target_ioc);
HDassert(ioc_file_offset);
- HDassert(stripe_size > 0);
- HDassert(n_io_concentrators > 0);
+ HDassert(ioc_subfile_idx);
stripe_idx = file_offset / stripe_size;
- subfile_row = stripe_idx / n_io_concentrators;
+ subfile_row = stripe_idx / num_subfiles;
+ subfile_idx = (stripe_idx % num_subfiles) / num_io_concentrators;
- *target_ioc = stripe_idx % n_io_concentrators;
+ *target_ioc = (stripe_idx % num_subfiles) % num_io_concentrators;
*ioc_file_offset = (subfile_row * stripe_size) + (file_offset % stripe_size);
+ *ioc_subfile_idx = subfile_idx;
}
/*
@@ -90,17 +95,20 @@ cast_to_void(const void *data)
*-------------------------------------------------------------------------
*/
herr_t
-ioc__write_independent_async(int64_t context_id, int n_io_concentrators, int64_t offset, int64_t elements,
- const void *data, io_req_t **io_req)
+ioc__write_independent_async(int64_t context_id, int64_t offset, int64_t elements, const void *data,
+ io_req_t **io_req)
{
subfiling_context_t *sf_context = NULL;
MPI_Request ack_request = MPI_REQUEST_NULL;
io_req_t *sf_io_request = NULL;
int64_t ioc_start;
int64_t ioc_offset;
+ int64_t ioc_subfile_idx;
int64_t msg[3] = {0};
int *io_concentrators = NULL;
- int data_tag = 0;
+ int num_io_concentrators;
+ int num_subfiles;
+ int data_tag = 0;
int mpi_code;
herr_t ret_value = SUCCEED;
@@ -111,13 +119,16 @@ ioc__write_independent_async(int64_t context_id, int n_io_concentrators, int64_t
HDassert(sf_context->topology);
HDassert(sf_context->topology->io_concentrators);
- io_concentrators = sf_context->topology->io_concentrators;
+ io_concentrators = sf_context->topology->io_concentrators;
+ num_io_concentrators = sf_context->topology->n_io_concentrators;
+ num_subfiles = sf_context->sf_num_subfiles;
/*
* Calculate the IOC that we'll send the I/O request to
* and the offset within that IOC's subfile
*/
- calculate_target_ioc(offset, sf_context->sf_stripe_size, n_io_concentrators, &ioc_start, &ioc_offset);
+ calculate_target_ioc(offset, sf_context->sf_stripe_size, num_io_concentrators, num_subfiles, &ioc_start,
+ &ioc_offset, &ioc_subfile_idx);
/*
* Wait for memory to be allocated on the target IOC before
@@ -141,37 +152,43 @@ ioc__write_independent_async(int64_t context_id, int n_io_concentrators, int64_t
*/
msg[0] = elements;
msg[1] = ioc_offset;
- msg[2] = context_id;
- if (MPI_SUCCESS != (mpi_code = MPI_Send(msg, 3, MPI_INT64_T, io_concentrators[ioc_start], WRITE_INDEP,
- sf_context->sf_msg_comm)))
+ msg[2] = ioc_subfile_idx;
+ if (MPI_SUCCESS != (mpi_code = MPI_Send(msg, 1, H5_subfiling_rpc_msg_type, io_concentrators[ioc_start],
+ WRITE_INDEP, sf_context->sf_msg_comm)))
H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Send failed", mpi_code);
- /* Wait to receive data tag */
+ /* Wait to receive the data tag from the IOC */
if (MPI_SUCCESS != (mpi_code = MPI_Wait(&ack_request, MPI_STATUS_IGNORE)))
H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Wait failed", mpi_code);
if (data_tag == 0)
H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "received NACK from IOC");
- /* At this point in the new implementation, we should queue
- * the async write so that when the top level VFD tells us
- * to complete all pending IO requests, we have all the info
- * we need to accomplish that.
+ /*
+ * Allocate the I/O request object that will
+ * be returned to the caller
*/
if (NULL == (sf_io_request = HDmalloc(sizeof(io_req_t))))
H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_WRITEERROR, FAIL, "couldn't allocate I/O request");
H5_CHECK_OVERFLOW(ioc_start, int64_t, int);
- sf_io_request->completion_func.io_args.ioc = (int)ioc_start;
- sf_io_request->completion_func.io_args.context_id = context_id;
- sf_io_request->completion_func.io_args.offset = offset;
- sf_io_request->completion_func.io_args.elements = elements;
- sf_io_request->completion_func.io_args.data = cast_to_void(data);
- sf_io_request->completion_func.io_args.io_req = MPI_REQUEST_NULL;
- sf_io_request->completion_func.io_function = async_completion;
- sf_io_request->completion_func.pending = 0;
+ sf_io_request->ioc = (int)ioc_start;
+ sf_io_request->context_id = context_id;
+ sf_io_request->offset = offset;
+ sf_io_request->elements = elements;
+ sf_io_request->data = cast_to_void(data);
+ sf_io_request->io_transfer_req = MPI_REQUEST_NULL;
+ sf_io_request->io_comp_req = MPI_REQUEST_NULL;
+ sf_io_request->io_comp_tag = -1;
- sf_io_request->prev = sf_io_request->next = NULL;
+ /*
+ * Start a non-blocking receive from the IOC that signifies
+ * when the actual write is complete
+ */
+ if (MPI_SUCCESS !=
+ (mpi_code = MPI_Irecv(&sf_io_request->io_comp_tag, 1, MPI_INT, io_concentrators[ioc_start],
+ WRITE_DATA_DONE, sf_context->sf_data_comm, &sf_io_request->io_comp_req)))
+ H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Irecv failed", mpi_code);
/*
* Start the actual data transfer using the ack received
@@ -180,7 +197,7 @@ ioc__write_independent_async(int64_t context_id, int n_io_concentrators, int64_t
H5_CHECK_OVERFLOW(elements, int64_t, int);
if (MPI_SUCCESS !=
(mpi_code = MPI_Isend(data, (int)elements, MPI_BYTE, io_concentrators[ioc_start], data_tag,
- sf_context->sf_data_comm, &sf_io_request->completion_func.io_args.io_req)))
+ sf_context->sf_data_comm, &sf_io_request->io_transfer_req)))
H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code);
/*
@@ -193,14 +210,23 @@ ioc__write_independent_async(int64_t context_id, int n_io_concentrators, int64_t
* to the caller.
*/
- sf_io_request->completion_func.pending = 1;
- *io_req = sf_io_request;
+ *io_req = sf_io_request;
done:
if (ret_value < 0) {
if (ack_request != MPI_REQUEST_NULL) {
- if (MPI_SUCCESS != (mpi_code = MPI_Cancel(&ack_request)))
- H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Cancel failed", mpi_code);
+ if (MPI_SUCCESS != (mpi_code = MPI_Wait(&ack_request, MPI_STATUS_IGNORE)))
+ H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Wait failed", mpi_code);
+ }
+ if (sf_io_request) {
+ if (sf_io_request->io_transfer_req != MPI_REQUEST_NULL) {
+ if (MPI_SUCCESS != (mpi_code = MPI_Wait(&sf_io_request->io_transfer_req, MPI_STATUS_IGNORE)))
+ H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Wait failed", mpi_code);
+ }
+ if (sf_io_request->io_comp_req != MPI_REQUEST_NULL) {
+ if (MPI_SUCCESS != (mpi_code = MPI_Wait(&sf_io_request->io_comp_req, MPI_STATUS_IGNORE)))
+ H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Wait failed", mpi_code);
+ }
}
HDfree(sf_io_request);
@@ -241,81 +267,141 @@ done:
*-------------------------------------------------------------------------
*/
herr_t
-ioc__read_independent_async(int64_t context_id, int n_io_concentrators, int64_t offset, int64_t elements,
- void *data, io_req_t **io_req)
+ioc__read_independent_async(int64_t context_id, int64_t offset, int64_t elements, void *data,
+ io_req_t **io_req)
{
subfiling_context_t *sf_context = NULL;
+ MPI_Request ack_request = MPI_REQUEST_NULL;
io_req_t *sf_io_request = NULL;
+ hbool_t need_data_tag = FALSE;
int64_t ioc_start;
int64_t ioc_offset;
+ int64_t ioc_subfile_idx;
int64_t msg[3] = {0};
int *io_concentrators = NULL;
+ int num_io_concentrators;
+ int num_subfiles;
+ int data_tag = 0;
int mpi_code;
herr_t ret_value = SUCCEED;
HDassert(io_req);
+ H5_CHECK_OVERFLOW(elements, int64_t, int);
+
if (NULL == (sf_context = H5_get_subfiling_object(context_id)))
H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "can't get subfiling context from ID");
HDassert(sf_context->topology);
HDassert(sf_context->topology->io_concentrators);
- io_concentrators = sf_context->topology->io_concentrators;
+ io_concentrators = sf_context->topology->io_concentrators;
+ num_io_concentrators = sf_context->topology->n_io_concentrators;
+ num_subfiles = sf_context->sf_num_subfiles;
+
+ /*
+ * If we are using 1 subfile per IOC, we can optimize reads
+ * a little since each read will go to a separate IOC and we
+ * won't be in danger of data being received in an
+ * unpredictable order. However, if some IOCs own more than
+ * 1 subfile, we need to associate each read with a unique
+ * message tag to make sure the data is received in the
+ * correct order.
+ */
+ need_data_tag = num_subfiles != num_io_concentrators;
+ if (!need_data_tag)
+ data_tag = READ_INDEP_DATA;
/*
* Calculate the IOC that we'll send the I/O request to
* and the offset within that IOC's subfile
*/
- calculate_target_ioc(offset, sf_context->sf_stripe_size, n_io_concentrators, &ioc_start, &ioc_offset);
+ calculate_target_ioc(offset, sf_context->sf_stripe_size, num_io_concentrators, num_subfiles, &ioc_start,
+ &ioc_offset, &ioc_subfile_idx);
/*
- * At this point in the new implementation, we should queue
- * the non-blocking recv so that when the top level VFD tells
- * us to complete all pending IO requests, we have all the info
- * we need to accomplish that.
- *
- * Post the early non-blocking receive here.
+ * Allocate the I/O request object that will
+ * be returned to the caller
*/
if (NULL == (sf_io_request = HDmalloc(sizeof(io_req_t))))
H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_READERROR, FAIL, "couldn't allocate I/O request");
H5_CHECK_OVERFLOW(ioc_start, int64_t, int);
- sf_io_request->completion_func.io_args.ioc = (int)ioc_start;
- sf_io_request->completion_func.io_args.context_id = context_id;
- sf_io_request->completion_func.io_args.offset = offset;
- sf_io_request->completion_func.io_args.elements = elements;
- sf_io_request->completion_func.io_args.data = data;
- sf_io_request->completion_func.io_args.io_req = MPI_REQUEST_NULL;
- sf_io_request->completion_func.io_function = async_completion;
- sf_io_request->completion_func.pending = 0;
-
- sf_io_request->prev = sf_io_request->next = NULL;
+ sf_io_request->ioc = (int)ioc_start;
+ sf_io_request->context_id = context_id;
+ sf_io_request->offset = offset;
+ sf_io_request->elements = elements;
+ sf_io_request->data = data;
+ sf_io_request->io_transfer_req = MPI_REQUEST_NULL;
+ sf_io_request->io_comp_req = MPI_REQUEST_NULL;
+ sf_io_request->io_comp_tag = -1;
+
+ if (need_data_tag) {
+ /*
+ * Post an early non-blocking receive for IOC to send an ACK
+ * (or NACK) message with a data tag that we will use for
+ * receiving data
+ */
+ if (MPI_SUCCESS != (mpi_code = MPI_Irecv(&data_tag, 1, MPI_INT, io_concentrators[ioc_start],
+ READ_INDEP_ACK, sf_context->sf_data_comm, &ack_request)))
+ H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Irecv failed", mpi_code);
+
+ /*
+ * Prepare and send an I/O request to the IOC identified
+ * by the file offset
+ */
+ msg[0] = elements;
+ msg[1] = ioc_offset;
+ msg[2] = ioc_subfile_idx;
+ if (MPI_SUCCESS !=
+ (mpi_code = MPI_Send(msg, 1, H5_subfiling_rpc_msg_type, io_concentrators[ioc_start], READ_INDEP,
+ sf_context->sf_msg_comm)))
+ H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Send failed", mpi_code);
+
+ /* Wait to receive the data tag from the IOC */
+ if (MPI_SUCCESS != (mpi_code = MPI_Wait(&ack_request, MPI_STATUS_IGNORE)))
+ H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Wait failed", mpi_code);
+
+ if (data_tag == 0)
+ H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "received NACK from IOC");
+ }
- H5_CHECK_OVERFLOW(elements, int64_t, int);
+ /*
+ * Post a non-blocking receive for the data from the IOC
+ * using the selected data tag (either the one received
+ * from the IOC or the static READ_INDEP_DATA tag)
+ */
if (MPI_SUCCESS !=
- (mpi_code = MPI_Irecv(data, (int)elements, MPI_BYTE, io_concentrators[ioc_start], READ_INDEP_DATA,
- sf_context->sf_data_comm, &sf_io_request->completion_func.io_args.io_req)))
+ (mpi_code = MPI_Irecv(data, (int)elements, MPI_BYTE, io_concentrators[ioc_start], data_tag,
+ sf_context->sf_data_comm, &sf_io_request->io_transfer_req)))
H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Irecv failed", mpi_code);
- sf_io_request->completion_func.pending = 1;
- *io_req = sf_io_request;
+ if (!need_data_tag) {
+ /*
+ * Prepare and send an I/O request to the IOC identified
+ * by the file offset
+ */
+ msg[0] = elements;
+ msg[1] = ioc_offset;
+ msg[2] = ioc_subfile_idx;
+ if (MPI_SUCCESS !=
+ (mpi_code = MPI_Send(msg, 1, H5_subfiling_rpc_msg_type, io_concentrators[ioc_start], READ_INDEP,
+ sf_context->sf_msg_comm)))
+ H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Send failed", mpi_code);
+ }
- /*
- * Prepare and send an I/O request to the IOC identified
- * by the file offset
- */
- msg[0] = elements;
- msg[1] = ioc_offset;
- msg[2] = context_id;
- if (MPI_SUCCESS != (mpi_code = MPI_Send(msg, 3, MPI_INT64_T, io_concentrators[ioc_start], READ_INDEP,
- sf_context->sf_msg_comm)))
- H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Send failed", mpi_code);
+ *io_req = sf_io_request;
done:
if (ret_value < 0) {
- if (sf_io_request && sf_io_request->completion_func.io_args.io_req != MPI_REQUEST_NULL) {
- if (MPI_SUCCESS != (mpi_code = MPI_Cancel(&sf_io_request->completion_func.io_args.io_req)))
- H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Cancel failed", mpi_code);
+ if (ack_request != MPI_REQUEST_NULL) {
+ if (MPI_SUCCESS != (mpi_code = MPI_Wait(&ack_request, MPI_STATUS_IGNORE)))
+ H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Wait failed", mpi_code);
+ }
+ if (sf_io_request) {
+ if (sf_io_request->io_transfer_req != MPI_REQUEST_NULL) {
+ if (MPI_SUCCESS != (mpi_code = MPI_Wait(&sf_io_request->io_transfer_req, MPI_STATUS_IGNORE)))
+ H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Wait failed", mpi_code);
+ }
}
HDfree(sf_io_request);
@@ -326,56 +412,27 @@ done:
} /* end ioc__read_independent_async() */
/*-------------------------------------------------------------------------
- * Function: async_completion
+ * Function: ioc__async_completion
*
- * Purpose: Given a single io_func_t structure containing the function
- * pointer and it's input arguments and a single MPI_Request
- * argument which needs to be completed, we make progress
- * by calling MPI_Test. In this initial example, we loop
- * until the request is completed as indicated by a non-zero
- * flag variable.
+ * Purpose: IOC function to complete outstanding I/O requests.
+ * Currently just a wrapper around MPI_Waitall on the given
+ * MPI_Request array.
*
- * As we go further with the implementation, we anticipate that
- * rather than testing a single request variable, we will
- * deal with a collection of all pending IO requests (on
- * this rank).
+ * Return: Non-negative on success/Negative on failure
*
- * Return: an integer status. Zero(0) indicates success. Negative
- * values (-1) indicates an error.
*-------------------------------------------------------------------------
*/
-static int
-async_completion(void *arg)
+herr_t
+ioc__async_completion(MPI_Request *mpi_reqs, size_t num_reqs)
{
- int n_reqs;
- int mpi_code;
- int ret_value = 0;
- struct async_arg {
- int n_reqs;
- MPI_Request *sf_reqs;
- } *in_progress = (struct async_arg *)arg;
-
- HDassert(arg);
-
- n_reqs = in_progress->n_reqs;
+ herr_t ret_value = SUCCEED;
+ int mpi_code;
- if (n_reqs < 0) {
-#ifdef H5FD_IOC_DEBUG
- HDprintf("%s: invalid number of in progress I/O requests\n", __func__);
-#endif
+ HDassert(mpi_reqs);
- ret_value = -1;
- goto done;
- }
-
- if (MPI_SUCCESS != (mpi_code = MPI_Waitall(n_reqs, in_progress->sf_reqs, MPI_STATUSES_IGNORE))) {
-#ifdef H5FD_IOC_DEBUG
- HDprintf("%s: MPI_Waitall failed with rc %d\n", __func__, mpi_code);
-#endif
-
- ret_value = -1;
- goto done;
- }
+ H5_CHECK_OVERFLOW(num_reqs, size_t, int);
+ if (MPI_SUCCESS != (mpi_code = MPI_Waitall((int)num_reqs, mpi_reqs, MPI_STATUSES_IGNORE)))
+ H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code);
done:
H5_SUBFILING_FUNC_LEAVE;