summaryrefslogtreecommitdiffstats
path: root/src/H5FDsubfiling/H5FDioc_threads.c
diff options
context:
space:
mode:
authorjhendersonHDF <jhenderson@hdfgroup.org>2022-07-22 20:03:12 (GMT)
committerGitHub <noreply@github.com>2022-07-22 20:03:12 (GMT)
commit27bb358f7ab1d23f3f8ce081c6b4f1602033e4d7 (patch)
treee8a69bdfbc16f9acf073ddb3ebff586eccfca009 /src/H5FDsubfiling/H5FDioc_threads.c
parent32caa567a26680c5f98d0ea0cc989c45c89dc654 (diff)
downloadhdf5-27bb358f7ab1d23f3f8ce081c6b4f1602033e4d7.zip
hdf5-27bb358f7ab1d23f3f8ce081c6b4f1602033e4d7.tar.gz
hdf5-27bb358f7ab1d23f3f8ce081c6b4f1602033e4d7.tar.bz2
Subfiling VFD (#1883)
* Added support for vector I/O calls to the VFD layer, and associated test code. Note that this includes the optimization to allow shortened sizes and types arrays to allow more space efficient representations of vectors in which all entries are of the same size and/or type. See the Selection I/o RFC for further details. Tested serial and parallel, debug and production on Charis. serial and parallel debug only on Jelly. * ran code formatter quick serial build and test on jelly * Add H5FD_read_selection() and H5FD_write_selection(). Currently only translate to scalar calls. Fix const buf in H5FD_write_vector(). * Format source * Fix comments * Add selection I/O to chunk code, used when: not using chunk cache, no datatype conversion, no I/O filters, no page buffer, not using collective I/O. Requires global variable H5_use_selection_io_g be set to TRUE. Implemented selection to vector I/O transaltion at the file driver layer. * Fix formatting unrelated to previous change to stop github from complaining. * Add full API support for selection I/O. Add tests for this. * Implement selection I/O for contiguous datasets. Fix bug in selection I/O translation. Add const qualifiers to some internal selection I/O routines to maintain const-correctness while avoiding memcpys. * Added vector read / write support to the MPIO VFD, with associated test code (see testpar/t_vfd.c). Note that this implementation does NOT support vector entries of size greater than 2 GB. This must be repaired before release, but it should be good enough for correctness testing. As MPIO requires vector I/O requests to be sorted in increasing address order, also added a vector sort utility in H5FDint.c This function is tested in passing by the MPIO vector I/O extension. In passing, repaired a bug in size / type vector extension management in H5FD_read/write_vector() Tested parallel debug and production on charis and Jelly. * Ran source code formatter * Add support for independent parallel I/O with selection I/O. Add HDF5_USE_SELECTION_IO env var to control selection I/O (default off). * Implement parallel collective support for selection I/O. * Fix comments and run formatter. * Update selection IO branch with develop (#1215) Merged branch 'develop' into selection_io * Sync with develop (#1262) Updated the branch with develop changes. * Implement big I/O support for vector I/O requests in the MPIO file driver. * Free arrays in H5FD__mpio_read/write_vector() as soon as they're not needed, to cut down on memory usage during I/O. * Address comments from code review. Fix const warnings with H5S_SEL_ITER_INIT(). * Committing clang-format changes * Feature/subfiling (#1464) * Initial checkin of merged sub-filing VFD. Passes regression tests (debug/shared/paralle) on Jelly. However, bugs and many compiler warnings remain -- not suitable for merge to develop. * Minor mods to src/H5FDsubfile_mpi.c to address errors reported by autogen.sh * Code formatting run -- no test * Merged my subfiling code fixes into the new selection_io_branch * Forgot to add the FindMERCURY.cmake file. This will probably disappear soon * attempting to make a more reliable subfile file open which doesn't return errors. For some unknown reason, the regular posix open will occasionally fail to create a subfile. Some better error handling for file close has been added. * added NULL option for H5FD_subfiling_config_t in H5Pset_fapl_subfiling (#1034) * NULL option automatically stacks IOC VFD for subfiling and returns a valid fapl. * added doxygen subfiling APIs * Various fixes which allow the IOR benchmark to run correctly * Lots of updates including the packaging up of the mercury_util source files to enable easier builds for our Benchmarking * Interim checkin of selection_io_with_subfiling_vfd branch Moddified testpar/t_vfd.c to test the subfiling vfd with default configuration. Must update this code to run with a variety of configurations -- most particularly multiple IO concentrators, and stripe depth small enough to test the other IO concentrators. testpar/t_vfd.c exposed a large number of race condidtions -- symtoms included: 1) Crashes (usually seg faults) 2) Heap corruption 3) Stack corruption 4) Double frees of heap space 5) Hangs 6) Out of order execution of I/O requests / violations of POSIX semantics 7) Swapped write requests Items 1 - 4 turned out to be primarily caused by file close issues -- specifically, the main I/O concentrator thread and its pool of worker threads were not being shut down properly on file close. Addressing this issue in combination with some other minor fixes seems to have addressed these issues. Items 5 & 6 appear to have been caused by issue of I/O requests to the thread pool in an order that did not maintain POSIX semantics. A rewrite of the I/O request dispatch code appears to have solved these issues. Item 7 seems to have been caused by multiple write requests from a given rank being read by the wrong worker thread. Code to issue "unique" tags for each write request via the ACK message appears to have cleaned this up. Note that the code is still in poor condtition. A partial list of known defects includes: a) Race condiditon on file close that allows superblock writes to arrive at the I/O concentrator after it has been shutdown. This defect is most evident when testpar/t_subfiling_vfd is run with 8 ranks. b) No error reporting from I/O concentrators -- must design and implement this. For now, mostly just asserts, which suggests that it should be run in debug mode. c) Much commented out and/or un-used code. d) Code orgnaization e) Build system with bits of Mercury is awkward -- think of shifting to pthreads with our own thread pool code. f) Need to add native support for vector and selection I/O to the subfiling VFD. g) Need to review, and posibly rework configuration code. h) Need to store subfile configuration data in a superblock extension message, and add code to use this data on file open. i) Test code is inadequate -- expect more issues as it is extended. In particular, there is no unit test code for the I/O request dispatch code. While I think it is correct at present, we need test code to verify this. Similarly, we need to test with multiple I/O concentrators and much smaller stripe depth. My actual code changes were limited to: src/H5FDioc.c src/H5FDioc_threads.c src/H5FDsubfile_int.c src/H5FDsubfile_mpi.c src/H5FDsubfiling.c src/H5FDsubfiling.h src/H5FDsubfiling_priv.h testpar/t_subfiling_vfd.c testpar/t_vfd.c I'm not sure what is going on with the deletions in src/mercury/src/util. Tested parallel/debug on Charis and Jelly * subfiling with selection IO (#1219) Merged branch 'selection_io' into subfiling branch. * Subfile name fixes (#1250) * fixed subfiling naming convention, and added leading zero to rank names. * Merge branch 'selection_io' into selection_io_with_subfiling_vfd (#1265) * Added script to join subfiles into a single HDF5 file (#1350) * Modified H5FD__subfiling_query() to report that the sub-filing VFD supports MPI This exposed issues with truncate and get EOF in the sub-filing VFD. I believe I have addressed these issues (get EOF not as fully tested as it should be), howeer, it exposed race conditions resulting in hangs. As of this writing, I have not been able to chase these down. Note that the tests that expose these race conditions are in testpar/t_subfiling_vfd.c, and are currently skipped. Unskip these tests to reproduce the race conditions. tested (to the extent possible) debug/parallel on charis and jelly. * Committing clang-format changes * fixed H5MM_free Co-authored-by: mainzer <mainzer#hdfgroup.org> Co-authored-by: jrmainzer <72230804+jrmainzer@users.noreply.github.com> Co-authored-by: Richard Warren <Richard.Warren@hdfgroup.org> Co-authored-by: Richard.Warren <richard.warren@jelly.ad.hdfgroup.org> Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> * Move Subfiling VFD components into H5FDsubfiling source directory * Update Autotools build and add H5_HAVE_SUBFILING_VFD macro to H5pubconf.h * Tidy up CMake build of subfiling sources * Merge branch 'develop' into feature/subfiling (#1539) Merge branch 'develop' into feature/subfiling * Add VFD interface version field to Subfiling and IOC VFDs * Merge branch 'develop' into feature/subfiling (#1557) Merge branch 'develop' into feature/subfiling * Merge branch 'develop' into feature/subfiling (#1563) Merge branch 'develop' into feature/subfiling * Tidy up merge artifacts after rebase on develop * Fix incorrect variable in mirror VFD utils CMake * Ensure VFD values are always defined * Add subfiling to CMake VFD_LIST if built * Mark MPI I/O driver self-initialization global as static * Add Subfiling VFD to predefined VFDs for HDF5_DRIVER env. variable * Initial progress towards separating private vs. public subfiling code * include libgen.h in t_vfd tests for correct dirname/basename * Committing clang-format changes * removed mercury option, included subfiling header path (#1577) Added subfiling status to configure output, installed h5fuse.sh to build directory for use in future tests. * added check for stdatomic.h (#1578) * added check for stdatomic.h with subfiling * added H5_HAVE_SUBFILING_VFD for cmake * fix old-style-definition warning (#1582) * fix old-style-definition warning * added test for enable parallel with subfiling VFD (#1586) Fails if subfiling VFD is not used with parallel support. * Subfiling/IOC VFD fixes and tidying (#1619) * Rename CMake option for Subfiling VFD to be consistent with other VFDs * Miscellaneous Subfiling fixes Add error message for unset MPI communicator Support dynamic loading of subfiling VFD with default configuration * Temporary fix for subfile name issue * Added subfile checks (#1634) * added subfile checks * Feature/subfiling (#1655) * Subfiling/IOC VFD cleanup Fix misuse of MPI_COMM_WORLD in IOC VFD Propagate Subfiling FAPL MPI settings down to IOC FAPL in default configuration case Cleanup IOC VFD debugging code Change sprintf to snprintf in a few places * Major work on separating Subfiling and IOC VFDs from each other * Re-write async_completion func to not overuse stack * Replace usage of MPI_COMM_WORLD with file's actual MPI communicator * Refactor H5FDsubfile_mpi.c * Remove empty file H5FDsubfile_mpi.c * Separate IOC VFD errors to its own error stack * Committing clang-format changes * Remove H5TRACE macros from H5FDioc.c * Integrate H5FDioc_threads.c with IOC error stack * Fix for subfile name generation Use number of I/O concentrators from existing subfiling configuration file, if one exists * Add temporary barrier in "Get EOF" operation to prevent races on EOF Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> * Fix for retrieval of machine Host ID * Default to MPI_COMM_WORLD if no MPI params set * added libs rt and pthreads (#1673) * added libs rt and pthreads * Feature/subfiling (#1689) * More tidying of IOC VFD and subfiling debug code * Remove old unused log file code * Clear FID from active file map on failure * Fix bug in generation of subfile names when truncating file * Change subfile names to start from 1 instead of 0 * Use long long for user-specified stripe size from environment variable * Skip 0-sized I/Os in low-level IOC I/O routines * Don't update EOF on read * Convert printed warning about data size mismatch to assertion * Don't add base file address to I/O addresses twice Base address should already be applied as part of H5FDwrite/read_vector calls * Account for 0-sized I/O vector entries in subfile write/read functions * Rewrite init_indep_io for clarity * Correction for IOC wraparound calculations * Some corrections to iovec calculations * Remove temporary barrier on EOF retrieval * Complete work request queue entry on error instead of skipping over * Account for stripe size wraparound for sf_col_offset calculation * Committing clang-format changes Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> * Re-write and fix bugs in I/O vector filling routines (#1703) * Rewrite I/O vector filling routines for clarity * Fix bug with iovec_fill_last when last I/O size is 0 * added subfiling_dir line read (#1714) * added subfiling_dir line read and use it * shellcheck fixes * I/O request dispatch logic update (#1731) Short-circuit I/O request dispatch when head of I/O queue is an in-progress get EOF or truncate operation. This prevents an issue where a write operation can be dispatched alongside a get EOF/truncate operation, whereas all I/O requests are supposed to be ineligible for dispatch until the get EOF/truncate is completed * h5fuse.sh.in clean-up (#1757) * Added command-line options * Committing clang-format changes * Align with changes from develop * Mimic MPI I/O VFD for EOF handling * Initialize context_id field for work request objects * Use logfile for some debugging information * Use atomic store to set IOC ready flag * Use separate communicator for sending file EOF data Minor IOC cleanup * Use H5_subfile_fid_to_context to get context ID for file in Subfiling VFD * IOVEC calculation fixes * Updates for debugging code * Minor fixes for threaded code * Committing clang-format changes * Use separate MPI communicator for barrier operations * Committing clang-format changes * Rewrite EOF routine to use nonblocking MPI communication * Committing clang-format changes * Always dispatch I/O work requests in IOC main loop * Return distinct MPI communicator to library when requested * Minor warning cleanup * Committing clang-format changes * Generate h5fuse.sh from h5fuse.sh.in in CMake * Send truncate messages to correct IOC rank * Committing clang-format changes * Miscellaneous cleanup Post some MPI receives before sends Free some duplicated MPI communicator/Info objects Remove unnecessary extra MPI_Barrier * Warning cleanup * Fix for leaked MPI communicator * Retrieve file EOF on single rank and bcast it * Fixes for a few failure paths * Cleanup of IOC file opens * Committing clang-format changes * Use plan MPI_Send for send of EOF messages * Always check MPI thread support level during Subfiling init * Committing clang-format changes * Handle a hang on failure when IOCs can't open subfiles * Committing clang-format changes * Refactor file open status consensus check * Committing clang-format changes * Fix for MPI_Comm_free being called after MPI_Finalize * Fix VFD test by setting MPI params before setting subfiling on FAPL * Update Subfiling VFD error handling and error stack usage * Improvements for Subfiling logfiles * Remove prototypes for currently unused routines * Disable I/O queue stat collecting by default * Remove unused serialization mutex variable * Update VFD testing to take subfiling VFD into account * Fix usage of global subfiling application layout object * Minor fixes for failure pathways * Keep track of the number of failures in an IOC I/O queue * Make sure not to exceed MPI_TAG_UB value for data communication messages * Committing clang-format changes * Update for rename of some H5FD 'ctl' opcodes * Always include Subfiling's public header files in hdf5.h * Remove old unused code and comments * Implement support for per-file I/O queues Allows the subfiling VFD to have multiple HDF5 files open simultaneously * Use simple MPI_Iprobe over unnecessary MPI_Improbe * Committing clang-format changes * Update HDF5 testing to query driver for H5FD_FEAT_DEFAULT_VFD_COMPATIBLE flag * Fix a few bugs related to file multi-opens * Avoid calling MPI routines if subfiling gets reinitialized * Fix issue when files are closed in a random order * Update HDF5 testing to query VFD for "using MPI" feature flag * Register atexit handler in subfiling VFD to call MPI_Finalize after HDF5 closes * Fail for collective I/O requests until support is implemented * Correct VOL test function prototypes * Minor cleanup of old code and comments * Update mercury dependency * Cleanup of subfiling configuration structure * Committing clang-format changes * Build system updates for Subfiling VFD * Fix possible hang on failure in t_vfd tests caused by mismatched MPI_Barrier calls * Copy subfiling IOC fapl in "fapl get" method * Mirror subfiling superblock writes to stub file for legacy POSIX-y HDF5 applications * Allow collective I/O for MPI_BYTE types and rank 0 bcast strategy * Committing clang-format changes * Use different scheme for subfiling write message MPI tag calculations * Committing clang-format changes * Avoid performing fstat calls on all MPI ranks * Add MPI_Barrier before finalizing IOC threads * Use try_lock in I/O queue dispatch to minimize contention from worker threads * Use simple Waitall for nonblocking I/O waits * Add configurable IOC main thread delay and try_lock option to I/O queue dispatch * Fix bug that could cause serialization of non-overlapping I/O requests * Temporarily treat collective subfiling vector I/O calls as independent * Removed unused mercury bits * Add stubs for subfiling and IOC file delete callback * Update VFD testing for Subfiling VFD * Work around HDF5 metadata cache bug for Subfiling VFD when MPI Comm size = 1 * Committing clang-format changes Co-authored-by: mainzer <mainzer#hdfgroup.org> Co-authored-by: Neil Fortner <nfortne2@hdfgroup.org> Co-authored-by: Scot Breitenfeld <brtnfld@hdfgroup.org> Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: jrmainzer <72230804+jrmainzer@users.noreply.github.com> Co-authored-by: Richard Warren <Richard.Warren@hdfgroup.org> Co-authored-by: Richard.Warren <richard.warren@jelly.ad.hdfgroup.org>
Diffstat (limited to 'src/H5FDsubfiling/H5FDioc_threads.c')
-rw-r--r--src/H5FDsubfiling/H5FDioc_threads.c1658
1 files changed, 1658 insertions, 0 deletions
diff --git a/src/H5FDsubfiling/H5FDioc_threads.c b/src/H5FDsubfiling/H5FDioc_threads.c
new file mode 100644
index 0000000..0d620b5
--- /dev/null
+++ b/src/H5FDsubfiling/H5FDioc_threads.c
@@ -0,0 +1,1658 @@
+/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
+ * Copyright by The HDF Group. *
+ * All rights reserved. *
+ * *
+ * This file is part of HDF5. The full HDF5 copyright notice, including *
+ * terms governing use, modification, and redistribution, is contained in *
+ * the COPYING file, which can be found at the root of the source code *
+ * distribution tree, or in https://support.hdfgroup.org/ftp/HDF5/releases. *
+ * If you do not have access to either file, you may request a copy from *
+ * help@hdfgroup.org. *
+ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
+
+#include "H5FDioc_priv.h"
+
+#include "H5FDsubfiling.h"
+
+#ifndef HG_TEST_NUM_THREADS_DEFAULT
+#define HG_TEST_NUM_THREADS_DEFAULT 4
+#endif
+
+#define MIN_READ_RETRIES 10
+
+/*
+ * The amount of time (in nanoseconds) for the IOC main
+ * thread to sleep when there are no incoming I/O requests
+ * to process
+ */
+#define IOC_MAIN_SLEEP_DELAY (20000)
+
+/*
+ * IOC data for a file that is stored in that
+ * file's subfiling context object
+ */
+typedef struct ioc_data_t {
+ ioc_io_queue_t io_queue;
+ hg_thread_t ioc_main_thread;
+ hg_thread_pool_t *io_thread_pool;
+ int64_t sf_context_id;
+
+ /* sf_io_ops_pending is use to track the number of I/O operations pending so that we can wait
+ * until all I/O operations have been serviced before shutting down the worker thread pool.
+ * The value of this variable must always be non-negative.
+ *
+ * Note that this is a convenience variable -- we could use io_queue.q_len instead.
+ * However, accessing this field requires locking io_queue.q_mutex.
+ */
+ atomic_int sf_ioc_ready;
+ atomic_int sf_shutdown_flag;
+ atomic_int sf_io_ops_pending;
+ atomic_int sf_work_pending;
+} ioc_data_t;
+
+/*
+ * NOTES:
+ * Rather than re-create the code for creating and managing a thread pool,
+ * I'm utilizing a reasonably well tested implementation from the mercury
+ * project. At some point, we should revisit this decision or possibly
+ * directly link against the mercury library. This would make sense if
+ * we move away from using MPI as the messaging infrastructure and instead
+ * use mercury for that purpose...
+ */
+
+static hg_thread_mutex_t ioc_thread_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+#ifdef H5FD_IOC_COLLECT_STATS
+static int sf_write_ops = 0;
+static int sf_read_ops = 0;
+static double sf_pwrite_time = 0.0;
+static double sf_pread_time = 0.0;
+static double sf_write_wait_time = 0.0;
+static double sf_read_wait_time = 0.0;
+static double sf_queue_delay_time = 0.0;
+#endif
+
+/* Prototypes */
+static HG_THREAD_RETURN_TYPE ioc_thread_main(void *arg);
+static int ioc_main(ioc_data_t *ioc_data);
+
+static int ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm,
+ uint32_t counter);
+static int ioc_file_queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm);
+
+static int ioc_file_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size,
+ int subfile_rank);
+static int ioc_file_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size,
+ int subfile_rank);
+static int ioc_file_truncate(int fd, int64_t length, int subfile_rank);
+static int ioc_file_report_eof(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm);
+
+static ioc_io_queue_entry_t *ioc_io_queue_alloc_entry(void);
+static void ioc_io_queue_complete_entry(ioc_data_t *ioc_data, ioc_io_queue_entry_t *entry_ptr);
+static void ioc_io_queue_dispatch_eligible_entries(ioc_data_t *ioc_data, hbool_t try_lock);
+static void ioc_io_queue_free_entry(ioc_io_queue_entry_t *q_entry_ptr);
+static void ioc_io_queue_add_entry(ioc_data_t *ioc_data, sf_work_request_t *wk_req_ptr);
+
+/*-------------------------------------------------------------------------
+ * Function: initialize_ioc_threads
+ *
+ * Purpose: The principal entry point to initialize the execution
+ * context for an I/O Concentrator (IOC). The main thread
+ * is responsible for receiving I/O requests from each
+ * HDF5 "client" and distributing those to helper threads
+ * for actual processing. We initialize a fixed number
+ * of helper threads by creating a thread pool.
+ *
+ * Return: SUCCESS (0) or FAIL (-1) if any errors are detected
+ * for the multi-threaded initialization.
+ *
+ * Programmer: Richard Warren
+ * 7/17/2020
+ *
+ * Changes: Initial Version/None.
+ *
+ *-------------------------------------------------------------------------
+ */
+int
+initialize_ioc_threads(void *_sf_context)
+{
+ subfiling_context_t *sf_context = _sf_context;
+ ioc_data_t * ioc_data = NULL;
+ unsigned thread_pool_count = HG_TEST_NUM_THREADS_DEFAULT;
+ char * env_value;
+ int ret_value = 0;
+#ifdef H5FD_IOC_COLLECT_STATS
+ double t_start = 0.0, t_end = 0.0;
+#endif
+
+ HDassert(sf_context);
+
+ /*
+ * Allocate and initialize IOC data that will be passed
+ * to the IOC main thread
+ */
+ if (NULL == (ioc_data = HDmalloc(sizeof(*ioc_data))))
+ H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, (-1),
+ "can't allocate IOC data for IOC main thread");
+ ioc_data->sf_context_id = sf_context->sf_context_id;
+ ioc_data->io_thread_pool = NULL;
+ ioc_data->io_queue = (ioc_io_queue_t){/* magic = */ H5FD_IOC__IO_Q_MAGIC,
+ /* q_head = */ NULL,
+ /* q_tail = */ NULL,
+ /* num_pending = */ 0,
+ /* num_in_progress = */ 0,
+ /* num_failed = */ 0,
+ /* q_len = */ 0,
+ /* req_counter = */ 0,
+ /* q_mutex = */
+ PTHREAD_MUTEX_INITIALIZER,
+#ifdef H5FD_IOC_COLLECT_STATS
+ /* max_q_len = */ 0,
+ /* max_num_pending = */ 0,
+ /* max_num_in_progress = */ 0,
+ /* ind_read_requests = */ 0,
+ /* ind_write_requests = */ 0,
+ /* truncate_requests = */ 0,
+ /* get_eof_requests = */ 0,
+ /* requests_queued = */ 0,
+ /* requests_dispatched = */ 0,
+ /* requests_completed = */ 0
+#endif
+ };
+
+ /* Initialize atomic vars */
+ atomic_init(&ioc_data->sf_ioc_ready, 0);
+ atomic_init(&ioc_data->sf_shutdown_flag, 0);
+ atomic_init(&ioc_data->sf_io_ops_pending, 0);
+ atomic_init(&ioc_data->sf_work_pending, 0);
+
+#ifdef H5FD_IOC_COLLECT_STATS
+ t_start = MPI_Wtime();
+#endif
+
+ if (hg_thread_mutex_init(&ioc_data->io_queue.q_mutex) < 0)
+ H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTINIT, (-1), "can't initialize IOC thread queue mutex");
+
+ /* Allow experimentation with the number of helper threads */
+ if ((env_value = HDgetenv(H5_IOC_THREAD_POOL_COUNT)) != NULL) {
+ int value_check = HDatoi(env_value);
+ if (value_check > 0) {
+ thread_pool_count = (unsigned int)value_check;
+ }
+ }
+
+ /* Initialize a thread pool for the I/O concentrator's worker threads */
+ if (hg_thread_pool_init(thread_pool_count, &ioc_data->io_thread_pool) < 0)
+ H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTINIT, (-1), "can't initialize IOC worker thread pool");
+
+ /* Create the main IOC thread that will receive and dispatch I/O requests */
+ if (hg_thread_create(&ioc_data->ioc_main_thread, ioc_thread_main, ioc_data) < 0)
+ H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTINIT, (-1), "can't create IOC main thread");
+
+ /* Wait until ioc_main() reports that it is ready */
+ while (atomic_load(&ioc_data->sf_ioc_ready) != 1) {
+ usleep(20);
+ }
+
+#ifdef H5FD_IOC_COLLECT_STATS
+ t_end = MPI_Wtime();
+
+#ifdef H5FD_IOC_DEBUG
+ if (sf_verbose_flag) {
+ if (sf_context->topology->subfile_rank == 0) {
+ HDprintf("%s: time = %lf seconds\n", __func__, (t_end - t_start));
+ HDfflush(stdout);
+ }
+ }
+#endif
+
+#endif
+
+ sf_context->ioc_data = ioc_data;
+
+done:
+ H5_SUBFILING_FUNC_LEAVE;
+}
+
+int
+finalize_ioc_threads(void *_sf_context)
+{
+ subfiling_context_t *sf_context = _sf_context;
+ ioc_data_t * ioc_data = NULL;
+ int ret_value = 0;
+
+ HDassert(sf_context);
+ HDassert(sf_context->topology->rank_is_ioc);
+
+ ioc_data = sf_context->ioc_data;
+ if (ioc_data) {
+ HDassert(0 == atomic_load(&ioc_data->sf_shutdown_flag));
+
+ /* Shutdown the main IOC thread */
+ atomic_store(&ioc_data->sf_shutdown_flag, 1);
+
+ /* Allow ioc_main to exit.*/
+ do {
+ usleep(20);
+ } while (0 != atomic_load(&ioc_data->sf_shutdown_flag));
+
+ /* Tear down IOC worker thread pool */
+ HDassert(0 == atomic_load(&ioc_data->sf_io_ops_pending));
+ hg_thread_pool_destroy(ioc_data->io_thread_pool);
+
+ hg_thread_mutex_destroy(&ioc_data->io_queue.q_mutex);
+
+ /* Wait for IOC main thread to exit */
+ hg_thread_join(ioc_data->ioc_main_thread);
+ }
+
+ HDfree(ioc_data);
+
+ H5_SUBFILING_FUNC_LEAVE;
+}
+
+/*-------------------------------------------------------------------------
+ * Function: ioc_thread_main
+ *
+ * Purpose: An IO Concentrator instance is initialized with the
+ * specified subfiling context.
+ *
+ * Return: The IO concentrator thread executes as long as the HDF5
+ * file associated with this context is open. At file close,
+ * the thread will return from 'ioc_main' and the thread
+ * exit status will be checked by the main program.
+ *
+ * Programmer: Richard Warren
+ * 7/17/2020
+ *
+ * Changes: Initial Version/None.
+ *
+ *-------------------------------------------------------------------------
+ */
+static HG_THREAD_RETURN_TYPE
+ioc_thread_main(void *arg)
+{
+ hg_thread_ret_t thread_ret = (hg_thread_ret_t)0;
+ ioc_data_t * ioc_data = (ioc_data_t *)arg;
+
+ /* Pass along the ioc_data_t */
+ ioc_main(ioc_data);
+
+ return thread_ret;
+}
+
+/*-------------------------------------------------------------------------
+ * Function: ioc_main
+ *
+ * Purpose: This is the principal function run by the I/O Concentrator
+ * main thread. It remains within a loop until allowed to
+ * exit by means of setting the 'sf_shutdown_flag'. This is
+ * usually accomplished as part of the file close operation.
+ *
+ * The function implements an asynchronous polling approach
+ * for incoming messages. These messages can be thought of
+ * as a primitive RPC which utilizes MPI tags to code and
+ * implement the desired subfiling functionality.
+ *
+ * As each incoming message is received, it gets added to
+ * a queue for processing by a thread_pool thread. The
+ * message handlers are dispatched via the
+ * "handle_work_request" routine.
+
+ * Subfiling is effectively a software RAID-0 implementation
+ * where having multiple I/O Concentrators and independent
+ * subfiles is equated to the multiple disks and a true
+ * hardware base RAID implementation.
+ *
+ * I/O Concentrators are ordered according to their MPI rank.
+ * In the simplest interpretation, IOC(0) will always contain
+ * the initial bytes of the logical disk image. Byte 0 of
+ * IOC(1) will contain the byte written to the logical disk
+ * offset "stripe_size" X IOC(number).
+ *
+ * Example: If the stripe size is defined to be 256K, then
+ * byte 0 of subfile(1) is at logical offset 262144 of the
+ * file. Similarly, byte 0 of subfile(2) represents the
+ * logical file offset = 524288. For logical files larger
+ * than 'N' X stripe_size, we simply "wrap around" back to
+ * subfile(0). The following shows the mapping of 30
+ * logical blocks of data over 3 subfiles:
+ * +--------+--------+--------+--------+--------+--------+
+ * | blk(0 )| blk(1) | blk(2 )| blk(3 )| blk(4 )| blk(5 )|
+ * | IOC(0) | IOC(1) | IOC(2) | IOC(0) | IOC(1) | IOC(2) |
+ * +--------+--------+--------+--------+--------+--------+
+ * | blk(6 )| blk(7) | blk(8 )| blk(9 )| blk(10)| blk(11)|
+ * | IOC(0) | IOC(1) | IOC(2) | IOC(0) | IOC(1) | IOC(2) |
+ * +--------+--------+--------+--------+--------+--------+
+ * | blk(12)| blk(13)| blk(14)| blk(15)| blk(16)| blk(17)|
+ * | IOC(0) | IOC(1) | IOC(2) | IOC(0) | IOC(1) | IOC(2) |
+ * +--------+--------+--------+--------+--------+--------+
+ * | blk(18)| blk(19)| blk(20)| blk(21)| blk(22)| blk(23)|
+ * | IOC(0) | IOC(1) | IOC(2) | IOC(0) | IOC(1) | IOC(2) |
+ * +--------+--------+--------+--------+--------+--------+
+ * | blk(24)| blk(25)| blk(26)| blk(27)| blk(28)| blk(29)|
+ * | IOC(0) | IOC(1) | IOC(2) | IOC(0) | IOC(1) | IOC(2) |
+ * +--------+--------+--------+--------+--------+--------+
+ *
+ * Return: None
+ * Errors: None
+ *
+ * Programmer: Richard Warren
+ * 7/17/2020
+ *
+ * Changes: Initial Version/None.
+ *-------------------------------------------------------------------------
+ */
+static int
+ioc_main(ioc_data_t *ioc_data)
+{
+ subfiling_context_t *context = NULL;
+ sf_work_request_t wk_req;
+ int subfile_rank;
+ int shutdown_requested;
+ int ret_value = 0;
+
+ HDassert(ioc_data);
+
+ context = H5_get_subfiling_object(ioc_data->sf_context_id);
+ HDassert(context);
+
+ /* We can't have opened any files at this point..
+ * The file open approach has changed so that the normal
+ * application rank (hosting this thread) does the file open.
+ * We can simply utilize the file descriptor (which should now
+ * represent an open file).
+ */
+
+ subfile_rank = context->sf_group_rank;
+
+ /* tell initialize_ioc_threads() that ioc_main() is ready to enter its main loop */
+ atomic_store(&ioc_data->sf_ioc_ready, 1);
+
+ shutdown_requested = 0;
+
+ while ((!shutdown_requested) || (0 < atomic_load(&ioc_data->sf_io_ops_pending)) ||
+ (0 < atomic_load(&ioc_data->sf_work_pending))) {
+ MPI_Status status;
+ int flag = 0;
+ int mpi_code;
+
+ /* Probe for incoming work requests */
+ if (MPI_SUCCESS !=
+ (mpi_code = (MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, context->sf_msg_comm, &flag, &status))))
+ H5_SUBFILING_MPI_GOTO_ERROR(-1, "MPI_Iprobe failed", mpi_code);
+
+ if (flag) {
+ double queue_start_time;
+ int count;
+ int source = status.MPI_SOURCE;
+ int tag = status.MPI_TAG;
+
+ if ((tag != READ_INDEP) && (tag != WRITE_INDEP) && (tag != TRUNC_OP) && (tag != GET_EOF_OP))
+ H5_SUBFILING_GOTO_ERROR(H5E_ARGS, H5E_BADVALUE, -1, "invalid work request operation (%d)",
+ tag);
+
+ if (MPI_SUCCESS != (mpi_code = MPI_Get_count(&status, MPI_BYTE, &count)))
+ H5_SUBFILING_MPI_GOTO_ERROR(-1, "MPI_Get_count failed", mpi_code);
+
+ if (count < 0)
+ H5_SUBFILING_GOTO_ERROR(H5E_ARGS, H5E_BADVALUE, -1, "invalid work request message size (%d)",
+ count);
+
+ if ((size_t)count > sizeof(sf_work_request_t))
+ H5_SUBFILING_GOTO_ERROR(H5E_ARGS, H5E_BADVALUE, -1, "work request message is too large (%d)",
+ count);
+
+ /*
+ * Zero out work request, since the received message should
+ * be smaller than sizeof(sf_work_request_t)
+ */
+ HDmemset(&wk_req, 0, sizeof(sf_work_request_t));
+
+ if (MPI_SUCCESS != (mpi_code = MPI_Recv(&wk_req, count, MPI_BYTE, source, tag,
+ context->sf_msg_comm, MPI_STATUS_IGNORE)))
+ H5_SUBFILING_MPI_GOTO_ERROR(-1, "MPI_Recv failed", mpi_code);
+
+ /* Dispatch work request to worker threads in thread pool */
+
+ queue_start_time = MPI_Wtime();
+
+ wk_req.tag = tag;
+ wk_req.source = source;
+ wk_req.subfile_rank = subfile_rank;
+ wk_req.context_id = ioc_data->sf_context_id;
+ wk_req.start_time = queue_start_time;
+ wk_req.buffer = NULL;
+
+ ioc_io_queue_add_entry(ioc_data, &wk_req);
+
+ HDassert(atomic_load(&ioc_data->sf_io_ops_pending) >= 0);
+ }
+ else {
+ struct timespec sleep_spec = {0, IOC_MAIN_SLEEP_DELAY};
+
+ HDnanosleep(&sleep_spec, NULL);
+ }
+
+ ioc_io_queue_dispatch_eligible_entries(ioc_data, flag ? 0 : 1);
+
+ shutdown_requested = atomic_load(&ioc_data->sf_shutdown_flag);
+ }
+
+ /* Reset the shutdown flag */
+ atomic_store(&ioc_data->sf_shutdown_flag, 0);
+
+done:
+ H5_SUBFILING_FUNC_LEAVE;
+} /* ioc_main() */
+
+#ifdef H5_SUBFILING_DEBUG
+static const char *
+translate_opcode(io_op_t op)
+{
+ switch (op) {
+ case READ_OP:
+ return "READ_OP";
+ break;
+ case WRITE_OP:
+ return "WRITE_OP";
+ break;
+ case OPEN_OP:
+ return "OPEN_OP";
+ break;
+ case CLOSE_OP:
+ return "CLOSE_OP";
+ break;
+ case TRUNC_OP:
+ return "TRUNC_OP";
+ break;
+ case GET_EOF_OP:
+ return "GET_EOF_OP";
+ break;
+ case FINI_OP:
+ return "FINI_OP";
+ break;
+ case LOGGING_OP:
+ return "LOGGING_OP";
+ break;
+ }
+ return "unknown";
+}
+#endif
+
+/*-------------------------------------------------------------------------
+ * Function: handle_work_request
+ *
+ * Purpose: Handle a work request from the thread pool work queue.
+ * We dispatch the specific function as indicated by the
+ * TAG that has been added to the work request by the
+ * IOC main thread (which is just a copy of the MPI tag
+ * associated with the RPC message) and provide the subfiling
+ * context associated with the HDF5 file.
+ *
+ * Any status associated with the function processing is
+ * returned directly to the client via ACK or NACK messages.
+ *
+ * Return: (none) Doesn't fail.
+ *
+ * Programmer: Richard Warren
+ * 7/17/2020
+ *
+ * Changes: Initial Version/None.
+ *
+ *-------------------------------------------------------------------------
+ */
+static HG_THREAD_RETURN_TYPE
+handle_work_request(void *arg)
+{
+ ioc_io_queue_entry_t *q_entry_ptr = (ioc_io_queue_entry_t *)arg;
+ subfiling_context_t * sf_context = NULL;
+ sf_work_request_t * msg = &(q_entry_ptr->wk_req);
+ ioc_data_t * ioc_data = NULL;
+ int64_t file_context_id = msg->header[2];
+ int op_ret;
+ hg_thread_ret_t ret_value = 0;
+
+ HDassert(q_entry_ptr);
+ HDassert(q_entry_ptr->magic == H5FD_IOC__IO_Q_ENTRY_MAGIC);
+ HDassert(q_entry_ptr->in_progress);
+
+ sf_context = H5_get_subfiling_object(file_context_id);
+ HDassert(sf_context);
+
+ ioc_data = sf_context->ioc_data;
+ HDassert(ioc_data);
+
+ atomic_fetch_add(&ioc_data->sf_work_pending, 1);
+
+ msg->in_progress = 1;
+
+ switch (msg->tag) {
+ case WRITE_INDEP:
+ op_ret = ioc_file_queue_write_indep(msg, msg->subfile_rank, msg->source, sf_context->sf_data_comm,
+ q_entry_ptr->counter);
+ break;
+
+ case READ_INDEP:
+ op_ret = ioc_file_queue_read_indep(msg, msg->subfile_rank, msg->source, sf_context->sf_data_comm);
+ break;
+
+ case TRUNC_OP:
+ op_ret = ioc_file_truncate(sf_context->sf_fid, q_entry_ptr->wk_req.header[0],
+ sf_context->topology->subfile_rank);
+ break;
+
+ case GET_EOF_OP:
+ op_ret = ioc_file_report_eof(msg, msg->subfile_rank, msg->source, sf_context->sf_eof_comm);
+ break;
+
+ default:
+#ifdef H5_SUBFILING_DEBUG
+ H5_subfiling_log(file_context_id, "%s: IOC %d received unknown message with tag %x from rank %d",
+ __func__, msg->subfile_rank, msg->tag, msg->source);
+#endif
+
+ op_ret = -1;
+ break;
+ }
+
+ atomic_fetch_sub(&ioc_data->sf_work_pending, 1);
+
+ if (op_ret < 0) {
+#ifdef H5_SUBFILING_DEBUG
+ H5_subfiling_log(
+ file_context_id,
+ "%s: IOC %d request(%s) filename=%s from rank(%d), size=%ld, offset=%ld FAILED with ret %d",
+ __func__, msg->subfile_rank, translate_opcode((io_op_t)msg->tag), sf_context->sf_filename,
+ msg->source, msg->header[0], msg->header[1], op_ret);
+#endif
+
+ q_entry_ptr->wk_ret = op_ret;
+ }
+
+#ifdef H5FD_IOC_DEBUG
+ {
+ int curr_io_ops_pending = atomic_load(&ioc_data->sf_io_ops_pending);
+ HDassert(curr_io_ops_pending > 0);
+ }
+#endif
+
+ /* complete the I/O request */
+ ioc_io_queue_complete_entry(ioc_data, q_entry_ptr);
+
+ HDassert(atomic_load(&ioc_data->sf_io_ops_pending) >= 0);
+
+ /* Check the I/O Queue to see if there are any dispatchable entries */
+ ioc_io_queue_dispatch_eligible_entries(ioc_data, 1);
+
+ H5_SUBFILING_FUNC_LEAVE;
+}
+
+/*-------------------------------------------------------------------------
+ * Function: begin_thread_exclusive
+ *
+ * Purpose: Mutex lock to restrict access to code or variables.
+ *
+ * Return: integer result of mutex_lock request.
+ *
+ * Programmer: Richard Warren
+ * 7/17/2020
+ *
+ * Changes: Initial Version/None.
+ *
+ *-------------------------------------------------------------------------
+ */
+void
+begin_thread_exclusive(void)
+{
+ hg_thread_mutex_lock(&ioc_thread_mutex);
+}
+
+/*-------------------------------------------------------------------------
+ * Function: end_thread_exclusive
+ *
+ * Purpose: Mutex unlock. Should only be called by the current holder
+ * of the locked mutex.
+ *
+ * Return: result of mutex_unlock operation.
+ *
+ * Programmer: Richard Warren
+ * 7/17/2020
+ *
+ * Changes: Initial Version/None.
+ *
+ *-------------------------------------------------------------------------
+ */
+void
+end_thread_exclusive(void)
+{
+ hg_thread_mutex_unlock(&ioc_thread_mutex);
+}
+
+static herr_t
+send_ack_to_client(int ack_val, int dest_rank, int source_rank, int msg_tag, MPI_Comm comm)
+{
+ int mpi_code;
+ herr_t ret_value = SUCCEED;
+
+ HDassert(ack_val > 0);
+
+ (void)source_rank;
+
+ if (MPI_SUCCESS != (mpi_code = MPI_Send(&ack_val, 1, MPI_INT, dest_rank, msg_tag, comm)))
+ H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Send", mpi_code);
+
+done:
+ H5_SUBFILING_FUNC_LEAVE;
+}
+
+static herr_t
+send_nack_to_client(int dest_rank, int source_rank, int msg_tag, MPI_Comm comm)
+{
+ int nack = 0;
+ int mpi_code;
+ herr_t ret_value = SUCCEED;
+
+ (void)source_rank;
+
+ if (MPI_SUCCESS != (mpi_code = MPI_Send(&nack, 1, MPI_INT, dest_rank, msg_tag, comm)))
+ H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Send", mpi_code);
+
+done:
+ H5_SUBFILING_FUNC_LEAVE;
+}
+
+/*
+=========================================
+queue_xxx functions that should be run
+from the thread pool threads...
+=========================================
+*/
+
+/*-------------------------------------------------------------------------
+ * Function: ioc_file_queue_write_indep
+ *
+ * Purpose: Implement the IOC independent write function. The
+ * function is invoked as a result of the IOC receiving the
+ * "header"/RPC. What remains is to allocate memory for the
+ * data sent by the client and then write the data to our
+ * subfile. We utilize pwrite for the actual file writing.
+ * File flushing is done at file close.
+ *
+ * Return: The integer status returned by the Internal read_independent
+ * function. Successful operations will return 0.
+ * Errors: An MPI related error value.
+ *
+ * Programmer: Richard Warren
+ * 7/17/2020
+ *
+ * Changes: Initial Version/None.
+ *
+ *-------------------------------------------------------------------------
+ */
+static int
+ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm,
+ uint32_t counter)
+{
+ subfiling_context_t *sf_context = NULL;
+ MPI_Status msg_status;
+ hbool_t send_nack = FALSE;
+ int64_t data_size;
+ int64_t file_offset;
+ int64_t file_context_id;
+ int64_t stripe_id;
+ haddr_t sf_eof;
+#ifdef H5FD_IOC_COLLECT_STATS
+ double t_start;
+ double t_end;
+ double t_write;
+ double t_wait;
+ double t_queue_delay;
+#endif
+ char *recv_buf = NULL;
+ int rcv_tag;
+ int sf_fid;
+ int data_bytes_received;
+ int write_ret;
+ int mpi_code;
+ int ret_value = 0;
+
+ HDassert(msg);
+
+ /* Retrieve the fields of the RPC message for the write operation */
+ data_size = msg->header[0];
+ file_offset = msg->header[1];
+ file_context_id = msg->header[2];
+
+ if (data_size < 0) {
+ send_nack = TRUE;
+ H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_BADVALUE, -1, "invalid data size for write");
+ }
+
+ sf_context = H5_get_subfiling_object(file_context_id);
+ HDassert(sf_context);
+
+ stripe_id = file_offset + data_size;
+ sf_eof = (haddr_t)(stripe_id % sf_context->sf_stripe_size);
+
+ stripe_id /= sf_context->sf_stripe_size;
+ sf_eof += (haddr_t)((stripe_id * sf_context->sf_blocksize_per_stripe) + sf_context->sf_base_addr);
+
+ /* Flag that we've attempted to write data to the file */
+ sf_context->sf_write_count++;
+
+#ifdef H5FD_IOC_COLLECT_STATS
+ /* For debugging performance */
+ sf_write_ops++;
+
+ t_start = MPI_Wtime();
+ t_queue_delay = t_start - msg->start_time;
+
+#ifdef H5FD_IOC_DEBUG
+ if (sf_verbose_flag) {
+ if (sf_logfile) {
+ HDfprintf(sf_logfile,
+ "[ioc(%d) %s]: msg from %d: datasize=%ld\toffset=%ld, "
+ "queue_delay = %lf seconds\n",
+ subfile_rank, __func__, source, data_size, file_offset, t_queue_delay);
+ }
+ }
+#endif
+
+#endif
+
+ /* Allocate space to receive data sent from the client */
+ if (NULL == (recv_buf = HDmalloc((size_t)data_size))) {
+ send_nack = TRUE;
+ H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, -1, "couldn't allocate receive buffer for data");
+ }
+
+ /*
+ * Calculate message tag for the client to use for sending
+ * data, then send an ACK message to the client with the
+ * calculated message tag. This calculated message tag
+ * allows us to distinguish between multiple concurrent
+ * writes from a single rank.
+ */
+ HDassert(H5FD_IOC_tag_ub_val_ptr && (*H5FD_IOC_tag_ub_val_ptr >= WRITE_TAG_BASE));
+ rcv_tag = (int)(counter % (INT_MAX - WRITE_TAG_BASE));
+ rcv_tag %= (*H5FD_IOC_tag_ub_val_ptr - WRITE_TAG_BASE);
+ rcv_tag += WRITE_TAG_BASE;
+
+ if (send_ack_to_client(rcv_tag, source, subfile_rank, WRITE_INDEP_ACK, comm) < 0)
+ H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_WRITEERROR, -1, "couldn't send ACK to client");
+
+ /* Receive data from client */
+ H5_CHECK_OVERFLOW(data_size, int64_t, int);
+ if (MPI_SUCCESS !=
+ (mpi_code = MPI_Recv(recv_buf, (int)data_size, MPI_BYTE, source, rcv_tag, comm, &msg_status)))
+ H5_SUBFILING_MPI_GOTO_ERROR(-1, "MPI_Recv failed", mpi_code);
+
+ if (MPI_SUCCESS != (mpi_code = MPI_Get_count(&msg_status, MPI_BYTE, &data_bytes_received)))
+ H5_SUBFILING_MPI_GOTO_ERROR(-1, "MPI_Get_count failed", mpi_code);
+
+ if (data_bytes_received != data_size)
+ H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_WRITEERROR, -1,
+ "message size mismatch -- expected = %" PRId64 ", actual = %d", data_size,
+ data_bytes_received);
+
+#ifdef H5FD_IOC_COLLECT_STATS
+ t_end = MPI_Wtime();
+ t_wait = t_end - t_start;
+ sf_write_wait_time += t_wait;
+
+ t_start = t_end;
+
+#ifdef H5FD_IOC_DEBUG
+ if (sf_verbose_flag) {
+ if (sf_logfile) {
+ HDfprintf(sf_logfile, "[ioc(%d) %s] MPI_Recv(%ld bytes, from = %d) status = %d\n", subfile_rank,
+ __func__, data_size, source, mpi_code);
+ }
+ }
+#endif
+
+#endif
+
+ sf_fid = sf_context->sf_fid;
+
+#ifdef H5FD_IOC_DEBUG
+ if (sf_fid < 0)
+ H5_subfiling_log(file_context_id, "%s: WARNING: attempt to write data to closed subfile FID %d",
+ __func__, sf_fid);
+#endif
+
+ if (sf_fid >= 0) {
+ /* Actually write data received from client into subfile */
+ if ((write_ret = ioc_file_write_data(sf_fid, file_offset, recv_buf, data_size, subfile_rank)) < 0)
+ H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_WRITEERROR, -1,
+ "write function(FID=%d, Source=%d) returned an error (%d)", sf_fid,
+ source, write_ret);
+
+#ifdef H5FD_IOC_COLLECT_STATS
+ t_end = MPI_Wtime();
+ t_write = t_end - t_start;
+ sf_pwrite_time += t_write;
+#endif
+ }
+
+#ifdef H5FD_IOC_COLLECT_STATS
+ sf_queue_delay_time += t_queue_delay;
+#endif
+
+ begin_thread_exclusive();
+
+ /* Adjust EOF if necessary */
+ if (sf_eof > sf_context->sf_eof)
+ sf_context->sf_eof = sf_eof;
+
+ end_thread_exclusive();
+
+done:
+ if (send_nack) {
+ /* Send NACK back to client so client can handle failure gracefully */
+ if (send_nack_to_client(source, subfile_rank, WRITE_INDEP_ACK, comm) < 0)
+ H5_SUBFILING_DONE_ERROR(H5E_IO, H5E_WRITEERROR, -1, "couldn't send NACK to client");
+ }
+
+ HDfree(recv_buf);
+
+ H5_SUBFILING_FUNC_LEAVE;
+} /* ioc_file_queue_write_indep() */
+
+/*-------------------------------------------------------------------------
+ * Function: ioc_file_queue_read_indep
+ *
+ * Purpose: Implement the IOC independent read function. The
+ * function is invoked as a result of the IOC receiving the
+ * "header"/RPC. What remains is to allocate memory for
+ * reading the data and then to send this to the client.
+ * We utilize pread for the actual file reading.
+ *
+ * Return: The integer status returned by the Internal read_independent
+ * function. Successful operations will return 0.
+ * Errors: An MPI related error value.
+ *
+ * Programmer: Richard Warren
+ * 7/17/2020
+ *
+ * Changes: Initial Version/None.
+ *
+ *-------------------------------------------------------------------------
+ */
+static int
+ioc_file_queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm)
+{
+ subfiling_context_t *sf_context = NULL;
+ hbool_t send_empty_buf = TRUE;
+ int64_t data_size;
+ int64_t file_offset;
+ int64_t file_context_id;
+#ifdef H5FD_IOC_COLLECT_STATS
+ double t_start;
+ double t_end;
+ double t_read;
+ double t_queue_delay;
+#endif
+ char *send_buf = NULL;
+ int sf_fid;
+ int read_ret;
+ int mpi_code;
+ int ret_value = 0;
+
+ HDassert(msg);
+
+ /* Retrieve the fields of the RPC message for the read operation */
+ data_size = msg->header[0];
+ file_offset = msg->header[1];
+ file_context_id = msg->header[2];
+
+ if (data_size < 0)
+ H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_BADVALUE, -1, "invalid data size for read");
+
+ sf_context = H5_get_subfiling_object(file_context_id);
+ HDassert(sf_context);
+
+ /* Flag that we've attempted to read data from the file */
+ sf_context->sf_read_count++;
+
+#ifdef H5FD_IOC_COLLECT_STATS
+ /* For debugging performance */
+ sf_read_ops++;
+
+ t_start = MPI_Wtime();
+ t_queue_delay = t_start - msg->start_time;
+
+#ifdef H5FD_IOC_DEBUG
+ if (sf_verbose_flag && (sf_logfile != NULL)) {
+ HDfprintf(sf_logfile,
+ "[ioc(%d) %s] msg from %d: datasize=%ld\toffset=%ld "
+ "queue_delay=%lf seconds\n",
+ subfile_rank, __func__, source, data_size, file_offset, t_queue_delay);
+ }
+#endif
+
+#endif
+
+ /* Allocate space to send data read from file to client */
+ if (NULL == (send_buf = HDmalloc((size_t)data_size)))
+ H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, -1, "couldn't allocate send buffer for data");
+
+ sf_fid = sf_context->sf_fid;
+ if (sf_fid < 0)
+ H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_BADVALUE, -1, "subfile file descriptor %d is invalid", sf_fid);
+
+ /* Read data from the subfile */
+ if ((read_ret = ioc_file_read_data(sf_fid, file_offset, send_buf, data_size, subfile_rank)) < 0) {
+ H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_READERROR, read_ret,
+ "read function(FID=%d, Source=%d) returned an error (%d)", sf_fid, source,
+ read_ret);
+ }
+
+ send_empty_buf = FALSE;
+
+ /* Send read data to the client */
+ H5_CHECK_OVERFLOW(data_size, int64_t, int);
+ if (MPI_SUCCESS !=
+ (mpi_code = MPI_Send(send_buf, (int)data_size, MPI_BYTE, source, READ_INDEP_DATA, comm)))
+ H5_SUBFILING_MPI_GOTO_ERROR(-1, "MPI_Send failed", mpi_code);
+
+#ifdef H5FD_IOC_COLLECT_STATS
+ t_end = MPI_Wtime();
+ t_read = t_end - t_start;
+ sf_pread_time += t_read;
+ sf_queue_delay_time += t_queue_delay;
+
+#ifdef H5FD_IOC_DEBUG
+ if (sf_verbose_flag && (sf_logfile != NULL)) {
+ HDfprintf(sf_logfile, "[ioc(%d)] MPI_Send to source(%d) completed\n", subfile_rank, source);
+ }
+#endif
+
+#endif
+
+done:
+ if (send_empty_buf) {
+ /*
+ * Send an empty message back to client on failure. The client will
+ * likely get a message truncation error, but at least shouldn't hang.
+ */
+ if (MPI_SUCCESS != (mpi_code = MPI_Send(NULL, 0, MPI_BYTE, source, READ_INDEP_DATA, comm)))
+ H5_SUBFILING_MPI_DONE_ERROR(-1, "MPI_Send failed", mpi_code);
+ }
+
+ HDfree(send_buf);
+
+ return ret_value;
+} /* end ioc_file_queue_read_indep() */
+
+/*
+======================================================
+File functions
+
+The pread and pwrite posix functions are described as
+being thread safe.
+======================================================
+*/
+
+static int
+ioc_file_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank)
+{
+ ssize_t bytes_remaining = (ssize_t)data_size;
+ ssize_t bytes_written = 0;
+ char * this_data = (char *)data_buffer;
+ int ret_value = 0;
+
+#ifndef H5FD_IOC_DEBUG
+ (void)subfile_rank;
+#endif
+
+ HDcompile_assert(H5_SIZEOF_OFF_T == sizeof(file_offset));
+
+ while (bytes_remaining) {
+ errno = 0;
+
+ bytes_written = HDpwrite(fd, this_data, (size_t)bytes_remaining, file_offset);
+
+ if (bytes_written >= 0) {
+ bytes_remaining -= bytes_written;
+
+#ifdef H5FD_IOC_DEBUG
+ HDprintf("[ioc(%d) %s]: wrote %ld bytes, remaining=%ld, file_offset=%" PRId64 "\n", subfile_rank,
+ __func__, bytes_written, bytes_remaining, file_offset);
+#endif
+
+ this_data += bytes_written;
+ file_offset += bytes_written;
+ }
+ else {
+ H5_SUBFILING_SYS_GOTO_ERROR(H5E_IO, H5E_WRITEERROR, -1, "HDpwrite failed");
+ }
+ }
+
+ /* We don't usually use this for each file write. We usually do the file
+ * flush as part of file close operation.
+ */
+#ifdef H5FD_IOC_REQUIRE_FLUSH
+ fdatasync(fd);
+#endif
+
+done:
+ H5_SUBFILING_FUNC_LEAVE;
+} /* end ioc_file_write_data() */
+
+static int
+ioc_file_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank)
+{
+ useconds_t delay = 100;
+ ssize_t bytes_remaining = (ssize_t)data_size;
+ ssize_t bytes_read = 0;
+ char * this_buffer = (char *)data_buffer;
+ int retries = MIN_READ_RETRIES;
+ int ret_value = 0;
+
+#ifndef H5FD_IOC_DEBUG
+ (void)subfile_rank;
+#endif
+
+ HDcompile_assert(H5_SIZEOF_OFF_T == sizeof(file_offset));
+
+ while (bytes_remaining) {
+ errno = 0;
+
+ bytes_read = HDpread(fd, this_buffer, (size_t)bytes_remaining, file_offset);
+
+ if (bytes_read > 0) {
+ /* Reset retry params */
+ retries = MIN_READ_RETRIES;
+ delay = 100;
+
+ bytes_remaining -= bytes_read;
+
+#ifdef H5FD_IOC_DEBUG
+ HDprintf("[ioc(%d) %s]: read %ld bytes, remaining=%ld, file_offset=%" PRId64 "\n", subfile_rank,
+ __func__, bytes_read, bytes_remaining, file_offset);
+#endif
+
+ this_buffer += bytes_read;
+ file_offset += bytes_read;
+ }
+ else if (bytes_read == 0) {
+ HDassert(bytes_remaining > 0);
+
+ /* end of file but not end of format address space */
+ HDmemset(this_buffer, 0, (size_t)bytes_remaining);
+ break;
+ }
+ else {
+ if (retries == 0) {
+#ifdef H5FD_IOC_DEBUG
+ HDprintf("[ioc(%d) %s]: TIMEOUT: file_offset=%" PRId64 ", data_size=%ld\n", subfile_rank,
+ __func__, file_offset, data_size);
+#endif
+
+ H5_SUBFILING_SYS_GOTO_ERROR(H5E_IO, H5E_READERROR, -1, "HDpread failed");
+ }
+
+ retries--;
+ usleep(delay);
+ delay *= 2;
+ }
+ }
+
+done:
+ H5_SUBFILING_FUNC_LEAVE;
+} /* end ioc_file_read_data() */
+
+static int
+ioc_file_truncate(int fd, int64_t length, int subfile_rank)
+{
+ int ret_value = 0;
+
+#ifndef H5FD_IOC_DEBUG
+ (void)subfile_rank;
+#endif
+
+ if (HDftruncate(fd, (off_t)length) != 0)
+ H5_SUBFILING_SYS_GOTO_ERROR(H5E_FILE, H5E_SEEKERROR, -1, "HDftruncate failed");
+
+#ifdef H5FD_IOC_DEBUG
+ HDprintf("[ioc(%d) %s]: truncated subfile to %lld bytes. ret = %d\n", subfile_rank, __func__,
+ (long long)length, errno);
+ HDfflush(stdout);
+#endif
+
+done:
+ H5_SUBFILING_FUNC_LEAVE;
+} /* end ioc_file_truncate() */
+
+/*-------------------------------------------------------------------------
+ * Function: ioc_file_report_eof
+ *
+ * Purpose: Determine the target sub-file's eof and report this value
+ * to the requesting rank.
+ *
+ * Notes: This function will have to be reworked once we solve
+ * the IOC error reporting problem.
+ *
+ * This function mixes functionality that should be
+ * in two different VFDs.
+ *
+ * Return: 0 if successful, 1 or an MPI error code on failure.
+ *
+ * Programmer: John Mainzer
+ * 7/17/2020
+ *
+ * Changes: Initial Version/None.
+ *
+ *-------------------------------------------------------------------------
+ */
+
+static int
+ioc_file_report_eof(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm)
+{
+ subfiling_context_t *sf_context = NULL;
+ h5_stat_t sb;
+ int64_t eof_req_reply[3];
+ int64_t file_context_id;
+ int fd;
+ int mpi_code;
+ int ret_value = 0;
+
+ HDassert(msg);
+
+ /* first get the EOF of the target file. */
+
+ file_context_id = msg->header[2];
+
+ if (NULL == (sf_context = H5_get_subfiling_object(file_context_id)))
+ H5_SUBFILING_GOTO_ERROR(H5E_FILE, H5E_CANTGET, -1, "couldn't retrieve subfiling context");
+
+ fd = sf_context->sf_fid;
+
+ if (HDfstat(fd, &sb) < 0)
+ H5_SUBFILING_SYS_GOTO_ERROR(H5E_FILE, H5E_SYSERRSTR, -1, "HDfstat failed");
+
+ eof_req_reply[0] = (int64_t)subfile_rank;
+ eof_req_reply[1] = (int64_t)(sb.st_size);
+ eof_req_reply[2] = 0; /* not used */
+
+#ifdef H5_SUBFILING_DEBUG
+ H5_subfiling_log(file_context_id, "%s: reporting file EOF as %" PRId64 ".", __func__, eof_req_reply[1]);
+#endif
+
+ /* return the subfile EOF to the querying rank */
+ if (MPI_SUCCESS != (mpi_code = MPI_Send(eof_req_reply, 3, MPI_INT64_T, source, GET_EOF_COMPLETED, comm)))
+ H5_SUBFILING_MPI_GOTO_ERROR(-1, "MPI_Send", mpi_code);
+
+done:
+ H5_SUBFILING_FUNC_LEAVE;
+} /* ioc_file_report_eof() */
+
+/*-------------------------------------------------------------------------
+ * Function: ioc_io_queue_alloc_entry
+ *
+ * Purpose: Allocate and initialize an instance of
+ * ioc_io_queue_entry_t. Return pointer to the new
+ * instance on success, and NULL on failure.
+ *
+ * Return: Pointer to new instance of ioc_io_queue_entry_t
+ * on success, and NULL on failure.
+ *
+ * Programmer: JRM -- 11/6/21
+ *
+ * Changes: None.
+ *
+ *-------------------------------------------------------------------------
+ */
+static ioc_io_queue_entry_t *
+ioc_io_queue_alloc_entry(void)
+{
+ ioc_io_queue_entry_t *q_entry_ptr = NULL;
+
+ q_entry_ptr = (ioc_io_queue_entry_t *)HDmalloc(sizeof(ioc_io_queue_entry_t));
+
+ if (q_entry_ptr) {
+
+ q_entry_ptr->magic = H5FD_IOC__IO_Q_ENTRY_MAGIC;
+ q_entry_ptr->next = NULL;
+ q_entry_ptr->prev = NULL;
+ q_entry_ptr->in_progress = FALSE;
+ q_entry_ptr->counter = 0;
+
+ /* will memcpy the wk_req field, so don't bother to initialize */
+ /* will initialize thread_wk field before use */
+
+ q_entry_ptr->wk_ret = 0;
+
+#ifdef H5FD_IOC_COLLECT_STATS
+ q_entry_ptr->q_time = 0;
+ q_entry_ptr->dispatch_time = 0;
+#endif
+ }
+
+ return q_entry_ptr;
+} /* ioc_io_queue_alloc_entry() */
+
+/*-------------------------------------------------------------------------
+ * Function: ioc_io_queue_add_entry
+ *
+ * Purpose: Add an I/O request to the tail of the IOC I/O Queue.
+ *
+ * To do this, we must:
+ *
+ * 1) allocate a new instance of ioc_io_queue_entry_t
+ *
+ * 2) Initialize the new instance and copy the supplied
+ * instance of sf_work_request_t into it.
+ *
+ * 3) Append it to the IOC I/O queue.
+ *
+ * Note that this does not dispatch the request even if it
+ * is eligible for immediate dispatch. This is done with
+ * a call to ioc_io_queue_dispatch_eligible_entries().
+ *
+ * Return: void.
+ *
+ * Programmer: JRM -- 11/7/21
+ *
+ * Changes: None.
+ *
+ *-------------------------------------------------------------------------
+ */
+static void
+ioc_io_queue_add_entry(ioc_data_t *ioc_data, sf_work_request_t *wk_req_ptr)
+{
+ ioc_io_queue_entry_t *entry_ptr = NULL;
+
+ HDassert(ioc_data);
+ HDassert(ioc_data->io_queue.magic == H5FD_IOC__IO_Q_MAGIC);
+ HDassert(wk_req_ptr);
+
+ entry_ptr = ioc_io_queue_alloc_entry();
+
+ HDassert(entry_ptr);
+ HDassert(entry_ptr->magic == H5FD_IOC__IO_Q_ENTRY_MAGIC);
+
+ HDmemcpy((void *)(&(entry_ptr->wk_req)), (const void *)wk_req_ptr, sizeof(sf_work_request_t));
+
+ /* must obtain io_queue mutex before appending */
+ hg_thread_mutex_lock(&ioc_data->io_queue.q_mutex);
+
+ HDassert(ioc_data->io_queue.q_len == atomic_load(&ioc_data->sf_io_ops_pending));
+
+ entry_ptr->counter = ioc_data->io_queue.req_counter++;
+
+ ioc_data->io_queue.num_pending++;
+
+ H5FD_IOC__Q_APPEND(&ioc_data->io_queue, entry_ptr);
+
+ atomic_fetch_add(&ioc_data->sf_io_ops_pending, 1);
+
+#ifdef H5_SUBFILING_DEBUG
+ H5_subfiling_log(wk_req_ptr->context_id,
+ "%s: request %d queued. op = %d, offset/len = %lld/%lld, q-ed/disp/ops_pend = %d/%d/%d.",
+ __func__, entry_ptr->counter, (entry_ptr->wk_req.tag),
+ (long long)(entry_ptr->wk_req.header[1]), (long long)(entry_ptr->wk_req.header[0]),
+ ioc_data->io_queue.num_pending, ioc_data->io_queue.num_in_progress,
+ atomic_load(&ioc_data->sf_io_ops_pending));
+#endif
+
+ HDassert(ioc_data->io_queue.num_pending + ioc_data->io_queue.num_in_progress == ioc_data->io_queue.q_len);
+
+#ifdef H5FD_IOC_COLLECT_STATS
+ entry_ptr->q_time = H5_now_usec();
+
+ if (ioc_data->io_queue.q_len > ioc_data->io_queue.max_q_len) {
+ ioc_data->io_queue.max_q_len = ioc_data->io_queue.q_len;
+ }
+
+ if (ioc_data->io_queue.num_pending > ioc_data->io_queue.max_num_pending) {
+ ioc_data->io_queue.max_num_pending = ioc_data->io_queue.num_pending;
+ }
+
+ if (entry_ptr->wk_req.tag == READ_INDEP) {
+ ioc_data->io_queue.ind_read_requests++;
+ }
+ else if (entry_ptr->wk_req.tag == WRITE_INDEP) {
+ ioc_data->io_queue.ind_write_requests++;
+ }
+ else if (entry_ptr->wk_req.tag == TRUNC_OP) {
+ ioc_data->io_queue.truncate_requests++;
+ }
+ else if (entry_ptr->wk_req.tag == GET_EOF_OP) {
+ ioc_data->io_queue.get_eof_requests++;
+ }
+
+ ioc_data->io_queue.requests_queued++;
+#endif
+
+#ifdef H5_SUBFILING_DEBUG
+ if (ioc_data->io_queue.q_len != atomic_load(&ioc_data->sf_io_ops_pending)) {
+ H5_subfiling_log(
+ wk_req_ptr->context_id,
+ "%s: ioc_data->io_queue->q_len = %d != %d = atomic_load(&ioc_data->sf_io_ops_pending).", __func__,
+ ioc_data->io_queue.q_len, atomic_load(&ioc_data->sf_io_ops_pending));
+ }
+#endif
+
+ HDassert(ioc_data->io_queue.q_len == atomic_load(&ioc_data->sf_io_ops_pending));
+
+ hg_thread_mutex_unlock(&ioc_data->io_queue.q_mutex);
+
+ return;
+} /* ioc_io_queue_add_entry() */
+
+/*-------------------------------------------------------------------------
+ * Function: ioc_io_queue_dispatch_eligible_entries
+ *
+ * Purpose: Scan the IOC I/O Queue for dispatchable entries, and
+ * dispatch any such entries found.
+ *
+ * Do this by scanning the I/O queue from head to tail for
+ * entries that:
+ *
+ * 1) Have not already been dispatched
+ *
+ * 2) Either:
+ *
+ * a) do not intersect with any prior entries on the
+ * I/O queue, or
+ *
+ * b) Are read requests, and all intersections are with
+ * prior read requests.
+ *
+ * Dispatch any such entries found.
+ *
+ * Do this to maintain the POSIX semantics required by
+ * HDF5.
+ *
+ * Note that TRUNC_OPs and GET_EOF_OPs are a special case.
+ * Specifically, no I/O queue entry can be dispatched if
+ * there is a truncate or get EOF operation between it and
+ * the head of the queue. Further, a truncate or get EOF
+ * request cannot be executed unless it is at the head of
+ * the queue.
+ *
+ * Return: void.
+ *
+ * Programmer: JRM -- 11/7/21
+ *
+ * Changes: None.
+ *
+ *-------------------------------------------------------------------------
+ */
+/* TODO: Keep an eye on statistics and optimize this algorithm if necessary. While it is O(N)
+ * where N is the number of elements in the I/O Queue if there are are no-overlaps, it
+ * can become O(N**2) in the worst case.
+ */
+static void
+ioc_io_queue_dispatch_eligible_entries(ioc_data_t *ioc_data, hbool_t try_lock)
+{
+ hbool_t conflict_detected;
+ int64_t entry_offset;
+ int64_t entry_len;
+ int64_t scan_offset;
+ int64_t scan_len;
+ ioc_io_queue_entry_t *entry_ptr = NULL;
+ ioc_io_queue_entry_t *scan_ptr = NULL;
+
+ HDassert(ioc_data);
+ HDassert(ioc_data->io_queue.magic == H5FD_IOC__IO_Q_MAGIC);
+
+ if (try_lock) {
+ if (hg_thread_mutex_try_lock(&ioc_data->io_queue.q_mutex) < 0)
+ return;
+ }
+ else
+ hg_thread_mutex_lock(&ioc_data->io_queue.q_mutex);
+
+ entry_ptr = ioc_data->io_queue.q_head;
+
+ /* sanity check on first element in the I/O queue */
+ HDassert((entry_ptr == NULL) || (entry_ptr->prev == NULL));
+
+ while ((entry_ptr) && (ioc_data->io_queue.num_pending > 0)) {
+
+ HDassert(entry_ptr->magic == H5FD_IOC__IO_Q_ENTRY_MAGIC);
+
+ /* Check for a get EOF or truncate operation at head of queue */
+ if (ioc_data->io_queue.q_head->in_progress) {
+ if ((ioc_data->io_queue.q_head->wk_req.tag == TRUNC_OP) ||
+ (ioc_data->io_queue.q_head->wk_req.tag == GET_EOF_OP)) {
+
+ /* we have a truncate or get eof operation in progress -- thus no other operations
+ * can be dispatched until the truncate or get eof operation completes. Just break
+ * out of the loop.
+ */
+
+ break;
+ }
+ }
+
+ if (!entry_ptr->in_progress) {
+
+ entry_offset = entry_ptr->wk_req.header[1];
+ entry_len = entry_ptr->wk_req.header[0];
+
+ conflict_detected = FALSE;
+
+ scan_ptr = entry_ptr->prev;
+
+ HDassert((scan_ptr == NULL) || (scan_ptr->magic == H5FD_IOC__IO_Q_ENTRY_MAGIC));
+
+ if ((entry_ptr->wk_req.tag == TRUNC_OP) || (entry_ptr->wk_req.tag == GET_EOF_OP)) {
+
+ if (scan_ptr != NULL) {
+
+ /* the TRUNC_OP or GET_EOF_OP is not at the head of the queue, and thus cannot
+ * be dispatched. Further, no operation can be dispatched if a truncate request
+ * appears before it in the queue. Thus we have done all we can and will break
+ * out of the loop.
+ */
+ break;
+ }
+ }
+
+ while ((scan_ptr) && (!conflict_detected)) {
+
+ /* check for overlaps */
+ scan_offset = scan_ptr->wk_req.header[1];
+ scan_len = scan_ptr->wk_req.header[0];
+
+ /* at present, I/O requests are scalar -- i.e. single blocks specified by offset and length.
+ * when this changes, this if statement will have to be updated accordingly.
+ */
+ if (((scan_offset + scan_len) > entry_offset) && ((entry_offset + entry_len) > scan_offset)) {
+
+ /* the two request overlap -- unless they are both reads, we have detected a conflict */
+
+ /* TODO: update this if statement when we add collective I/O */
+ if ((entry_ptr->wk_req.tag != READ_INDEP) || (scan_ptr->wk_req.tag != READ_INDEP)) {
+
+ conflict_detected = TRUE;
+ }
+ }
+
+ scan_ptr = scan_ptr->prev;
+ }
+
+ if (!conflict_detected) { /* dispatch I/O request */
+
+ HDassert(scan_ptr == NULL);
+ HDassert(!entry_ptr->in_progress);
+
+ entry_ptr->in_progress = TRUE;
+
+ HDassert(ioc_data->io_queue.num_pending > 0);
+
+ ioc_data->io_queue.num_pending--;
+ ioc_data->io_queue.num_in_progress++;
+
+ HDassert(ioc_data->io_queue.num_pending + ioc_data->io_queue.num_in_progress ==
+ ioc_data->io_queue.q_len);
+
+ entry_ptr->thread_wk.func = handle_work_request;
+ entry_ptr->thread_wk.args = entry_ptr;
+
+#ifdef H5_SUBFILING_DEBUG
+ H5_subfiling_log(entry_ptr->wk_req.context_id,
+ "%s: request %d dispatched. op = %d, offset/len = %lld/%lld, "
+ "q-ed/disp/ops_pend = %d/%d/%d.",
+ __func__, entry_ptr->counter, (entry_ptr->wk_req.tag),
+ (long long)(entry_ptr->wk_req.header[1]),
+ (long long)(entry_ptr->wk_req.header[0]), ioc_data->io_queue.num_pending,
+ ioc_data->io_queue.num_in_progress,
+ atomic_load(&ioc_data->sf_io_ops_pending));
+#endif
+
+#ifdef H5FD_IOC_COLLECT_STATS
+ if (ioc_data->io_queue.num_in_progress > ioc_data->io_queue.max_num_in_progress) {
+ ioc_data->io_queue.max_num_in_progress = ioc_data->io_queue.num_in_progress;
+ }
+
+ ioc_data->io_queue.requests_dispatched++;
+
+ entry_ptr->dispatch_time = H5_now_usec();
+#endif
+
+ hg_thread_pool_post(ioc_data->io_thread_pool, &(entry_ptr->thread_wk));
+ }
+ }
+
+ entry_ptr = entry_ptr->next;
+ }
+
+ HDassert(ioc_data->io_queue.q_len == atomic_load(&ioc_data->sf_io_ops_pending));
+
+ hg_thread_mutex_unlock(&ioc_data->io_queue.q_mutex);
+} /* ioc_io_queue_dispatch_eligible_entries() */
+
+/*-------------------------------------------------------------------------
+ * Function: ioc_io_queue_complete_entry
+ *
+ * Purpose: Update the IOC I/O Queue for the completion of an I/O
+ * request.
+ *
+ * To do this:
+ *
+ * 1) Remove the entry from the I/O Queue
+ *
+ * 2) If so configured, update statistics
+ *
+ * 3) Discard the instance of ioc_io_queue_entry_t.
+ *
+ * Return: void.
+ *
+ * Programmer: JRM -- 11/7/21
+ *
+ * Changes: None.
+ *
+ *-------------------------------------------------------------------------
+ */
+static void
+ioc_io_queue_complete_entry(ioc_data_t *ioc_data, ioc_io_queue_entry_t *entry_ptr)
+{
+#ifdef H5FD_IOC_COLLECT_STATS
+ uint64_t queued_time;
+ uint64_t execution_time;
+#endif
+
+ HDassert(ioc_data);
+ HDassert(ioc_data->io_queue.magic == H5FD_IOC__IO_Q_MAGIC);
+ HDassert(entry_ptr);
+ HDassert(entry_ptr->magic == H5FD_IOC__IO_Q_ENTRY_MAGIC);
+
+ /* must obtain io_queue mutex before deleting and updating stats */
+ hg_thread_mutex_lock(&ioc_data->io_queue.q_mutex);
+
+ HDassert(ioc_data->io_queue.num_pending + ioc_data->io_queue.num_in_progress == ioc_data->io_queue.q_len);
+ HDassert(ioc_data->io_queue.num_in_progress > 0);
+
+ if (entry_ptr->wk_ret < 0)
+ ioc_data->io_queue.num_failed++;
+
+ H5FD_IOC__Q_REMOVE(&ioc_data->io_queue, entry_ptr);
+
+ ioc_data->io_queue.num_in_progress--;
+
+ HDassert(ioc_data->io_queue.num_pending + ioc_data->io_queue.num_in_progress == ioc_data->io_queue.q_len);
+
+ atomic_fetch_sub(&ioc_data->sf_io_ops_pending, 1);
+
+#ifdef H5_SUBFILING_DEBUG
+ H5_subfiling_log(entry_ptr->wk_req.context_id,
+ "%s: request %d completed with ret %d. op = %d, offset/len = %lld/%lld, "
+ "q-ed/disp/ops_pend = %d/%d/%d.",
+ __func__, entry_ptr->counter, entry_ptr->wk_ret, (entry_ptr->wk_req.tag),
+ (long long)(entry_ptr->wk_req.header[1]), (long long)(entry_ptr->wk_req.header[0]),
+ ioc_data->io_queue.num_pending, ioc_data->io_queue.num_in_progress,
+ atomic_load(&ioc_data->sf_io_ops_pending));
+
+ /*
+ * If this I/O request is a truncate or "get eof" op, make sure
+ * there aren't other operations in progress
+ */
+ if ((entry_ptr->wk_req.tag == GET_EOF_OP) || (entry_ptr->wk_req.tag == TRUNC_OP))
+ HDassert(ioc_data->io_queue.num_in_progress == 0);
+#endif
+
+ HDassert(ioc_data->io_queue.q_len == atomic_load(&ioc_data->sf_io_ops_pending));
+
+#ifdef H5FD_IOC_COLLECT_STATS
+ /* Compute the queued and execution time */
+ queued_time = entry_ptr->dispatch_time - entry_ptr->q_time;
+ execution_time = H5_now_usec() = entry_ptr->dispatch_time;
+
+ ioc_data->io_queue.requests_completed++;
+
+ entry_ptr->q_time = H5_now_usec();
+
+#endif
+
+ hg_thread_mutex_unlock(&ioc_data->io_queue.q_mutex);
+
+ HDassert(entry_ptr->wk_req.buffer == NULL);
+
+ ioc_io_queue_free_entry(entry_ptr);
+
+ entry_ptr = NULL;
+
+ return;
+} /* ioc_io_queue_complete_entry() */
+
+/*-------------------------------------------------------------------------
+ * Function: ioc_io_queue_free_entry
+ *
+ * Purpose: Free the supplied instance of ioc_io_queue_entry_t.
+ *
+ * Verify that magic field is set to
+ * H5FD_IOC__IO_Q_ENTRY_MAGIC, and that the next and prev
+ * fields are NULL.
+ *
+ * Return: void.
+ *
+ * Programmer: JRM -- 11/6/21
+ *
+ * Changes: None.
+ *
+ *-------------------------------------------------------------------------
+ */
+static void
+ioc_io_queue_free_entry(ioc_io_queue_entry_t *q_entry_ptr)
+{
+ /* use assertions for error checking, since the following should never fail. */
+ HDassert(q_entry_ptr);
+ HDassert(q_entry_ptr->magic == H5FD_IOC__IO_Q_ENTRY_MAGIC);
+ HDassert(q_entry_ptr->next == NULL);
+ HDassert(q_entry_ptr->prev == NULL);
+ HDassert(q_entry_ptr->wk_req.buffer == NULL);
+
+ q_entry_ptr->magic = 0;
+
+ HDfree(q_entry_ptr);
+
+ q_entry_ptr = NULL;
+
+ return;
+} /* H5FD_ioc__free_c_io_q_entry() */