diff options
Diffstat (limited to 'src/H5FDsubfiling/H5FDioc_int.c')
-rw-r--r-- | src/H5FDsubfiling/H5FDioc_int.c | 382 |
1 files changed, 382 insertions, 0 deletions
diff --git a/src/H5FDsubfiling/H5FDioc_int.c b/src/H5FDsubfiling/H5FDioc_int.c new file mode 100644 index 0000000..c1ce669 --- /dev/null +++ b/src/H5FDsubfiling/H5FDioc_int.c @@ -0,0 +1,382 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Copyright by The HDF Group. * + * All rights reserved. * + * * + * This file is part of HDF5. The full HDF5 copyright notice, including * + * terms governing use, modification, and redistribution, is contained in * + * the COPYING file, which can be found at the root of the source code * + * distribution tree, or in https://support.hdfgroup.org/ftp/HDF5/releases. * + * If you do not have access to either file, you may request a copy from * + * help@hdfgroup.org. * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +/* + * Purpose: This is part of an I/O concentrator driver. + */ + +#include "H5FDioc_priv.h" + +static int async_completion(void *arg); + +/* + * Given a file offset, the stripe size and + * the number of IOCs, calculate the target + * IOC for I/O and the file offset for the + * subfile that IOC controls + */ +static inline void +calculate_target_ioc(int64_t file_offset, int64_t stripe_size, int n_io_concentrators, int64_t *target_ioc, + int64_t *ioc_file_offset) +{ + int64_t stripe_idx; + int64_t subfile_row; + + HDassert(target_ioc); + HDassert(ioc_file_offset); + HDassert(stripe_size > 0); + HDassert(n_io_concentrators > 0); + + stripe_idx = file_offset / stripe_size; + subfile_row = stripe_idx / n_io_concentrators; + + *target_ioc = stripe_idx % n_io_concentrators; + *ioc_file_offset = (subfile_row * stripe_size) + (file_offset % stripe_size); +} + +/* + * Utility routine to hack around casting away const + */ +static inline void * +cast_to_void(const void *data) +{ + union { + const void *const_ptr_to_data; + void * ptr_to_data; + } eliminate_const_warning; + eliminate_const_warning.const_ptr_to_data = data; + return eliminate_const_warning.ptr_to_data; +} + +/*------------------------------------------------------------------------- + * Function: ioc__write_independent_async + * + * Purpose: The IO operations can be striped across a selection of + * IO concentrators. The read and write independent calls + * compute the group of 1 or more IOCs and further create + * derived MPI datatypes when required by the size of the + * contiguous read or write requests. + * + * IOC(0) contains the logical data storage for file offset + * zero and all offsets that reside within modulo range of + * the subfiling stripe_size. + * + * We cycle through all 'n_io_conentrators' and send a + * descriptor to each IOC that has a non-zero sized IO + * request to fulfill. + * + * Sending descriptors to an IOC usually gets an ACK or + * NACK in response. For the write operations, we post + * asynch READs to receive ACKs from IOC ranks that have + * allocated memory receive the data to write to the + * subfile. Upon receiving an ACK, we send the actual + * user data to the IOC. + * + * Return: Non-negative on success/Negative on failure + * + * Programmer: Richard Warren + * 7/17/2020 + * + * Changes: Initial Version/None. + *------------------------------------------------------------------------- + */ +herr_t +ioc__write_independent_async(int64_t context_id, int n_io_concentrators, int64_t offset, int64_t elements, + const void *data, io_req_t **io_req) +{ + subfiling_context_t *sf_context = NULL; + MPI_Request ack_request = MPI_REQUEST_NULL; + io_req_t * sf_io_request = NULL; + int64_t ioc_start; + int64_t ioc_offset; + int64_t msg[3] = {0}; + int * io_concentrators = NULL; + int data_tag = 0; + int mpi_code; + herr_t ret_value = SUCCEED; + + HDassert(io_req); + + if (NULL == (sf_context = H5_get_subfiling_object(context_id))) + H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "can't get subfiling context from ID"); + HDassert(sf_context->topology); + HDassert(sf_context->topology->io_concentrators); + + io_concentrators = sf_context->topology->io_concentrators; + + /* + * Calculate the IOC that we'll send the I/O request to + * and the offset within that IOC's subfile + */ + calculate_target_ioc(offset, sf_context->sf_stripe_size, n_io_concentrators, &ioc_start, &ioc_offset); + + /* + * Wait for memory to be allocated on the target IOC before + * beginning send of user data. Once that memory has been + * allocated, we will receive an ACK (or NACK) message from + * the IOC to allow us to proceed. + * + * On ACK, the IOC will send the tag to be used for sending + * data. This allows us to distinguish between multiple + * concurrent writes from a single rank. + * + * Post an early non-blocking receive for the MPI tag here. + */ + if (MPI_SUCCESS != (mpi_code = MPI_Irecv(&data_tag, 1, MPI_INT, io_concentrators[ioc_start], + WRITE_INDEP_ACK, sf_context->sf_data_comm, &ack_request))) + H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Irecv failed", mpi_code); + + /* + * Prepare and send an I/O request to the IOC identified + * by the file offset + */ + msg[0] = elements; + msg[1] = ioc_offset; + msg[2] = context_id; + if (MPI_SUCCESS != (mpi_code = MPI_Send(msg, 3, MPI_INT64_T, io_concentrators[ioc_start], WRITE_INDEP, + sf_context->sf_msg_comm))) + H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Send failed", mpi_code); + + /* Wait to receive data tag */ + if (MPI_SUCCESS != (mpi_code = MPI_Wait(&ack_request, MPI_STATUS_IGNORE))) + H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Wait failed", mpi_code); + + if (data_tag == 0) + H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "received NACK from IOC"); + + /* At this point in the new implementation, we should queue + * the async write so that when the top level VFD tells us + * to complete all pending IO requests, we have all the info + * we need to accomplish that. + */ + if (NULL == (sf_io_request = HDmalloc(sizeof(io_req_t)))) + H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_WRITEERROR, FAIL, "couldn't allocate I/O request"); + + H5_CHECK_OVERFLOW(ioc_start, int64_t, int); + sf_io_request->completion_func.io_args.ioc = (int)ioc_start; + sf_io_request->completion_func.io_args.context_id = context_id; + sf_io_request->completion_func.io_args.offset = offset; + sf_io_request->completion_func.io_args.elements = elements; + sf_io_request->completion_func.io_args.data = cast_to_void(data); + sf_io_request->completion_func.io_args.io_req = MPI_REQUEST_NULL; + sf_io_request->completion_func.io_function = async_completion; + sf_io_request->completion_func.pending = 0; + + sf_io_request->prev = sf_io_request->next = NULL; + + /* + * Start the actual data transfer using the ack received + * from the IOC as the tag for the send + */ + H5_CHECK_OVERFLOW(elements, int64_t, int); + if (MPI_SUCCESS != + (mpi_code = MPI_Isend(data, (int)elements, MPI_BYTE, io_concentrators[ioc_start], data_tag, + sf_context->sf_data_comm, &sf_io_request->completion_func.io_args.io_req))) + H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code); + + /* + * NOTE: When we actually have the async I/O support, + * the request should be queued before we return to + * the caller. Having queued the I/O operation, we + * might want to get additional work started before + * allowing the queued I/O requests to make further + * progress and/or to complete, so we just return + * to the caller. + */ + + sf_io_request->completion_func.pending = 1; + *io_req = sf_io_request; + +done: + if (ret_value < 0) { + if (ack_request != MPI_REQUEST_NULL) { + if (MPI_SUCCESS != (mpi_code = MPI_Cancel(&ack_request))) + H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Cancel failed", mpi_code); + } + + HDfree(sf_io_request); + *io_req = NULL; + } + + H5_SUBFILING_FUNC_LEAVE; +} /* end ioc__write_independent_async() */ + +/*------------------------------------------------------------------------- + * Function: Internal ioc__read_independent_async + * + * Purpose: The IO operations can be striped across a selection of + * IO concentrators. The read and write independent calls + * compute the group of 1 or more IOCs and further create + * derived MPI datatypes when required by the size of the + * contiguous read or write requests. + * + * IOC(0) contains the logical data storage for file offset + * zero and all offsets that reside within modulo range of + * the subfiling stripe_size. + * + * We cycle through all 'n_io_conentrators' and send a + * descriptor to each IOC that has a non-zero sized IO + * request to fulfill. + * + * Sending descriptors to an IOC usually gets an ACK or + * NACK in response. For the read operations, we post + * asynch READs to receive the file data and wait until + * all pending operations have completed. + * + * Return: Non-negative on success/Negative on failure + * + * Programmer: Richard Warren + * 7/17/2020 + * + * Changes: Initial Version/None. + *------------------------------------------------------------------------- + */ +herr_t +ioc__read_independent_async(int64_t context_id, int n_io_concentrators, int64_t offset, int64_t elements, + void *data, io_req_t **io_req) +{ + subfiling_context_t *sf_context = NULL; + io_req_t * sf_io_request = NULL; + int64_t ioc_start; + int64_t ioc_offset; + int64_t msg[3] = {0}; + int * io_concentrators = NULL; + int mpi_code; + herr_t ret_value = SUCCEED; + + HDassert(io_req); + + if (NULL == (sf_context = H5_get_subfiling_object(context_id))) + H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "can't get subfiling context from ID"); + HDassert(sf_context->topology); + HDassert(sf_context->topology->io_concentrators); + + io_concentrators = sf_context->topology->io_concentrators; + + /* + * Calculate the IOC that we'll send the I/O request to + * and the offset within that IOC's subfile + */ + calculate_target_ioc(offset, sf_context->sf_stripe_size, n_io_concentrators, &ioc_start, &ioc_offset); + + /* + * At this point in the new implementation, we should queue + * the non-blocking recv so that when the top level VFD tells + * us to complete all pending IO requests, we have all the info + * we need to accomplish that. + * + * Post the early non-blocking receive here. + */ + if (NULL == (sf_io_request = HDmalloc(sizeof(io_req_t)))) + H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_READERROR, FAIL, "couldn't allocate I/O request"); + + H5_CHECK_OVERFLOW(ioc_start, int64_t, int); + sf_io_request->completion_func.io_args.ioc = (int)ioc_start; + sf_io_request->completion_func.io_args.context_id = context_id; + sf_io_request->completion_func.io_args.offset = offset; + sf_io_request->completion_func.io_args.elements = elements; + sf_io_request->completion_func.io_args.data = data; + sf_io_request->completion_func.io_args.io_req = MPI_REQUEST_NULL; + sf_io_request->completion_func.io_function = async_completion; + sf_io_request->completion_func.pending = 0; + + sf_io_request->prev = sf_io_request->next = NULL; + + H5_CHECK_OVERFLOW(elements, int64_t, int); + if (MPI_SUCCESS != + (mpi_code = MPI_Irecv(data, (int)elements, MPI_BYTE, io_concentrators[ioc_start], READ_INDEP_DATA, + sf_context->sf_data_comm, &sf_io_request->completion_func.io_args.io_req))) + H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Irecv failed", mpi_code); + + sf_io_request->completion_func.pending = 1; + *io_req = sf_io_request; + + /* + * Prepare and send an I/O request to the IOC identified + * by the file offset + */ + msg[0] = elements; + msg[1] = ioc_offset; + msg[2] = context_id; + if (MPI_SUCCESS != (mpi_code = MPI_Send(msg, 3, MPI_INT64_T, io_concentrators[ioc_start], READ_INDEP, + sf_context->sf_msg_comm))) + H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Send failed", mpi_code); + +done: + if (ret_value < 0) { + if (sf_io_request && sf_io_request->completion_func.io_args.io_req != MPI_REQUEST_NULL) { + if (MPI_SUCCESS != (mpi_code = MPI_Cancel(&sf_io_request->completion_func.io_args.io_req))) + H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Cancel failed", mpi_code); + } + + HDfree(sf_io_request); + *io_req = NULL; + } + + H5_SUBFILING_FUNC_LEAVE; +} /* end ioc__read_independent_async() */ + +/*------------------------------------------------------------------------- + * Function: async_completion + * + * Purpose: Given a single io_func_t structure containing the function + * pointer and it's input arguments and a single MPI_Request + * argument which needs to be completed, we make progress + * by calling MPI_Test. In this initial example, we loop + * until the request is completed as indicated by a non-zero + * flag variable. + * + * As we go further with the implementation, we anticipate that + * rather than testing a single request variable, we will + * deal with a collection of all pending IO requests (on + * this rank). + * + * Return: an integer status. Zero(0) indicates success. Negative + * values (-1) indicates an error. + *------------------------------------------------------------------------- + */ +static int +async_completion(void *arg) +{ + int n_reqs; + int mpi_code; + int ret_value = 0; + struct async_arg { + int n_reqs; + MPI_Request *sf_reqs; + } *in_progress = (struct async_arg *)arg; + + HDassert(arg); + + n_reqs = in_progress->n_reqs; + + if (n_reqs < 0) { +#ifdef H5FD_IOC_DEBUG + HDprintf("%s: invalid number of in progress I/O requests\n", __func__); +#endif + + ret_value = -1; + goto done; + } + + if (MPI_SUCCESS != (mpi_code = MPI_Waitall(n_reqs, in_progress->sf_reqs, MPI_STATUSES_IGNORE))) { +#ifdef H5FD_IOC_DEBUG + HDprintf("%s: MPI_Waitall failed with rc %d\n", __func__, mpi_code); +#endif + + ret_value = -1; + goto done; + } + +done: + H5_SUBFILING_FUNC_LEAVE; +} |