summaryrefslogtreecommitdiffstats
path: root/src/H5FDsubfiling/H5FDioc_int.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/H5FDsubfiling/H5FDioc_int.c')
-rw-r--r--src/H5FDsubfiling/H5FDioc_int.c382
1 files changed, 382 insertions, 0 deletions
diff --git a/src/H5FDsubfiling/H5FDioc_int.c b/src/H5FDsubfiling/H5FDioc_int.c
new file mode 100644
index 0000000..c1ce669
--- /dev/null
+++ b/src/H5FDsubfiling/H5FDioc_int.c
@@ -0,0 +1,382 @@
+/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
+ * Copyright by The HDF Group. *
+ * All rights reserved. *
+ * *
+ * This file is part of HDF5. The full HDF5 copyright notice, including *
+ * terms governing use, modification, and redistribution, is contained in *
+ * the COPYING file, which can be found at the root of the source code *
+ * distribution tree, or in https://support.hdfgroup.org/ftp/HDF5/releases. *
+ * If you do not have access to either file, you may request a copy from *
+ * help@hdfgroup.org. *
+ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
+
+/*
+ * Purpose: This is part of an I/O concentrator driver.
+ */
+
+#include "H5FDioc_priv.h"
+
+static int async_completion(void *arg);
+
+/*
+ * Given a file offset, the stripe size and
+ * the number of IOCs, calculate the target
+ * IOC for I/O and the file offset for the
+ * subfile that IOC controls
+ */
+static inline void
+calculate_target_ioc(int64_t file_offset, int64_t stripe_size, int n_io_concentrators, int64_t *target_ioc,
+ int64_t *ioc_file_offset)
+{
+ int64_t stripe_idx;
+ int64_t subfile_row;
+
+ HDassert(target_ioc);
+ HDassert(ioc_file_offset);
+ HDassert(stripe_size > 0);
+ HDassert(n_io_concentrators > 0);
+
+ stripe_idx = file_offset / stripe_size;
+ subfile_row = stripe_idx / n_io_concentrators;
+
+ *target_ioc = stripe_idx % n_io_concentrators;
+ *ioc_file_offset = (subfile_row * stripe_size) + (file_offset % stripe_size);
+}
+
+/*
+ * Utility routine to hack around casting away const
+ */
+static inline void *
+cast_to_void(const void *data)
+{
+ union {
+ const void *const_ptr_to_data;
+ void * ptr_to_data;
+ } eliminate_const_warning;
+ eliminate_const_warning.const_ptr_to_data = data;
+ return eliminate_const_warning.ptr_to_data;
+}
+
+/*-------------------------------------------------------------------------
+ * Function: ioc__write_independent_async
+ *
+ * Purpose: The IO operations can be striped across a selection of
+ * IO concentrators. The read and write independent calls
+ * compute the group of 1 or more IOCs and further create
+ * derived MPI datatypes when required by the size of the
+ * contiguous read or write requests.
+ *
+ * IOC(0) contains the logical data storage for file offset
+ * zero and all offsets that reside within modulo range of
+ * the subfiling stripe_size.
+ *
+ * We cycle through all 'n_io_conentrators' and send a
+ * descriptor to each IOC that has a non-zero sized IO
+ * request to fulfill.
+ *
+ * Sending descriptors to an IOC usually gets an ACK or
+ * NACK in response. For the write operations, we post
+ * asynch READs to receive ACKs from IOC ranks that have
+ * allocated memory receive the data to write to the
+ * subfile. Upon receiving an ACK, we send the actual
+ * user data to the IOC.
+ *
+ * Return: Non-negative on success/Negative on failure
+ *
+ * Programmer: Richard Warren
+ * 7/17/2020
+ *
+ * Changes: Initial Version/None.
+ *-------------------------------------------------------------------------
+ */
+herr_t
+ioc__write_independent_async(int64_t context_id, int n_io_concentrators, int64_t offset, int64_t elements,
+ const void *data, io_req_t **io_req)
+{
+ subfiling_context_t *sf_context = NULL;
+ MPI_Request ack_request = MPI_REQUEST_NULL;
+ io_req_t * sf_io_request = NULL;
+ int64_t ioc_start;
+ int64_t ioc_offset;
+ int64_t msg[3] = {0};
+ int * io_concentrators = NULL;
+ int data_tag = 0;
+ int mpi_code;
+ herr_t ret_value = SUCCEED;
+
+ HDassert(io_req);
+
+ if (NULL == (sf_context = H5_get_subfiling_object(context_id)))
+ H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "can't get subfiling context from ID");
+ HDassert(sf_context->topology);
+ HDassert(sf_context->topology->io_concentrators);
+
+ io_concentrators = sf_context->topology->io_concentrators;
+
+ /*
+ * Calculate the IOC that we'll send the I/O request to
+ * and the offset within that IOC's subfile
+ */
+ calculate_target_ioc(offset, sf_context->sf_stripe_size, n_io_concentrators, &ioc_start, &ioc_offset);
+
+ /*
+ * Wait for memory to be allocated on the target IOC before
+ * beginning send of user data. Once that memory has been
+ * allocated, we will receive an ACK (or NACK) message from
+ * the IOC to allow us to proceed.
+ *
+ * On ACK, the IOC will send the tag to be used for sending
+ * data. This allows us to distinguish between multiple
+ * concurrent writes from a single rank.
+ *
+ * Post an early non-blocking receive for the MPI tag here.
+ */
+ if (MPI_SUCCESS != (mpi_code = MPI_Irecv(&data_tag, 1, MPI_INT, io_concentrators[ioc_start],
+ WRITE_INDEP_ACK, sf_context->sf_data_comm, &ack_request)))
+ H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Irecv failed", mpi_code);
+
+ /*
+ * Prepare and send an I/O request to the IOC identified
+ * by the file offset
+ */
+ msg[0] = elements;
+ msg[1] = ioc_offset;
+ msg[2] = context_id;
+ if (MPI_SUCCESS != (mpi_code = MPI_Send(msg, 3, MPI_INT64_T, io_concentrators[ioc_start], WRITE_INDEP,
+ sf_context->sf_msg_comm)))
+ H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Send failed", mpi_code);
+
+ /* Wait to receive data tag */
+ if (MPI_SUCCESS != (mpi_code = MPI_Wait(&ack_request, MPI_STATUS_IGNORE)))
+ H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Wait failed", mpi_code);
+
+ if (data_tag == 0)
+ H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "received NACK from IOC");
+
+ /* At this point in the new implementation, we should queue
+ * the async write so that when the top level VFD tells us
+ * to complete all pending IO requests, we have all the info
+ * we need to accomplish that.
+ */
+ if (NULL == (sf_io_request = HDmalloc(sizeof(io_req_t))))
+ H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_WRITEERROR, FAIL, "couldn't allocate I/O request");
+
+ H5_CHECK_OVERFLOW(ioc_start, int64_t, int);
+ sf_io_request->completion_func.io_args.ioc = (int)ioc_start;
+ sf_io_request->completion_func.io_args.context_id = context_id;
+ sf_io_request->completion_func.io_args.offset = offset;
+ sf_io_request->completion_func.io_args.elements = elements;
+ sf_io_request->completion_func.io_args.data = cast_to_void(data);
+ sf_io_request->completion_func.io_args.io_req = MPI_REQUEST_NULL;
+ sf_io_request->completion_func.io_function = async_completion;
+ sf_io_request->completion_func.pending = 0;
+
+ sf_io_request->prev = sf_io_request->next = NULL;
+
+ /*
+ * Start the actual data transfer using the ack received
+ * from the IOC as the tag for the send
+ */
+ H5_CHECK_OVERFLOW(elements, int64_t, int);
+ if (MPI_SUCCESS !=
+ (mpi_code = MPI_Isend(data, (int)elements, MPI_BYTE, io_concentrators[ioc_start], data_tag,
+ sf_context->sf_data_comm, &sf_io_request->completion_func.io_args.io_req)))
+ H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code);
+
+ /*
+ * NOTE: When we actually have the async I/O support,
+ * the request should be queued before we return to
+ * the caller. Having queued the I/O operation, we
+ * might want to get additional work started before
+ * allowing the queued I/O requests to make further
+ * progress and/or to complete, so we just return
+ * to the caller.
+ */
+
+ sf_io_request->completion_func.pending = 1;
+ *io_req = sf_io_request;
+
+done:
+ if (ret_value < 0) {
+ if (ack_request != MPI_REQUEST_NULL) {
+ if (MPI_SUCCESS != (mpi_code = MPI_Cancel(&ack_request)))
+ H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Cancel failed", mpi_code);
+ }
+
+ HDfree(sf_io_request);
+ *io_req = NULL;
+ }
+
+ H5_SUBFILING_FUNC_LEAVE;
+} /* end ioc__write_independent_async() */
+
+/*-------------------------------------------------------------------------
+ * Function: Internal ioc__read_independent_async
+ *
+ * Purpose: The IO operations can be striped across a selection of
+ * IO concentrators. The read and write independent calls
+ * compute the group of 1 or more IOCs and further create
+ * derived MPI datatypes when required by the size of the
+ * contiguous read or write requests.
+ *
+ * IOC(0) contains the logical data storage for file offset
+ * zero and all offsets that reside within modulo range of
+ * the subfiling stripe_size.
+ *
+ * We cycle through all 'n_io_conentrators' and send a
+ * descriptor to each IOC that has a non-zero sized IO
+ * request to fulfill.
+ *
+ * Sending descriptors to an IOC usually gets an ACK or
+ * NACK in response. For the read operations, we post
+ * asynch READs to receive the file data and wait until
+ * all pending operations have completed.
+ *
+ * Return: Non-negative on success/Negative on failure
+ *
+ * Programmer: Richard Warren
+ * 7/17/2020
+ *
+ * Changes: Initial Version/None.
+ *-------------------------------------------------------------------------
+ */
+herr_t
+ioc__read_independent_async(int64_t context_id, int n_io_concentrators, int64_t offset, int64_t elements,
+ void *data, io_req_t **io_req)
+{
+ subfiling_context_t *sf_context = NULL;
+ io_req_t * sf_io_request = NULL;
+ int64_t ioc_start;
+ int64_t ioc_offset;
+ int64_t msg[3] = {0};
+ int * io_concentrators = NULL;
+ int mpi_code;
+ herr_t ret_value = SUCCEED;
+
+ HDassert(io_req);
+
+ if (NULL == (sf_context = H5_get_subfiling_object(context_id)))
+ H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "can't get subfiling context from ID");
+ HDassert(sf_context->topology);
+ HDassert(sf_context->topology->io_concentrators);
+
+ io_concentrators = sf_context->topology->io_concentrators;
+
+ /*
+ * Calculate the IOC that we'll send the I/O request to
+ * and the offset within that IOC's subfile
+ */
+ calculate_target_ioc(offset, sf_context->sf_stripe_size, n_io_concentrators, &ioc_start, &ioc_offset);
+
+ /*
+ * At this point in the new implementation, we should queue
+ * the non-blocking recv so that when the top level VFD tells
+ * us to complete all pending IO requests, we have all the info
+ * we need to accomplish that.
+ *
+ * Post the early non-blocking receive here.
+ */
+ if (NULL == (sf_io_request = HDmalloc(sizeof(io_req_t))))
+ H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_READERROR, FAIL, "couldn't allocate I/O request");
+
+ H5_CHECK_OVERFLOW(ioc_start, int64_t, int);
+ sf_io_request->completion_func.io_args.ioc = (int)ioc_start;
+ sf_io_request->completion_func.io_args.context_id = context_id;
+ sf_io_request->completion_func.io_args.offset = offset;
+ sf_io_request->completion_func.io_args.elements = elements;
+ sf_io_request->completion_func.io_args.data = data;
+ sf_io_request->completion_func.io_args.io_req = MPI_REQUEST_NULL;
+ sf_io_request->completion_func.io_function = async_completion;
+ sf_io_request->completion_func.pending = 0;
+
+ sf_io_request->prev = sf_io_request->next = NULL;
+
+ H5_CHECK_OVERFLOW(elements, int64_t, int);
+ if (MPI_SUCCESS !=
+ (mpi_code = MPI_Irecv(data, (int)elements, MPI_BYTE, io_concentrators[ioc_start], READ_INDEP_DATA,
+ sf_context->sf_data_comm, &sf_io_request->completion_func.io_args.io_req)))
+ H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Irecv failed", mpi_code);
+
+ sf_io_request->completion_func.pending = 1;
+ *io_req = sf_io_request;
+
+ /*
+ * Prepare and send an I/O request to the IOC identified
+ * by the file offset
+ */
+ msg[0] = elements;
+ msg[1] = ioc_offset;
+ msg[2] = context_id;
+ if (MPI_SUCCESS != (mpi_code = MPI_Send(msg, 3, MPI_INT64_T, io_concentrators[ioc_start], READ_INDEP,
+ sf_context->sf_msg_comm)))
+ H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Send failed", mpi_code);
+
+done:
+ if (ret_value < 0) {
+ if (sf_io_request && sf_io_request->completion_func.io_args.io_req != MPI_REQUEST_NULL) {
+ if (MPI_SUCCESS != (mpi_code = MPI_Cancel(&sf_io_request->completion_func.io_args.io_req)))
+ H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Cancel failed", mpi_code);
+ }
+
+ HDfree(sf_io_request);
+ *io_req = NULL;
+ }
+
+ H5_SUBFILING_FUNC_LEAVE;
+} /* end ioc__read_independent_async() */
+
+/*-------------------------------------------------------------------------
+ * Function: async_completion
+ *
+ * Purpose: Given a single io_func_t structure containing the function
+ * pointer and it's input arguments and a single MPI_Request
+ * argument which needs to be completed, we make progress
+ * by calling MPI_Test. In this initial example, we loop
+ * until the request is completed as indicated by a non-zero
+ * flag variable.
+ *
+ * As we go further with the implementation, we anticipate that
+ * rather than testing a single request variable, we will
+ * deal with a collection of all pending IO requests (on
+ * this rank).
+ *
+ * Return: an integer status. Zero(0) indicates success. Negative
+ * values (-1) indicates an error.
+ *-------------------------------------------------------------------------
+ */
+static int
+async_completion(void *arg)
+{
+ int n_reqs;
+ int mpi_code;
+ int ret_value = 0;
+ struct async_arg {
+ int n_reqs;
+ MPI_Request *sf_reqs;
+ } *in_progress = (struct async_arg *)arg;
+
+ HDassert(arg);
+
+ n_reqs = in_progress->n_reqs;
+
+ if (n_reqs < 0) {
+#ifdef H5FD_IOC_DEBUG
+ HDprintf("%s: invalid number of in progress I/O requests\n", __func__);
+#endif
+
+ ret_value = -1;
+ goto done;
+ }
+
+ if (MPI_SUCCESS != (mpi_code = MPI_Waitall(n_reqs, in_progress->sf_reqs, MPI_STATUSES_IGNORE))) {
+#ifdef H5FD_IOC_DEBUG
+ HDprintf("%s: MPI_Waitall failed with rc %d\n", __func__, mpi_code);
+#endif
+
+ ret_value = -1;
+ goto done;
+ }
+
+done:
+ H5_SUBFILING_FUNC_LEAVE;
+}