diff options
Diffstat (limited to 'src/H5FDsubfiling/H5FDioc_int.c')
-rw-r--r-- | src/H5FDsubfiling/H5FDioc_int.c | 295 |
1 files changed, 176 insertions, 119 deletions
diff --git a/src/H5FDsubfiling/H5FDioc_int.c b/src/H5FDsubfiling/H5FDioc_int.c index 71afef4..e2ba95a 100644 --- a/src/H5FDsubfiling/H5FDioc_int.c +++ b/src/H5FDsubfiling/H5FDioc_int.c @@ -16,31 +16,36 @@ #include "H5FDioc_priv.h" -static int async_completion(void *arg); - /* - * Given a file offset, the stripe size and - * the number of IOCs, calculate the target - * IOC for I/O and the file offset for the - * subfile that IOC controls + * Given a file offset, the stripe size, the + * number of IOCs and the number of subfiles, + * calculate the target IOC for I/O, the index + * of the target subfile out of the subfiles + * that the IOC controls and the file offset + * into that subfile */ static inline void -calculate_target_ioc(int64_t file_offset, int64_t stripe_size, int n_io_concentrators, int64_t *target_ioc, - int64_t *ioc_file_offset) +calculate_target_ioc(int64_t file_offset, int64_t stripe_size, int num_io_concentrators, int num_subfiles, + int64_t *target_ioc, int64_t *ioc_file_offset, int64_t *ioc_subfile_idx) { int64_t stripe_idx; int64_t subfile_row; + int64_t subfile_idx; + HDassert(stripe_size > 0); + HDassert(num_io_concentrators > 0); + HDassert(num_subfiles > 0); HDassert(target_ioc); HDassert(ioc_file_offset); - HDassert(stripe_size > 0); - HDassert(n_io_concentrators > 0); + HDassert(ioc_subfile_idx); stripe_idx = file_offset / stripe_size; - subfile_row = stripe_idx / n_io_concentrators; + subfile_row = stripe_idx / num_subfiles; + subfile_idx = (stripe_idx % num_subfiles) / num_io_concentrators; - *target_ioc = stripe_idx % n_io_concentrators; + *target_ioc = (stripe_idx % num_subfiles) % num_io_concentrators; *ioc_file_offset = (subfile_row * stripe_size) + (file_offset % stripe_size); + *ioc_subfile_idx = subfile_idx; } /* @@ -90,17 +95,20 @@ cast_to_void(const void *data) *------------------------------------------------------------------------- */ herr_t -ioc__write_independent_async(int64_t context_id, int n_io_concentrators, int64_t offset, int64_t elements, - const void *data, io_req_t **io_req) +ioc__write_independent_async(int64_t context_id, int64_t offset, int64_t elements, const void *data, + io_req_t **io_req) { subfiling_context_t *sf_context = NULL; MPI_Request ack_request = MPI_REQUEST_NULL; io_req_t *sf_io_request = NULL; int64_t ioc_start; int64_t ioc_offset; + int64_t ioc_subfile_idx; int64_t msg[3] = {0}; int *io_concentrators = NULL; - int data_tag = 0; + int num_io_concentrators; + int num_subfiles; + int data_tag = 0; int mpi_code; herr_t ret_value = SUCCEED; @@ -111,13 +119,16 @@ ioc__write_independent_async(int64_t context_id, int n_io_concentrators, int64_t HDassert(sf_context->topology); HDassert(sf_context->topology->io_concentrators); - io_concentrators = sf_context->topology->io_concentrators; + io_concentrators = sf_context->topology->io_concentrators; + num_io_concentrators = sf_context->topology->n_io_concentrators; + num_subfiles = sf_context->sf_num_subfiles; /* * Calculate the IOC that we'll send the I/O request to * and the offset within that IOC's subfile */ - calculate_target_ioc(offset, sf_context->sf_stripe_size, n_io_concentrators, &ioc_start, &ioc_offset); + calculate_target_ioc(offset, sf_context->sf_stripe_size, num_io_concentrators, num_subfiles, &ioc_start, + &ioc_offset, &ioc_subfile_idx); /* * Wait for memory to be allocated on the target IOC before @@ -141,37 +152,43 @@ ioc__write_independent_async(int64_t context_id, int n_io_concentrators, int64_t */ msg[0] = elements; msg[1] = ioc_offset; - msg[2] = context_id; - if (MPI_SUCCESS != (mpi_code = MPI_Send(msg, 3, MPI_INT64_T, io_concentrators[ioc_start], WRITE_INDEP, - sf_context->sf_msg_comm))) + msg[2] = ioc_subfile_idx; + if (MPI_SUCCESS != (mpi_code = MPI_Send(msg, 1, H5_subfiling_rpc_msg_type, io_concentrators[ioc_start], + WRITE_INDEP, sf_context->sf_msg_comm))) H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Send failed", mpi_code); - /* Wait to receive data tag */ + /* Wait to receive the data tag from the IOC */ if (MPI_SUCCESS != (mpi_code = MPI_Wait(&ack_request, MPI_STATUS_IGNORE))) H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Wait failed", mpi_code); if (data_tag == 0) H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "received NACK from IOC"); - /* At this point in the new implementation, we should queue - * the async write so that when the top level VFD tells us - * to complete all pending IO requests, we have all the info - * we need to accomplish that. + /* + * Allocate the I/O request object that will + * be returned to the caller */ if (NULL == (sf_io_request = HDmalloc(sizeof(io_req_t)))) H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_WRITEERROR, FAIL, "couldn't allocate I/O request"); H5_CHECK_OVERFLOW(ioc_start, int64_t, int); - sf_io_request->completion_func.io_args.ioc = (int)ioc_start; - sf_io_request->completion_func.io_args.context_id = context_id; - sf_io_request->completion_func.io_args.offset = offset; - sf_io_request->completion_func.io_args.elements = elements; - sf_io_request->completion_func.io_args.data = cast_to_void(data); - sf_io_request->completion_func.io_args.io_req = MPI_REQUEST_NULL; - sf_io_request->completion_func.io_function = async_completion; - sf_io_request->completion_func.pending = 0; + sf_io_request->ioc = (int)ioc_start; + sf_io_request->context_id = context_id; + sf_io_request->offset = offset; + sf_io_request->elements = elements; + sf_io_request->data = cast_to_void(data); + sf_io_request->io_transfer_req = MPI_REQUEST_NULL; + sf_io_request->io_comp_req = MPI_REQUEST_NULL; + sf_io_request->io_comp_tag = -1; - sf_io_request->prev = sf_io_request->next = NULL; + /* + * Start a non-blocking receive from the IOC that signifies + * when the actual write is complete + */ + if (MPI_SUCCESS != + (mpi_code = MPI_Irecv(&sf_io_request->io_comp_tag, 1, MPI_INT, io_concentrators[ioc_start], + WRITE_DATA_DONE, sf_context->sf_data_comm, &sf_io_request->io_comp_req))) + H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Irecv failed", mpi_code); /* * Start the actual data transfer using the ack received @@ -180,7 +197,7 @@ ioc__write_independent_async(int64_t context_id, int n_io_concentrators, int64_t H5_CHECK_OVERFLOW(elements, int64_t, int); if (MPI_SUCCESS != (mpi_code = MPI_Isend(data, (int)elements, MPI_BYTE, io_concentrators[ioc_start], data_tag, - sf_context->sf_data_comm, &sf_io_request->completion_func.io_args.io_req))) + sf_context->sf_data_comm, &sf_io_request->io_transfer_req))) H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code); /* @@ -193,14 +210,23 @@ ioc__write_independent_async(int64_t context_id, int n_io_concentrators, int64_t * to the caller. */ - sf_io_request->completion_func.pending = 1; - *io_req = sf_io_request; + *io_req = sf_io_request; done: if (ret_value < 0) { if (ack_request != MPI_REQUEST_NULL) { - if (MPI_SUCCESS != (mpi_code = MPI_Cancel(&ack_request))) - H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Cancel failed", mpi_code); + if (MPI_SUCCESS != (mpi_code = MPI_Wait(&ack_request, MPI_STATUS_IGNORE))) + H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Wait failed", mpi_code); + } + if (sf_io_request) { + if (sf_io_request->io_transfer_req != MPI_REQUEST_NULL) { + if (MPI_SUCCESS != (mpi_code = MPI_Wait(&sf_io_request->io_transfer_req, MPI_STATUS_IGNORE))) + H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Wait failed", mpi_code); + } + if (sf_io_request->io_comp_req != MPI_REQUEST_NULL) { + if (MPI_SUCCESS != (mpi_code = MPI_Wait(&sf_io_request->io_comp_req, MPI_STATUS_IGNORE))) + H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Wait failed", mpi_code); + } } HDfree(sf_io_request); @@ -241,81 +267,141 @@ done: *------------------------------------------------------------------------- */ herr_t -ioc__read_independent_async(int64_t context_id, int n_io_concentrators, int64_t offset, int64_t elements, - void *data, io_req_t **io_req) +ioc__read_independent_async(int64_t context_id, int64_t offset, int64_t elements, void *data, + io_req_t **io_req) { subfiling_context_t *sf_context = NULL; + MPI_Request ack_request = MPI_REQUEST_NULL; io_req_t *sf_io_request = NULL; + hbool_t need_data_tag = FALSE; int64_t ioc_start; int64_t ioc_offset; + int64_t ioc_subfile_idx; int64_t msg[3] = {0}; int *io_concentrators = NULL; + int num_io_concentrators; + int num_subfiles; + int data_tag = 0; int mpi_code; herr_t ret_value = SUCCEED; HDassert(io_req); + H5_CHECK_OVERFLOW(elements, int64_t, int); + if (NULL == (sf_context = H5_get_subfiling_object(context_id))) H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "can't get subfiling context from ID"); HDassert(sf_context->topology); HDassert(sf_context->topology->io_concentrators); - io_concentrators = sf_context->topology->io_concentrators; + io_concentrators = sf_context->topology->io_concentrators; + num_io_concentrators = sf_context->topology->n_io_concentrators; + num_subfiles = sf_context->sf_num_subfiles; + + /* + * If we are using 1 subfile per IOC, we can optimize reads + * a little since each read will go to a separate IOC and we + * won't be in danger of data being received in an + * unpredictable order. However, if some IOCs own more than + * 1 subfile, we need to associate each read with a unique + * message tag to make sure the data is received in the + * correct order. + */ + need_data_tag = num_subfiles != num_io_concentrators; + if (!need_data_tag) + data_tag = READ_INDEP_DATA; /* * Calculate the IOC that we'll send the I/O request to * and the offset within that IOC's subfile */ - calculate_target_ioc(offset, sf_context->sf_stripe_size, n_io_concentrators, &ioc_start, &ioc_offset); + calculate_target_ioc(offset, sf_context->sf_stripe_size, num_io_concentrators, num_subfiles, &ioc_start, + &ioc_offset, &ioc_subfile_idx); /* - * At this point in the new implementation, we should queue - * the non-blocking recv so that when the top level VFD tells - * us to complete all pending IO requests, we have all the info - * we need to accomplish that. - * - * Post the early non-blocking receive here. + * Allocate the I/O request object that will + * be returned to the caller */ if (NULL == (sf_io_request = HDmalloc(sizeof(io_req_t)))) H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_READERROR, FAIL, "couldn't allocate I/O request"); H5_CHECK_OVERFLOW(ioc_start, int64_t, int); - sf_io_request->completion_func.io_args.ioc = (int)ioc_start; - sf_io_request->completion_func.io_args.context_id = context_id; - sf_io_request->completion_func.io_args.offset = offset; - sf_io_request->completion_func.io_args.elements = elements; - sf_io_request->completion_func.io_args.data = data; - sf_io_request->completion_func.io_args.io_req = MPI_REQUEST_NULL; - sf_io_request->completion_func.io_function = async_completion; - sf_io_request->completion_func.pending = 0; - - sf_io_request->prev = sf_io_request->next = NULL; + sf_io_request->ioc = (int)ioc_start; + sf_io_request->context_id = context_id; + sf_io_request->offset = offset; + sf_io_request->elements = elements; + sf_io_request->data = data; + sf_io_request->io_transfer_req = MPI_REQUEST_NULL; + sf_io_request->io_comp_req = MPI_REQUEST_NULL; + sf_io_request->io_comp_tag = -1; + + if (need_data_tag) { + /* + * Post an early non-blocking receive for IOC to send an ACK + * (or NACK) message with a data tag that we will use for + * receiving data + */ + if (MPI_SUCCESS != (mpi_code = MPI_Irecv(&data_tag, 1, MPI_INT, io_concentrators[ioc_start], + READ_INDEP_ACK, sf_context->sf_data_comm, &ack_request))) + H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Irecv failed", mpi_code); + + /* + * Prepare and send an I/O request to the IOC identified + * by the file offset + */ + msg[0] = elements; + msg[1] = ioc_offset; + msg[2] = ioc_subfile_idx; + if (MPI_SUCCESS != + (mpi_code = MPI_Send(msg, 1, H5_subfiling_rpc_msg_type, io_concentrators[ioc_start], READ_INDEP, + sf_context->sf_msg_comm))) + H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Send failed", mpi_code); + + /* Wait to receive the data tag from the IOC */ + if (MPI_SUCCESS != (mpi_code = MPI_Wait(&ack_request, MPI_STATUS_IGNORE))) + H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Wait failed", mpi_code); + + if (data_tag == 0) + H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "received NACK from IOC"); + } - H5_CHECK_OVERFLOW(elements, int64_t, int); + /* + * Post a non-blocking receive for the data from the IOC + * using the selected data tag (either the one received + * from the IOC or the static READ_INDEP_DATA tag) + */ if (MPI_SUCCESS != - (mpi_code = MPI_Irecv(data, (int)elements, MPI_BYTE, io_concentrators[ioc_start], READ_INDEP_DATA, - sf_context->sf_data_comm, &sf_io_request->completion_func.io_args.io_req))) + (mpi_code = MPI_Irecv(data, (int)elements, MPI_BYTE, io_concentrators[ioc_start], data_tag, + sf_context->sf_data_comm, &sf_io_request->io_transfer_req))) H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Irecv failed", mpi_code); - sf_io_request->completion_func.pending = 1; - *io_req = sf_io_request; + if (!need_data_tag) { + /* + * Prepare and send an I/O request to the IOC identified + * by the file offset + */ + msg[0] = elements; + msg[1] = ioc_offset; + msg[2] = ioc_subfile_idx; + if (MPI_SUCCESS != + (mpi_code = MPI_Send(msg, 1, H5_subfiling_rpc_msg_type, io_concentrators[ioc_start], READ_INDEP, + sf_context->sf_msg_comm))) + H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Send failed", mpi_code); + } - /* - * Prepare and send an I/O request to the IOC identified - * by the file offset - */ - msg[0] = elements; - msg[1] = ioc_offset; - msg[2] = context_id; - if (MPI_SUCCESS != (mpi_code = MPI_Send(msg, 3, MPI_INT64_T, io_concentrators[ioc_start], READ_INDEP, - sf_context->sf_msg_comm))) - H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Send failed", mpi_code); + *io_req = sf_io_request; done: if (ret_value < 0) { - if (sf_io_request && sf_io_request->completion_func.io_args.io_req != MPI_REQUEST_NULL) { - if (MPI_SUCCESS != (mpi_code = MPI_Cancel(&sf_io_request->completion_func.io_args.io_req))) - H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Cancel failed", mpi_code); + if (ack_request != MPI_REQUEST_NULL) { + if (MPI_SUCCESS != (mpi_code = MPI_Wait(&ack_request, MPI_STATUS_IGNORE))) + H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Wait failed", mpi_code); + } + if (sf_io_request) { + if (sf_io_request->io_transfer_req != MPI_REQUEST_NULL) { + if (MPI_SUCCESS != (mpi_code = MPI_Wait(&sf_io_request->io_transfer_req, MPI_STATUS_IGNORE))) + H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Wait failed", mpi_code); + } } HDfree(sf_io_request); @@ -326,56 +412,27 @@ done: } /* end ioc__read_independent_async() */ /*------------------------------------------------------------------------- - * Function: async_completion + * Function: ioc__async_completion * - * Purpose: Given a single io_func_t structure containing the function - * pointer and it's input arguments and a single MPI_Request - * argument which needs to be completed, we make progress - * by calling MPI_Test. In this initial example, we loop - * until the request is completed as indicated by a non-zero - * flag variable. + * Purpose: IOC function to complete outstanding I/O requests. + * Currently just a wrapper around MPI_Waitall on the given + * MPI_Request array. * - * As we go further with the implementation, we anticipate that - * rather than testing a single request variable, we will - * deal with a collection of all pending IO requests (on - * this rank). + * Return: Non-negative on success/Negative on failure * - * Return: an integer status. Zero(0) indicates success. Negative - * values (-1) indicates an error. *------------------------------------------------------------------------- */ -static int -async_completion(void *arg) +herr_t +ioc__async_completion(MPI_Request *mpi_reqs, size_t num_reqs) { - int n_reqs; - int mpi_code; - int ret_value = 0; - struct async_arg { - int n_reqs; - MPI_Request *sf_reqs; - } *in_progress = (struct async_arg *)arg; - - HDassert(arg); - - n_reqs = in_progress->n_reqs; + herr_t ret_value = SUCCEED; + int mpi_code; - if (n_reqs < 0) { -#ifdef H5FD_IOC_DEBUG - HDprintf("%s: invalid number of in progress I/O requests\n", __func__); -#endif + HDassert(mpi_reqs); - ret_value = -1; - goto done; - } - - if (MPI_SUCCESS != (mpi_code = MPI_Waitall(n_reqs, in_progress->sf_reqs, MPI_STATUSES_IGNORE))) { -#ifdef H5FD_IOC_DEBUG - HDprintf("%s: MPI_Waitall failed with rc %d\n", __func__, mpi_code); -#endif - - ret_value = -1; - goto done; - } + H5_CHECK_OVERFLOW(num_reqs, size_t, int); + if (MPI_SUCCESS != (mpi_code = MPI_Waitall((int)num_reqs, mpi_reqs, MPI_STATUSES_IGNORE))) + H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code); done: H5_SUBFILING_FUNC_LEAVE; |