diff options
Diffstat (limited to 'src/H5FDsubfiling/H5FDioc_threads.c')
-rw-r--r-- | src/H5FDsubfiling/H5FDioc_threads.c | 1658 |
1 files changed, 1658 insertions, 0 deletions
diff --git a/src/H5FDsubfiling/H5FDioc_threads.c b/src/H5FDsubfiling/H5FDioc_threads.c new file mode 100644 index 0000000..0d620b5 --- /dev/null +++ b/src/H5FDsubfiling/H5FDioc_threads.c @@ -0,0 +1,1658 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Copyright by The HDF Group. * + * All rights reserved. * + * * + * This file is part of HDF5. The full HDF5 copyright notice, including * + * terms governing use, modification, and redistribution, is contained in * + * the COPYING file, which can be found at the root of the source code * + * distribution tree, or in https://support.hdfgroup.org/ftp/HDF5/releases. * + * If you do not have access to either file, you may request a copy from * + * help@hdfgroup.org. * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +#include "H5FDioc_priv.h" + +#include "H5FDsubfiling.h" + +#ifndef HG_TEST_NUM_THREADS_DEFAULT +#define HG_TEST_NUM_THREADS_DEFAULT 4 +#endif + +#define MIN_READ_RETRIES 10 + +/* + * The amount of time (in nanoseconds) for the IOC main + * thread to sleep when there are no incoming I/O requests + * to process + */ +#define IOC_MAIN_SLEEP_DELAY (20000) + +/* + * IOC data for a file that is stored in that + * file's subfiling context object + */ +typedef struct ioc_data_t { + ioc_io_queue_t io_queue; + hg_thread_t ioc_main_thread; + hg_thread_pool_t *io_thread_pool; + int64_t sf_context_id; + + /* sf_io_ops_pending is use to track the number of I/O operations pending so that we can wait + * until all I/O operations have been serviced before shutting down the worker thread pool. + * The value of this variable must always be non-negative. + * + * Note that this is a convenience variable -- we could use io_queue.q_len instead. + * However, accessing this field requires locking io_queue.q_mutex. + */ + atomic_int sf_ioc_ready; + atomic_int sf_shutdown_flag; + atomic_int sf_io_ops_pending; + atomic_int sf_work_pending; +} ioc_data_t; + +/* + * NOTES: + * Rather than re-create the code for creating and managing a thread pool, + * I'm utilizing a reasonably well tested implementation from the mercury + * project. At some point, we should revisit this decision or possibly + * directly link against the mercury library. This would make sense if + * we move away from using MPI as the messaging infrastructure and instead + * use mercury for that purpose... + */ + +static hg_thread_mutex_t ioc_thread_mutex = PTHREAD_MUTEX_INITIALIZER; + +#ifdef H5FD_IOC_COLLECT_STATS +static int sf_write_ops = 0; +static int sf_read_ops = 0; +static double sf_pwrite_time = 0.0; +static double sf_pread_time = 0.0; +static double sf_write_wait_time = 0.0; +static double sf_read_wait_time = 0.0; +static double sf_queue_delay_time = 0.0; +#endif + +/* Prototypes */ +static HG_THREAD_RETURN_TYPE ioc_thread_main(void *arg); +static int ioc_main(ioc_data_t *ioc_data); + +static int ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm, + uint32_t counter); +static int ioc_file_queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm); + +static int ioc_file_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, + int subfile_rank); +static int ioc_file_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, + int subfile_rank); +static int ioc_file_truncate(int fd, int64_t length, int subfile_rank); +static int ioc_file_report_eof(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm); + +static ioc_io_queue_entry_t *ioc_io_queue_alloc_entry(void); +static void ioc_io_queue_complete_entry(ioc_data_t *ioc_data, ioc_io_queue_entry_t *entry_ptr); +static void ioc_io_queue_dispatch_eligible_entries(ioc_data_t *ioc_data, hbool_t try_lock); +static void ioc_io_queue_free_entry(ioc_io_queue_entry_t *q_entry_ptr); +static void ioc_io_queue_add_entry(ioc_data_t *ioc_data, sf_work_request_t *wk_req_ptr); + +/*------------------------------------------------------------------------- + * Function: initialize_ioc_threads + * + * Purpose: The principal entry point to initialize the execution + * context for an I/O Concentrator (IOC). The main thread + * is responsible for receiving I/O requests from each + * HDF5 "client" and distributing those to helper threads + * for actual processing. We initialize a fixed number + * of helper threads by creating a thread pool. + * + * Return: SUCCESS (0) or FAIL (-1) if any errors are detected + * for the multi-threaded initialization. + * + * Programmer: Richard Warren + * 7/17/2020 + * + * Changes: Initial Version/None. + * + *------------------------------------------------------------------------- + */ +int +initialize_ioc_threads(void *_sf_context) +{ + subfiling_context_t *sf_context = _sf_context; + ioc_data_t * ioc_data = NULL; + unsigned thread_pool_count = HG_TEST_NUM_THREADS_DEFAULT; + char * env_value; + int ret_value = 0; +#ifdef H5FD_IOC_COLLECT_STATS + double t_start = 0.0, t_end = 0.0; +#endif + + HDassert(sf_context); + + /* + * Allocate and initialize IOC data that will be passed + * to the IOC main thread + */ + if (NULL == (ioc_data = HDmalloc(sizeof(*ioc_data)))) + H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, (-1), + "can't allocate IOC data for IOC main thread"); + ioc_data->sf_context_id = sf_context->sf_context_id; + ioc_data->io_thread_pool = NULL; + ioc_data->io_queue = (ioc_io_queue_t){/* magic = */ H5FD_IOC__IO_Q_MAGIC, + /* q_head = */ NULL, + /* q_tail = */ NULL, + /* num_pending = */ 0, + /* num_in_progress = */ 0, + /* num_failed = */ 0, + /* q_len = */ 0, + /* req_counter = */ 0, + /* q_mutex = */ + PTHREAD_MUTEX_INITIALIZER, +#ifdef H5FD_IOC_COLLECT_STATS + /* max_q_len = */ 0, + /* max_num_pending = */ 0, + /* max_num_in_progress = */ 0, + /* ind_read_requests = */ 0, + /* ind_write_requests = */ 0, + /* truncate_requests = */ 0, + /* get_eof_requests = */ 0, + /* requests_queued = */ 0, + /* requests_dispatched = */ 0, + /* requests_completed = */ 0 +#endif + }; + + /* Initialize atomic vars */ + atomic_init(&ioc_data->sf_ioc_ready, 0); + atomic_init(&ioc_data->sf_shutdown_flag, 0); + atomic_init(&ioc_data->sf_io_ops_pending, 0); + atomic_init(&ioc_data->sf_work_pending, 0); + +#ifdef H5FD_IOC_COLLECT_STATS + t_start = MPI_Wtime(); +#endif + + if (hg_thread_mutex_init(&ioc_data->io_queue.q_mutex) < 0) + H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTINIT, (-1), "can't initialize IOC thread queue mutex"); + + /* Allow experimentation with the number of helper threads */ + if ((env_value = HDgetenv(H5_IOC_THREAD_POOL_COUNT)) != NULL) { + int value_check = HDatoi(env_value); + if (value_check > 0) { + thread_pool_count = (unsigned int)value_check; + } + } + + /* Initialize a thread pool for the I/O concentrator's worker threads */ + if (hg_thread_pool_init(thread_pool_count, &ioc_data->io_thread_pool) < 0) + H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTINIT, (-1), "can't initialize IOC worker thread pool"); + + /* Create the main IOC thread that will receive and dispatch I/O requests */ + if (hg_thread_create(&ioc_data->ioc_main_thread, ioc_thread_main, ioc_data) < 0) + H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTINIT, (-1), "can't create IOC main thread"); + + /* Wait until ioc_main() reports that it is ready */ + while (atomic_load(&ioc_data->sf_ioc_ready) != 1) { + usleep(20); + } + +#ifdef H5FD_IOC_COLLECT_STATS + t_end = MPI_Wtime(); + +#ifdef H5FD_IOC_DEBUG + if (sf_verbose_flag) { + if (sf_context->topology->subfile_rank == 0) { + HDprintf("%s: time = %lf seconds\n", __func__, (t_end - t_start)); + HDfflush(stdout); + } + } +#endif + +#endif + + sf_context->ioc_data = ioc_data; + +done: + H5_SUBFILING_FUNC_LEAVE; +} + +int +finalize_ioc_threads(void *_sf_context) +{ + subfiling_context_t *sf_context = _sf_context; + ioc_data_t * ioc_data = NULL; + int ret_value = 0; + + HDassert(sf_context); + HDassert(sf_context->topology->rank_is_ioc); + + ioc_data = sf_context->ioc_data; + if (ioc_data) { + HDassert(0 == atomic_load(&ioc_data->sf_shutdown_flag)); + + /* Shutdown the main IOC thread */ + atomic_store(&ioc_data->sf_shutdown_flag, 1); + + /* Allow ioc_main to exit.*/ + do { + usleep(20); + } while (0 != atomic_load(&ioc_data->sf_shutdown_flag)); + + /* Tear down IOC worker thread pool */ + HDassert(0 == atomic_load(&ioc_data->sf_io_ops_pending)); + hg_thread_pool_destroy(ioc_data->io_thread_pool); + + hg_thread_mutex_destroy(&ioc_data->io_queue.q_mutex); + + /* Wait for IOC main thread to exit */ + hg_thread_join(ioc_data->ioc_main_thread); + } + + HDfree(ioc_data); + + H5_SUBFILING_FUNC_LEAVE; +} + +/*------------------------------------------------------------------------- + * Function: ioc_thread_main + * + * Purpose: An IO Concentrator instance is initialized with the + * specified subfiling context. + * + * Return: The IO concentrator thread executes as long as the HDF5 + * file associated with this context is open. At file close, + * the thread will return from 'ioc_main' and the thread + * exit status will be checked by the main program. + * + * Programmer: Richard Warren + * 7/17/2020 + * + * Changes: Initial Version/None. + * + *------------------------------------------------------------------------- + */ +static HG_THREAD_RETURN_TYPE +ioc_thread_main(void *arg) +{ + hg_thread_ret_t thread_ret = (hg_thread_ret_t)0; + ioc_data_t * ioc_data = (ioc_data_t *)arg; + + /* Pass along the ioc_data_t */ + ioc_main(ioc_data); + + return thread_ret; +} + +/*------------------------------------------------------------------------- + * Function: ioc_main + * + * Purpose: This is the principal function run by the I/O Concentrator + * main thread. It remains within a loop until allowed to + * exit by means of setting the 'sf_shutdown_flag'. This is + * usually accomplished as part of the file close operation. + * + * The function implements an asynchronous polling approach + * for incoming messages. These messages can be thought of + * as a primitive RPC which utilizes MPI tags to code and + * implement the desired subfiling functionality. + * + * As each incoming message is received, it gets added to + * a queue for processing by a thread_pool thread. The + * message handlers are dispatched via the + * "handle_work_request" routine. + + * Subfiling is effectively a software RAID-0 implementation + * where having multiple I/O Concentrators and independent + * subfiles is equated to the multiple disks and a true + * hardware base RAID implementation. + * + * I/O Concentrators are ordered according to their MPI rank. + * In the simplest interpretation, IOC(0) will always contain + * the initial bytes of the logical disk image. Byte 0 of + * IOC(1) will contain the byte written to the logical disk + * offset "stripe_size" X IOC(number). + * + * Example: If the stripe size is defined to be 256K, then + * byte 0 of subfile(1) is at logical offset 262144 of the + * file. Similarly, byte 0 of subfile(2) represents the + * logical file offset = 524288. For logical files larger + * than 'N' X stripe_size, we simply "wrap around" back to + * subfile(0). The following shows the mapping of 30 + * logical blocks of data over 3 subfiles: + * +--------+--------+--------+--------+--------+--------+ + * | blk(0 )| blk(1) | blk(2 )| blk(3 )| blk(4 )| blk(5 )| + * | IOC(0) | IOC(1) | IOC(2) | IOC(0) | IOC(1) | IOC(2) | + * +--------+--------+--------+--------+--------+--------+ + * | blk(6 )| blk(7) | blk(8 )| blk(9 )| blk(10)| blk(11)| + * | IOC(0) | IOC(1) | IOC(2) | IOC(0) | IOC(1) | IOC(2) | + * +--------+--------+--------+--------+--------+--------+ + * | blk(12)| blk(13)| blk(14)| blk(15)| blk(16)| blk(17)| + * | IOC(0) | IOC(1) | IOC(2) | IOC(0) | IOC(1) | IOC(2) | + * +--------+--------+--------+--------+--------+--------+ + * | blk(18)| blk(19)| blk(20)| blk(21)| blk(22)| blk(23)| + * | IOC(0) | IOC(1) | IOC(2) | IOC(0) | IOC(1) | IOC(2) | + * +--------+--------+--------+--------+--------+--------+ + * | blk(24)| blk(25)| blk(26)| blk(27)| blk(28)| blk(29)| + * | IOC(0) | IOC(1) | IOC(2) | IOC(0) | IOC(1) | IOC(2) | + * +--------+--------+--------+--------+--------+--------+ + * + * Return: None + * Errors: None + * + * Programmer: Richard Warren + * 7/17/2020 + * + * Changes: Initial Version/None. + *------------------------------------------------------------------------- + */ +static int +ioc_main(ioc_data_t *ioc_data) +{ + subfiling_context_t *context = NULL; + sf_work_request_t wk_req; + int subfile_rank; + int shutdown_requested; + int ret_value = 0; + + HDassert(ioc_data); + + context = H5_get_subfiling_object(ioc_data->sf_context_id); + HDassert(context); + + /* We can't have opened any files at this point.. + * The file open approach has changed so that the normal + * application rank (hosting this thread) does the file open. + * We can simply utilize the file descriptor (which should now + * represent an open file). + */ + + subfile_rank = context->sf_group_rank; + + /* tell initialize_ioc_threads() that ioc_main() is ready to enter its main loop */ + atomic_store(&ioc_data->sf_ioc_ready, 1); + + shutdown_requested = 0; + + while ((!shutdown_requested) || (0 < atomic_load(&ioc_data->sf_io_ops_pending)) || + (0 < atomic_load(&ioc_data->sf_work_pending))) { + MPI_Status status; + int flag = 0; + int mpi_code; + + /* Probe for incoming work requests */ + if (MPI_SUCCESS != + (mpi_code = (MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, context->sf_msg_comm, &flag, &status)))) + H5_SUBFILING_MPI_GOTO_ERROR(-1, "MPI_Iprobe failed", mpi_code); + + if (flag) { + double queue_start_time; + int count; + int source = status.MPI_SOURCE; + int tag = status.MPI_TAG; + + if ((tag != READ_INDEP) && (tag != WRITE_INDEP) && (tag != TRUNC_OP) && (tag != GET_EOF_OP)) + H5_SUBFILING_GOTO_ERROR(H5E_ARGS, H5E_BADVALUE, -1, "invalid work request operation (%d)", + tag); + + if (MPI_SUCCESS != (mpi_code = MPI_Get_count(&status, MPI_BYTE, &count))) + H5_SUBFILING_MPI_GOTO_ERROR(-1, "MPI_Get_count failed", mpi_code); + + if (count < 0) + H5_SUBFILING_GOTO_ERROR(H5E_ARGS, H5E_BADVALUE, -1, "invalid work request message size (%d)", + count); + + if ((size_t)count > sizeof(sf_work_request_t)) + H5_SUBFILING_GOTO_ERROR(H5E_ARGS, H5E_BADVALUE, -1, "work request message is too large (%d)", + count); + + /* + * Zero out work request, since the received message should + * be smaller than sizeof(sf_work_request_t) + */ + HDmemset(&wk_req, 0, sizeof(sf_work_request_t)); + + if (MPI_SUCCESS != (mpi_code = MPI_Recv(&wk_req, count, MPI_BYTE, source, tag, + context->sf_msg_comm, MPI_STATUS_IGNORE))) + H5_SUBFILING_MPI_GOTO_ERROR(-1, "MPI_Recv failed", mpi_code); + + /* Dispatch work request to worker threads in thread pool */ + + queue_start_time = MPI_Wtime(); + + wk_req.tag = tag; + wk_req.source = source; + wk_req.subfile_rank = subfile_rank; + wk_req.context_id = ioc_data->sf_context_id; + wk_req.start_time = queue_start_time; + wk_req.buffer = NULL; + + ioc_io_queue_add_entry(ioc_data, &wk_req); + + HDassert(atomic_load(&ioc_data->sf_io_ops_pending) >= 0); + } + else { + struct timespec sleep_spec = {0, IOC_MAIN_SLEEP_DELAY}; + + HDnanosleep(&sleep_spec, NULL); + } + + ioc_io_queue_dispatch_eligible_entries(ioc_data, flag ? 0 : 1); + + shutdown_requested = atomic_load(&ioc_data->sf_shutdown_flag); + } + + /* Reset the shutdown flag */ + atomic_store(&ioc_data->sf_shutdown_flag, 0); + +done: + H5_SUBFILING_FUNC_LEAVE; +} /* ioc_main() */ + +#ifdef H5_SUBFILING_DEBUG +static const char * +translate_opcode(io_op_t op) +{ + switch (op) { + case READ_OP: + return "READ_OP"; + break; + case WRITE_OP: + return "WRITE_OP"; + break; + case OPEN_OP: + return "OPEN_OP"; + break; + case CLOSE_OP: + return "CLOSE_OP"; + break; + case TRUNC_OP: + return "TRUNC_OP"; + break; + case GET_EOF_OP: + return "GET_EOF_OP"; + break; + case FINI_OP: + return "FINI_OP"; + break; + case LOGGING_OP: + return "LOGGING_OP"; + break; + } + return "unknown"; +} +#endif + +/*------------------------------------------------------------------------- + * Function: handle_work_request + * + * Purpose: Handle a work request from the thread pool work queue. + * We dispatch the specific function as indicated by the + * TAG that has been added to the work request by the + * IOC main thread (which is just a copy of the MPI tag + * associated with the RPC message) and provide the subfiling + * context associated with the HDF5 file. + * + * Any status associated with the function processing is + * returned directly to the client via ACK or NACK messages. + * + * Return: (none) Doesn't fail. + * + * Programmer: Richard Warren + * 7/17/2020 + * + * Changes: Initial Version/None. + * + *------------------------------------------------------------------------- + */ +static HG_THREAD_RETURN_TYPE +handle_work_request(void *arg) +{ + ioc_io_queue_entry_t *q_entry_ptr = (ioc_io_queue_entry_t *)arg; + subfiling_context_t * sf_context = NULL; + sf_work_request_t * msg = &(q_entry_ptr->wk_req); + ioc_data_t * ioc_data = NULL; + int64_t file_context_id = msg->header[2]; + int op_ret; + hg_thread_ret_t ret_value = 0; + + HDassert(q_entry_ptr); + HDassert(q_entry_ptr->magic == H5FD_IOC__IO_Q_ENTRY_MAGIC); + HDassert(q_entry_ptr->in_progress); + + sf_context = H5_get_subfiling_object(file_context_id); + HDassert(sf_context); + + ioc_data = sf_context->ioc_data; + HDassert(ioc_data); + + atomic_fetch_add(&ioc_data->sf_work_pending, 1); + + msg->in_progress = 1; + + switch (msg->tag) { + case WRITE_INDEP: + op_ret = ioc_file_queue_write_indep(msg, msg->subfile_rank, msg->source, sf_context->sf_data_comm, + q_entry_ptr->counter); + break; + + case READ_INDEP: + op_ret = ioc_file_queue_read_indep(msg, msg->subfile_rank, msg->source, sf_context->sf_data_comm); + break; + + case TRUNC_OP: + op_ret = ioc_file_truncate(sf_context->sf_fid, q_entry_ptr->wk_req.header[0], + sf_context->topology->subfile_rank); + break; + + case GET_EOF_OP: + op_ret = ioc_file_report_eof(msg, msg->subfile_rank, msg->source, sf_context->sf_eof_comm); + break; + + default: +#ifdef H5_SUBFILING_DEBUG + H5_subfiling_log(file_context_id, "%s: IOC %d received unknown message with tag %x from rank %d", + __func__, msg->subfile_rank, msg->tag, msg->source); +#endif + + op_ret = -1; + break; + } + + atomic_fetch_sub(&ioc_data->sf_work_pending, 1); + + if (op_ret < 0) { +#ifdef H5_SUBFILING_DEBUG + H5_subfiling_log( + file_context_id, + "%s: IOC %d request(%s) filename=%s from rank(%d), size=%ld, offset=%ld FAILED with ret %d", + __func__, msg->subfile_rank, translate_opcode((io_op_t)msg->tag), sf_context->sf_filename, + msg->source, msg->header[0], msg->header[1], op_ret); +#endif + + q_entry_ptr->wk_ret = op_ret; + } + +#ifdef H5FD_IOC_DEBUG + { + int curr_io_ops_pending = atomic_load(&ioc_data->sf_io_ops_pending); + HDassert(curr_io_ops_pending > 0); + } +#endif + + /* complete the I/O request */ + ioc_io_queue_complete_entry(ioc_data, q_entry_ptr); + + HDassert(atomic_load(&ioc_data->sf_io_ops_pending) >= 0); + + /* Check the I/O Queue to see if there are any dispatchable entries */ + ioc_io_queue_dispatch_eligible_entries(ioc_data, 1); + + H5_SUBFILING_FUNC_LEAVE; +} + +/*------------------------------------------------------------------------- + * Function: begin_thread_exclusive + * + * Purpose: Mutex lock to restrict access to code or variables. + * + * Return: integer result of mutex_lock request. + * + * Programmer: Richard Warren + * 7/17/2020 + * + * Changes: Initial Version/None. + * + *------------------------------------------------------------------------- + */ +void +begin_thread_exclusive(void) +{ + hg_thread_mutex_lock(&ioc_thread_mutex); +} + +/*------------------------------------------------------------------------- + * Function: end_thread_exclusive + * + * Purpose: Mutex unlock. Should only be called by the current holder + * of the locked mutex. + * + * Return: result of mutex_unlock operation. + * + * Programmer: Richard Warren + * 7/17/2020 + * + * Changes: Initial Version/None. + * + *------------------------------------------------------------------------- + */ +void +end_thread_exclusive(void) +{ + hg_thread_mutex_unlock(&ioc_thread_mutex); +} + +static herr_t +send_ack_to_client(int ack_val, int dest_rank, int source_rank, int msg_tag, MPI_Comm comm) +{ + int mpi_code; + herr_t ret_value = SUCCEED; + + HDassert(ack_val > 0); + + (void)source_rank; + + if (MPI_SUCCESS != (mpi_code = MPI_Send(&ack_val, 1, MPI_INT, dest_rank, msg_tag, comm))) + H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Send", mpi_code); + +done: + H5_SUBFILING_FUNC_LEAVE; +} + +static herr_t +send_nack_to_client(int dest_rank, int source_rank, int msg_tag, MPI_Comm comm) +{ + int nack = 0; + int mpi_code; + herr_t ret_value = SUCCEED; + + (void)source_rank; + + if (MPI_SUCCESS != (mpi_code = MPI_Send(&nack, 1, MPI_INT, dest_rank, msg_tag, comm))) + H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Send", mpi_code); + +done: + H5_SUBFILING_FUNC_LEAVE; +} + +/* +========================================= +queue_xxx functions that should be run +from the thread pool threads... +========================================= +*/ + +/*------------------------------------------------------------------------- + * Function: ioc_file_queue_write_indep + * + * Purpose: Implement the IOC independent write function. The + * function is invoked as a result of the IOC receiving the + * "header"/RPC. What remains is to allocate memory for the + * data sent by the client and then write the data to our + * subfile. We utilize pwrite for the actual file writing. + * File flushing is done at file close. + * + * Return: The integer status returned by the Internal read_independent + * function. Successful operations will return 0. + * Errors: An MPI related error value. + * + * Programmer: Richard Warren + * 7/17/2020 + * + * Changes: Initial Version/None. + * + *------------------------------------------------------------------------- + */ +static int +ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm, + uint32_t counter) +{ + subfiling_context_t *sf_context = NULL; + MPI_Status msg_status; + hbool_t send_nack = FALSE; + int64_t data_size; + int64_t file_offset; + int64_t file_context_id; + int64_t stripe_id; + haddr_t sf_eof; +#ifdef H5FD_IOC_COLLECT_STATS + double t_start; + double t_end; + double t_write; + double t_wait; + double t_queue_delay; +#endif + char *recv_buf = NULL; + int rcv_tag; + int sf_fid; + int data_bytes_received; + int write_ret; + int mpi_code; + int ret_value = 0; + + HDassert(msg); + + /* Retrieve the fields of the RPC message for the write operation */ + data_size = msg->header[0]; + file_offset = msg->header[1]; + file_context_id = msg->header[2]; + + if (data_size < 0) { + send_nack = TRUE; + H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_BADVALUE, -1, "invalid data size for write"); + } + + sf_context = H5_get_subfiling_object(file_context_id); + HDassert(sf_context); + + stripe_id = file_offset + data_size; + sf_eof = (haddr_t)(stripe_id % sf_context->sf_stripe_size); + + stripe_id /= sf_context->sf_stripe_size; + sf_eof += (haddr_t)((stripe_id * sf_context->sf_blocksize_per_stripe) + sf_context->sf_base_addr); + + /* Flag that we've attempted to write data to the file */ + sf_context->sf_write_count++; + +#ifdef H5FD_IOC_COLLECT_STATS + /* For debugging performance */ + sf_write_ops++; + + t_start = MPI_Wtime(); + t_queue_delay = t_start - msg->start_time; + +#ifdef H5FD_IOC_DEBUG + if (sf_verbose_flag) { + if (sf_logfile) { + HDfprintf(sf_logfile, + "[ioc(%d) %s]: msg from %d: datasize=%ld\toffset=%ld, " + "queue_delay = %lf seconds\n", + subfile_rank, __func__, source, data_size, file_offset, t_queue_delay); + } + } +#endif + +#endif + + /* Allocate space to receive data sent from the client */ + if (NULL == (recv_buf = HDmalloc((size_t)data_size))) { + send_nack = TRUE; + H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, -1, "couldn't allocate receive buffer for data"); + } + + /* + * Calculate message tag for the client to use for sending + * data, then send an ACK message to the client with the + * calculated message tag. This calculated message tag + * allows us to distinguish between multiple concurrent + * writes from a single rank. + */ + HDassert(H5FD_IOC_tag_ub_val_ptr && (*H5FD_IOC_tag_ub_val_ptr >= WRITE_TAG_BASE)); + rcv_tag = (int)(counter % (INT_MAX - WRITE_TAG_BASE)); + rcv_tag %= (*H5FD_IOC_tag_ub_val_ptr - WRITE_TAG_BASE); + rcv_tag += WRITE_TAG_BASE; + + if (send_ack_to_client(rcv_tag, source, subfile_rank, WRITE_INDEP_ACK, comm) < 0) + H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_WRITEERROR, -1, "couldn't send ACK to client"); + + /* Receive data from client */ + H5_CHECK_OVERFLOW(data_size, int64_t, int); + if (MPI_SUCCESS != + (mpi_code = MPI_Recv(recv_buf, (int)data_size, MPI_BYTE, source, rcv_tag, comm, &msg_status))) + H5_SUBFILING_MPI_GOTO_ERROR(-1, "MPI_Recv failed", mpi_code); + + if (MPI_SUCCESS != (mpi_code = MPI_Get_count(&msg_status, MPI_BYTE, &data_bytes_received))) + H5_SUBFILING_MPI_GOTO_ERROR(-1, "MPI_Get_count failed", mpi_code); + + if (data_bytes_received != data_size) + H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_WRITEERROR, -1, + "message size mismatch -- expected = %" PRId64 ", actual = %d", data_size, + data_bytes_received); + +#ifdef H5FD_IOC_COLLECT_STATS + t_end = MPI_Wtime(); + t_wait = t_end - t_start; + sf_write_wait_time += t_wait; + + t_start = t_end; + +#ifdef H5FD_IOC_DEBUG + if (sf_verbose_flag) { + if (sf_logfile) { + HDfprintf(sf_logfile, "[ioc(%d) %s] MPI_Recv(%ld bytes, from = %d) status = %d\n", subfile_rank, + __func__, data_size, source, mpi_code); + } + } +#endif + +#endif + + sf_fid = sf_context->sf_fid; + +#ifdef H5FD_IOC_DEBUG + if (sf_fid < 0) + H5_subfiling_log(file_context_id, "%s: WARNING: attempt to write data to closed subfile FID %d", + __func__, sf_fid); +#endif + + if (sf_fid >= 0) { + /* Actually write data received from client into subfile */ + if ((write_ret = ioc_file_write_data(sf_fid, file_offset, recv_buf, data_size, subfile_rank)) < 0) + H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_WRITEERROR, -1, + "write function(FID=%d, Source=%d) returned an error (%d)", sf_fid, + source, write_ret); + +#ifdef H5FD_IOC_COLLECT_STATS + t_end = MPI_Wtime(); + t_write = t_end - t_start; + sf_pwrite_time += t_write; +#endif + } + +#ifdef H5FD_IOC_COLLECT_STATS + sf_queue_delay_time += t_queue_delay; +#endif + + begin_thread_exclusive(); + + /* Adjust EOF if necessary */ + if (sf_eof > sf_context->sf_eof) + sf_context->sf_eof = sf_eof; + + end_thread_exclusive(); + +done: + if (send_nack) { + /* Send NACK back to client so client can handle failure gracefully */ + if (send_nack_to_client(source, subfile_rank, WRITE_INDEP_ACK, comm) < 0) + H5_SUBFILING_DONE_ERROR(H5E_IO, H5E_WRITEERROR, -1, "couldn't send NACK to client"); + } + + HDfree(recv_buf); + + H5_SUBFILING_FUNC_LEAVE; +} /* ioc_file_queue_write_indep() */ + +/*------------------------------------------------------------------------- + * Function: ioc_file_queue_read_indep + * + * Purpose: Implement the IOC independent read function. The + * function is invoked as a result of the IOC receiving the + * "header"/RPC. What remains is to allocate memory for + * reading the data and then to send this to the client. + * We utilize pread for the actual file reading. + * + * Return: The integer status returned by the Internal read_independent + * function. Successful operations will return 0. + * Errors: An MPI related error value. + * + * Programmer: Richard Warren + * 7/17/2020 + * + * Changes: Initial Version/None. + * + *------------------------------------------------------------------------- + */ +static int +ioc_file_queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm) +{ + subfiling_context_t *sf_context = NULL; + hbool_t send_empty_buf = TRUE; + int64_t data_size; + int64_t file_offset; + int64_t file_context_id; +#ifdef H5FD_IOC_COLLECT_STATS + double t_start; + double t_end; + double t_read; + double t_queue_delay; +#endif + char *send_buf = NULL; + int sf_fid; + int read_ret; + int mpi_code; + int ret_value = 0; + + HDassert(msg); + + /* Retrieve the fields of the RPC message for the read operation */ + data_size = msg->header[0]; + file_offset = msg->header[1]; + file_context_id = msg->header[2]; + + if (data_size < 0) + H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_BADVALUE, -1, "invalid data size for read"); + + sf_context = H5_get_subfiling_object(file_context_id); + HDassert(sf_context); + + /* Flag that we've attempted to read data from the file */ + sf_context->sf_read_count++; + +#ifdef H5FD_IOC_COLLECT_STATS + /* For debugging performance */ + sf_read_ops++; + + t_start = MPI_Wtime(); + t_queue_delay = t_start - msg->start_time; + +#ifdef H5FD_IOC_DEBUG + if (sf_verbose_flag && (sf_logfile != NULL)) { + HDfprintf(sf_logfile, + "[ioc(%d) %s] msg from %d: datasize=%ld\toffset=%ld " + "queue_delay=%lf seconds\n", + subfile_rank, __func__, source, data_size, file_offset, t_queue_delay); + } +#endif + +#endif + + /* Allocate space to send data read from file to client */ + if (NULL == (send_buf = HDmalloc((size_t)data_size))) + H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, -1, "couldn't allocate send buffer for data"); + + sf_fid = sf_context->sf_fid; + if (sf_fid < 0) + H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_BADVALUE, -1, "subfile file descriptor %d is invalid", sf_fid); + + /* Read data from the subfile */ + if ((read_ret = ioc_file_read_data(sf_fid, file_offset, send_buf, data_size, subfile_rank)) < 0) { + H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_READERROR, read_ret, + "read function(FID=%d, Source=%d) returned an error (%d)", sf_fid, source, + read_ret); + } + + send_empty_buf = FALSE; + + /* Send read data to the client */ + H5_CHECK_OVERFLOW(data_size, int64_t, int); + if (MPI_SUCCESS != + (mpi_code = MPI_Send(send_buf, (int)data_size, MPI_BYTE, source, READ_INDEP_DATA, comm))) + H5_SUBFILING_MPI_GOTO_ERROR(-1, "MPI_Send failed", mpi_code); + +#ifdef H5FD_IOC_COLLECT_STATS + t_end = MPI_Wtime(); + t_read = t_end - t_start; + sf_pread_time += t_read; + sf_queue_delay_time += t_queue_delay; + +#ifdef H5FD_IOC_DEBUG + if (sf_verbose_flag && (sf_logfile != NULL)) { + HDfprintf(sf_logfile, "[ioc(%d)] MPI_Send to source(%d) completed\n", subfile_rank, source); + } +#endif + +#endif + +done: + if (send_empty_buf) { + /* + * Send an empty message back to client on failure. The client will + * likely get a message truncation error, but at least shouldn't hang. + */ + if (MPI_SUCCESS != (mpi_code = MPI_Send(NULL, 0, MPI_BYTE, source, READ_INDEP_DATA, comm))) + H5_SUBFILING_MPI_DONE_ERROR(-1, "MPI_Send failed", mpi_code); + } + + HDfree(send_buf); + + return ret_value; +} /* end ioc_file_queue_read_indep() */ + +/* +====================================================== +File functions + +The pread and pwrite posix functions are described as +being thread safe. +====================================================== +*/ + +static int +ioc_file_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank) +{ + ssize_t bytes_remaining = (ssize_t)data_size; + ssize_t bytes_written = 0; + char * this_data = (char *)data_buffer; + int ret_value = 0; + +#ifndef H5FD_IOC_DEBUG + (void)subfile_rank; +#endif + + HDcompile_assert(H5_SIZEOF_OFF_T == sizeof(file_offset)); + + while (bytes_remaining) { + errno = 0; + + bytes_written = HDpwrite(fd, this_data, (size_t)bytes_remaining, file_offset); + + if (bytes_written >= 0) { + bytes_remaining -= bytes_written; + +#ifdef H5FD_IOC_DEBUG + HDprintf("[ioc(%d) %s]: wrote %ld bytes, remaining=%ld, file_offset=%" PRId64 "\n", subfile_rank, + __func__, bytes_written, bytes_remaining, file_offset); +#endif + + this_data += bytes_written; + file_offset += bytes_written; + } + else { + H5_SUBFILING_SYS_GOTO_ERROR(H5E_IO, H5E_WRITEERROR, -1, "HDpwrite failed"); + } + } + + /* We don't usually use this for each file write. We usually do the file + * flush as part of file close operation. + */ +#ifdef H5FD_IOC_REQUIRE_FLUSH + fdatasync(fd); +#endif + +done: + H5_SUBFILING_FUNC_LEAVE; +} /* end ioc_file_write_data() */ + +static int +ioc_file_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank) +{ + useconds_t delay = 100; + ssize_t bytes_remaining = (ssize_t)data_size; + ssize_t bytes_read = 0; + char * this_buffer = (char *)data_buffer; + int retries = MIN_READ_RETRIES; + int ret_value = 0; + +#ifndef H5FD_IOC_DEBUG + (void)subfile_rank; +#endif + + HDcompile_assert(H5_SIZEOF_OFF_T == sizeof(file_offset)); + + while (bytes_remaining) { + errno = 0; + + bytes_read = HDpread(fd, this_buffer, (size_t)bytes_remaining, file_offset); + + if (bytes_read > 0) { + /* Reset retry params */ + retries = MIN_READ_RETRIES; + delay = 100; + + bytes_remaining -= bytes_read; + +#ifdef H5FD_IOC_DEBUG + HDprintf("[ioc(%d) %s]: read %ld bytes, remaining=%ld, file_offset=%" PRId64 "\n", subfile_rank, + __func__, bytes_read, bytes_remaining, file_offset); +#endif + + this_buffer += bytes_read; + file_offset += bytes_read; + } + else if (bytes_read == 0) { + HDassert(bytes_remaining > 0); + + /* end of file but not end of format address space */ + HDmemset(this_buffer, 0, (size_t)bytes_remaining); + break; + } + else { + if (retries == 0) { +#ifdef H5FD_IOC_DEBUG + HDprintf("[ioc(%d) %s]: TIMEOUT: file_offset=%" PRId64 ", data_size=%ld\n", subfile_rank, + __func__, file_offset, data_size); +#endif + + H5_SUBFILING_SYS_GOTO_ERROR(H5E_IO, H5E_READERROR, -1, "HDpread failed"); + } + + retries--; + usleep(delay); + delay *= 2; + } + } + +done: + H5_SUBFILING_FUNC_LEAVE; +} /* end ioc_file_read_data() */ + +static int +ioc_file_truncate(int fd, int64_t length, int subfile_rank) +{ + int ret_value = 0; + +#ifndef H5FD_IOC_DEBUG + (void)subfile_rank; +#endif + + if (HDftruncate(fd, (off_t)length) != 0) + H5_SUBFILING_SYS_GOTO_ERROR(H5E_FILE, H5E_SEEKERROR, -1, "HDftruncate failed"); + +#ifdef H5FD_IOC_DEBUG + HDprintf("[ioc(%d) %s]: truncated subfile to %lld bytes. ret = %d\n", subfile_rank, __func__, + (long long)length, errno); + HDfflush(stdout); +#endif + +done: + H5_SUBFILING_FUNC_LEAVE; +} /* end ioc_file_truncate() */ + +/*------------------------------------------------------------------------- + * Function: ioc_file_report_eof + * + * Purpose: Determine the target sub-file's eof and report this value + * to the requesting rank. + * + * Notes: This function will have to be reworked once we solve + * the IOC error reporting problem. + * + * This function mixes functionality that should be + * in two different VFDs. + * + * Return: 0 if successful, 1 or an MPI error code on failure. + * + * Programmer: John Mainzer + * 7/17/2020 + * + * Changes: Initial Version/None. + * + *------------------------------------------------------------------------- + */ + +static int +ioc_file_report_eof(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm) +{ + subfiling_context_t *sf_context = NULL; + h5_stat_t sb; + int64_t eof_req_reply[3]; + int64_t file_context_id; + int fd; + int mpi_code; + int ret_value = 0; + + HDassert(msg); + + /* first get the EOF of the target file. */ + + file_context_id = msg->header[2]; + + if (NULL == (sf_context = H5_get_subfiling_object(file_context_id))) + H5_SUBFILING_GOTO_ERROR(H5E_FILE, H5E_CANTGET, -1, "couldn't retrieve subfiling context"); + + fd = sf_context->sf_fid; + + if (HDfstat(fd, &sb) < 0) + H5_SUBFILING_SYS_GOTO_ERROR(H5E_FILE, H5E_SYSERRSTR, -1, "HDfstat failed"); + + eof_req_reply[0] = (int64_t)subfile_rank; + eof_req_reply[1] = (int64_t)(sb.st_size); + eof_req_reply[2] = 0; /* not used */ + +#ifdef H5_SUBFILING_DEBUG + H5_subfiling_log(file_context_id, "%s: reporting file EOF as %" PRId64 ".", __func__, eof_req_reply[1]); +#endif + + /* return the subfile EOF to the querying rank */ + if (MPI_SUCCESS != (mpi_code = MPI_Send(eof_req_reply, 3, MPI_INT64_T, source, GET_EOF_COMPLETED, comm))) + H5_SUBFILING_MPI_GOTO_ERROR(-1, "MPI_Send", mpi_code); + +done: + H5_SUBFILING_FUNC_LEAVE; +} /* ioc_file_report_eof() */ + +/*------------------------------------------------------------------------- + * Function: ioc_io_queue_alloc_entry + * + * Purpose: Allocate and initialize an instance of + * ioc_io_queue_entry_t. Return pointer to the new + * instance on success, and NULL on failure. + * + * Return: Pointer to new instance of ioc_io_queue_entry_t + * on success, and NULL on failure. + * + * Programmer: JRM -- 11/6/21 + * + * Changes: None. + * + *------------------------------------------------------------------------- + */ +static ioc_io_queue_entry_t * +ioc_io_queue_alloc_entry(void) +{ + ioc_io_queue_entry_t *q_entry_ptr = NULL; + + q_entry_ptr = (ioc_io_queue_entry_t *)HDmalloc(sizeof(ioc_io_queue_entry_t)); + + if (q_entry_ptr) { + + q_entry_ptr->magic = H5FD_IOC__IO_Q_ENTRY_MAGIC; + q_entry_ptr->next = NULL; + q_entry_ptr->prev = NULL; + q_entry_ptr->in_progress = FALSE; + q_entry_ptr->counter = 0; + + /* will memcpy the wk_req field, so don't bother to initialize */ + /* will initialize thread_wk field before use */ + + q_entry_ptr->wk_ret = 0; + +#ifdef H5FD_IOC_COLLECT_STATS + q_entry_ptr->q_time = 0; + q_entry_ptr->dispatch_time = 0; +#endif + } + + return q_entry_ptr; +} /* ioc_io_queue_alloc_entry() */ + +/*------------------------------------------------------------------------- + * Function: ioc_io_queue_add_entry + * + * Purpose: Add an I/O request to the tail of the IOC I/O Queue. + * + * To do this, we must: + * + * 1) allocate a new instance of ioc_io_queue_entry_t + * + * 2) Initialize the new instance and copy the supplied + * instance of sf_work_request_t into it. + * + * 3) Append it to the IOC I/O queue. + * + * Note that this does not dispatch the request even if it + * is eligible for immediate dispatch. This is done with + * a call to ioc_io_queue_dispatch_eligible_entries(). + * + * Return: void. + * + * Programmer: JRM -- 11/7/21 + * + * Changes: None. + * + *------------------------------------------------------------------------- + */ +static void +ioc_io_queue_add_entry(ioc_data_t *ioc_data, sf_work_request_t *wk_req_ptr) +{ + ioc_io_queue_entry_t *entry_ptr = NULL; + + HDassert(ioc_data); + HDassert(ioc_data->io_queue.magic == H5FD_IOC__IO_Q_MAGIC); + HDassert(wk_req_ptr); + + entry_ptr = ioc_io_queue_alloc_entry(); + + HDassert(entry_ptr); + HDassert(entry_ptr->magic == H5FD_IOC__IO_Q_ENTRY_MAGIC); + + HDmemcpy((void *)(&(entry_ptr->wk_req)), (const void *)wk_req_ptr, sizeof(sf_work_request_t)); + + /* must obtain io_queue mutex before appending */ + hg_thread_mutex_lock(&ioc_data->io_queue.q_mutex); + + HDassert(ioc_data->io_queue.q_len == atomic_load(&ioc_data->sf_io_ops_pending)); + + entry_ptr->counter = ioc_data->io_queue.req_counter++; + + ioc_data->io_queue.num_pending++; + + H5FD_IOC__Q_APPEND(&ioc_data->io_queue, entry_ptr); + + atomic_fetch_add(&ioc_data->sf_io_ops_pending, 1); + +#ifdef H5_SUBFILING_DEBUG + H5_subfiling_log(wk_req_ptr->context_id, + "%s: request %d queued. op = %d, offset/len = %lld/%lld, q-ed/disp/ops_pend = %d/%d/%d.", + __func__, entry_ptr->counter, (entry_ptr->wk_req.tag), + (long long)(entry_ptr->wk_req.header[1]), (long long)(entry_ptr->wk_req.header[0]), + ioc_data->io_queue.num_pending, ioc_data->io_queue.num_in_progress, + atomic_load(&ioc_data->sf_io_ops_pending)); +#endif + + HDassert(ioc_data->io_queue.num_pending + ioc_data->io_queue.num_in_progress == ioc_data->io_queue.q_len); + +#ifdef H5FD_IOC_COLLECT_STATS + entry_ptr->q_time = H5_now_usec(); + + if (ioc_data->io_queue.q_len > ioc_data->io_queue.max_q_len) { + ioc_data->io_queue.max_q_len = ioc_data->io_queue.q_len; + } + + if (ioc_data->io_queue.num_pending > ioc_data->io_queue.max_num_pending) { + ioc_data->io_queue.max_num_pending = ioc_data->io_queue.num_pending; + } + + if (entry_ptr->wk_req.tag == READ_INDEP) { + ioc_data->io_queue.ind_read_requests++; + } + else if (entry_ptr->wk_req.tag == WRITE_INDEP) { + ioc_data->io_queue.ind_write_requests++; + } + else if (entry_ptr->wk_req.tag == TRUNC_OP) { + ioc_data->io_queue.truncate_requests++; + } + else if (entry_ptr->wk_req.tag == GET_EOF_OP) { + ioc_data->io_queue.get_eof_requests++; + } + + ioc_data->io_queue.requests_queued++; +#endif + +#ifdef H5_SUBFILING_DEBUG + if (ioc_data->io_queue.q_len != atomic_load(&ioc_data->sf_io_ops_pending)) { + H5_subfiling_log( + wk_req_ptr->context_id, + "%s: ioc_data->io_queue->q_len = %d != %d = atomic_load(&ioc_data->sf_io_ops_pending).", __func__, + ioc_data->io_queue.q_len, atomic_load(&ioc_data->sf_io_ops_pending)); + } +#endif + + HDassert(ioc_data->io_queue.q_len == atomic_load(&ioc_data->sf_io_ops_pending)); + + hg_thread_mutex_unlock(&ioc_data->io_queue.q_mutex); + + return; +} /* ioc_io_queue_add_entry() */ + +/*------------------------------------------------------------------------- + * Function: ioc_io_queue_dispatch_eligible_entries + * + * Purpose: Scan the IOC I/O Queue for dispatchable entries, and + * dispatch any such entries found. + * + * Do this by scanning the I/O queue from head to tail for + * entries that: + * + * 1) Have not already been dispatched + * + * 2) Either: + * + * a) do not intersect with any prior entries on the + * I/O queue, or + * + * b) Are read requests, and all intersections are with + * prior read requests. + * + * Dispatch any such entries found. + * + * Do this to maintain the POSIX semantics required by + * HDF5. + * + * Note that TRUNC_OPs and GET_EOF_OPs are a special case. + * Specifically, no I/O queue entry can be dispatched if + * there is a truncate or get EOF operation between it and + * the head of the queue. Further, a truncate or get EOF + * request cannot be executed unless it is at the head of + * the queue. + * + * Return: void. + * + * Programmer: JRM -- 11/7/21 + * + * Changes: None. + * + *------------------------------------------------------------------------- + */ +/* TODO: Keep an eye on statistics and optimize this algorithm if necessary. While it is O(N) + * where N is the number of elements in the I/O Queue if there are are no-overlaps, it + * can become O(N**2) in the worst case. + */ +static void +ioc_io_queue_dispatch_eligible_entries(ioc_data_t *ioc_data, hbool_t try_lock) +{ + hbool_t conflict_detected; + int64_t entry_offset; + int64_t entry_len; + int64_t scan_offset; + int64_t scan_len; + ioc_io_queue_entry_t *entry_ptr = NULL; + ioc_io_queue_entry_t *scan_ptr = NULL; + + HDassert(ioc_data); + HDassert(ioc_data->io_queue.magic == H5FD_IOC__IO_Q_MAGIC); + + if (try_lock) { + if (hg_thread_mutex_try_lock(&ioc_data->io_queue.q_mutex) < 0) + return; + } + else + hg_thread_mutex_lock(&ioc_data->io_queue.q_mutex); + + entry_ptr = ioc_data->io_queue.q_head; + + /* sanity check on first element in the I/O queue */ + HDassert((entry_ptr == NULL) || (entry_ptr->prev == NULL)); + + while ((entry_ptr) && (ioc_data->io_queue.num_pending > 0)) { + + HDassert(entry_ptr->magic == H5FD_IOC__IO_Q_ENTRY_MAGIC); + + /* Check for a get EOF or truncate operation at head of queue */ + if (ioc_data->io_queue.q_head->in_progress) { + if ((ioc_data->io_queue.q_head->wk_req.tag == TRUNC_OP) || + (ioc_data->io_queue.q_head->wk_req.tag == GET_EOF_OP)) { + + /* we have a truncate or get eof operation in progress -- thus no other operations + * can be dispatched until the truncate or get eof operation completes. Just break + * out of the loop. + */ + + break; + } + } + + if (!entry_ptr->in_progress) { + + entry_offset = entry_ptr->wk_req.header[1]; + entry_len = entry_ptr->wk_req.header[0]; + + conflict_detected = FALSE; + + scan_ptr = entry_ptr->prev; + + HDassert((scan_ptr == NULL) || (scan_ptr->magic == H5FD_IOC__IO_Q_ENTRY_MAGIC)); + + if ((entry_ptr->wk_req.tag == TRUNC_OP) || (entry_ptr->wk_req.tag == GET_EOF_OP)) { + + if (scan_ptr != NULL) { + + /* the TRUNC_OP or GET_EOF_OP is not at the head of the queue, and thus cannot + * be dispatched. Further, no operation can be dispatched if a truncate request + * appears before it in the queue. Thus we have done all we can and will break + * out of the loop. + */ + break; + } + } + + while ((scan_ptr) && (!conflict_detected)) { + + /* check for overlaps */ + scan_offset = scan_ptr->wk_req.header[1]; + scan_len = scan_ptr->wk_req.header[0]; + + /* at present, I/O requests are scalar -- i.e. single blocks specified by offset and length. + * when this changes, this if statement will have to be updated accordingly. + */ + if (((scan_offset + scan_len) > entry_offset) && ((entry_offset + entry_len) > scan_offset)) { + + /* the two request overlap -- unless they are both reads, we have detected a conflict */ + + /* TODO: update this if statement when we add collective I/O */ + if ((entry_ptr->wk_req.tag != READ_INDEP) || (scan_ptr->wk_req.tag != READ_INDEP)) { + + conflict_detected = TRUE; + } + } + + scan_ptr = scan_ptr->prev; + } + + if (!conflict_detected) { /* dispatch I/O request */ + + HDassert(scan_ptr == NULL); + HDassert(!entry_ptr->in_progress); + + entry_ptr->in_progress = TRUE; + + HDassert(ioc_data->io_queue.num_pending > 0); + + ioc_data->io_queue.num_pending--; + ioc_data->io_queue.num_in_progress++; + + HDassert(ioc_data->io_queue.num_pending + ioc_data->io_queue.num_in_progress == + ioc_data->io_queue.q_len); + + entry_ptr->thread_wk.func = handle_work_request; + entry_ptr->thread_wk.args = entry_ptr; + +#ifdef H5_SUBFILING_DEBUG + H5_subfiling_log(entry_ptr->wk_req.context_id, + "%s: request %d dispatched. op = %d, offset/len = %lld/%lld, " + "q-ed/disp/ops_pend = %d/%d/%d.", + __func__, entry_ptr->counter, (entry_ptr->wk_req.tag), + (long long)(entry_ptr->wk_req.header[1]), + (long long)(entry_ptr->wk_req.header[0]), ioc_data->io_queue.num_pending, + ioc_data->io_queue.num_in_progress, + atomic_load(&ioc_data->sf_io_ops_pending)); +#endif + +#ifdef H5FD_IOC_COLLECT_STATS + if (ioc_data->io_queue.num_in_progress > ioc_data->io_queue.max_num_in_progress) { + ioc_data->io_queue.max_num_in_progress = ioc_data->io_queue.num_in_progress; + } + + ioc_data->io_queue.requests_dispatched++; + + entry_ptr->dispatch_time = H5_now_usec(); +#endif + + hg_thread_pool_post(ioc_data->io_thread_pool, &(entry_ptr->thread_wk)); + } + } + + entry_ptr = entry_ptr->next; + } + + HDassert(ioc_data->io_queue.q_len == atomic_load(&ioc_data->sf_io_ops_pending)); + + hg_thread_mutex_unlock(&ioc_data->io_queue.q_mutex); +} /* ioc_io_queue_dispatch_eligible_entries() */ + +/*------------------------------------------------------------------------- + * Function: ioc_io_queue_complete_entry + * + * Purpose: Update the IOC I/O Queue for the completion of an I/O + * request. + * + * To do this: + * + * 1) Remove the entry from the I/O Queue + * + * 2) If so configured, update statistics + * + * 3) Discard the instance of ioc_io_queue_entry_t. + * + * Return: void. + * + * Programmer: JRM -- 11/7/21 + * + * Changes: None. + * + *------------------------------------------------------------------------- + */ +static void +ioc_io_queue_complete_entry(ioc_data_t *ioc_data, ioc_io_queue_entry_t *entry_ptr) +{ +#ifdef H5FD_IOC_COLLECT_STATS + uint64_t queued_time; + uint64_t execution_time; +#endif + + HDassert(ioc_data); + HDassert(ioc_data->io_queue.magic == H5FD_IOC__IO_Q_MAGIC); + HDassert(entry_ptr); + HDassert(entry_ptr->magic == H5FD_IOC__IO_Q_ENTRY_MAGIC); + + /* must obtain io_queue mutex before deleting and updating stats */ + hg_thread_mutex_lock(&ioc_data->io_queue.q_mutex); + + HDassert(ioc_data->io_queue.num_pending + ioc_data->io_queue.num_in_progress == ioc_data->io_queue.q_len); + HDassert(ioc_data->io_queue.num_in_progress > 0); + + if (entry_ptr->wk_ret < 0) + ioc_data->io_queue.num_failed++; + + H5FD_IOC__Q_REMOVE(&ioc_data->io_queue, entry_ptr); + + ioc_data->io_queue.num_in_progress--; + + HDassert(ioc_data->io_queue.num_pending + ioc_data->io_queue.num_in_progress == ioc_data->io_queue.q_len); + + atomic_fetch_sub(&ioc_data->sf_io_ops_pending, 1); + +#ifdef H5_SUBFILING_DEBUG + H5_subfiling_log(entry_ptr->wk_req.context_id, + "%s: request %d completed with ret %d. op = %d, offset/len = %lld/%lld, " + "q-ed/disp/ops_pend = %d/%d/%d.", + __func__, entry_ptr->counter, entry_ptr->wk_ret, (entry_ptr->wk_req.tag), + (long long)(entry_ptr->wk_req.header[1]), (long long)(entry_ptr->wk_req.header[0]), + ioc_data->io_queue.num_pending, ioc_data->io_queue.num_in_progress, + atomic_load(&ioc_data->sf_io_ops_pending)); + + /* + * If this I/O request is a truncate or "get eof" op, make sure + * there aren't other operations in progress + */ + if ((entry_ptr->wk_req.tag == GET_EOF_OP) || (entry_ptr->wk_req.tag == TRUNC_OP)) + HDassert(ioc_data->io_queue.num_in_progress == 0); +#endif + + HDassert(ioc_data->io_queue.q_len == atomic_load(&ioc_data->sf_io_ops_pending)); + +#ifdef H5FD_IOC_COLLECT_STATS + /* Compute the queued and execution time */ + queued_time = entry_ptr->dispatch_time - entry_ptr->q_time; + execution_time = H5_now_usec() = entry_ptr->dispatch_time; + + ioc_data->io_queue.requests_completed++; + + entry_ptr->q_time = H5_now_usec(); + +#endif + + hg_thread_mutex_unlock(&ioc_data->io_queue.q_mutex); + + HDassert(entry_ptr->wk_req.buffer == NULL); + + ioc_io_queue_free_entry(entry_ptr); + + entry_ptr = NULL; + + return; +} /* ioc_io_queue_complete_entry() */ + +/*------------------------------------------------------------------------- + * Function: ioc_io_queue_free_entry + * + * Purpose: Free the supplied instance of ioc_io_queue_entry_t. + * + * Verify that magic field is set to + * H5FD_IOC__IO_Q_ENTRY_MAGIC, and that the next and prev + * fields are NULL. + * + * Return: void. + * + * Programmer: JRM -- 11/6/21 + * + * Changes: None. + * + *------------------------------------------------------------------------- + */ +static void +ioc_io_queue_free_entry(ioc_io_queue_entry_t *q_entry_ptr) +{ + /* use assertions for error checking, since the following should never fail. */ + HDassert(q_entry_ptr); + HDassert(q_entry_ptr->magic == H5FD_IOC__IO_Q_ENTRY_MAGIC); + HDassert(q_entry_ptr->next == NULL); + HDassert(q_entry_ptr->prev == NULL); + HDassert(q_entry_ptr->wk_req.buffer == NULL); + + q_entry_ptr->magic = 0; + + HDfree(q_entry_ptr); + + q_entry_ptr = NULL; + + return; +} /* H5FD_ioc__free_c_io_q_entry() */ |