summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRichard Warren <Richard.Warren@hdfgroup.org>2020-06-22 18:46:24 (GMT)
committerRichard Warren <Richard.Warren@hdfgroup.org>2020-06-22 18:46:24 (GMT)
commit5e02da94f11742aa246eb284c964709d97404d3d (patch)
treef7c15b98588051287ceaad887ec122917af2d92e
parentd20000ec51d50b66fc1226eeb656b8dc1358f826 (diff)
downloadhdf5-5e02da94f11742aa246eb284c964709d97404d3d.zip
hdf5-5e02da94f11742aa246eb284c964709d97404d3d.tar.gz
hdf5-5e02da94f11742aa246eb284c964709d97404d3d.tar.bz2
Initial subfiling branch
-rw-r--r--src/CMakeLists.txt7
-rw-r--r--src/H5FDsubfile.c250
-rw-r--r--src/H5FDsubfile.h0
-rw-r--r--src/H5FDsubfile_mpi.c1542
-rw-r--r--src/H5FDsubfile_private.h183
-rw-r--r--src/H5FDsubfile_public.h11
-rw-r--r--src/H5FDsubfile_threads.c132
-rw-r--r--src/mercury/mercury_atomic.h637
-rw-r--r--src/mercury/mercury_atomic_queue.c83
-rw-r--r--src/mercury/mercury_atomic_queue.h271
-rw-r--r--src/mercury/mercury_event.c72
-rw-r--r--src/mercury/mercury_event.h184
-rw-r--r--src/mercury/mercury_hash_string.h48
-rw-r--r--src/mercury/mercury_hash_table.c526
-rw-r--r--src/mercury/mercury_hash_table.h252
-rw-r--r--src/mercury/mercury_list.h126
-rw-r--r--src/mercury/mercury_log.c128
-rw-r--r--src/mercury/mercury_log.h104
-rw-r--r--src/mercury/mercury_mem.c177
-rw-r--r--src/mercury/mercury_mem.h92
-rw-r--r--src/mercury/mercury_poll.c531
-rw-r--r--src/mercury/mercury_poll.h164
-rw-r--r--src/mercury/mercury_queue.h123
-rw-r--r--src/mercury/mercury_request.c224
-rw-r--r--src/mercury/mercury_request.h242
-rw-r--r--src/mercury/mercury_thread.c162
-rw-r--r--src/mercury/mercury_thread.h242
-rw-r--r--src/mercury/mercury_thread_condition.c46
-rw-r--r--src/mercury/mercury_thread_condition.h182
-rw-r--r--src/mercury/mercury_thread_mutex.c50
-rw-r--r--src/mercury/mercury_thread_mutex.h127
-rw-r--r--src/mercury/mercury_thread_pool.c191
-rw-r--r--src/mercury/mercury_thread_pool.h124
-rw-r--r--src/mercury/mercury_thread_rwlock.c77
-rw-r--r--src/mercury/mercury_thread_rwlock.h236
-rw-r--r--src/mercury/mercury_thread_spin.c47
-rw-r--r--src/mercury/mercury_thread_spin.h146
-rw-r--r--src/mercury/mercury_time.h402
-rw-r--r--src/mercury/mercury_util_config.h141
-rw-r--r--src/mercury/mercury_util_error.c20
-rw-r--r--src/mercury/mercury_util_error.h104
-rw-r--r--testpar/CMakeLists.txt1
-rw-r--r--testpar/t_subfile_openclose.c42
-rw-r--r--tools/lib/h5diff.c7
-rw-r--r--tools/lib/h5tools_utils.c1
-rw-r--r--tools/lib/h5tools_utils.h1
-rw-r--r--tools/lib/h5trav.c17
47 files changed, 8474 insertions, 1 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index c0915c8..c5f6316 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -243,6 +243,9 @@ set (H5FD_SOURCES
${HDF5_SRC_DIR}/H5FDstdio.c
${HDF5_SRC_DIR}/H5FDtest.c
${HDF5_SRC_DIR}/H5FDwindows.c
+ ${HDF5_SRC_DIR}/H5FDsubfile.c
+ ${HDF5_SRC_DIR}/H5FDsubfile_threads.c
+ ${HDF5_SRC_DIR}/H5FDsubfile_mpi.c
)
set (H5FD_HDRS
@@ -262,6 +265,10 @@ set (H5FD_HDRS
${HDF5_SRC_DIR}/H5FDsplitter.h
${HDF5_SRC_DIR}/H5FDstdio.h
${HDF5_SRC_DIR}/H5FDwindows.h
+ ${HDF5_SRC_DIR}/H5FDsubfile_public.h
+ ${HDF5_SRC_DIR}/mercury/mercury_thread.h
+ ${HDF5_SRC_DIR}/mercury/mercury_thread_mutex.h
+ ${HDF5_SRC_DIR}/mercury/mercury_log.h
)
IDE_GENERATED_PROPERTIES ("H5FD" "${H5FD_HDRS}" "${H5FD_SOURCES}" )
diff --git a/src/H5FDsubfile.c b/src/H5FDsubfile.c
new file mode 100644
index 0000000..2b94f0d
--- /dev/null
+++ b/src/H5FDsubfile.c
@@ -0,0 +1,250 @@
+
+#include "H5FDsubfile_public.h"
+
+#ifdef H5_HAVE_PARALLEL
+
+/***********/
+/* Headers */
+/***********/
+#include "H5private.h" /* Generic Functions */
+#include "H5CXprivate.h" /* API Contexts */
+#include "H5Dprivate.h" /* Datasets */
+#include "H5Eprivate.h" /* Error handling */
+#include "H5Ipublic.h" /* IDs */
+#include "H5Iprivate.h" /* IDs */
+#include "H5MMprivate.h" /* Memory management */
+#include "H5Pprivate.h" /* Property lists */
+
+/*
+=========================================
+Private functions
+========================================
+*/
+
+static size_t sf_topology_limit = 4;
+static size_t sf_topology_entries = 0;
+static sf_topology_t **sf_topology_cache = NULL;
+
+static size_t sf_context_limit = 4;
+static size_t sf_context_entries = 0;
+static subfiling_context_t **sf_context_cache = NULL;
+static hid_t context_id = H5I_INVALID_HID;
+static hid_t topology_id = H5I_INVALID_HID;
+
+
+static int64_t record_subfiling_object(SF_OBJ_TYPE type, void *obj)
+{
+ size_t index;
+ int64_t obj_reference;
+ uint64_t tag;
+ switch(type) {
+ case SF_TOPOLOGY: {
+ if (sf_topology_cache == NULL) {
+ sf_topology_cache = (sf_topology_t **)
+ calloc(sf_topology_limit, sizeof(sf_topology_t *));
+ }
+ assert(sf_topology_cache != NULL);
+ index = sf_topology_entries++;
+ tag = SF_TOPOLOGY;
+ obj_reference = (int64_t)((tag << 32) | index);
+ sf_topology_cache[index] = obj;
+ return obj_reference;
+ break;
+ }
+ case SF_CONTEXT: {
+ if (sf_context_cache == NULL) {
+ sf_context_cache = (subfiling_context_t **)
+ calloc(sf_context_limit, sizeof(subfiling_context_t *));
+ }
+ assert(sf_context_cache != NULL);
+ index = sf_context_entries++;
+ tag = SF_CONTEXT;
+ obj_reference = (int64_t)((tag << 32) | index);
+ sf_context_cache[index] = (subfiling_context_t *)obj;
+ return obj_reference;
+ break;
+ }
+ default:
+ puts("UNKNOWN Subfiling object type");
+ }
+
+ return -1;
+}
+
+/*
+=========================================
+Public vars (for subfiling) and functions
+========================================
+*/
+
+int sf_verbose_flag = 0;
+
+/*
+=========================================
+File functions
+=========================================
+
+The pread and pwrite posix functions are described as
+being thread safe. We include mutex locks and unlocks
+to work around any potential threading conflicts...
+Those however, are compiled according #ifdef
+*/
+
+int sf_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank)
+{
+ int ret = 0;
+ ssize_t bytes_read;
+ ssize_t bytes_remaining = (ssize_t)data_size;
+ char *this_buffer = data_buffer;
+
+ while(bytes_remaining) {
+ if ((bytes_read = (ssize_t)pread(fd, this_buffer, (size_t)bytes_remaining, file_offset)) < 0) {
+ perror("pread failed!");
+ fflush(stdout);
+ }
+ else if (bytes_read > 0) {
+ if (sf_verbose_flag) {
+ printf("[ioc(%d) %s] read %ld bytes of %ld requested\n",
+ subfile_rank, __func__,
+ bytes_read, bytes_remaining);
+ }
+ bytes_remaining -= bytes_read;
+ this_buffer += bytes_read;
+ file_offset += bytes_read;
+ }
+ else {
+ printf("[ioc(%d) %s] ERROR! read of 0 bytes == eof!\n", subfile_rank, __func__ );
+ fflush(stdout);
+ break;
+ }
+ }
+ return ret;
+}
+
+int sf_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank)
+{
+ int ret = 0;
+ char *this_data = (char *)data_buffer;
+ ssize_t bytes_remaining = (ssize_t) data_size;
+ ssize_t written = 0;
+
+ while(bytes_remaining) {
+ if ((written = pwrite(fd, this_data, (size_t)bytes_remaining, file_offset)) < 0) {
+ perror("pwrite failed!");
+ fflush(stdout);
+ }
+ else {
+ if (sf_verbose_flag) {
+ printf("[ioc(%d) %s] wrote %ld bytes of %ld requested\n",
+ subfile_rank, __func__,
+ written, bytes_remaining);
+ }
+ bytes_remaining -= written;
+ this_data += written;
+ file_offset += written;
+ }
+ }
+#ifdef SUBFILE_REQUIRE_FLUSH
+ fdatasync(fd);
+#endif
+
+ return ret;
+}
+
+
+
+
+void * get_subfiling_object(int64_t object_id)
+{
+ int obj_type = (int)((object_id >> 32) & 0x0FFFF);
+ /* We don't require a large indexing space
+ * 16 bits should be enough..
+ */
+ size_t index = (object_id & 0x0FFFF);
+ if (obj_type == SF_TOPOLOGY) {
+ if (index < sf_context_entries) {
+ return (void *)sf_topology_cache[index];
+ }
+ else {
+ puts("Illegal object index");
+ }
+ }
+ else if (obj_type == SF_CONTEXT) {
+ if (index < sf_context_entries) {
+ return (void *)sf_context_cache[index];
+ }
+ else {
+ puts("Illegal object index");
+ }
+ }
+ else {
+ puts("UNKNOWN Subfiling object type");
+ }
+ return NULL;
+}
+
+herr_t
+H5FDsubfiling_init(void)
+{
+ herr_t ret_value = SUCCEED;
+ int ioc_count;
+ int world_rank, world_size;
+ sf_topology_t *thisApp = NULL;
+ subfiling_context_t *newContext = NULL;
+
+
+
+ if (MPI_Comm_size(MPI_COMM_WORLD, &world_size) != MPI_SUCCESS) {
+ puts("MPI_Comm_size returned an error");
+ ret_value = FAIL;
+ goto done;
+ }
+ if (MPI_Comm_rank(MPI_COMM_WORLD, &world_rank) != MPI_SUCCESS) {
+ puts("MPI_Comm_rank returned an error");
+ ret_value = FAIL;
+ goto done;
+ }
+ if ((ioc_count = H5FD__determine_ioc_count (world_size, world_rank, &thisApp)) > 0) {
+ topology_id = (hid_t)record_subfiling_object(SF_TOPOLOGY, thisApp);
+ }
+ if (topology_id < 0) {
+ puts("Unable to register subfiling topology!");
+ ret_value = FAIL;
+ goto done;
+ }
+ if (H5FD__init_subfile_context(&newContext, ioc_count, world_size, world_rank, thisApp->rank_is_ioc) != SUCCEED) {
+ puts("Unable to initialize a subfiling context!");
+ ret_value = FAIL;
+ goto done;
+ }
+ context_id = (hid_t)record_subfiling_object(SF_CONTEXT, newContext);
+ if (context_id < 0) {
+ ret_value = FAIL;
+ puts("Unable to register subfiling context!");
+ }
+
+done:
+ return ret_value;
+}
+
+herr_t
+H5FDsubfiling_finalize(void)
+{
+ herr_t ret_value = SUCCEED; /* Return value */
+
+ /* Shutdown the IO Concentrator threads */
+ sf_shutdown_flag = 1;
+ usleep(100);
+ MPI_Barrier(MPI_COMM_WORLD);
+ delete_subfiling_context(context_id);
+
+ return ret_value;
+}
+
+hid_t
+get_subfiling_context()
+{
+ return context_id;
+}
+
+#endif /* H5_HAVE_PARALLEL */
diff --git a/src/H5FDsubfile.h b/src/H5FDsubfile.h
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/H5FDsubfile.h
diff --git a/src/H5FDsubfile_mpi.c b/src/H5FDsubfile_mpi.c
new file mode 100644
index 0000000..c66c6b7
--- /dev/null
+++ b/src/H5FDsubfile_mpi.c
@@ -0,0 +1,1542 @@
+
+#include "H5FDsubfile_private.h"
+
+static int *io_concentrator = NULL;
+static int n_io_concentrators = -1;
+static int sf_world_rank = -1;
+static int sf_world_size = -1;
+static int subfile_fid = -1;
+static int64_t sf_stripe_size = -1;
+static int64_t sf_blocksize_per_stripe = 0;
+
+static MPI_Datatype H5FD__create_f_l_mpi_type(subfiling_context_t *context,
+ int64_t target_write_bytes,
+ int64_t first_write,
+ int64_t last_write,
+ int ioc_depth);
+static MPI_Datatype H5FD__create_first_mpi_type(subfiling_context_t *context,
+ int64_t offset,
+ int64_t target_write_bytes,
+ int64_t first_write,
+ int ioc_depth);
+static MPI_Datatype H5FD__create_final_mpi_type(subfiling_context_t *context,
+ int64_t target_write_bytes,
+ int64_t last_write,
+ int ioc_depth);
+static MPI_Datatype H5FD__create_mpi_uniform_type(subfiling_context_t *context,
+ int64_t offset,
+ int64_t target_write_bytes,
+ int ioc_depth);
+
+static int * request_count_per_rank = NULL;
+
+atomic_int sf_workinprogress = 0;
+atomic_int sf_work_pending = 0;
+atomic_int sf_file_close_count = 0;
+atomic_int sf_file_refcount = 0;
+
+#ifdef DEBUG_TRACING
+FILE *sf_logfile = NULL;
+#endif
+
+MPI_Comm sf_msg_comm = MPI_COMM_NULL; /* Messages IN */
+MPI_Comm sf_data_comm = MPI_COMM_NULL; /* Messages OUT */
+
+int sf_shutdown_flag = 0;
+
+const char *sf_subfile_prefix = ".";
+
+
+#define MAX_WORK_PER_RANK 2
+
+/*
+=========================================
+Private functions
+=========================================
+*/
+
+static int _determine_subfile_rank(int myrank)
+{
+ if (io_concentrator) {
+ int i;
+ for(i=0; i< n_io_concentrators; i++) {
+ if (io_concentrator[i] == myrank)
+ return i;
+ }
+ }
+ return -1;
+}
+
+static int is_io_concentrator(int rank)
+{
+ int index = _determine_subfile_rank(rank);
+ if (index < 0) return 0;
+ return 1; /* true */
+}
+
+
+
+static void init_io_vars(int64_t stripe_size, int64_t blocksize_per_stripe,
+ int64_t file_offset, int64_t data_extent,
+ int64_t *first_io, int64_t *first_io_offset, int64_t *last_io,
+ int *starting_ioc, int *final_ioc, int *starting_row, int *final_row)
+{
+ int64_t total_stripe_width = stripe_size * n_io_concentrators;
+ int64_t starting_offset = file_offset % stripe_size;
+ int64_t final_offset = (file_offset + data_extent -1);
+ int64_t last_io_check = (starting_offset + data_extent) % stripe_size;
+ *starting_row = (int)(file_offset / total_stripe_width);
+ *final_row = (int)(final_offset / total_stripe_width);
+
+ /* Maybe update how many bytes in the entire IOC collection */
+ if (blocksize_per_stripe == 0)
+ sf_blocksize_per_stripe = total_stripe_width;
+
+ *starting_ioc = (int)((file_offset / stripe_size) % n_io_concentrators);
+ *final_ioc = (int)((final_offset / stripe_size) % n_io_concentrators);
+ *first_io_offset = starting_offset;
+ *first_io = ((stripe_size - starting_offset) >= data_extent ? data_extent : (stripe_size - starting_offset));
+ /* Check for just a single IO op */
+ if (*first_io == data_extent) *last_io = 0;
+ else *last_io = (last_io_check > 0 ? last_io_check : stripe_size);
+}
+
+static int init__indep_io(subfiling_context_t *sf_context,
+ int64_t **source_data_offset, int64_t **sf_datasize,
+ int64_t **sf_offset, MPI_Datatype **sf_dtype,
+ int64_t offset, int64_t elements, int dtype_extent)
+{
+ int64_t data_extent = elements * dtype_extent;
+ int64_t first_io=0, last_io=0, first_io_offset=0;
+
+ int64_t *data_offset = *source_data_offset;
+ int64_t *ioc_datasize = *sf_datasize;
+ int64_t *ioc_offset = *sf_offset;
+ MPI_Datatype *ioc_type = *sf_dtype;
+ int k, ioc_start, ioc_last, ioc_depth, starting_row, final_row;
+ sf_stripe_size = sf_context->sf_stripe_size;
+ sf_blocksize_per_stripe = sf_context->sf_blocksize_per_stripe;
+
+ init_io_vars(sf_stripe_size, sf_blocksize_per_stripe, offset, data_extent,
+ &first_io, &first_io_offset, &last_io,
+ &ioc_start, &ioc_last, &starting_row, &final_row);
+
+ if (sf_verbose_flag) {
+ printf("[%d] offset=%ld,data_extent=%ld,sf_stripe_size=%ld,n_io_concentrators=%d,"
+ "first_io=%ld,first_io_offset=%ld,last_io=%ld,ioc_start=%d,ioc_last=%d\n",
+ sf_world_rank, offset,data_extent,sf_stripe_size,n_io_concentrators,
+ first_io,first_io_offset,last_io,ioc_start,ioc_last);
+ fflush(stdout);
+ }
+
+ if (data_offset == NULL) {
+ data_offset = (int64_t *)calloc((size_t)n_io_concentrators, sizeof(int64_t));
+ assert(data_offset != NULL);
+ *source_data_offset = data_offset;
+ }
+
+ if (ioc_datasize == NULL) {
+ ioc_datasize = (int64_t *)calloc((size_t)n_io_concentrators, sizeof(int64_t));
+ assert(ioc_datasize != NULL);
+ *sf_datasize = ioc_datasize;
+ }
+
+ if (ioc_offset == NULL) {
+ ioc_offset = (int64_t *)calloc((size_t)n_io_concentrators, sizeof(int64_t));
+ assert(ioc_offset != NULL);
+ *sf_offset = ioc_offset;
+ }
+
+ if (ioc_type == NULL) {
+ ioc_type = (MPI_Datatype *)calloc((size_t)n_io_concentrators, sizeof(MPI_Datatype));
+ assert(ioc_type != NULL);
+ *sf_dtype = ioc_type;
+ }
+
+ for(k=0; k < n_io_concentrators; k++) {
+ ioc_datasize[k] = 0;
+ ioc_offset[k] = 0;
+ /* Free previously used datatypes */
+ if (ioc_type[k] &&
+ (ioc_type[k] != MPI_DATATYPE_NULL) &&
+ (ioc_type[k] != MPI_BYTE))
+ MPI_Type_free(&ioc_type[k]);
+ else ioc_type[k] = MPI_DATATYPE_NULL;
+ }
+
+ if (data_extent) {
+ int next_index = ioc_start;
+ int64_t target_bytes;
+ int64_t total_bytes_remaining = data_extent;
+ int64_t row_base = starting_row * sf_stripe_size;
+ int64_t subfile_offset = row_base + first_io_offset;
+ int64_t source_offset = 0;
+ int64_t remaining_bytes_in_row = ((n_io_concentrators - ioc_start) * sf_stripe_size) - first_io_offset;
+
+ ioc_depth = (final_row - starting_row) +1;
+ if ((ioc_start > ioc_last) && (data_extent > remaining_bytes_in_row)) ioc_depth--;
+
+ while(total_bytes_remaining > 0) {
+ target_bytes = 0;
+ if (next_index == ioc_start) {
+ target_bytes = first_io;
+ }
+ if (next_index == ioc_last) {
+ target_bytes += last_io;
+ ioc_depth--;
+ }
+ if (ioc_depth) {
+ if (next_index == ioc_start)
+ target_bytes += (sf_stripe_size * (ioc_depth -1));
+ else target_bytes += (sf_stripe_size * ioc_depth);
+ }
+
+ data_offset[next_index] = source_offset;
+ ioc_datasize[next_index] += target_bytes;
+ ioc_offset[next_index] += subfile_offset;
+ total_bytes_remaining -= target_bytes;
+ /*
+ * With the exception of the very 1st IO, all additional
+ * IO operations start on a slice_boundary (and this is
+ * consistent across the collection of IOCs).
+ */
+
+ subfile_offset = row_base;
+
+ /*
+ * Possibly Create an MPI datatype for each MPI_Send operation.
+ * If the length allows writing into a single stripe on
+ * a single IOC, then we can use the MPI_BYTE datatype.
+ */
+
+
+ if (next_index == ioc_start) { /* First target */
+ if (next_index == ioc_last) {
+ ioc_type[next_index] =
+ H5FD__create_f_l_mpi_type(sf_context, target_bytes,
+ first_io, last_io, ioc_depth+1);
+ } else {
+ ioc_type[next_index] =
+ H5FD__create_first_mpi_type(sf_context, ioc_offset[next_index],
+ target_bytes, first_io, ioc_depth);
+ }
+ source_offset += first_io;
+ }
+ else {
+ if (next_index == ioc_last) {
+ ioc_type[next_index] =
+ H5FD__create_final_mpi_type(sf_context,
+ target_bytes, last_io, ioc_depth+1);
+ } else {
+ ioc_type[next_index] =
+ H5FD__create_mpi_uniform_type(sf_context,ioc_offset[next_index],
+ target_bytes, ioc_depth);
+ }
+ source_offset += sf_stripe_size;
+ }
+
+ if (++next_index == n_io_concentrators) {
+ next_index = 0;
+ row_base += sf_stripe_size;
+ subfile_offset = row_base;
+ }
+ }
+ }
+ return 0;
+}
+
+
+static int compare_hostid(const void *h1, const void *h2)
+{
+ const layout_t *host1 = (const layout_t *)h1;
+ const layout_t *host2 = (const layout_t *)h2;
+ return (host1->hostid > host2->hostid);
+}
+
+
+static void gather_topology_info(sf_topology_t *info)
+{
+ sf_world_size = info->world_size;
+ sf_world_rank = info->world_rank;
+
+ if (info->topology)
+ return;
+
+ if (sf_world_size > 1) {
+ long hostid = gethostid();
+ layout_t my_hostinfo;
+ layout_t *topology = (layout_t *)calloc((size_t)sf_world_size+1, sizeof(layout_t));
+ if (topology == NULL) {
+ perror("calloc failure!");
+ MPI_Abort(MPI_COMM_WORLD, 1);
+ }
+ info->hostid = hostid;
+ info->topology = topology;
+ my_hostinfo.rank = sf_world_rank;
+ my_hostinfo.hostid = hostid;
+ info->topology[sf_world_rank] = my_hostinfo;
+ if (MPI_Allgather(&my_hostinfo, 2, MPI_LONG,
+ info->topology, 2, MPI_LONG,
+ MPI_COMM_WORLD) == MPI_SUCCESS) {
+ qsort(info->topology, (size_t)sf_world_size, sizeof(layout_t), compare_hostid);
+ }
+ }
+}
+
+static int count_nodes(sf_topology_t *info)
+{
+ int k, node_count, hostid_index = -1;
+ long nextid;
+
+ assert(info != NULL);
+ if (info->topology == NULL)
+ gather_topology_info (info);
+
+ nextid = info->topology[0].hostid;
+ info->node_ranks = (int *)calloc((size_t)(info->world_size+1), sizeof(int));
+ if (nextid == info->hostid)
+ hostid_index = 0;
+
+ node_count = 1;
+ /* Recall that the topology array has been sorted! */
+ for (k=1; k < info->world_size; k++) {
+ if (info->topology[k].hostid != nextid) {
+ nextid = info->topology[k].hostid;
+ if (hostid_index < 0) {
+ if (nextid == info->hostid) hostid_index = k;
+ }
+ /* Record the index of new hostid */
+ info->node_ranks[node_count++] = k;
+ }
+ }
+
+ /* Mark the end of the node_ranks */
+ info->node_ranks[node_count] = info->world_size;
+ /* Save the index where we first located my hostid */
+ info->node_index = hostid_index;
+ return info->node_count = node_count;
+}
+
+int
+H5FD__determine_ioc_count(int world_size, int world_rank, sf_topology_t **thisapp)
+{
+ static int ioc_count = 0;
+ if (!ioc_count) {
+ int k, node;
+ int node_index;
+ int iocs_per_node = 1;
+ char *envValue = NULL;
+ sf_topology_t *app_topology = (sf_topology_t *)calloc(1, sizeof(sf_topology_t));
+ assert(app_topology != NULL);
+ app_topology->world_size = world_size;
+ app_topology->world_rank = world_rank;
+
+ io_concentrator = (int *)calloc((size_t)world_size, sizeof(int));
+ assert(io_concentrator != NULL);
+ ioc_count = count_nodes (app_topology);
+ /* FIXME: This should ONLY be used for testing!
+ * For production, we should probably limit the
+ * number to a single IOC per node...
+ * (based on performance numbers)
+ */
+ if ((envValue = getenv("IOC_COUNT_PER_NODE")) != NULL) {
+ int value_check = atoi(envValue);
+ if (value_check > 0) {
+ iocs_per_node = value_check;
+ }
+ }
+
+ /* 'node_ranks' contain the index of the first instance of a hostid
+ * in the sorted sf_topology array. Our own index is 'node_index'.
+ */
+ node_index = app_topology->node_index;
+ app_topology->local_peers = app_topology->node_ranks[node_index+1] -
+ app_topology->node_ranks[node_index];
+ if (app_topology->topology[node_index].rank == world_rank) {
+ app_topology->rank_is_ioc = true;
+ app_topology->subfile_rank = node_index;
+ }
+ /* FIXME: This should ONLY be used for testing!
+ * NOTE: The app_topology->local_peers is ONLY valid
+ * for the current NODE. There is no guarantee that
+ * the application layout defines a uniform number of
+ * MPI ranks per node...
+ * Because this is only for testing purposes (at this time)
+ * we can live with the assumption that if we define the
+ * IOC_COUNT_PER_NODE environment variable, then each
+ * node will have *at-least* that many MPI ranks assigned.
+ * See above!
+ */
+ else if ((app_topology->local_peers > 1) && (iocs_per_node > 1)) {
+ if (iocs_per_node > app_topology->local_peers)
+ iocs_per_node = app_topology->local_peers;
+ for(k=1; k< iocs_per_node; k++) {
+ if (app_topology->topology[node_index + k].rank == world_rank) {
+ app_topology->rank_is_ioc = true;
+ app_topology->subfile_rank = node_index + k;
+ break;
+ }
+ }
+ }
+ /* More hacks for testing */
+ if (io_concentrator) {
+ int n_iocs = 0;
+ for(node = 0; node < ioc_count; node++) {
+ for (k=0; k < iocs_per_node; k++) {
+ node_index = app_topology->node_ranks[node];
+ io_concentrator[n_iocs++] = (int)(
+ app_topology->topology[node_index + k].rank);
+ }
+ }
+ ioc_count = n_io_concentrators = n_iocs;
+ }
+
+ if (ioc_count > 0) {
+ *thisapp = app_topology;
+ }
+ }
+ return ioc_count;
+}
+
+int
+H5FD__init_subfile_context(subfiling_context_t **newContext, int n_iocs, int world_size, int world_rank, bool rank_is_ioc)
+{
+ int status;
+ subfiling_context_t *next = (subfiling_context_t *)
+ malloc(sizeof(subfiling_context_t));
+ if (io_concentrator == NULL) {
+ goto err_exit;
+ }
+ if (next == NULL) {
+ goto err_exit;
+ }
+ else {
+ int k;
+ char *envValue = NULL;
+ int ioc_leader = io_concentrator[0];
+ int app_leader = 0;
+ *newContext = next;
+ next->sf_stripe_size = DEFAULT_STRIPE_SIZE;
+ if ((envValue = getenv("IOC_STRIPE_SIZE")) != NULL) {
+ long value_check = atol(envValue);
+ if (value_check > 0) {
+ next->sf_stripe_size = (int64_t)value_check;
+ }
+ }
+ if ((envValue = getenv("SUBFILE_PREFIX")) != NULL) {
+ char temp[PATH_MAX];
+ sprintf(temp,"%s", envValue);
+ next->subfile_prefix = strdup(temp);
+ sf_subfile_prefix = strdup(temp);
+ }
+
+ next->sf_blocksize_per_stripe = next->sf_stripe_size * n_iocs;
+ status = MPI_Comm_dup(MPI_COMM_WORLD, &next->sf_msg_comm);
+ if (status != MPI_SUCCESS) goto err_exit;
+ status = MPI_Comm_set_errhandler(next->sf_msg_comm, MPI_ERRORS_RETURN);
+ if (status != MPI_SUCCESS) goto err_exit;
+ status = MPI_Comm_dup(MPI_COMM_WORLD, &next->sf_data_comm);
+ if (status != MPI_SUCCESS) goto err_exit;
+ status = MPI_Comm_set_errhandler(next->sf_data_comm, MPI_ERRORS_RETURN);
+ if (status != MPI_SUCCESS) goto err_exit;
+
+ k = 0;
+ while(is_io_concentrator(k))
+ k++;
+ app_leader = k;
+
+ if (sf_verbose_flag && (world_rank == 0)) {
+ printf("app_leader = %d and ioc_leader = %d\n", app_leader, ioc_leader);
+ }
+
+ if (n_iocs > 1) {
+ status = MPI_Comm_split(MPI_COMM_WORLD, rank_is_ioc, world_rank, &next->sf_group_comm);
+ if (status != MPI_SUCCESS) goto err_exit;
+ status = MPI_Comm_size(next->sf_group_comm, &next->sf_group_size);
+ if (status != MPI_SUCCESS) goto err_exit;
+ status = MPI_Comm_rank(next->sf_group_comm, &next->sf_group_rank);
+ if (status != MPI_SUCCESS) goto err_exit;
+ /*
+ * There may be additional functionality we need for the IOCs...
+ * If so, then can probably initialize those things here!
+ */
+ }
+
+ if (rank_is_ioc) {
+ status = initialize_ioc_threads(next);
+ if (status) goto err_exit;
+ }
+ }
+ return 0;
+
+err_exit:
+ return -1;
+}
+
+
+/*
+---------------------------------------------------------------------------------
+ The data that we're sending to receiving from an IO concentrator (IOC) contains
+ the initial collection of bytes. The length of this initial segment is 'first_write'.
+ Note that the terminology isn't significant. We are describing an IO operation in
+ terms of an MPI datatype which will either gather data from a source buffer
+ to send to an IOC or will be used to unpack data from an IOC into a user buffer.
+ Subsequent IO operations which are related to the current File IO will begin on
+ sf_stripe_size boundaries.
+---------------------------------------------------------------------------------
+*/
+
+static MPI_Datatype H5FD__create_first_mpi_type(
+ subfiling_context_t *context, int64_t offset,
+ int64_t target_write_bytes, int64_t first_write, int ioc_depth)
+{
+ MPI_Datatype newType = MPI_DATATYPE_NULL;
+ int64_t stripe_size = context->sf_stripe_size;
+ int64_t offset_in_stripe = offset % sf_stripe_size;
+ int64_t depth_in_bytes = sf_stripe_size * ioc_depth;
+ int64_t next_offset = context->sf_blocksize_per_stripe - offset_in_stripe;
+ int64_t total_bytes = first_write;
+
+ assert(ioc_depth > 0);
+ if (stripe_size >= depth_in_bytes)
+ return MPI_BYTE;
+
+ if (depth_in_bytes) {
+ int k;
+ int temp_blocks[64];
+ int temp_disps[64];
+ int *blocks = temp_blocks;
+ int *disps = temp_disps;
+ if (ioc_depth > 64) {
+ blocks = (int *)calloc((size_t)ioc_depth, sizeof(int));
+ disps = (int *)calloc((size_t)ioc_depth, sizeof(int));
+ }
+ blocks[0] = (int)first_write;
+ disps[0] = (int) 0;
+ for(k=1; k < ioc_depth; k++) {
+ disps[k] = (int)next_offset;
+ blocks[k] = (int)stripe_size;
+ total_bytes += stripe_size;
+ next_offset += context->sf_blocksize_per_stripe;
+ }
+ if (total_bytes != target_write_bytes) {
+ printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n",
+ __func__, total_bytes, target_write_bytes);
+ }
+
+ if (MPI_Type_indexed(ioc_depth, blocks, disps, MPI_BYTE, &newType) != MPI_SUCCESS) {
+ perror("MPI_Type_indexed failed!");
+ return MPI_DATATYPE_NULL;
+ }
+ MPI_Type_commit(&newType);
+ if (ioc_depth > 64) {
+ if (blocks != temp_blocks) {
+ free(blocks);
+ blocks = NULL;
+ }
+ if (disps != temp_disps) {
+ free(disps);
+ disps = NULL;
+ }
+ }
+ }
+ return newType;
+}
+
+/*
+---------------------------------------------------------------------------------
+ The data that we're sending to an IO concentrator (IOC) contains the final
+ collection of bytes. Other than that detail, this is pretty much like the
+ typical' case... All chunks sizes are the identical (execpt for the very
+ last chunk) and all will start at relative stripe offset of 0. More precisely,
+ the start offset is a multiple of the subfiling "stripe_size".
+ We can utilize MPI_Type_indexed to represent the new type.
+---------------------------------------------------------------------------------
+*/
+static MPI_Datatype H5FD__create_final_mpi_type(subfiling_context_t *context, int64_t target_write_bytes, int64_t last_write, int ioc_depth)
+{
+ MPI_Datatype newType = MPI_DATATYPE_NULL;
+ int64_t stripe_size = context->sf_stripe_size;
+ int64_t depth_in_bytes = (stripe_size * ioc_depth) + last_write;
+ int64_t total_bytes = last_write;
+
+ assert(ioc_depth > 0);
+
+ if (depth_in_bytes <= stripe_size)
+ return MPI_BYTE;
+
+ if (depth_in_bytes) {
+ int k;
+ int temp_blocks[64];
+ int temp_disps[64];
+ int *blocks = temp_blocks;
+ int *disps = temp_disps;
+ if (ioc_depth > 64) {
+ blocks = (int *)calloc((size_t)ioc_depth, sizeof(int));
+ disps = (int *)calloc((size_t)ioc_depth, sizeof(int));
+ }
+
+ for(k=0; k < ioc_depth; k++) {
+ disps[k] = (int)(k * context->sf_blocksize_per_stripe);
+ blocks[k] = (int)stripe_size;
+ total_bytes += stripe_size;
+ }
+ blocks[k-1] = (int)last_write;
+ if (total_bytes != target_write_bytes) {
+ printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n",
+ __func__, total_bytes, target_write_bytes);
+ }
+
+ if (MPI_Type_indexed(ioc_depth, blocks, disps, MPI_BYTE, &newType) != MPI_SUCCESS) {
+ return MPI_DATATYPE_NULL;
+ }
+ MPI_Type_commit(&newType);
+ if (ioc_depth > 64) {
+ if (blocks != temp_blocks) {
+ free(blocks);
+ blocks = NULL;
+ }
+ if (disps != temp_disps) {
+ free(disps);
+ disps = NULL;
+ }
+ }
+ }
+ return newType;
+}
+
+/*
+---------------------------------------------------------------------------------
+ Special case where the current IOC has both the first and final write chunks.
+ This implmentation is a merge of the first_mpi_type and final_mpi_type
+ functions.
+---------------------------------------------------------------------------------
+*/
+static MPI_Datatype H5FD__create_f_l_mpi_type(subfiling_context_t *context,
+ int64_t target_write_bytes,
+ int64_t first_write,
+ int64_t last_write, int ioc_depth)
+{
+ MPI_Datatype newType = MPI_DATATYPE_NULL;
+ int64_t stripe_size = context->sf_stripe_size;
+ int64_t depth_in_bytes = stripe_size * ioc_depth;
+ int64_t offset_in_stripe = stripe_size - first_write;
+ int64_t next_offset = context->sf_blocksize_per_stripe - offset_in_stripe;
+ int64_t total_bytes = first_write + last_write;
+
+ assert(ioc_depth > 0);
+ if (last_write == 0) {
+ newType = MPI_BYTE;
+ }
+ else if (depth_in_bytes) {
+ int k;
+ int temp_blocks[64];
+ int temp_disps[64];
+ int *blocks = temp_blocks;
+ int *disps = temp_disps;
+ if (ioc_depth > 64) {
+ blocks = (int *)calloc((size_t)ioc_depth, sizeof(int));
+ disps = (int *)calloc((size_t)ioc_depth, sizeof(int));
+ }
+ blocks[0] = (int)first_write;
+ disps[0] = 0;
+ for(k=1; k < ioc_depth; k++) {
+ total_bytes += stripe_size;
+ blocks[k] = (int)stripe_size;
+ disps[k] = (int)next_offset;
+ next_offset += context->sf_blocksize_per_stripe;
+ }
+ blocks[k-1] = (int)last_write;
+
+ if (total_bytes != target_write_bytes) {
+ printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n",
+ __func__, total_bytes, target_write_bytes);
+ }
+
+ if (MPI_Type_indexed(ioc_depth, blocks, disps, MPI_BYTE, &newType) != MPI_SUCCESS) {
+ perror("MPI_Type_indexed failed!");
+ return MPI_DATATYPE_NULL;
+ }
+
+ MPI_Type_commit(&newType);
+ if (ioc_depth > 64) {
+ if (blocks != temp_blocks) {
+ free(blocks);
+ blocks = NULL;
+ }
+ if (disps != temp_disps) {
+ free(disps);
+ disps = NULL;
+ }
+ }
+ }
+ return newType;
+}
+
+/*
+---------------------------------------------------------------------------------
+ This is the 'typical' case in which the IOC has neither the first chunck nor
+ the last. All chunks sizes are the identical and start at offset = 0.
+ We utilize MPI_Type_indexed to represent the new type.
+---------------------------------------------------------------------------------
+*/
+MPI_Datatype H5FD__create_mpi_uniform_type(subfiling_context_t *context,
+ int64_t offset,
+ int64_t target_write_bytes, int ioc_depth)
+{
+ /* Maintain some state between function calls allow reuse of the new datatypes... */
+ static MPI_Datatype uniformType = MPI_DATATYPE_NULL;
+ static int64_t depth_in_bytes = 0;
+
+ MPI_Datatype newType = MPI_DATATYPE_NULL;
+ int64_t stripe_size = context->sf_stripe_size;
+ int64_t offset_in_stripe = offset % stripe_size;
+ int64_t check_depth = stripe_size * ioc_depth;
+ int64_t total_bytes = 0;
+
+ assert(offset_in_stripe == 0);
+ assert(ioc_depth > 0);
+
+ if (check_depth == stripe_size)
+ return MPI_BYTE;
+
+ if (depth_in_bytes) {
+ if (depth_in_bytes != check_depth) {
+ MPI_Type_free(&uniformType);
+ depth_in_bytes = 0;
+ }
+ }
+ if (!depth_in_bytes) {
+ int k;
+ int temp_blocks[64];
+ int temp_disps[64];
+ int *blocks = temp_blocks;
+ int *disps = temp_disps;
+ if (ioc_depth > 64) {
+ blocks = (int *)calloc((size_t)ioc_depth, sizeof(int));
+ disps = (int *)calloc((size_t)ioc_depth, sizeof(int));
+ }
+ for(k=0; k < ioc_depth; k++) {
+ disps[k] = (int)(k * context->sf_blocksize_per_stripe);
+ blocks[k] = (int)(stripe_size);
+ total_bytes += stripe_size;
+ }
+
+ if (total_bytes != target_write_bytes) {
+ printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n",
+ __func__, total_bytes, target_write_bytes);
+ }
+
+ if (MPI_Type_indexed(ioc_depth, blocks, disps, MPI_BYTE, &uniformType) != MPI_SUCCESS) {
+ perror("MPI_Type_indexed failed!");
+ return MPI_DATATYPE_NULL;
+ }
+ MPI_Type_commit(&uniformType);
+ if (ioc_depth > 64) {
+ if (blocks != temp_blocks) {
+ free(blocks);
+ blocks = NULL;
+ }
+ if (disps != temp_disps) {
+ free(disps);
+ disps = NULL;
+ }
+ }
+ depth_in_bytes = check_depth;
+ }
+ MPI_Type_dup(uniformType, &newType);
+ return newType;
+}
+
+
+int sf_read_independent(hid_t context_id, int64_t offset, int64_t elements, int dtype_extent, void *data)
+{
+ static int *acks = NULL;
+ static int *indices = NULL;
+ static MPI_Request *ackreqs = NULL;
+ static MPI_Request *reqs = NULL;
+ static MPI_Status *stats = NULL;
+ static int64_t *source_data_offset = NULL;
+ static int64_t *ioc_read_datasize = NULL;
+ static int64_t *ioc_read_offset = NULL;
+ static MPI_Datatype *ioc_read_type = NULL;
+
+ subfiling_context_t *sf_context = get_subfiling_object(context_id);
+ int i, ioc, n_waiting = 0, status = 0;
+
+ assert(sf_context != NULL);
+
+ if (acks == NULL) {
+ if ((acks = (int *)calloc((size_t)n_io_concentrators*2, sizeof(int))) == NULL) {
+ perror("calloc");
+ return -1;
+ }
+ else indices = &acks[n_io_concentrators];
+ }
+ if (reqs == NULL) {
+ if ((reqs = (MPI_Request *)calloc((size_t)n_io_concentrators, sizeof(MPI_Request))) == NULL) {
+ perror("calloc");
+ return -1;
+ }
+ }
+ if (ackreqs == NULL) {
+ if ((ackreqs = (MPI_Request *)calloc((size_t)n_io_concentrators, sizeof(MPI_Request))) == NULL) {
+ perror("calloc");
+ return -1;
+ }
+ }
+ if (stats == NULL) {
+ if ((stats = (MPI_Status *)calloc((size_t)n_io_concentrators, sizeof(MPI_Status))) == NULL) {
+ perror("calloc");
+ return -1;
+ }
+ }
+
+ if (init__indep_io(sf_context, &source_data_offset, &ioc_read_datasize, &ioc_read_offset,
+ &ioc_read_type, offset, elements, dtype_extent) < 0) {
+ return -1;
+ }
+
+ if (sf_verbose_flag) {
+ for(ioc=0; ioc < n_io_concentrators; ioc++) {
+ int64_t sourceOffset = source_data_offset[ioc];
+ printf("[%d %s]: read_source[ioc(%d), sourceOffset=%ld, datasize=%ld, foffset=%ld]\n",
+ sf_world_rank, __func__, ioc, sourceOffset, ioc_read_datasize[ioc], ioc_read_offset[ioc] );
+ }
+ }
+
+ /* Prepare the IOCs with a message which indicates the length
+ * and file offset for the actual data to be provided.
+ */
+ for(ioc=0; ioc < n_io_concentrators; ioc++) {
+ int64_t msg[2] = {ioc_read_datasize[ioc], ioc_read_offset[ioc]};
+ char *sourceData = (char *)data;
+ int64_t sourceOffset = source_data_offset[ioc];
+
+ /* We may not require data from this IOC...
+ * or we may read the data directly from the file!
+ * Check the size to verify!
+ */
+ reqs[ioc] = MPI_REQUEST_NULL;
+ if (ioc_read_datasize[ioc] == 0) {
+ continue;
+ }
+
+ if (sf_verbose_flag ) {
+ printf("[%d %s] Requesting %ld read bytes from IOC(%d): sourceOffset=%ld\n",
+ sf_world_rank, __func__, msg[0], io_concentrator[ioc], sourceOffset );
+ }
+
+ status = MPI_Ssend(msg, 2, MPI_INT64_T, io_concentrator[ioc], READ_INDEP, sf_context->sf_msg_comm);
+ if (status != MPI_SUCCESS) {
+ printf("[%d] MPI_Send failure!", sf_world_rank);
+ return status;
+ }
+ else {
+ if (ioc_read_type[ioc] == MPI_BYTE) {
+ int bytes = (int) ioc_read_datasize[ioc];
+ status = MPI_Irecv(&sourceData[sourceOffset], bytes, ioc_read_type[ioc], io_concentrator[ioc],
+ READ_INDEP_DATA, sf_context->sf_data_comm, &reqs[ioc]);
+ } else {
+ status = MPI_Irecv(&sourceData[sourceOffset], 1, ioc_read_type[ioc], io_concentrator[ioc],
+ READ_INDEP_DATA, sf_context->sf_data_comm, &reqs[ioc]);
+ }
+ if (status != MPI_SUCCESS) {
+ int length = 256;
+ char error_string[length];
+ MPI_Error_string(status, error_string, &length);
+ printf("(%s) MPI_Irecv error: %s\n", __func__, error_string);
+ return status;
+ }
+ n_waiting++;
+ }
+
+ }
+ /* We've queued all of the Async READs, now we just need to
+ * complete them in any order...
+ */
+ while(n_waiting) {
+ int ready = 0;
+ status = MPI_Waitsome(n_io_concentrators, reqs, &ready, indices, stats);
+ if (status != MPI_SUCCESS) {
+ int len;
+ char estring[MPI_MAX_ERROR_STRING];
+ MPI_Error_string(status, estring, &len);
+ printf("[%d %s] MPI_ERROR! MPI_Waitsome returned an error(%s)\n",
+ sf_world_rank, __func__, estring );
+ fflush(stdout);
+ }
+
+ for(i=0; i < ready; i++) {
+ ioc = io_concentrator[indices[i]];
+ if (sf_verbose_flag) {
+ printf("[%d] READ bytes(%ld) of data from ioc_concentrator %d complete\n",
+ sf_world_rank, ioc_read_datasize[indices[i]] , ioc);
+ fflush(stdout);
+ }
+ n_waiting--;
+ }
+ }
+ return status;
+}
+
+
+
+int sf_write_independent(hid_t context_id, int64_t offset, int64_t elements, int dtype_extent, void *data)
+{
+ static int *acks = NULL;
+ static int *indices = NULL;
+ static MPI_Request *reqs = NULL;
+ static MPI_Status *stats = NULL;
+ static int64_t *source_data_offset = NULL;
+ static int64_t *ioc_write_datasize = NULL;
+ static int64_t *ioc_write_offset = NULL;
+ static MPI_Datatype *ioc_write_type = NULL;
+
+ subfiling_context_t *sf_context = get_subfiling_object(context_id);
+ int i, target, ioc, n_waiting = 0, status = 0;
+ int errors = 0;
+ if (acks == NULL) {
+ if ((acks = (int *)calloc((size_t)n_io_concentrators*2, sizeof(int))) == NULL) {
+ perror("calloc");
+ return -1;
+ }
+ else indices = &acks[n_io_concentrators];
+ }
+ if (reqs == NULL) {
+ if ((reqs = (MPI_Request *)calloc((size_t)n_io_concentrators, sizeof(MPI_Request))) == NULL) {
+ perror("calloc");
+ return -1;
+ }
+ }
+ if (stats == NULL) {
+ if ((stats = (MPI_Status *)calloc((size_t)n_io_concentrators, sizeof(MPI_Status))) == NULL) {
+ perror("calloc");
+ return -1;
+ }
+ }
+
+ if (init__indep_io(sf_context, &source_data_offset, &ioc_write_datasize, &ioc_write_offset,
+ &ioc_write_type, offset, elements, dtype_extent) < 0) {
+ return -1;
+ }
+
+ if (sf_verbose_flag) {
+ for(ioc=0; ioc < n_io_concentrators; ioc++) {
+ int64_t sourceOffset = source_data_offset[ioc];
+ printf("[%d %s]: write_dest[ioc(%d), sourceOffset=%ld, datasize=%ld, foffset=%ld]\n",
+ sf_world_rank, __func__, ioc, sourceOffset,
+ ioc_write_datasize[ioc], ioc_write_offset[ioc] );
+ }
+ }
+
+ /* Prepare the IOCs with a message which indicates the length
+ * of the actual data to be written. We also provide the file
+ * offset so that when the IOC recieves the data (in whatever order)
+ * they can lseek to the correct offset and write the data.
+ */
+ for(target=0; target < n_io_concentrators; target++) {
+ int64_t sourceOffset;
+ int64_t msg[2] = {0,};
+ char *sourceData = (char *)data;
+ ioc = (sf_world_rank + target) % n_io_concentrators;
+
+ sourceOffset = source_data_offset[ioc];
+ msg[0] = ioc_write_datasize[ioc];
+ msg[1] = ioc_write_offset[ioc];
+ acks[ioc] = 0;
+ reqs[ioc] = MPI_REQUEST_NULL;
+
+ if (ioc_write_datasize[ioc] == 0) {
+ if (sf_verbose_flag) {
+ printf("[%d %s] skipping ioc(%d) send datasize = %ld\n",
+ sf_world_rank,__func__, ioc, ioc_write_datasize[ioc]);
+ fflush(stdout);
+ }
+ continue;
+ }
+ if ( sf_verbose_flag ) {
+ printf("[%d] Datatype(%x) Sending to ioc(%d) %ld bytes of data with file_offset=%ld\n",
+ sf_world_rank, ioc_write_type[ioc], ioc, ioc_write_datasize[ioc], ioc_write_offset[ioc]);
+ fflush(stdout);
+ }
+
+ /* Send the Message HEADER which indicates the requested IO operation
+ * (via the message TAG) along with the data size and file offset.
+ */
+
+ status = MPI_Ssend(msg, 2, MPI_INT64_T, io_concentrator[ioc],
+ WRITE_INDEP, sf_context->sf_msg_comm);
+
+ if (status != MPI_SUCCESS) {
+ int len;
+ char estring[MPI_MAX_ERROR_STRING];
+ MPI_Error_string(status, estring, &len);
+ printf("[%d] ERROR! MPI_Send of %ld bytes to %d returned an error(%s)\n",
+ sf_world_rank, sizeof(msg), io_concentrator[ioc], estring );
+ fflush(stdout);
+ }
+
+ status = MPI_Recv(&acks[ioc], 1, MPI_INT, io_concentrator[ioc], WRITE_INDEP_ACK,
+ sf_context->sf_data_comm, &stats[ioc]);
+
+ if (status == MPI_SUCCESS) {
+ if (sf_verbose_flag) {
+ printf("[%d] received ack(%d) from ioc(%d)\n",sf_world_rank, acks[ioc], ioc);
+ fflush(stdout);
+ }
+ if (acks[ioc] > 0) {
+ if (ioc_write_type[ioc] == MPI_BYTE) {
+ int datasize = (int)(ioc_write_datasize[ioc] & INT32_MASK);
+ status = MPI_Issend(&sourceData[sourceOffset], datasize,
+ MPI_BYTE, io_concentrator[ioc], WRITE_INDEP_DATA,
+ sf_context->sf_data_comm,&reqs[ioc]);
+ }
+ else {
+ status = MPI_Issend(&sourceData[sourceOffset], 1, ioc_write_type[ioc],
+ io_concentrator[ioc], WRITE_INDEP_DATA,
+ sf_context->sf_data_comm,&reqs[ioc]);
+ }
+ /* Queued another Isend which need to be completed (below) */
+ n_waiting++;
+ }
+ } else {
+ errors++;
+ puts("ACK error!");
+ fflush(stdout);
+ }
+ if (status != MPI_SUCCESS) {
+ errors++;
+ printf("[%d] ERROR! Unable to Send data to ioc(%d)\n",
+ sf_world_rank, ioc);
+ fflush(stdout);
+ }
+ }
+
+ while(n_waiting) {
+ int ready = 0;
+ status = MPI_Waitsome(n_io_concentrators, reqs, &ready, indices, stats);
+ if (status != MPI_SUCCESS) {
+ int len;
+ char estring[MPI_MAX_ERROR_STRING];
+ MPI_Error_string(status, estring, &len);
+ printf("[%d %s] MPI_ERROR! MPI_Waitsome returned an error(%s)\n",
+ sf_world_rank, __func__, estring );
+ fflush(stdout);
+ errors++;
+ }
+
+ for(i=0; i < ready; i++) {
+ /* One of the Issend calls has completed
+ * Wait for another ACK to indicate the data as been written
+ * to the subfile.
+ */
+ acks[indices[i]] = 0;
+ n_waiting--;
+ }
+ }
+ if (errors) return -1;
+ return status;
+}
+
+int sf_close_subfiles(hid_t context_id)
+{
+ int i, status;
+ int errors = 0;
+ int n_waiting = 0;
+ int indices[n_io_concentrators];
+ int ioc_acks[n_io_concentrators];
+ MPI_Request reqs[n_io_concentrators];
+ subfiling_context_t *sf_context = get_subfiling_object(context_id);
+
+ for (i=0; i < n_io_concentrators; i++) {
+ int64_t msg[2] = {0, 0};
+ status = MPI_Ssend(msg, 2, MPI_INT64_T, io_concentrator[i], CLOSE_OP, sf_context->sf_msg_comm);
+ if (status == MPI_SUCCESS) {
+ status = MPI_Irecv(&ioc_acks[i], 1, MPI_INT, io_concentrator[i], COMPLETED, sf_context->sf_data_comm, &reqs[i]);
+ }
+ if (status != MPI_SUCCESS) {
+ printf("[%d] MPI close_subfiles failure!", sf_world_rank);
+ errors++;
+ }
+ else n_waiting++;
+ }
+ while(n_waiting) {
+ int ready = 0;
+ status = MPI_Waitsome(n_io_concentrators, reqs, &ready, indices, MPI_STATUSES_IGNORE);
+ if (status != MPI_SUCCESS) {
+ int len;
+ char estring[MPI_MAX_ERROR_STRING];
+ MPI_Error_string(status, estring, &len);
+ printf("[%d %s] MPI_ERROR! MPI_Waitsome returned an error(%s)\n",
+ sf_world_rank, __func__, estring );
+ fflush(stdout);
+ errors++;
+ }
+ for(i=0; i < ready; i++) {
+ n_waiting--;
+ }
+ }
+ return errors;
+}
+
+int sf_open_subfiles(hid_t context_id, char *prefix, int flags)
+{
+ int i, status;
+ int n_waiting = 0;
+ int indices[n_io_concentrators];
+ int ioc_acks[n_io_concentrators];
+ MPI_Request reqs[n_io_concentrators];
+ subfiling_context_t *sf_context = get_subfiling_object(context_id);
+
+ sf_stripe_size = sf_context->sf_stripe_size;
+
+ if ((sf_context->subfile_prefix != NULL) && (prefix != NULL)) {
+ if (strcmp(sf_context->subfile_prefix, prefix) != 0) {
+ sf_context->subfile_prefix = strdup(prefix);
+ }
+ }
+
+ for (i=0; i < n_io_concentrators; i++) {
+ int64_t msg[2] = {flags, 0};
+ if (sf_verbose_flag) {
+ printf("[%d] file open request (flags = %0lx)\n", sf_world_rank, msg[0]);
+ }
+ status = MPI_Ssend(msg, 2, MPI_INT64_T, io_concentrator[i], OPEN_OP, sf_context->sf_msg_comm);
+ if (status == MPI_SUCCESS) {
+ status = MPI_Irecv(&ioc_acks[i], 1, MPI_INT, io_concentrator[i], COMPLETED, sf_context->sf_data_comm, &reqs[i]);
+ }
+ if (status != MPI_SUCCESS) {
+ printf("[%d] MPI close_subfiles failure!", sf_world_rank);
+ }
+ else n_waiting++;
+ }
+ while(n_waiting) {
+ int ready = 0;
+ status = MPI_Waitsome(n_io_concentrators, reqs, &ready, indices, MPI_STATUSES_IGNORE);
+ if (status != MPI_SUCCESS) {
+ int len;
+ char estring[MPI_MAX_ERROR_STRING];
+ MPI_Error_string(status, estring, &len);
+ printf("[%d %s] MPI_ERROR! MPI_Waitsome returned an error(%s)\n",
+ sf_world_rank, __func__, estring );
+ fflush(stdout);
+ }
+ for(i=0; i < ready; i++) {
+ n_waiting--;
+ }
+ }
+
+ return 0;
+}
+
+int
+ioc_main(subfiling_context_t *context)
+{
+ int subfile_rank;
+ int flag, ret;
+ int max_work_depth;
+ MPI_Status status, msg_status;
+ sf_work_request_t *incoming_requests = NULL;
+ useconds_t delay = 20;
+
+ assert(context != NULL);
+ subfile_rank = context->sf_group_rank;
+ if (request_count_per_rank == NULL) {
+ request_count_per_rank = (int *)calloc((size_t)sf_world_size, sizeof(int));
+ assert(request_count_per_rank != NULL);
+ }
+
+ max_work_depth = sf_world_size * MAX_WORK_PER_RANK;
+ incoming_requests = (sf_work_request_t *)calloc((size_t)max_work_depth, sizeof(sf_work_request_t));
+ assert(incoming_requests != NULL);
+
+#ifdef DEBUG_TRACING
+ char logname[64];
+ sprintf(logname,"ioc_%d.log", subfile_rank);
+ sf_logfile = fopen(logname, "w+");
+#endif
+ /* Initialize atomic vars */
+ atomic_init(&sf_workinprogress, 0);
+ atomic_init(&sf_work_pending, 0);
+ atomic_init(&sf_file_close_count, 0);
+ atomic_init(&sf_file_refcount, 0);
+
+ sf_msg_comm = context->sf_msg_comm; /* Messages IN */
+ sf_data_comm = context->sf_data_comm; /* Messages OUT */
+
+ while(!sf_shutdown_flag || sf_work_pending) {
+ flag = 0;
+ ret = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, context->sf_msg_comm, &flag, &status);
+ if ((ret == MPI_SUCCESS) && (flag != 0)) {
+ sf_work_request_t *msg = NULL;
+ int count;
+ int request_size = (int)sizeof(sf_work_request_t);
+ int source = status.MPI_SOURCE;
+ int tag = status.MPI_TAG;
+
+ MPI_Get_count(&status, MPI_BYTE, &count);
+ if (count > request_size) {
+ msg = (sf_work_request_t *) malloc((size_t)count);
+ ret = MPI_Recv(msg,count,MPI_BYTE, source, tag, context->sf_msg_comm, &msg_status);
+ }
+ else {
+ ret = MPI_Recv(&incoming_requests[sf_workinprogress],count, MPI_BYTE,
+ source, tag, context->sf_msg_comm, &msg_status);
+ }
+ if (ret == MPI_SUCCESS) {
+
+#ifdef DEBUG_TRACING
+ printf("[[ioc(%d) msg from %d tag=%x, datasize=%ld, foffset=%ld]]\n", subfile_rank, source, tag,
+ incoming_requests[sf_workinprogress].header[0],
+ incoming_requests[sf_workinprogress].header[1]);
+ fflush(stdout);
+#endif
+ if (msg) {
+ msg->tag = tag;
+ msg->source = source;
+ msg->subfile_rank = subfile_rank;
+ tpool_add_work(msg);
+ }
+ else {
+ incoming_requests[sf_workinprogress].tag = tag;
+ incoming_requests[sf_workinprogress].source = source;
+ incoming_requests[sf_workinprogress].subfile_rank = subfile_rank;
+ tpool_add_work(&incoming_requests[sf_workinprogress]);
+ atomic_fetch_add(&sf_workinprogress, 1); // atomic
+ atomic_compare_exchange_strong(&sf_workinprogress, &max_work_depth, 0);
+ }
+ }
+ }
+ else usleep(delay);
+ }
+
+#ifdef DEBUG_TRACING
+ fclose(sf_logfile);
+#endif
+
+ return 0;
+}
+
+/*
+=========================================
+Private helper functions
+=========================================
+*/
+
+static int send_ack__(int target, int subfile_rank, int tag, MPI_Comm comm)
+{
+ int ack = 1;
+ int ret = MPI_Send(&ack, 1, MPI_INT, target, tag, comm);
+ if (sf_verbose_flag) {
+ printf("[ioc(%d): Sending ACK to MPI_rank(%d)\n", subfile_rank, target);
+ }
+ return ret;
+}
+
+static int send_nack__(int target, int subfile_rank, int tag, MPI_Comm comm)
+{
+ int nack = 0;
+ int ret = MPI_Send(&nack, 1, MPI_INT, target, tag, comm);
+ if (sf_verbose_flag) {
+ printf("[ioc(%d): Sending NACK to MPI_rank(%d)\n", subfile_rank, target);
+ }
+ return ret;
+}
+
+/*
+=========================================
+queue_xxx functions that should be run
+from the thread pool threads...
+=========================================
+*/
+
+int queue_write_coll(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm)
+{
+ return 0;
+}
+
+int queue_read_coll(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm)
+{
+ return 0;
+}
+
+int queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm)
+{
+ char *recv_buffer = NULL;
+ int ret = MPI_SUCCESS;
+ MPI_Status msg_status;
+ int64_t data_size = msg->header[0];
+ int64_t file_offset = msg->header[1];
+
+ if (sf_verbose_flag) {
+ printf("[ioc(%d) %s]: msg from %d: datasize=%ld\toffset=%ld\n", subfile_rank, __func__, source, data_size, file_offset );
+ fflush(stdout);
+ }
+ if (recv_buffer == NULL) {
+ if ((recv_buffer = (char *)malloc((size_t)data_size)) == NULL) {
+ perror("malloc");
+ send_nack__(source, subfile_rank, WRITE_INDEP_ACK, comm);
+ return -1;
+ }
+ }
+
+ send_ack__(source, subfile_rank, WRITE_INDEP_ACK, comm);
+
+ ret = MPI_Recv(recv_buffer, (int)data_size, MPI_BYTE, source, WRITE_INDEP_DATA, comm, &msg_status );
+ if (ret != MPI_SUCCESS) {
+ int len;
+ char estring[MPI_MAX_ERROR_STRING];
+ MPI_Error_string(ret, estring, &len);
+ printf("[ioc(%d) %s] MPI_ERROR(%d)! MPI_Recv of %ld bytes from %d returned an error(%s)\n",
+ subfile_rank, __func__, msg_status.MPI_ERROR, data_size, source, estring );
+ fflush(stdout);
+ return ret;
+ } else if(sf_verbose_flag) {
+ printf("[ioc(%d) %s] MPI_Recv success. Writing %ld bytes from rank %d to disk\n",
+ subfile_rank, __func__, data_size, source);
+ fflush(stdout);
+ }
+ if (sf_write_data(subfile_fid, file_offset, recv_buffer, data_size, subfile_rank ) < 0) {
+ free(recv_buffer);
+ recv_buffer = NULL;
+ printf("[ioc(%d) %s] sf_write_data returned an error!\n", subfile_rank, __func__);
+ fflush(stdout);
+ return -1;
+ }
+ // send_ack__(source, WRITE_COMPLETED, tinfo->comm);
+ if (recv_buffer) {
+ free(recv_buffer);
+ }
+ return 0;
+}
+
+int queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm)
+{
+ char *send_buffer = NULL;
+ int ret = MPI_SUCCESS;
+ int64_t data_size = msg->header[0];
+ int64_t file_offset = msg->header[1];
+
+
+ if (sf_verbose_flag) {
+ printf("[ioc(%d) %s] msg from %d: datasize=%ld\toffset=%ld\n", subfile_rank, __func__, source, data_size, file_offset );
+ fflush(stdout);
+ }
+ if ((send_buffer = (char *)malloc((size_t)data_size)) == NULL) {
+ perror("malloc");
+ return -1;
+ }
+
+ if (sf_read_data(subfile_fid, file_offset, send_buffer, data_size, subfile_rank) < 0) {
+ printf("[%d] %s - sf_read_data returned an error!\n", subfile_rank, __func__);
+ fflush(stdout);
+ return -1;
+ }
+ ret = MPI_Send(send_buffer, (int)data_size, MPI_BYTE, source, READ_INDEP_DATA, comm);
+ if (ret != MPI_SUCCESS) {
+ int len;
+ char estring[MPI_MAX_ERROR_STRING];
+ MPI_Error_string(ret, estring, &len);
+ printf("[ioc(%d)] ERROR! MPI_Send of %ld bytes to %d returned an error(%s)\n",subfile_rank, data_size, source, estring );
+ fflush(stdout);
+ return ret;
+ }
+
+ if (send_buffer) free(send_buffer);
+
+ return 0;
+}
+
+
+int queue_file_open(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm)
+{
+ int ret, req_count, errors=0;
+ int prev_count;
+ int flags = (int)(msg->header[0] & 0x0ffffffff);
+
+ atomic_fetch_add(&sf_file_refcount, 1); // atomic
+
+ if (sf_verbose_flag) {
+ printf("[ioc(%d) %s] file open flags = %0x, source=%d\n", subfile_rank, __func__, flags, source);
+ fflush(stdout);
+ }
+ if (subfile_fid < 0) {
+ errors = subfiling_open_file(sf_subfile_prefix, subfile_rank, flags);
+ }
+
+ req_count = COMPLETED;
+ ret = MPI_Send(&req_count, 1, MPI_INT, source, COMPLETED, comm);
+ if (ret != MPI_SUCCESS) {
+ errors++;
+ }
+ if (errors) {
+ printf("[ioc(%d) %s] Error opening file\n", subfile_rank, __func__);
+ fflush(stdout);
+ }
+ return errors;
+}
+
+/*
+ * The decrement is somewhat of misnomer, i.e. we check the number of file open
+ * requests to the number of file close requests. When those values match, the
+ * actual file gets closed via the callback_ftn. The effects a weak collective
+ * on the file close operation. File opens on the other hand, can occur in
+ * any random order and no collective semanitics are enforced.
+ */
+int decrement_file_ref_counts( int subfile_rank, int source, MPI_Comm comm, file_close_cb callback_ftn)
+{
+ int close_count, open_count;
+ atomic_fetch_add(&sf_file_close_count, 1); // atomic
+ close_count = atomic_load(&sf_file_close_count);
+ open_count = atomic_load(&sf_file_refcount);
+
+ if (close_count == open_count) {
+ atomic_store(&sf_file_refcount, 0);
+ atomic_store(&sf_file_close_count, 0); /* Complete the reset to zeros */
+ if (callback_ftn(subfile_rank, comm) < 0) {
+ printf("[ioc(%d) %s] callback_ftn returned an error\n", subfile_rank, __func__ );
+ fflush(stdout);
+ }
+ }
+ return 0;
+}
+
+/* Note: This function should be called ONLY when all clients
+ * have called the CLOSE_OP on this IO Concentrator.
+ * The IOC API maintains a reference count on subfiles
+ * so that once that count is decremented to zero, the
+ * decrement_file_ref_counts function will call here.
+ */
+int subfiling_close_file(int subfile_rank, MPI_Comm comm)
+{
+ int ret, source = 0;
+ int errors = 0, flag = COMPLETED;
+ if (subfile_fid >= 0) {
+ close(subfile_fid);
+ subfile_fid = -1;
+ }
+ /* Notify all ranks */
+ for (source = 0; source < sf_world_size; source++) {
+ /* Don't release our local MPI process until all
+ * other ranks are released.
+ */
+ if (source == sf_world_rank) {
+ continue;
+ }
+ ret = MPI_Send(&flag, 1, MPI_INT, source, COMPLETED, comm);
+ if (ret != MPI_SUCCESS) errors++;
+ }
+
+ /* Release the local MPI process */
+ ret = MPI_Send(&flag, 1, MPI_INT, sf_world_rank, COMPLETED, comm);
+ if (ret != MPI_SUCCESS) errors++;
+
+ if (errors) {
+ printf("[ioc(%d) %s] Errors sending file close replies\n", subfile_rank, __func__);
+ fflush(stdout);
+ }
+
+ return errors;
+}
+
+int subfiling_open_file(const char *prefix, int subfile_rank, int flags)
+{
+ int errors = 0;
+ /* Only the real IOCs open the subfiles */
+ if (subfile_rank >= 0) {
+ const char *dotconfig = ".subfile_config";
+ mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
+ char filepath[PATH_MAX];
+ char config[PATH_MAX];
+ if (prefix) {
+ mkdir(prefix, S_IRWXU);
+ sprintf(filepath, "%s/node_local_temp_%d_of_%d",
+ prefix, subfile_rank, n_io_concentrators);
+ sprintf(config, "%s/%s", prefix, dotconfig);
+ }
+ else {
+ sprintf(filepath, "node_local_temp_%d_of_%d",
+ subfile_rank,n_io_concentrators);
+ strcpy(config, dotconfig);
+ }
+
+ if ((subfile_fid = open(filepath, flags, mode)) < 0) {
+ perror("subfile open");
+ errors++;
+ goto done;
+ }
+ else {
+ /*
+ * File open and close operations are collective
+ * in intent. Apart from actual IO operations, we
+ * initialize the number of references to everyone
+ * so that we detect when pending IO operations are
+ * have all completed before we close the actual
+ * subfiles.
+ */
+ atomic_init((atomic_int *)&sf_workinprogress, sf_world_size);
+ }
+
+ if ((subfile_rank == 0) && (flags & O_CREAT)) {
+ size_t bufsize = PATH_MAX + 16;
+ FILE *f = NULL;
+ char linebuf[bufsize];
+ /* If a config file already exists, AND
+ * the user wants to truncate subfiles (if they exist),
+ * then we should also truncate an existing config file.
+ */
+ if (access(config, flags) == 0) {
+ truncate(config, 0);
+ }
+ f = fopen(config, "w+");
+ if (f != NULL) {
+ int k;
+ char *underscore = strrchr(filepath,'_');
+ *underscore=0;
+ strcpy(config, filepath);
+ *underscore='_';
+ sprintf(linebuf,"stripe_size=%ld\n", sf_stripe_size);
+ fwrite(linebuf, strlen(linebuf), 1, f);
+ sprintf(linebuf,"aggregator_count=%d\n",n_io_concentrators);
+ fwrite(linebuf, strlen(linebuf), 1, f);
+
+ for(k=0; k < n_io_concentrators; k++) {
+ snprintf(linebuf,bufsize,"%s_%d:%d\n",config, k, io_concentrator[k]);
+ fwrite(linebuf, strlen(linebuf), 1, f);
+ }
+
+ fclose(f);
+ }
+ else {
+ perror("fopen(config)");
+ errors++;
+ goto done;
+ }
+ }
+ if (sf_verbose_flag) {
+ printf("[ioc:%d] Opened subfile %s\n", subfile_rank, filepath);
+ }
+ }
+done:
+ return errors;
+}
+
+void
+delete_subfiling_context(hid_t context_id)
+{
+ subfiling_context_t *sf_context = get_subfiling_object(context_id);
+ MPI_Comm_free(&sf_context->sf_msg_comm);
+ MPI_Comm_free(&sf_context->sf_data_comm);
+ sf_msg_comm = MPI_COMM_NULL;
+ sf_data_comm = MPI_COMM_NULL;
+ if (n_io_concentrators > 1) {
+ MPI_Comm_free(&sf_context->sf_group_comm);
+ MPI_Comm_free(&sf_context->sf_intercomm);
+ }
+
+ free(sf_context);
+ usleep(100);
+
+ return;
+}
diff --git a/src/H5FDsubfile_private.h b/src/H5FDsubfile_private.h
new file mode 100644
index 0000000..cacfdb9
--- /dev/null
+++ b/src/H5FDsubfile_private.h
@@ -0,0 +1,183 @@
+/********************/
+/* Standard Headers */
+/********************/
+
+#include <assert.h>
+#include <stdatomic.h>
+#include <stdio.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+/**************/
+/* H5 Headers */
+/**************/
+#include "H5private.h" /* Generic Functions */
+#include "H5CXprivate.h" /* API Contexts */
+#include "H5Dprivate.h" /* Datasets */
+#include "H5Eprivate.h" /* Error handling */
+#include "H5Ipublic.h"
+#include "H5Iprivate.h" /* IDs */
+#include "H5MMprivate.h" /* Memory management */
+#include "H5Pprivate.h" /* Property lists */
+
+
+#include "mpi.h"
+
+#ifndef _H5FDsubfile_private_H
+#define _H5FDsubfile_private_H
+
+typedef int (*file_close_cb)(int,MPI_Comm);
+
+typedef struct {
+ int64_t sf_stripe_size;
+ int64_t sf_blocksize_per_stripe;
+ MPI_Comm sf_msg_comm;
+ MPI_Comm sf_data_comm;
+ MPI_Comm sf_group_comm;
+ MPI_Comm sf_intercomm;
+ int sf_group_size;
+ int sf_group_rank;
+ int sf_intercomm_root;
+ char *subfile_prefix;
+} subfiling_context_t;
+
+typedef struct {
+ /* {Datasize, Offset} */
+ int64_t header[2];
+ int tag;
+ int source;
+ int subfile_rank;
+} sf_work_request_t;
+
+
+
+typedef struct {
+ long rank;
+ long hostid;
+} layout_t;
+
+typedef struct {
+ long hostid;
+ layout_t *topology;
+ int *node_ranks;
+ int node_count;
+ int node_index;
+ int local_peers;
+ int subfile_rank;
+ int world_rank;
+ int world_size;
+ bool rank_is_ioc;
+} sf_topology_t;
+
+#define K(n) ((n)*1024)
+#define DEFAULT_STRIPE_SIZE K(4) /* (1024*1024) */
+#define MAX_DEPTH 256
+
+typedef enum io_ops {
+ READ_OP = 1,
+ WRITE_OP = 2,
+ OPEN_OP = 3,
+ CLOSE_OP = 4,
+ INCR_OP = 8,
+ DECR_OP = 16,
+} io_op_t;
+
+typedef enum {
+ SF_BADID = (-1),
+ SF_TOPOLOGY = 1,
+ SF_CONTEXT,
+ SF_NTYPES /* number of subfiling object types, MUST BE LAST */
+} SF_OBJ_TYPE;
+
+
+
+/* MPI Tags are 32 bits, we treat them as unsigned
+ * to allow the use of the available bits for RPC
+ * selections:
+ * 0000
+ * 0001 READ_OP (Independent)
+ * 0010 WRITE_OP (Independent)
+ * 0011 /////////
+ * 0100 CLOSE_OP (Independent)
+ * -----
+ * 1000
+ * 1001 COLLECTIVE_READ
+ * 1010 COLLECTIVE_WRITE
+ * 1011 /////////
+ * 1100 COLLECTIVE_CLOSE
+ *
+ * 31 28 24 20 16 12 8 4 0|
+ * +-------+-------+-------+-------+-------+-------+-------+-------+
+ * | | | ACKS | OP |
+ * +-------+-------+-------+-------+-------+-------+-------+-------+
+ *
+ */
+
+/* Bit 3 SET indicates collectives */
+#define COLL_FUNC (0x1 << 3)
+
+#define ACK_PART (0x0acc << 8)
+#define DATA_PART (0xd8da << 8)
+#define COMPLETED (0xfed1 << 8)
+
+#define READ_INDEP (READ_OP)
+#define READ_COLL (COLL_FUNC | READ_OP)
+#define WRITE_INDEP (WRITE_OP)
+#define WRITE_COLL (COLL_FUNC | WRITE_OP)
+
+#define WRITE_INDEP_ACK (ACK_PART | WRITE_OP)
+#define WRITE_INDEP_DATA (DATA_PART | WRITE_OP)
+
+#define READ_INDEP_DATA (DATA_PART | READ_OP)
+
+#define INT32_MASK 0x07FFFFFFFFFFFFFFF
+
+extern int sf_verbose_flag;
+extern int sf_shutdown_flag;
+
+extern atomic_int sf_workinprogress;
+extern atomic_int sf_work_pending;
+extern atomic_int sf_file_close_count;
+extern atomic_int sf_file_refcount;
+
+/*
+-------------
+Messages IN
+-------------
+*/
+extern MPI_Comm sf_msg_comm;
+
+/*
+-------------
+Messages OUT
+-------------
+*/
+extern MPI_Comm sf_data_comm;
+
+
+
+H5_DLL int H5FD__determine_ioc_count(int world_size, int world_rank, sf_topology_t **thisapp);
+H5_DLL int H5FD__init_subfile_context(subfiling_context_t **newContext, int n_iocs, int world_size, int world_rank, bool rank_is_ioc);
+H5_DLL void * get_subfiling_object(int64_t object_id);
+H5_DLL hid_t get_subfiling_context(void);
+H5_DLL int initialize_ioc_threads(subfiling_context_t *sf_context);
+H5_DLL int tpool_add_work(sf_work_request_t *);
+H5_DLL int ioc_main(subfiling_context_t *context);
+H5_DLL int queue_write_coll(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm);
+H5_DLL int queue_read_coll(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm);
+H5_DLL int queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm);
+H5_DLL int queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm);
+H5_DLL int subfiling_close_file(int subfile_rank, MPI_Comm comm);
+H5_DLL int subfiling_open_file(const char *prefix, int subfile_rank, MPI_Comm comm);
+H5_DLL int queue_file_open(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm);
+H5_DLL int decrement_file_ref_counts( int subfile_rank, int source, MPI_Comm comm, file_close_cb callback_ftn);
+H5_DLL int sf_open_subfiles(hid_t context_id, char *prefix, int flags);
+H5_DLL int sf_close_subfiles(hid_t context_id);
+H5_DLL int sf_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank);
+H5_DLL int sf_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank);
+H5_DLL void delete_subfiling_context(hid_t context_id);
+
+#endif
diff --git a/src/H5FDsubfile_public.h b/src/H5FDsubfile_public.h
new file mode 100644
index 0000000..6e4e23c
--- /dev/null
+++ b/src/H5FDsubfile_public.h
@@ -0,0 +1,11 @@
+#ifndef _H5FDsubfile_public_H
+#define _H5FDsubfile_public_H
+
+#include "H5FDsubfile_private.h"
+
+herr_t H5FDsubfiling_init(void);
+herr_t H5FDsubfiling_finalize(void);
+
+
+#endif /* _H5FDsubfile_public_H */
+
diff --git a/src/H5FDsubfile_threads.c b/src/H5FDsubfile_threads.c
new file mode 100644
index 0000000..bc9e352
--- /dev/null
+++ b/src/H5FDsubfile_threads.c
@@ -0,0 +1,132 @@
+
+#include "H5FDsubfile_private.h"
+
+#include "mercury/mercury_util_config.h"
+#include "mercury/mercury_log.h"
+#include "mercury/mercury_log.c"
+#include "mercury/mercury_util_error.c"
+#include "mercury/mercury_thread.c"
+#include "mercury/mercury_thread_mutex.c"
+#include "mercury/mercury_thread_condition.h"
+#include "mercury/mercury_thread_condition.c"
+#include "mercury/mercury_thread_pool.c"
+#include "mercury/mercury_thread_spin.c"
+
+static hg_thread_mutex_t ioc_mutex = PTHREAD_MUTEX_INITIALIZER;
+static hg_thread_pool_t *ioc_thread_pool = NULL;
+static hg_thread_t ioc_thread;
+
+#define HG_TEST_NUM_THREADS_DEFAULT 4
+#define POOL_CONCURRENT_MAX 64
+
+static struct hg_thread_work pool_request[POOL_CONCURRENT_MAX];
+
+static HG_THREAD_RETURN_TYPE
+ioc_thread_main(void *arg)
+{
+ hg_thread_ret_t thread_ret = (hg_thread_ret_t) 0;
+
+ /* Pass along the subfiling_context_t */
+ ioc_main(arg);
+
+ // hg_thread_exit(thread_ret);
+ return thread_ret;
+}
+
+int
+initialize_ioc_threads(subfiling_context_t *sf_context)
+{
+ int status;
+ status = hg_thread_mutex_init(&ioc_mutex);
+ if (status) {
+ puts("hg_thread_mutex_init failed");
+ goto err_exit;
+ }
+ status = hg_thread_pool_init(HG_TEST_NUM_THREADS_DEFAULT, &ioc_thread_pool);
+ if (status) {
+ puts("hg_thread_pool_init failed");
+ goto err_exit;
+ }
+ status = hg_thread_create(&ioc_thread, ioc_thread_main, sf_context);
+ if (status) {
+ puts("hg_thread_create failed");
+ goto err_exit;
+ }
+ return 0;
+
+err_exit:
+ return -1;
+}
+
+
+void __attribute__((destructor)) finalize_ioc_threads()
+{
+ if (ioc_thread_pool != NULL) {
+ hg_thread_pool_destroy(ioc_thread_pool);
+ ioc_thread_pool = NULL;
+
+ if (hg_thread_join(ioc_thread) == 0)
+ puts("thread_join succeeded");
+ else puts("thread_join failed");
+ }
+}
+
+
+static HG_THREAD_RETURN_TYPE
+handle_work_request(void *arg)
+{
+ hg_thread_ret_t ret = 0;
+ sf_work_request_t *msg = (sf_work_request_t *)arg;
+ int status = 0;
+
+ atomic_fetch_add(&sf_work_pending, 1); // atomic
+ switch(msg->tag) {
+ case WRITE_COLL:
+ status = queue_write_coll( msg, msg->subfile_rank, msg->source, sf_data_comm);
+ break;
+ case READ_COLL:
+ status = queue_read_coll( msg, msg->subfile_rank, msg->source, sf_data_comm);
+ break;
+ case WRITE_INDEP:
+ status = queue_write_indep( msg, msg->subfile_rank, msg->source, sf_data_comm);
+ break;
+ case READ_INDEP:
+ status = queue_read_indep( msg, msg->subfile_rank, msg->source, sf_data_comm);
+ break;
+ case CLOSE_OP:
+ hg_thread_mutex_lock(&ioc_mutex);
+ status = decrement_file_ref_counts( msg->subfile_rank, msg->source, sf_data_comm,
+ subfiling_close_file);
+ hg_thread_mutex_unlock(&ioc_mutex);
+ break;
+ case OPEN_OP:
+ status = queue_file_open( msg, msg->subfile_rank, msg->source, sf_data_comm);
+ break;
+
+ default:
+ printf("[ioc(%d)] received message tag(%x)from rank %d\n", msg->subfile_rank, msg->tag, msg->source);
+ status = -1;
+ break;
+ }
+
+ atomic_fetch_sub(&sf_work_pending, 1); // atomic
+ if (status < 0) {
+ printf("[ioc(%d) %s]: Error encounted processing request(%x) from rank(%d\n",
+ msg->subfile_rank, __func__, msg->tag, msg->source);
+ fflush(stdout);
+ }
+ return ret;
+}
+
+int tpool_add_work(sf_work_request_t *work)
+{
+ static int work_index = 0;
+ hg_thread_mutex_lock(&ioc_mutex);
+ if (work_index == POOL_CONCURRENT_MAX)
+ work_index = 0;
+ pool_request[work_index].func = handle_work_request;
+ pool_request[work_index].args = work;
+ hg_thread_pool_post(ioc_thread_pool, &pool_request[work_index++]);
+ hg_thread_mutex_unlock(&ioc_mutex);
+ return 0;
+}
diff --git a/src/mercury/mercury_atomic.h b/src/mercury/mercury_atomic.h
new file mode 100644
index 0000000..7a684b9
--- /dev/null
+++ b/src/mercury/mercury_atomic.h
@@ -0,0 +1,637 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+#ifndef MERCURY_ATOMIC_H
+#define MERCURY_ATOMIC_H
+
+#include "mercury_util_config.h"
+
+#if defined(_WIN32)
+# include <windows.h>
+typedef struct {
+ volatile LONG value;
+} hg_atomic_int32_t;
+typedef struct {
+ volatile LONGLONG value;
+} hg_atomic_int64_t;
+# define HG_ATOMIC_VAR_INIT(x) \
+ { \
+ (x) \
+ }
+#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
+# include <opa_primitives.h>
+typedef OPA_int_t hg_atomic_int32_t;
+typedef OPA_ptr_t hg_atomic_int64_t; /* OPA has only limited 64-bit support */
+# define HG_ATOMIC_VAR_INIT(x) OPA_PTR_T_INITIALIZER(x)
+#elif defined(HG_UTIL_HAS_STDATOMIC_H)
+# include <stdatomic.h>
+typedef atomic_int hg_atomic_int32_t;
+# if HG_UTIL_ATOMIC_LONG_WIDTH == 8
+typedef atomic_long hg_atomic_int64_t;
+# else
+typedef atomic_llong hg_atomic_int64_t;
+# endif
+# define HG_ATOMIC_VAR_INIT(x) ATOMIC_VAR_INIT(x)
+#elif defined(__APPLE__)
+# include <libkern/OSAtomic.h>
+typedef struct {
+ volatile hg_util_int32_t value;
+} hg_atomic_int32_t;
+typedef struct {
+ volatile hg_util_int64_t value;
+} hg_atomic_int64_t;
+# define HG_ATOMIC_VAR_INIT(x) \
+ { \
+ (x) \
+ }
+#else
+# error "Not supported on this platform."
+#endif
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Init atomic value (32-bit integer).
+ *
+ * \param ptr [OUT] pointer to an atomic32 integer
+ * \param value [IN] value
+ */
+static HG_UTIL_INLINE void
+hg_atomic_init32(hg_atomic_int32_t *ptr, hg_util_int32_t value);
+
+/**
+ * Set atomic value (32-bit integer).
+ *
+ * \param ptr [OUT] pointer to an atomic32 integer
+ * \param value [IN] value
+ */
+static HG_UTIL_INLINE void
+hg_atomic_set32(hg_atomic_int32_t *ptr, hg_util_int32_t value);
+
+/**
+ * Get atomic value (32-bit integer).
+ *
+ * \param ptr [OUT] pointer to an atomic32 integer
+ *
+ * \return Value of the atomic integer
+ */
+static HG_UTIL_INLINE hg_util_int32_t
+hg_atomic_get32(hg_atomic_int32_t *ptr);
+
+/**
+ * Increment atomic value (32-bit integer).
+ *
+ * \param ptr [IN/OUT] pointer to an atomic32 integer
+ *
+ * \return Incremented value
+ */
+static HG_UTIL_INLINE hg_util_int32_t
+hg_atomic_incr32(hg_atomic_int32_t *ptr);
+
+/**
+ * Decrement atomic value (32-bit integer).
+ *
+ * \param ptr [IN/OUT] pointer to an atomic32 integer
+ *
+ * \return Decremented value
+ */
+static HG_UTIL_INLINE hg_util_int32_t
+hg_atomic_decr32(hg_atomic_int32_t *ptr);
+
+/**
+ * OR atomic value (32-bit integer).
+ *
+ * \param ptr [IN/OUT] pointer to an atomic32 integer
+ * \param value [IN] value to OR with
+ *
+ * \return Original value
+ */
+static HG_UTIL_INLINE hg_util_int32_t
+hg_atomic_or32(hg_atomic_int32_t *ptr, hg_util_int32_t value);
+
+/**
+ * XOR atomic value (32-bit integer).
+ *
+ * \param ptr [IN/OUT] pointer to an atomic32 integer
+ * \param value [IN] value to XOR with
+ *
+ * \return Original value
+ */
+static HG_UTIL_INLINE hg_util_int32_t
+hg_atomic_xor32(hg_atomic_int32_t *ptr, hg_util_int32_t value);
+
+/**
+ * AND atomic value (32-bit integer).
+ *
+ * \param ptr [IN/OUT] pointer to an atomic32 integer
+ * \param value [IN] value to AND with
+ *
+ * \return Original value
+ */
+static HG_UTIL_INLINE hg_util_int32_t
+hg_atomic_and32(hg_atomic_int32_t *ptr, hg_util_int32_t value);
+
+/**
+ * Compare and swap values (32-bit integer).
+ *
+ * \param ptr [IN/OUT] pointer to an atomic32 integer
+ * \param compare_value [IN] value to compare to
+ * \param swap_value [IN] value to swap with if ptr value is equal to
+ * compare value
+ *
+ * \return HG_UTIL_TRUE if swapped or HG_UTIL_FALSE
+ */
+static HG_UTIL_INLINE hg_util_bool_t
+hg_atomic_cas32(hg_atomic_int32_t *ptr, hg_util_int32_t compare_value,
+ hg_util_int32_t swap_value);
+
+/**
+ * Init atomic value (64-bit integer).
+ *
+ * \param ptr [OUT] pointer to an atomic32 integer
+ * \param value [IN] value
+ */
+static HG_UTIL_INLINE void
+hg_atomic_init64(hg_atomic_int64_t *ptr, hg_util_int64_t value);
+
+/**
+ * Set atomic value (64-bit integer).
+ *
+ * \param ptr [OUT] pointer to an atomic64 integer
+ * \param value [IN] value
+ */
+static HG_UTIL_INLINE void
+hg_atomic_set64(hg_atomic_int64_t *ptr, hg_util_int64_t value);
+
+/**
+ * Get atomic value (64-bit integer).
+ *
+ * \param ptr [OUT] pointer to an atomic64 integer
+ *
+ * \return Value of the atomic integer
+ */
+static HG_UTIL_INLINE hg_util_int64_t
+hg_atomic_get64(hg_atomic_int64_t *ptr);
+
+/**
+ * Increment atomic value (64-bit integer).
+ *
+ * \param ptr [IN/OUT] pointer to an atomic64 integer
+ *
+ * \return Incremented value
+ */
+static HG_UTIL_INLINE hg_util_int64_t
+hg_atomic_incr64(hg_atomic_int64_t *ptr);
+
+/**
+ * Decrement atomic value (64-bit integer).
+ *
+ * \param ptr [IN/OUT] pointer to an atomic64 integer
+ *
+ * \return Decremented value
+ */
+static HG_UTIL_INLINE hg_util_int64_t
+hg_atomic_decr64(hg_atomic_int64_t *ptr);
+
+/**
+ * OR atomic value (64-bit integer).
+ *
+ * \param ptr [IN/OUT] pointer to an atomic64 integer
+ * \param value [IN] value to OR with
+ *
+ * \return Original value
+ */
+static HG_UTIL_INLINE hg_util_int64_t
+hg_atomic_or64(hg_atomic_int64_t *ptr, hg_util_int64_t value);
+
+/**
+ * XOR atomic value (64-bit integer).
+ *
+ * \param ptr [IN/OUT] pointer to an atomic64 integer
+ * \param value [IN] value to XOR with
+ *
+ * \return Original value
+ */
+static HG_UTIL_INLINE hg_util_int64_t
+hg_atomic_xor64(hg_atomic_int64_t *ptr, hg_util_int64_t value);
+
+/**
+ * AND atomic value (64-bit integer).
+ *
+ * \param ptr [IN/OUT] pointer to an atomic64 integer
+ * \param value [IN] value to AND with
+ *
+ * \return Original value
+ */
+static HG_UTIL_INLINE hg_util_int64_t
+hg_atomic_and64(hg_atomic_int64_t *ptr, hg_util_int64_t value);
+
+/**
+ * Compare and swap values (64-bit integer).
+ *
+ * \param ptr [IN/OUT] pointer to an atomic64 integer
+ * \param compare_value [IN] value to compare to
+ * \param swap_value [IN] value to swap with if ptr value is equal to
+ * compare value
+ *
+ * \return HG_UTIL_TRUE if swapped or HG_UTIL_FALSE
+ */
+static HG_UTIL_INLINE hg_util_bool_t
+hg_atomic_cas64(hg_atomic_int64_t *ptr, hg_util_int64_t compare_value,
+ hg_util_int64_t swap_value);
+
+/**
+ * Memory barrier.
+ *
+ */
+static HG_UTIL_INLINE void
+hg_atomic_fence(void);
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE void
+hg_atomic_init32(hg_atomic_int32_t *ptr, hg_util_int32_t value)
+{
+#if defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
+ atomic_init(ptr, value);
+#else
+ hg_atomic_set32(ptr, value);
+#endif
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE void
+hg_atomic_set32(hg_atomic_int32_t *ptr, hg_util_int32_t value)
+{
+#if defined(_WIN32)
+ ptr->value = value;
+#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
+ OPA_store_int(ptr, value);
+#elif defined(HG_UTIL_HAS_STDATOMIC_H)
+ atomic_store_explicit(ptr, value, memory_order_release);
+#elif defined(__APPLE__)
+ ptr->value = value;
+#else
+# error "Not supported on this platform."
+#endif
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE hg_util_int32_t
+hg_atomic_get32(hg_atomic_int32_t *ptr)
+{
+ hg_util_int32_t ret;
+
+#if defined(_WIN32)
+ ret = ptr->value;
+#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
+ ret = OPA_load_int(ptr);
+#elif defined(HG_UTIL_HAS_STDATOMIC_H)
+ ret = atomic_load_explicit(ptr, memory_order_acquire);
+#elif defined(__APPLE__)
+ ret = ptr->value;
+#else
+# error "Not supported on this platform."
+#endif
+
+ return ret;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE hg_util_int32_t
+hg_atomic_incr32(hg_atomic_int32_t *ptr)
+{
+ hg_util_int32_t ret;
+
+#if defined(_WIN32)
+ ret = InterlockedIncrementNoFence(&ptr->value);
+#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
+ ret = OPA_fetch_and_incr_int(ptr) + 1;
+#elif defined(HG_UTIL_HAS_STDATOMIC_H)
+ ret = atomic_fetch_add_explicit(ptr, 1, memory_order_acq_rel) + 1;
+#elif defined(__APPLE__)
+ ret = OSAtomicIncrement32(&ptr->value);
+#else
+# error "Not supported on this platform."
+#endif
+
+ return ret;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE hg_util_int32_t
+hg_atomic_decr32(hg_atomic_int32_t *ptr)
+{
+ hg_util_int32_t ret;
+
+#if defined(_WIN32)
+ ret = InterlockedDecrementNoFence(&ptr->value);
+#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
+ ret = OPA_fetch_and_decr_int(ptr) - 1;
+#elif defined(HG_UTIL_HAS_STDATOMIC_H)
+ ret = atomic_fetch_sub_explicit(ptr, 1, memory_order_acq_rel) - 1;
+#elif defined(__APPLE__)
+ ret = OSAtomicDecrement32(&ptr->value);
+#else
+# error "Not supported on this platform."
+#endif
+
+ return ret;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE hg_util_int32_t
+hg_atomic_or32(hg_atomic_int32_t *ptr, hg_util_int32_t value)
+{
+ hg_util_int32_t ret;
+
+#if defined(_WIN32)
+ ret = InterlockedOrNoFence(&ptr->value, value);
+#elif defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
+ ret = atomic_fetch_or_explicit(ptr, value, memory_order_acq_rel);
+#elif defined(__APPLE__)
+ ret = OSAtomicOr32Orig((uint32_t) value, (volatile uint32_t *) &ptr->value);
+#else
+ do {
+ ret = hg_atomic_get32(ptr);
+ } while (!hg_atomic_cas32(ptr, ret, (ret | value)));
+#endif
+
+ return ret;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE hg_util_int32_t
+hg_atomic_xor32(hg_atomic_int32_t *ptr, hg_util_int32_t value)
+{
+ hg_util_int32_t ret;
+
+#if defined(_WIN32)
+ ret = InterlockedXorNoFence(&ptr->value, value);
+#elif defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
+ ret = atomic_fetch_xor_explicit(ptr, value, memory_order_acq_rel);
+#elif defined(__APPLE__)
+ ret =
+ OSAtomicXor32Orig((uint32_t) value, (volatile uint32_t *) &ptr->value);
+#else
+ do {
+ ret = hg_atomic_get32(ptr);
+ } while (!hg_atomic_cas32(ptr, ret, (ret ^ value)));
+#endif
+
+ return ret;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE hg_util_int32_t
+hg_atomic_and32(hg_atomic_int32_t *ptr, hg_util_int32_t value)
+{
+ hg_util_int32_t ret;
+
+#if defined(_WIN32)
+ ret = InterlockedAndNoFence(&ptr->value, value);
+#elif defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
+ ret = atomic_fetch_and_explicit(ptr, value, memory_order_acq_rel);
+#elif defined(__APPLE__)
+ ret =
+ OSAtomicAnd32Orig((uint32_t) value, (volatile uint32_t *) &ptr->value);
+#else
+ do {
+ ret = hg_atomic_get32(ptr);
+ } while (!hg_atomic_cas32(ptr, ret, (ret & value)));
+#endif
+
+ return ret;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE hg_util_bool_t
+hg_atomic_cas32(hg_atomic_int32_t *ptr, hg_util_int32_t compare_value,
+ hg_util_int32_t swap_value)
+{
+ hg_util_bool_t ret;
+
+#if defined(_WIN32)
+ ret = (compare_value == InterlockedCompareExchangeNoFence(
+ &ptr->value, swap_value, compare_value));
+#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
+ ret = (hg_util_bool_t)(
+ compare_value == OPA_cas_int(ptr, compare_value, swap_value));
+#elif defined(HG_UTIL_HAS_STDATOMIC_H)
+ ret = atomic_compare_exchange_strong_explicit(ptr, &compare_value,
+ swap_value, memory_order_acq_rel, memory_order_acquire);
+#elif defined(__APPLE__)
+ ret = OSAtomicCompareAndSwap32(compare_value, swap_value, &ptr->value);
+#else
+# error "Not supported on this platform."
+#endif
+
+ return ret;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE void
+hg_atomic_init64(hg_atomic_int64_t *ptr, hg_util_int64_t value)
+{
+#if defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
+ atomic_init(ptr, value);
+#else
+ hg_atomic_set64(ptr, value);
+#endif
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE void
+hg_atomic_set64(hg_atomic_int64_t *ptr, hg_util_int64_t value)
+{
+#if defined(_WIN32)
+ ptr->value = value;
+#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
+ OPA_store_ptr(ptr, (void *) value);
+#elif defined(HG_UTIL_HAS_STDATOMIC_H)
+ atomic_store_explicit(ptr, value, memory_order_release);
+#elif defined(__APPLE__)
+ ptr->value = value;
+#else
+# error "Not supported on this platform."
+#endif
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE hg_util_int64_t
+hg_atomic_get64(hg_atomic_int64_t *ptr)
+{
+ hg_util_int64_t ret;
+
+#if defined(_WIN32)
+ ret = ptr->value;
+#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
+ ret = (hg_util_int64_t) OPA_load_ptr(ptr);
+#elif defined(HG_UTIL_HAS_STDATOMIC_H)
+ ret = atomic_load_explicit(ptr, memory_order_acquire);
+#elif defined(__APPLE__)
+ ptr->value = value;
+#else
+# error "Not supported on this platform."
+#endif
+
+ return ret;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE hg_util_int64_t
+hg_atomic_incr64(hg_atomic_int64_t *ptr)
+{
+ hg_util_int64_t ret;
+
+#if defined(_WIN32)
+ ret = InterlockedIncrementNoFence64(&ptr->value);
+#elif defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
+ ret = atomic_fetch_add_explicit(ptr, 1, memory_order_acq_rel) + 1;
+#elif defined(__APPLE__)
+ ret = OSAtomicIncrement64(&ptr->value);
+#else
+ do {
+ ret = hg_atomic_get64(ptr);
+ } while (!hg_atomic_cas64(ptr, ret, ret + 1));
+ ret++;
+#endif
+
+ return ret;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE hg_util_int64_t
+hg_atomic_decr64(hg_atomic_int64_t *ptr)
+{
+ hg_util_int64_t ret;
+
+#if defined(_WIN32)
+ ret = InterlockedDecrementNoFence64(&ptr->value);
+#elif defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
+ ret = atomic_fetch_sub_explicit(ptr, 1, memory_order_acq_rel) - 1;
+#elif defined(__APPLE__)
+ ret = OSAtomicDecrement64(&ptr->value);
+#else
+ do {
+ ret = hg_atomic_get64(ptr);
+ } while (!hg_atomic_cas64(ptr, ret, ret - 1));
+ ret--;
+#endif
+
+ return ret;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE hg_util_int64_t
+hg_atomic_or64(hg_atomic_int64_t *ptr, hg_util_int64_t value)
+{
+ hg_util_int64_t ret;
+
+#if defined(_WIN32)
+ ret = InterlockedOr64NoFence(&ptr->value, value);
+#elif defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
+ ret = atomic_fetch_or_explicit(ptr, value, memory_order_acq_rel);
+#else
+ do {
+ ret = hg_atomic_get64(ptr);
+ } while (!hg_atomic_cas64(ptr, ret, (ret | value)));
+#endif
+
+ return ret;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE hg_util_int64_t
+hg_atomic_xor64(hg_atomic_int64_t *ptr, hg_util_int64_t value)
+{
+ hg_util_int64_t ret;
+
+#if defined(_WIN32)
+ ret = InterlockedXor64NoFence(&ptr->value, value);
+#elif defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
+ ret = atomic_fetch_xor_explicit(ptr, value, memory_order_acq_rel);
+#else
+ do {
+ ret = hg_atomic_get64(ptr);
+ } while (!hg_atomic_cas64(ptr, ret, (ret ^ value)));
+#endif
+
+ return ret;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE hg_util_int64_t
+hg_atomic_and64(hg_atomic_int64_t *ptr, hg_util_int64_t value)
+{
+ hg_util_int64_t ret;
+
+#if defined(_WIN32)
+ ret = InterlockedAnd64NoFence(&ptr->value, value);
+#elif defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
+ ret = atomic_fetch_and_explicit(ptr, value, memory_order_acq_rel);
+#else
+ do {
+ ret = hg_atomic_get64(ptr);
+ } while (!hg_atomic_cas64(ptr, ret, (ret & value)));
+#endif
+
+ return ret;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE hg_util_bool_t
+hg_atomic_cas64(hg_atomic_int64_t *ptr, hg_util_int64_t compare_value,
+ hg_util_int64_t swap_value)
+{
+ hg_util_bool_t ret;
+
+#if defined(_WIN32)
+ ret = (compare_value == InterlockedCompareExchangeNoFence64(
+ &ptr->value, swap_value, compare_value));
+#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
+ ret = (hg_util_bool_t)(
+ compare_value == (hg_util_int64_t) OPA_cas_ptr(
+ ptr, (void *) compare_value, (void *) swap_value));
+#elif defined(HG_UTIL_HAS_STDATOMIC_H)
+ ret = atomic_compare_exchange_strong_explicit(ptr, &compare_value,
+ swap_value, memory_order_acq_rel, memory_order_acquire);
+#elif defined(__APPLE__)
+ ret = OSAtomicCompareAndSwap64(compare_value, swap_value, &ptr->value);
+#else
+# error "Not supported on this platform."
+#endif
+
+ return ret;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE void
+hg_atomic_fence()
+{
+#if defined(_WIN32)
+ MemoryBarrier();
+#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
+ OPA_read_write_barrier();
+#elif defined(HG_UTIL_HAS_STDATOMIC_H)
+ atomic_thread_fence(memory_order_acq_rel);
+#elif defined(__APPLE__)
+ OSMemoryBarrier();
+#else
+# error "Not supported on this platform."
+#endif
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* MERCURY_ATOMIC_H */
diff --git a/src/mercury/mercury_atomic_queue.c b/src/mercury/mercury_atomic_queue.c
new file mode 100644
index 0000000..4c6a8e2
--- /dev/null
+++ b/src/mercury/mercury_atomic_queue.c
@@ -0,0 +1,83 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+/* Implementation derived from:
+ * https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h
+ *
+ * -
+ * Copyright (c) 2007-2009 Kip Macy <kmacy@freebsd.org>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ *
+ */
+
+#include "mercury_atomic_queue.h"
+#include "mercury_util_error.h"
+
+#include <stdlib.h>
+
+/****************/
+/* Local Macros */
+/****************/
+
+/* From <sys/param.h> */
+#define powerof2(x) ((((x) -1) & (x)) == 0)
+
+/*---------------------------------------------------------------------------*/
+struct hg_atomic_queue *
+hg_atomic_queue_alloc(unsigned int count)
+{
+ struct hg_atomic_queue *hg_atomic_queue = NULL;
+
+ HG_UTIL_CHECK_ERROR_NORET(
+ !powerof2(count), done, "atomic queue size must be power of 2");
+
+ hg_atomic_queue = malloc(
+ sizeof(struct hg_atomic_queue) + count * sizeof(hg_atomic_int64_t));
+ HG_UTIL_CHECK_ERROR_NORET(
+ hg_atomic_queue == NULL, done, "Could not allocate atomic queue");
+
+ hg_atomic_queue->prod_size = hg_atomic_queue->cons_size = count;
+ hg_atomic_queue->prod_mask = hg_atomic_queue->cons_mask = count - 1;
+ hg_atomic_init32(&hg_atomic_queue->prod_head, 0);
+ hg_atomic_init32(&hg_atomic_queue->cons_head, 0);
+ hg_atomic_init32(&hg_atomic_queue->prod_tail, 0);
+ hg_atomic_init32(&hg_atomic_queue->cons_tail, 0);
+
+done:
+ return hg_atomic_queue;
+}
+
+/*---------------------------------------------------------------------------*/
+void
+hg_atomic_queue_free(struct hg_atomic_queue *hg_atomic_queue)
+{
+ free(hg_atomic_queue);
+}
diff --git a/src/mercury/mercury_atomic_queue.h b/src/mercury/mercury_atomic_queue.h
new file mode 100644
index 0000000..73c0b15
--- /dev/null
+++ b/src/mercury/mercury_atomic_queue.h
@@ -0,0 +1,271 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+/* Implementation derived from:
+ * https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h
+ *
+ * -
+ * Copyright (c) 2007-2009 Kip Macy <kmacy@freebsd.org>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ *
+ */
+
+#ifndef MERCURY_ATOMIC_QUEUE_H
+#define MERCURY_ATOMIC_QUEUE_H
+
+#include "mercury_atomic.h"
+#include "mercury_mem.h"
+
+/*************************************/
+/* Public Type and Struct Definition */
+/*************************************/
+
+struct hg_atomic_queue {
+ hg_atomic_int32_t prod_head;
+ hg_atomic_int32_t prod_tail;
+ unsigned int prod_size;
+ unsigned int prod_mask;
+ hg_util_uint64_t drops;
+ hg_atomic_int32_t cons_head
+ __attribute__((aligned(HG_MEM_CACHE_LINE_SIZE)));
+ hg_atomic_int32_t cons_tail;
+ unsigned int cons_size;
+ unsigned int cons_mask;
+ hg_atomic_int64_t ring[] __attribute__((aligned(HG_MEM_CACHE_LINE_SIZE)));
+};
+
+/*****************/
+/* Public Macros */
+/*****************/
+
+#ifndef cpu_spinwait
+# if defined(__x86_64__) || defined(__amd64__)
+# define cpu_spinwait() asm volatile("pause\n" : : : "memory");
+# else
+# define cpu_spinwait() ;
+# endif
+#endif
+
+/*********************/
+/* Public Prototypes */
+/*********************/
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Allocate a new queue that can hold \count elements.
+ *
+ * \param count [IN] maximum number of elements
+ *
+ * \return pointer to allocated queue or NULL on failure
+ */
+HG_UTIL_PUBLIC struct hg_atomic_queue *
+hg_atomic_queue_alloc(unsigned int count);
+
+/**
+ * Free an existing queue.
+ *
+ * \param hg_atomic_queue [IN] pointer to queue
+ */
+HG_UTIL_PUBLIC void
+hg_atomic_queue_free(struct hg_atomic_queue *hg_atomic_queue);
+
+/**
+ * Push an entry to the queue.
+ *
+ * \param hg_atomic_queue [IN/OUT] pointer to queue
+ * \param entry [IN] pointer to object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+static HG_UTIL_INLINE int
+hg_atomic_queue_push(struct hg_atomic_queue *hg_atomic_queue, void *entry);
+
+/**
+ * Pop an entry from the queue (multi-consumer).
+ *
+ * \param hg_atomic_queue [IN/OUT] pointer to queue
+ *
+ * \return Pointer to popped object or NULL if queue is empty
+ */
+static HG_UTIL_INLINE void *
+hg_atomic_queue_pop_mc(struct hg_atomic_queue *hg_atomic_queue);
+
+/**
+ * Pop an entry from the queue (single consumer).
+ *
+ * \param hg_atomic_queue [IN/OUT] pointer to queue
+ *
+ * \return Pointer to popped object or NULL if queue is empty
+ */
+static HG_UTIL_INLINE void *
+hg_atomic_queue_pop_sc(struct hg_atomic_queue *hg_atomic_queue);
+
+/**
+ * Determine whether queue is empty.
+ *
+ * \param hg_atomic_queue [IN/OUT] pointer to queue
+ *
+ * \return HG_UTIL_TRUE if empty, HG_UTIL_FALSE if not
+ */
+static HG_UTIL_INLINE hg_util_bool_t
+hg_atomic_queue_is_empty(struct hg_atomic_queue *hg_atomic_queue);
+
+/**
+ * Determine number of entries in a queue.
+ *
+ * \param hg_atomic_queue [IN/OUT] pointer to queue
+ *
+ * \return Number of entries queued or 0 if none
+ */
+static HG_UTIL_INLINE unsigned int
+hg_atomic_queue_count(struct hg_atomic_queue *hg_atomic_queue);
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_atomic_queue_push(struct hg_atomic_queue *hg_atomic_queue, void *entry)
+{
+ hg_util_int32_t prod_head, prod_next, cons_tail;
+
+ do {
+ prod_head = hg_atomic_get32(&hg_atomic_queue->prod_head);
+ prod_next = (prod_head + 1) & (int) hg_atomic_queue->prod_mask;
+ cons_tail = hg_atomic_get32(&hg_atomic_queue->cons_tail);
+
+ if (prod_next == cons_tail) {
+ hg_atomic_fence();
+ if (prod_head == hg_atomic_get32(&hg_atomic_queue->prod_head) &&
+ cons_tail == hg_atomic_get32(&hg_atomic_queue->cons_tail)) {
+ hg_atomic_queue->drops++;
+ /* Full */
+ return HG_UTIL_FAIL;
+ }
+ continue;
+ }
+ } while (
+ !hg_atomic_cas32(&hg_atomic_queue->prod_head, prod_head, prod_next));
+
+ hg_atomic_set64(&hg_atomic_queue->ring[prod_head], (hg_util_int64_t) entry);
+
+ /*
+ * If there are other enqueues in progress
+ * that preceded us, we need to wait for them
+ * to complete
+ */
+ while (hg_atomic_get32(&hg_atomic_queue->prod_tail) != prod_head)
+ cpu_spinwait();
+
+ hg_atomic_set32(&hg_atomic_queue->prod_tail, prod_next);
+
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE void *
+hg_atomic_queue_pop_mc(struct hg_atomic_queue *hg_atomic_queue)
+{
+ hg_util_int32_t cons_head, cons_next;
+ void *entry = NULL;
+
+ do {
+ cons_head = hg_atomic_get32(&hg_atomic_queue->cons_head);
+ cons_next = (cons_head + 1) & (int) hg_atomic_queue->cons_mask;
+
+ if (cons_head == hg_atomic_get32(&hg_atomic_queue->prod_tail))
+ return NULL;
+ } while (
+ !hg_atomic_cas32(&hg_atomic_queue->cons_head, cons_head, cons_next));
+
+ entry = (void *) hg_atomic_get64(&hg_atomic_queue->ring[cons_head]);
+
+ /*
+ * If there are other dequeues in progress
+ * that preceded us, we need to wait for them
+ * to complete
+ */
+ while (hg_atomic_get32(&hg_atomic_queue->cons_tail) != cons_head)
+ cpu_spinwait();
+
+ hg_atomic_set32(&hg_atomic_queue->cons_tail, cons_next);
+
+ return entry;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE void *
+hg_atomic_queue_pop_sc(struct hg_atomic_queue *hg_atomic_queue)
+{
+ hg_util_int32_t cons_head, cons_next;
+ hg_util_int32_t prod_tail;
+ void *entry = NULL;
+
+ cons_head = hg_atomic_get32(&hg_atomic_queue->cons_head);
+ prod_tail = hg_atomic_get32(&hg_atomic_queue->prod_tail);
+ cons_next = (cons_head + 1) & (int) hg_atomic_queue->cons_mask;
+
+ if (cons_head == prod_tail)
+ /* Empty */
+ return NULL;
+
+ hg_atomic_set32(&hg_atomic_queue->cons_head, cons_next);
+
+ entry = (void *) hg_atomic_get64(&hg_atomic_queue->ring[cons_head]);
+
+ hg_atomic_set32(&hg_atomic_queue->cons_tail, cons_next);
+
+ return entry;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE hg_util_bool_t
+hg_atomic_queue_is_empty(struct hg_atomic_queue *hg_atomic_queue)
+{
+ return (hg_atomic_get32(&hg_atomic_queue->cons_head) ==
+ hg_atomic_get32(&hg_atomic_queue->prod_tail));
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE unsigned int
+hg_atomic_queue_count(struct hg_atomic_queue *hg_atomic_queue)
+{
+ return ((hg_atomic_queue->prod_size +
+ (unsigned int) hg_atomic_get32(&hg_atomic_queue->prod_tail) -
+ (unsigned int) hg_atomic_get32(&hg_atomic_queue->cons_tail)) &
+ hg_atomic_queue->prod_mask);
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* MERCURY_ATOMIC_QUEUE_H */
diff --git a/src/mercury/mercury_event.c b/src/mercury/mercury_event.c
new file mode 100644
index 0000000..42f4533
--- /dev/null
+++ b/src/mercury/mercury_event.c
@@ -0,0 +1,72 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+#include "mercury_event.h"
+
+#include "mercury_util_error.h"
+
+/*---------------------------------------------------------------------------*/
+int
+hg_event_create(void)
+{
+ int fd = -1;
+#if defined(_WIN32)
+
+#elif defined(HG_UTIL_HAS_SYSEVENTFD_H)
+ /* Create local signal event on self address */
+ fd = eventfd(0, EFD_NONBLOCK | EFD_SEMAPHORE);
+ HG_UTIL_CHECK_ERROR_NORET(
+ fd == -1, done, "eventfd() failed (%s)", strerror(errno));
+#elif defined(HG_UTIL_HAS_SYSEVENT_H)
+ struct kevent kev;
+ struct timespec timeout = {0, 0};
+ int rc;
+
+ /* Create kqueue */
+ fd = kqueue();
+ HG_UTIL_CHECK_ERROR_NORET(
+ fd == -1, done, "kqueue() failed (%s)", strerror(errno));
+
+ EV_SET(&kev, HG_EVENT_IDENT, EVFILT_USER, EV_ADD | EV_CLEAR, 0, 0, NULL);
+
+ /* Add user-defined event to kqueue */
+ rc = kevent(fd, &kev, 1, NULL, 0, &timeout);
+ HG_UTIL_CHECK_ERROR_NORET(
+ rc == -1, error, "kevent() failed (%s)", strerror(errno));
+#else
+
+#endif
+
+done:
+ return fd;
+
+#if defined(HG_UTIL_HAS_SYSEVENT_H)
+error:
+ hg_event_destroy(fd);
+
+ return -1;
+#endif
+}
+
+/*---------------------------------------------------------------------------*/
+int
+hg_event_destroy(int fd)
+{
+ int ret = HG_UTIL_SUCCESS, rc;
+#if defined(_WIN32)
+
+#else
+ rc = close(fd);
+ HG_UTIL_CHECK_ERROR(rc == -1, done, ret, HG_UTIL_FAIL,
+ "close() failed (%s)", strerror(errno));
+#endif
+done:
+ return ret;
+}
diff --git a/src/mercury/mercury_event.h b/src/mercury/mercury_event.h
new file mode 100644
index 0000000..48175b1
--- /dev/null
+++ b/src/mercury/mercury_event.h
@@ -0,0 +1,184 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+#ifndef MERCURY_EVENT_H
+#define MERCURY_EVENT_H
+
+#include "mercury_util_config.h"
+
+#ifdef _WIN32
+
+#else
+# include <errno.h>
+# include <string.h>
+# include <unistd.h>
+# if defined(HG_UTIL_HAS_SYSEVENTFD_H)
+# include <sys/eventfd.h>
+# ifndef HG_UTIL_HAS_EVENTFD_T
+typedef uint64_t eventfd_t;
+# endif
+# elif defined(HG_UTIL_HAS_SYSEVENT_H)
+# include <sys/event.h>
+# define HG_EVENT_IDENT 42 /* User-defined ident */
+# endif
+#endif
+
+/**
+ * Purpose: define an event object that can be used as an event
+ * wait/notify mechanism.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Create a new event object.
+ *
+ * \return file descriptor on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_event_create(void);
+
+/**
+ * Destroy an event object.
+ *
+ * \param fd [IN] event file descriptor
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_event_destroy(int fd);
+
+/**
+ * Notify for event.
+ *
+ * \param fd [IN] event file descriptor
+ *
+ * \return Non-negative on success or negative on failure
+ */
+static HG_UTIL_INLINE int
+hg_event_set(int fd);
+
+/**
+ * Get event notification.
+ *
+ * \param fd [IN] event file descriptor
+ * \param notified [IN] boolean set to HG_UTIL_TRUE if event received
+ *
+ * \return Non-negative on success or negative on failure
+ */
+static HG_UTIL_INLINE int
+hg_event_get(int fd, hg_util_bool_t *notified);
+
+/*---------------------------------------------------------------------------*/
+#if defined(_WIN32)
+/* TODO */
+#elif defined(HG_UTIL_HAS_SYSEVENTFD_H)
+# ifdef HG_UTIL_HAS_EVENTFD_T
+static HG_UTIL_INLINE int
+hg_event_set(int fd)
+{
+ return (eventfd_write(fd, 1) == 0) ? HG_UTIL_SUCCESS : HG_UTIL_FAIL;
+}
+# else
+static HG_UTIL_INLINE int
+hg_event_set(int fd)
+{
+ eventfd_t count = 1;
+ ssize_t s = write(fd, &count, sizeof(eventfd_t));
+
+ return (s == sizeof(eventfd_t)) ? HG_UTIL_SUCCESS : HG_UTIL_FAIL;
+}
+# endif
+#elif defined(HG_UTIL_HAS_SYSEVENT_H)
+static HG_UTIL_INLINE int
+hg_event_set(int fd)
+{
+ struct kevent kev;
+ struct timespec timeout = {0, 0};
+ int rc;
+
+ EV_SET(&kev, HG_EVENT_IDENT, EVFILT_USER, 0, NOTE_TRIGGER, 0, NULL);
+
+ /* Trigger user-defined event */
+ rc = kevent(fd, &kev, 1, NULL, 0, &timeout);
+
+ return (rc == -1) ? HG_UTIL_FAIL : HG_UTIL_SUCCESS;
+}
+#else
+# error "Not supported on this platform."
+#endif
+
+/*---------------------------------------------------------------------------*/
+#if defined(_WIN32)
+#elif defined(HG_UTIL_HAS_SYSEVENTFD_H)
+# ifdef HG_UTIL_HAS_EVENTFD_T
+static HG_UTIL_INLINE int
+hg_event_get(int fd, hg_util_bool_t *signaled)
+{
+ eventfd_t count = 0;
+
+ if ((eventfd_read(fd, &count) == 0) && count)
+ *signaled = HG_UTIL_TRUE;
+ else {
+ if (errno == EAGAIN)
+ *signaled = HG_UTIL_FALSE;
+ else
+ return HG_UTIL_FAIL;
+ }
+
+ return HG_UTIL_SUCCESS;
+}
+# else
+static HG_UTIL_INLINE int
+hg_event_get(int fd, hg_util_bool_t *signaled)
+{
+ eventfd_t count = 0;
+ ssize_t s = read(fd, &count, sizeof(eventfd_t));
+ if ((s == sizeof(eventfd_t)) && count)
+ *signaled = HG_UTIL_TRUE;
+ else {
+ if (errno == EAGAIN)
+ *signaled = HG_UTIL_FALSE;
+ else
+ return HG_UTIL_FAIL;
+ }
+
+ return HG_UTIL_SUCCESS;
+}
+# endif
+#elif defined(HG_UTIL_HAS_SYSEVENT_H)
+static HG_UTIL_INLINE int
+hg_event_get(int fd, hg_util_bool_t *signaled)
+{
+ struct kevent kev;
+ int nfds;
+ struct timespec timeout = {0, 0};
+
+ /* Check user-defined event */
+ nfds = kevent(fd, NULL, 0, &kev, 1, &timeout);
+ if (nfds == -1)
+ return HG_UTIL_FAIL;
+
+ *signaled = ((nfds > 0) && (kev.ident == HG_EVENT_IDENT)) ? HG_UTIL_TRUE
+ : HG_UTIL_FALSE;
+
+ return HG_UTIL_SUCCESS;
+}
+#else
+# error "Not supported on this platform."
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* MERCURY_EVENT_H */
diff --git a/src/mercury/mercury_hash_string.h b/src/mercury/mercury_hash_string.h
new file mode 100644
index 0000000..878fd4f
--- /dev/null
+++ b/src/mercury/mercury_hash_string.h
@@ -0,0 +1,48 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+#ifndef MERCURY_HASH_STRING_H
+#define MERCURY_HASH_STRING_H
+
+#include "mercury_util_config.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Hash function name for unique ID to register.
+ *
+ * \param string [IN] string name
+ *
+ * \return Non-negative ID that corresponds to string name
+ */
+static HG_UTIL_INLINE unsigned int
+hg_hash_string(const char *string)
+{
+ /* This is the djb2 string hash function */
+
+ unsigned int result = 5381;
+ const unsigned char *p;
+
+ p = (const unsigned char *) string;
+
+ while (*p != '\0') {
+ result = (result << 5) + result + *p;
+ ++p;
+ }
+ return result;
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* MERCURY_HASH_STRING_H */
diff --git a/src/mercury/mercury_hash_table.c b/src/mercury/mercury_hash_table.c
new file mode 100644
index 0000000..d7d14df
--- /dev/null
+++ b/src/mercury/mercury_hash_table.c
@@ -0,0 +1,526 @@
+/*
+
+Copyright (c) 2005-2008, Simon Howard
+
+Permission to use, copy, modify, and/or distribute this software
+for any purpose with or without fee is hereby granted, provided
+that the above copyright notice and this permission notice appear
+in all copies.
+
+THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL
+WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE
+AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR
+CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
+NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+ */
+
+/* Hash table implementation */
+
+#include "mercury_hash_table.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+struct hg_hash_table_entry {
+ hg_hash_table_key_t key;
+ hg_hash_table_value_t value;
+ hg_hash_table_entry_t *next;
+};
+
+struct hg_hash_table {
+ hg_hash_table_entry_t **table;
+ unsigned int table_size;
+ hg_hash_table_hash_func_t hash_func;
+ hg_hash_table_equal_func_t equal_func;
+ hg_hash_table_key_free_func_t key_free_func;
+ hg_hash_table_value_free_func_t value_free_func;
+ unsigned int entries;
+ unsigned int prime_index;
+};
+
+/* This is a set of good hash table prime numbers, from:
+ * http://planetmath.org/goodhashtableprimes
+ * Each prime is roughly double the previous value, and as far as
+ * possible from the nearest powers of two. */
+
+static const unsigned int hash_table_primes[] = {
+ 193,
+ 389,
+ 769,
+ 1543,
+ 3079,
+ 6151,
+ 12289,
+ 24593,
+ 49157,
+ 98317,
+ 196613,
+ 393241,
+ 786433,
+ 1572869,
+ 3145739,
+ 6291469,
+ 12582917,
+ 25165843,
+ 50331653,
+ 100663319,
+ 201326611,
+ 402653189,
+ 805306457,
+ 1610612741,
+};
+
+static const unsigned int hash_table_num_primes =
+ sizeof(hash_table_primes) / sizeof(int);
+
+/* Internal function used to allocate the table on hash table creation
+ * and when enlarging the table */
+
+static int
+hash_table_allocate_table(hg_hash_table_t *hash_table)
+{
+ unsigned int new_table_size;
+
+ /* Determine the table size based on the current prime index.
+ * An attempt is made here to ensure sensible behavior if the
+ * maximum prime is exceeded, but in practice other things are
+ * likely to break long before that happens. */
+
+ if (hash_table->prime_index < hash_table_num_primes) {
+ new_table_size = hash_table_primes[hash_table->prime_index];
+ } else {
+ new_table_size = hash_table->entries * 10;
+ }
+
+ hash_table->table_size = new_table_size;
+
+ /* Allocate the table and initialise to NULL for all entries */
+
+ hash_table->table = (hg_hash_table_entry_t **) calloc(
+ hash_table->table_size, sizeof(hg_hash_table_entry_t *));
+
+ return hash_table->table != NULL;
+}
+
+/* Free an entry, calling the free functions if there are any registered */
+
+static void
+hash_table_free_entry(hg_hash_table_t *hash_table, hg_hash_table_entry_t *entry)
+{
+ /* If there is a function registered for freeing keys, use it to free
+ * the key */
+
+ if (hash_table->key_free_func != NULL) {
+ hash_table->key_free_func(entry->key);
+ }
+
+ /* Likewise with the value */
+
+ if (hash_table->value_free_func != NULL) {
+ hash_table->value_free_func(entry->value);
+ }
+
+ /* Free the data structure */
+
+ free(entry);
+}
+
+hg_hash_table_t *
+hg_hash_table_new(
+ hg_hash_table_hash_func_t hash_func, hg_hash_table_equal_func_t equal_func)
+{
+ hg_hash_table_t *hash_table;
+
+ /* Allocate a new hash table structure */
+
+ hash_table = (hg_hash_table_t *) malloc(sizeof(hg_hash_table_t));
+
+ if (hash_table == NULL) {
+ return NULL;
+ }
+
+ hash_table->hash_func = hash_func;
+ hash_table->equal_func = equal_func;
+ hash_table->key_free_func = NULL;
+ hash_table->value_free_func = NULL;
+ hash_table->entries = 0;
+ hash_table->prime_index = 0;
+
+ /* Allocate the table */
+
+ if (!hash_table_allocate_table(hash_table)) {
+ free(hash_table);
+
+ return NULL;
+ }
+
+ return hash_table;
+}
+
+void
+hg_hash_table_free(hg_hash_table_t *hash_table)
+{
+ hg_hash_table_entry_t *rover;
+ hg_hash_table_entry_t *next;
+ unsigned int i;
+
+ /* Free all entries in all chains */
+
+ for (i = 0; i < hash_table->table_size; ++i) {
+ rover = hash_table->table[i];
+ while (rover != NULL) {
+ next = rover->next;
+ hash_table_free_entry(hash_table, rover);
+ rover = next;
+ }
+ }
+
+ /* Free the table */
+
+ free(hash_table->table);
+
+ /* Free the hash table structure */
+
+ free(hash_table);
+}
+
+void
+hg_hash_table_register_free_functions(hg_hash_table_t *hash_table,
+ hg_hash_table_key_free_func_t key_free_func,
+ hg_hash_table_value_free_func_t value_free_func)
+{
+ hash_table->key_free_func = key_free_func;
+ hash_table->value_free_func = value_free_func;
+}
+
+static int
+hash_table_enlarge(hg_hash_table_t *hash_table)
+{
+ hg_hash_table_entry_t **old_table;
+ unsigned int old_table_size;
+ unsigned int old_prime_index;
+ hg_hash_table_entry_t *rover;
+ hg_hash_table_entry_t *next;
+ unsigned int entry_index;
+ unsigned int i;
+
+ /* Store a copy of the old table */
+
+ old_table = hash_table->table;
+ old_table_size = hash_table->table_size;
+ old_prime_index = hash_table->prime_index;
+
+ /* Allocate a new, larger table */
+
+ ++hash_table->prime_index;
+
+ if (!hash_table_allocate_table(hash_table)) {
+
+ /* Failed to allocate the new table */
+
+ hash_table->table = old_table;
+ hash_table->table_size = old_table_size;
+ hash_table->prime_index = old_prime_index;
+
+ return 0;
+ }
+
+ /* Link all entries from all chains into the new table */
+
+ for (i = 0; i < old_table_size; ++i) {
+ rover = old_table[i];
+
+ while (rover != NULL) {
+ next = rover->next;
+
+ /* Find the index into the new table */
+
+ entry_index =
+ hash_table->hash_func(rover->key) % hash_table->table_size;
+
+ /* Link this entry into the chain */
+
+ rover->next = hash_table->table[entry_index];
+ hash_table->table[entry_index] = rover;
+
+ /* Advance to next in the chain */
+
+ rover = next;
+ }
+ }
+
+ /* Free the old table */
+
+ free(old_table);
+
+ return 1;
+}
+
+int
+hg_hash_table_insert(hg_hash_table_t *hash_table, hg_hash_table_key_t key,
+ hg_hash_table_value_t value)
+{
+ hg_hash_table_entry_t *rover;
+ hg_hash_table_entry_t *newentry;
+ unsigned int entry_index;
+
+ /* If there are too many items in the table with respect to the table
+ * size, the number of hash collisions increases and performance
+ * decreases. Enlarge the table size to prevent this happening */
+
+ if ((hash_table->entries * 3) / hash_table->table_size > 0) {
+
+ /* Table is more than 1/3 full */
+
+ if (!hash_table_enlarge(hash_table)) {
+
+ /* Failed to enlarge the table */
+
+ return 0;
+ }
+ }
+
+ /* Generate the hash of the key and hence the index into the table */
+
+ entry_index = hash_table->hash_func(key) % hash_table->table_size;
+
+ /* Traverse the chain at this location and look for an existing
+ * entry with the same key */
+
+ rover = hash_table->table[entry_index];
+
+ while (rover != NULL) {
+ if (hash_table->equal_func(rover->key, key) != 0) {
+
+ /* Same key: overwrite this entry with new data */
+
+ /* If there is a value free function, free the old data
+ * before adding in the new data */
+
+ if (hash_table->value_free_func != NULL) {
+ hash_table->value_free_func(rover->value);
+ }
+
+ /* Same with the key: use the new key value and free
+ * the old one */
+
+ if (hash_table->key_free_func != NULL) {
+ hash_table->key_free_func(rover->key);
+ }
+
+ rover->key = key;
+ rover->value = value;
+
+ /* Finished */
+
+ return 1;
+ }
+ rover = rover->next;
+ }
+
+ /* Not in the hash table yet. Create a new entry */
+
+ newentry = (hg_hash_table_entry_t *) malloc(sizeof(hg_hash_table_entry_t));
+
+ if (newentry == NULL) {
+ return 0;
+ }
+
+ newentry->key = key;
+ newentry->value = value;
+
+ /* Link into the list */
+
+ newentry->next = hash_table->table[entry_index];
+ hash_table->table[entry_index] = newentry;
+
+ /* Maintain the count of the number of entries */
+
+ ++hash_table->entries;
+
+ /* Added successfully */
+
+ return 1;
+}
+
+hg_hash_table_value_t
+hg_hash_table_lookup(hg_hash_table_t *hash_table, hg_hash_table_key_t key)
+{
+ hg_hash_table_entry_t *rover;
+ unsigned int entry_index;
+
+ /* Generate the hash of the key and hence the index into the table */
+
+ entry_index = hash_table->hash_func(key) % hash_table->table_size;
+
+ /* Walk the chain at this index until the corresponding entry is
+ * found */
+
+ rover = hash_table->table[entry_index];
+
+ while (rover != NULL) {
+ if (hash_table->equal_func(key, rover->key) != 0) {
+
+ /* Found the entry. Return the data. */
+
+ return rover->value;
+ }
+ rover = rover->next;
+ }
+
+ /* Not found */
+
+ return HG_HASH_TABLE_NULL;
+}
+
+int
+hg_hash_table_remove(hg_hash_table_t *hash_table, hg_hash_table_key_t key)
+{
+ hg_hash_table_entry_t **rover;
+ hg_hash_table_entry_t *entry;
+ unsigned int entry_index;
+ int result;
+
+ /* Generate the hash of the key and hence the index into the table */
+
+ entry_index = hash_table->hash_func(key) % hash_table->table_size;
+
+ /* Rover points at the pointer which points at the current entry
+ * in the chain being inspected. ie. the entry in the table, or
+ * the "next" pointer of the previous entry in the chain. This
+ * allows us to unlink the entry when we find it. */
+
+ result = 0;
+ rover = &hash_table->table[entry_index];
+
+ while (*rover != NULL) {
+
+ if (hash_table->equal_func(key, (*rover)->key) != 0) {
+
+ /* This is the entry to remove */
+
+ entry = *rover;
+
+ /* Unlink from the list */
+
+ *rover = entry->next;
+
+ /* Destroy the entry structure */
+
+ hash_table_free_entry(hash_table, entry);
+
+ /* Track count of entries */
+
+ --hash_table->entries;
+
+ result = 1;
+
+ break;
+ }
+
+ /* Advance to the next entry */
+
+ rover = &((*rover)->next);
+ }
+
+ return result;
+}
+
+unsigned int
+hg_hash_table_num_entries(hg_hash_table_t *hash_table)
+{
+ return hash_table->entries;
+}
+
+void
+hg_hash_table_iterate(
+ hg_hash_table_t *hash_table, hg_hash_table_iter_t *iterator)
+{
+ unsigned int chain;
+
+ iterator->hash_table = hash_table;
+
+ /* Default value of next if no entries are found. */
+
+ iterator->next_entry = NULL;
+
+ /* Find the first entry */
+
+ for (chain = 0; chain < hash_table->table_size; ++chain) {
+
+ if (hash_table->table[chain] != NULL) {
+ iterator->next_entry = hash_table->table[chain];
+ iterator->next_chain = chain;
+ break;
+ }
+ }
+}
+
+int
+hg_hash_table_iter_has_more(hg_hash_table_iter_t *iterator)
+{
+ return iterator->next_entry != NULL;
+}
+
+hg_hash_table_value_t
+hg_hash_table_iter_next(hg_hash_table_iter_t *iterator)
+{
+ hg_hash_table_entry_t *current_entry;
+ hg_hash_table_t *hash_table;
+ hg_hash_table_value_t result;
+ unsigned int chain;
+
+ hash_table = iterator->hash_table;
+
+ /* No more entries? */
+
+ if (iterator->next_entry == NULL) {
+ return HG_HASH_TABLE_NULL;
+ }
+
+ /* Result is immediately available */
+
+ current_entry = iterator->next_entry;
+ result = current_entry->value;
+
+ /* Find the next entry */
+
+ if (current_entry->next != NULL) {
+
+ /* Next entry in current chain */
+
+ iterator->next_entry = current_entry->next;
+
+ } else {
+
+ /* None left in this chain, so advance to the next chain */
+
+ chain = iterator->next_chain + 1;
+
+ /* Default value if no next chain found */
+
+ iterator->next_entry = NULL;
+
+ while (chain < hash_table->table_size) {
+
+ /* Is there anything in this chain? */
+
+ if (hash_table->table[chain] != NULL) {
+ iterator->next_entry = hash_table->table[chain];
+ break;
+ }
+
+ /* Try the next chain */
+
+ ++chain;
+ }
+
+ iterator->next_chain = chain;
+ }
+
+ return result;
+}
diff --git a/src/mercury/mercury_hash_table.h b/src/mercury/mercury_hash_table.h
new file mode 100644
index 0000000..619857b
--- /dev/null
+++ b/src/mercury/mercury_hash_table.h
@@ -0,0 +1,252 @@
+/*
+
+Copyright (c) 2005-2008, Simon Howard
+
+Permission to use, copy, modify, and/or distribute this software
+for any purpose with or without fee is hereby granted, provided
+that the above copyright notice and this permission notice appear
+in all copies.
+
+THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL
+WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE
+AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR
+CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
+NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+ */
+
+/**
+ * \file mercury_hash_table.h
+ *
+ * \brief Hash table.
+ *
+ * A hash table stores a set of values which can be addressed by a
+ * key. Given the key, the corresponding value can be looked up
+ * quickly.
+ *
+ * To create a hash table, use \ref hg_hash_table_new. To destroy a
+ * hash table, use \ref hg_hash_table_free.
+ *
+ * To insert a value into a hash table, use \ref hg_hash_table_insert.
+ *
+ * To remove a value from a hash table, use \ref hg_hash_table_remove.
+ *
+ * To look up a value by its key, use \ref hg_hash_table_lookup.
+ *
+ * To iterate over all values in a hash table, use
+ * \ref hg_hash_table_iterate to initialize a \ref hg_hash_table_iter
+ * structure. Each value can then be read in turn using
+ * \ref hg_hash_table_iter_next and \ref hg_hash_table_iter_has_more.
+ */
+
+#ifndef HG_HASH_TABLE_H
+#define HG_HASH_TABLE_H
+
+#include "mercury_util_config.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * A hash table structure.
+ */
+
+typedef struct hg_hash_table hg_hash_table_t;
+
+/**
+ * Structure used to iterate over a hash table.
+ */
+
+typedef struct hg_hash_table_iter hg_hash_table_iter_t;
+
+/**
+ * Internal structure representing an entry in a hash table.
+ */
+
+typedef struct hg_hash_table_entry hg_hash_table_entry_t;
+
+/**
+ * A key to look up a value in a \ref hg_hash_table_t.
+ */
+
+typedef void *hg_hash_table_key_t;
+
+/**
+ * A value stored in a \ref hg_hash_table_t.
+ */
+
+typedef void *hg_hash_table_value_t;
+
+/**
+ * Definition of a \ref hg_hash_table_iter.
+ */
+
+struct hg_hash_table_iter {
+ hg_hash_table_t *hash_table;
+ hg_hash_table_entry_t *next_entry;
+ unsigned int next_chain;
+};
+
+/**
+ * A null \ref HashTableValue.
+ */
+
+#define HG_HASH_TABLE_NULL ((void *) 0)
+
+/**
+ * Hash function used to generate hash values for keys used in a hash
+ * table.
+ *
+ * \param value The value to generate a hash value for.
+ * \return The hash value.
+ */
+
+typedef unsigned int (*hg_hash_table_hash_func_t)(hg_hash_table_key_t value);
+
+/**
+ * Function used to compare two keys for equality.
+ *
+ * \return Non-zero if the two keys are equal, zero if the keys are
+ * not equal.
+ */
+
+typedef int (*hg_hash_table_equal_func_t)(
+ hg_hash_table_key_t value1, hg_hash_table_key_t value2);
+
+/**
+ * Type of function used to free keys when entries are removed from a
+ * hash table.
+ */
+
+typedef void (*hg_hash_table_key_free_func_t)(hg_hash_table_key_t value);
+
+/**
+ * Type of function used to free values when entries are removed from a
+ * hash table.
+ */
+
+typedef void (*hg_hash_table_value_free_func_t)(hg_hash_table_value_t value);
+
+/**
+ * Create a new hash table.
+ *
+ * \param hash_func Function used to generate hash keys for the
+ * keys used in the table.
+ * \param equal_func Function used to test keys used in the table
+ * for equality.
+ * \return A new hash table structure, or NULL if it
+ * was not possible to allocate the new hash
+ * table.
+ */
+HG_UTIL_PUBLIC hg_hash_table_t *
+hg_hash_table_new(
+ hg_hash_table_hash_func_t hash_func, hg_hash_table_equal_func_t equal_func);
+
+/**
+ * Destroy a hash table.
+ *
+ * \param hash_table The hash table to destroy.
+ */
+HG_UTIL_PUBLIC void
+hg_hash_table_free(hg_hash_table_t *hash_table);
+
+/**
+ * Register functions used to free the key and value when an entry is
+ * removed from a hash table.
+ *
+ * \param hash_table The hash table.
+ * \param key_free_func Function used to free keys.
+ * \param value_free_func Function used to free values.
+ */
+HG_UTIL_PUBLIC void
+hg_hash_table_register_free_functions(hg_hash_table_t *hash_table,
+ hg_hash_table_key_free_func_t key_free_func,
+ hg_hash_table_value_free_func_t value_free_func);
+
+/**
+ * Insert a value into a hash table, overwriting any existing entry
+ * using the same key.
+ *
+ * \param hash_table The hash table.
+ * \param key The key for the new value.
+ * \param value The value to insert.
+ * \return Non-zero if the value was added successfully,
+ * or zero if it was not possible to allocate
+ * memory for the new entry.
+ */
+HG_UTIL_PUBLIC int
+hg_hash_table_insert(hg_hash_table_t *hash_table, hg_hash_table_key_t key,
+ hg_hash_table_value_t value);
+
+/**
+ * Look up a value in a hash table by key.
+ *
+ * \param hash_table The hash table.
+ * \param key The key of the value to look up.
+ * \return The value, or \ref HASH_TABLE_NULL if there
+ * is no value with that key in the hash table.
+ */
+HG_UTIL_PUBLIC hg_hash_table_value_t
+hg_hash_table_lookup(hg_hash_table_t *hash_table, hg_hash_table_key_t key);
+
+/**
+ * Remove a value from a hash table.
+ *
+ * \param hash_table The hash table.
+ * \param key The key of the value to remove.
+ * \return Non-zero if a key was removed, or zero if the
+ * specified key was not found in the hash table.
+ */
+HG_UTIL_PUBLIC int
+hg_hash_table_remove(hg_hash_table_t *hash_table, hg_hash_table_key_t key);
+
+/**
+ * Retrieve the number of entries in a hash table.
+ *
+ * \param hash_table The hash table.
+ * \return The number of entries in the hash table.
+ */
+HG_UTIL_PUBLIC unsigned int
+hg_hash_table_num_entries(hg_hash_table_t *hash_table);
+
+/**
+ * Initialise a \ref HashTableIterator to iterate over a hash table.
+ *
+ * \param hash_table The hash table.
+ * \param iter Pointer to an iterator structure to
+ * initialise.
+ */
+HG_UTIL_PUBLIC void
+hg_hash_table_iterate(hg_hash_table_t *hash_table, hg_hash_table_iter_t *iter);
+
+/**
+ * Determine if there are more keys in the hash table to iterate over.
+ *
+ * \param iterator The hash table iterator.
+ * \return Zero if there are no more values to iterate
+ * over, non-zero if there are more values to
+ * iterate over.
+ */
+HG_UTIL_PUBLIC int
+hg_hash_table_iter_has_more(hg_hash_table_iter_t *iterator);
+
+/**
+ * Using a hash table iterator, retrieve the next key.
+ *
+ * \param iterator The hash table iterator.
+ * \return The next key from the hash table, or
+ * \ref HG_HASH_TABLE_NULL if there are no more
+ * keys to iterate over.
+ */
+HG_UTIL_PUBLIC hg_hash_table_value_t
+hg_hash_table_iter_next(hg_hash_table_iter_t *iterator);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* HG_HASH_TABLE_H */
diff --git a/src/mercury/mercury_list.h b/src/mercury/mercury_list.h
new file mode 100644
index 0000000..5a29e4a
--- /dev/null
+++ b/src/mercury/mercury_list.h
@@ -0,0 +1,126 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+/* Code below is derived from sys/queue.h which follows the below notice:
+ *
+ * Copyright (c) 1991, 1993
+ * The Regents of the University of California. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 3. Neither the name of the University nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ *
+ * @(#)queue.h 8.5 (Berkeley) 8/20/94
+ */
+
+#ifndef MERCURY_LIST_H
+#define MERCURY_LIST_H
+
+#define HG_LIST_HEAD_INITIALIZER(name) \
+ { \
+ NULL \
+ }
+
+#define HG_LIST_HEAD_INIT(struct_head_name, var_name) \
+ struct struct_head_name var_name = HG_LIST_HEAD_INITIALIZER(var_name)
+
+#define HG_LIST_HEAD_DECL(struct_head_name, struct_entry_name) \
+ struct struct_head_name { \
+ struct struct_entry_name *head; \
+ }
+
+#define HG_LIST_HEAD(struct_entry_name) \
+ struct { \
+ struct struct_entry_name *head; \
+ }
+
+#define HG_LIST_ENTRY(struct_entry_name) \
+ struct { \
+ struct struct_entry_name *next; \
+ struct struct_entry_name **prev; \
+ }
+
+#define HG_LIST_INIT(head_ptr) \
+ do { \
+ (head_ptr)->head = NULL; \
+ } while (/*CONSTCOND*/ 0)
+
+#define HG_LIST_IS_EMPTY(head_ptr) ((head_ptr)->head == NULL)
+
+#define HG_LIST_FIRST(head_ptr) ((head_ptr)->head)
+
+#define HG_LIST_NEXT(entry_ptr, entry_field_name) \
+ ((entry_ptr)->entry_field_name.next)
+
+#define HG_LIST_INSERT_AFTER(list_entry_ptr, entry_ptr, entry_field_name) \
+ do { \
+ if (((entry_ptr)->entry_field_name.next = \
+ (list_entry_ptr)->entry_field_name.next) != NULL) \
+ (list_entry_ptr)->entry_field_name.next->entry_field_name.prev = \
+ &(entry_ptr)->entry_field_name.next; \
+ (list_entry_ptr)->entry_field_name.next = (entry_ptr); \
+ (entry_ptr)->entry_field_name.prev = \
+ &(list_entry_ptr)->entry_field_name.next; \
+ } while (/*CONSTCOND*/ 0)
+
+#define HG_LIST_INSERT_BEFORE(list_entry_ptr, entry_ptr, entry_field_name) \
+ do { \
+ (entry_ptr)->entry_field_name.prev = \
+ (list_entry_ptr)->entry_field_name.prev; \
+ (entry_ptr)->entry_field_name.next = (list_entry_ptr); \
+ *(list_entry_ptr)->entry_field_name.prev = (entry_ptr); \
+ (list_entry_ptr)->entry_field_name.prev = \
+ &(entry_ptr)->entry_field_name.next; \
+ } while (/*CONSTCOND*/ 0)
+
+#define HG_LIST_INSERT_HEAD(head_ptr, entry_ptr, entry_field_name) \
+ do { \
+ if (((entry_ptr)->entry_field_name.next = (head_ptr)->head) != NULL) \
+ (head_ptr)->head->entry_field_name.prev = \
+ &(entry_ptr)->entry_field_name.next; \
+ (head_ptr)->head = (entry_ptr); \
+ (entry_ptr)->entry_field_name.prev = &(head_ptr)->head; \
+ } while (/*CONSTCOND*/ 0)
+
+/* TODO would be nice to not have any condition */
+#define HG_LIST_REMOVE(entry_ptr, entry_field_name) \
+ do { \
+ if ((entry_ptr)->entry_field_name.next != NULL) \
+ (entry_ptr)->entry_field_name.next->entry_field_name.prev = \
+ (entry_ptr)->entry_field_name.prev; \
+ *(entry_ptr)->entry_field_name.prev = \
+ (entry_ptr)->entry_field_name.next; \
+ } while (/*CONSTCOND*/ 0)
+
+#define HG_LIST_FOREACH(var, head_ptr, entry_field_name) \
+ for ((var) = ((head_ptr)->head); (var); \
+ (var) = ((var)->entry_field_name.next))
+
+#endif /* MERCURY_LIST_H */
diff --git a/src/mercury/mercury_log.c b/src/mercury/mercury_log.c
new file mode 100644
index 0000000..84c468a
--- /dev/null
+++ b/src/mercury/mercury_log.c
@@ -0,0 +1,128 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+#include "mercury_log.h"
+
+#include <stdarg.h>
+
+/****************/
+/* Local Macros */
+/****************/
+
+#define HG_LOG_MAX_BUF 256
+
+#ifdef HG_UTIL_HAS_LOG_COLOR
+# define HG_LOG_ESC "\033"
+# define HG_LOG_RESET HG_LOG_ESC "[0m"
+# define HG_LOG_REG HG_LOG_ESC "[0;"
+# define HG_LOG_BOLD HG_LOG_ESC "[1;"
+# define HG_LOG_RED "31m"
+# define HG_LOG_GREEN "32m"
+# define HG_LOG_YELLOW "33m"
+# define HG_LOG_BLUE "34m"
+# define HG_LOG_MAGENTA "35m"
+# define HG_LOG_CYAN "36m"
+#endif
+
+/*******************/
+/* Local Variables */
+/*******************/
+
+static int (*hg_log_func_g)(FILE *stream, const char *format, ...) = fprintf;
+static FILE *hg_log_stream_debug_g = NULL;
+static FILE *hg_log_stream_warning_g = NULL;
+static FILE *hg_log_stream_error_g = NULL;
+
+/*---------------------------------------------------------------------------*/
+void
+hg_log_set_func(int (*log_func)(FILE *stream, const char *format, ...))
+{
+ hg_log_func_g = log_func;
+}
+
+/*---------------------------------------------------------------------------*/
+void
+hg_log_set_stream_debug(FILE *stream)
+{
+ hg_log_stream_debug_g = stream;
+}
+
+/*---------------------------------------------------------------------------*/
+void
+hg_log_set_stream_warning(FILE *stream)
+{
+ hg_log_stream_warning_g = stream;
+}
+
+/*---------------------------------------------------------------------------*/
+void
+hg_log_set_stream_error(FILE *stream)
+{
+ hg_log_stream_error_g = stream;
+}
+
+/*---------------------------------------------------------------------------*/
+void
+hg_log_write(unsigned int log_type, const char *module, const char *file,
+ unsigned int line, const char *func, const char *format, ...)
+{
+ char buf[HG_LOG_MAX_BUF];
+ FILE *stream = NULL;
+ const char *msg_type = NULL;
+#ifdef HG_UTIL_HAS_LOG_COLOR
+ const char *color = "";
+#endif
+ va_list ap;
+
+ switch (log_type) {
+ case HG_LOG_TYPE_DEBUG:
+#ifdef HG_UTIL_HAS_LOG_COLOR
+ color = HG_LOG_BLUE;
+#endif
+ stream = hg_log_stream_debug_g ? hg_log_stream_debug_g : stdout;
+ msg_type = "Debug";
+ break;
+ case HG_LOG_TYPE_WARNING:
+#ifdef HG_UTIL_HAS_LOG_COLOR
+ color = HG_LOG_MAGENTA;
+#endif
+ stream = hg_log_stream_warning_g ? hg_log_stream_warning_g : stdout;
+ msg_type = "Warning";
+ break;
+ case HG_LOG_TYPE_ERROR:
+#ifdef HG_UTIL_HAS_LOG_COLOR
+ color = HG_LOG_RED;
+#endif
+ stream = hg_log_stream_error_g ? hg_log_stream_error_g : stderr;
+ msg_type = "Error";
+ break;
+ default:
+ return;
+ };
+
+ va_start(ap, format);
+ vsnprintf(buf, HG_LOG_MAX_BUF, format, ap);
+ va_end(ap);
+
+/* Print using logging function */
+#ifdef HG_UTIL_HAS_LOG_COLOR
+ hg_log_func_g(stream,
+ "# %s%s[%s -- %s%s%s%s%s -- %s:%d]%s\n"
+ "## %s%s%s()%s: %s\n",
+ HG_LOG_REG, color, module, HG_LOG_BOLD, color, msg_type, HG_LOG_REG,
+ color, file, line, HG_LOG_RESET, HG_LOG_REG, HG_LOG_YELLOW, func,
+ HG_LOG_RESET, buf);
+#else
+ hg_log_func_g(stream,
+ "# %s -- %s -- %s:%d\n"
+ " # %s(): %s\n",
+ module, msg_type, file, line, func, buf);
+#endif
+}
diff --git a/src/mercury/mercury_log.h b/src/mercury/mercury_log.h
new file mode 100644
index 0000000..9ee48f2
--- /dev/null
+++ b/src/mercury/mercury_log.h
@@ -0,0 +1,104 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+#ifndef MERCURY_LOG_H
+#define MERCURY_LOG_H
+
+#include "mercury_util_config.h"
+
+#include <stdio.h>
+
+#define HG_LOG_TYPE_NONE 0
+#define HG_LOG_TYPE_DEBUG 0x01
+#define HG_LOG_TYPE_WARNING 0x02
+#define HG_LOG_TYPE_ERROR 0x04
+
+/* For compatibility */
+#if defined(__STDC_VERSION__) && (__STDC_VERSION__ < 199901L)
+# if defined(__GNUC__) && (__GNUC__ >= 2)
+# define __func__ __FUNCTION__
+# else
+# define __func__ "<unknown>"
+# endif
+#elif defined(_WIN32)
+# define __func__ __FUNCTION__
+#endif
+
+#define HG_LOG_WRITE_ERROR(HG_LOG_MODULE_NAME, ...) \
+ do { \
+ hg_log_write(HG_LOG_TYPE_ERROR, HG_LOG_MODULE_NAME, __FILE__, \
+ __LINE__, __func__, __VA_ARGS__); \
+ } while (0)
+#define HG_LOG_WRITE_DEBUG(HG_LOG_MODULE_NAME, ...) \
+ do { \
+ hg_log_write(HG_LOG_TYPE_DEBUG, HG_LOG_MODULE_NAME, __FILE__, \
+ __LINE__, __func__, __VA_ARGS__); \
+ } while (0)
+#define HG_LOG_WRITE_WARNING(HG_LOG_MODULE_NAME, ...) \
+ do { \
+ hg_log_write(HG_LOG_TYPE_WARNING, HG_LOG_MODULE_NAME, __FILE__, \
+ __LINE__, __func__, __VA_ARGS__); \
+ } while (0)
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Set the logging function.
+ *
+ * \param log_func [IN] pointer to function
+ */
+HG_UTIL_PUBLIC void
+hg_log_set_func(int (*log_func)(FILE *stream, const char *format, ...));
+
+/**
+ * Set the stream for debug output.
+ *
+ * \param stream [IN/OUT] pointer to stream
+ */
+HG_UTIL_PUBLIC void
+hg_log_set_stream_debug(FILE *stream);
+
+/**
+ * Set the stream for warning output.
+ *
+ * \param stream [IN/OUT] pointer to stream
+ */
+HG_UTIL_PUBLIC void
+hg_log_set_stream_warning(FILE *stream);
+
+/**
+ * Set the stream for error output.
+ *
+ * \param stream [IN/OUT] pointer to stream
+ */
+HG_UTIL_PUBLIC void
+hg_log_set_stream_error(FILE *stream);
+
+/**
+ * Write log.
+ *
+ * \param log_type [IN] log type (HG_LOG_TYPE_DEBUG, etc)
+ * \param module [IN] module name
+ * \param file [IN] file name
+ * \param line [IN] line number
+ * \param func [IN] function name
+ * \param format [IN] string format
+ */
+HG_UTIL_PUBLIC void
+hg_log_write(unsigned int log_type, const char *module, const char *file,
+ unsigned int line, const char *func, const char *format, ...);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* MERCURY_LOG_H */
diff --git a/src/mercury/mercury_mem.c b/src/mercury/mercury_mem.c
new file mode 100644
index 0000000..59a0acb
--- /dev/null
+++ b/src/mercury/mercury_mem.c
@@ -0,0 +1,177 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+#include "mercury_mem.h"
+
+#include "mercury_util_error.h"
+
+#ifdef _WIN32
+# include <windows.h>
+#else
+# include <errno.h>
+# include <fcntl.h> /* For O_* constants */
+# include <string.h>
+# include <sys/mman.h>
+# include <sys/stat.h> /* For mode constants */
+# include <sys/types.h>
+# include <unistd.h>
+#endif
+#include <stdlib.h>
+
+/*---------------------------------------------------------------------------*/
+long
+hg_mem_get_page_size(void)
+{
+ long page_size;
+
+#ifdef _WIN32
+ SYSTEM_INFO system_info;
+ GetSystemInfo(&system_info);
+ page_size = system_info.dwPageSize;
+#else
+ page_size = sysconf(_SC_PAGE_SIZE);
+#endif
+
+ return page_size;
+}
+
+/*---------------------------------------------------------------------------*/
+void *
+hg_mem_aligned_alloc(size_t alignment, size_t size)
+{
+ void *mem_ptr = NULL;
+
+#ifdef _WIN32
+ mem_ptr = _aligned_malloc(size, alignment);
+#else
+# ifdef _ISOC11_SOURCE
+ mem_ptr = aligned_alloc(alignment, size);
+# else
+ int rc = posix_memalign(&mem_ptr, alignment, size);
+ if (rc != 0)
+ return NULL;
+# endif
+#endif
+
+ return mem_ptr;
+}
+
+/*---------------------------------------------------------------------------*/
+void
+hg_mem_aligned_free(void *mem_ptr)
+{
+#ifdef _WIN32
+ _aligned_free(mem_ptr);
+#else
+ free(mem_ptr);
+#endif
+}
+
+/*---------------------------------------------------------------------------*/
+void *
+hg_mem_shm_map(const char *name, size_t size, hg_util_bool_t create)
+{
+ void *mem_ptr = NULL;
+#ifdef _WIN32
+ HANDLE fd = INVALID_HANDLE_VALUE;
+ LARGE_INTEGER large = {.QuadPart = size};
+ DWORD access = FILE_MAP_READ | FILE_MAP_WRITE;
+ BOOL rc;
+
+ if (create) {
+ fd = CreateFileMappingA(INVALID_HANDLE_VALUE, 0, PAGE_READWRITE,
+ large.HighPart, large.LowPart, name);
+ HG_UTIL_CHECK_ERROR_NORET(!fd, error, "CreateFileMappingA() failed");
+ } else {
+ fd = OpenFileMappingA(access, FALSE, name);
+ HG_UTIL_CHECK_ERROR_NORET(!fd, error, "OpenFileMappingA() failed");
+ }
+
+ mem_ptr = MapViewOfFile(fd, access, 0, 0, size);
+ HG_UTIL_CHECK_ERROR_NORET(!mem_ptr, error, "MapViewOfFile() failed");
+
+ /* The handle can be closed without affecting the memory mapping */
+ rc = CloseHandle(fd);
+ HG_UTIL_CHECK_ERROR_NORET(!rc, error, "CloseHandle() failed");
+#else
+ int fd = 0;
+ int flags = O_RDWR | (create ? O_CREAT : 0);
+ struct stat shm_stat;
+ int rc;
+
+ fd = shm_open(name, flags, S_IRUSR | S_IWUSR);
+ HG_UTIL_CHECK_ERROR_NORET(
+ fd < 0, error, "shm_open() failed (%s)", strerror(errno));
+
+ rc = fstat(fd, &shm_stat);
+ HG_UTIL_CHECK_ERROR_NORET(
+ rc != 0, error, "fstat() failed (%s)", strerror(errno));
+
+ if (shm_stat.st_size == 0) {
+ rc = ftruncate(fd, (off_t) size);
+ HG_UTIL_CHECK_ERROR_NORET(
+ rc != 0, error, "ftruncate() failed (%s)", strerror(errno));
+ } else
+ HG_UTIL_CHECK_ERROR_NORET(
+ shm_stat.st_size < (off_t) size, error, "shm file size too small");
+
+ mem_ptr = mmap(NULL, size, PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0);
+ HG_UTIL_CHECK_ERROR_NORET(
+ mem_ptr == MAP_FAILED, error, "mmap() failed (%s)", strerror(errno));
+
+ /* The file descriptor can be closed without affecting the memory mapping */
+ rc = close(fd);
+ HG_UTIL_CHECK_ERROR_NORET(
+ rc != 0, error, "close() failed (%s)", strerror(errno));
+#endif
+
+ return mem_ptr;
+
+error:
+#ifdef _WIN32
+ if (fd)
+ CloseHandle(fd);
+#else
+ if (fd > 0)
+ close(fd);
+#endif
+
+ return NULL;
+}
+
+/*---------------------------------------------------------------------------*/
+int
+hg_mem_shm_unmap(const char *name, void *mem_ptr, size_t size)
+{
+ int ret = HG_UTIL_SUCCESS;
+
+#ifdef _WIN32
+ if (mem_ptr) {
+ BOOL rc = UnmapViewOfFile(mem_ptr);
+ HG_UTIL_CHECK_ERROR(
+ !rc, done, ret, HG_UTIL_FAIL, "UnmapViewOfFile() failed");
+ }
+#else
+ if (mem_ptr && mem_ptr != MAP_FAILED) {
+ int rc = munmap(mem_ptr, size);
+ HG_UTIL_CHECK_ERROR(rc != 0, done, ret, HG_UTIL_FAIL,
+ "munmap() failed (%s)", strerror(errno));
+ }
+
+ if (name) {
+ int rc = shm_unlink(name);
+ HG_UTIL_CHECK_ERROR(rc != 0, done, ret, HG_UTIL_FAIL,
+ "shm_unlink() failed (%s)", strerror(errno));
+ }
+#endif
+
+done:
+ return ret;
+}
diff --git a/src/mercury/mercury_mem.h b/src/mercury/mercury_mem.h
new file mode 100644
index 0000000..c1ff53d
--- /dev/null
+++ b/src/mercury/mercury_mem.h
@@ -0,0 +1,92 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+#ifndef MERCURY_MEM_H
+#define MERCURY_MEM_H
+
+#include "mercury_util_config.h"
+
+/*************************************/
+/* Public Type and Struct Definition */
+/*************************************/
+
+/*****************/
+/* Public Macros */
+/*****************/
+
+#define HG_MEM_CACHE_LINE_SIZE 64
+#define HG_MEM_PAGE_SIZE 4096
+
+/*********************/
+/* Public Prototypes */
+/*********************/
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Get system default page size.
+ *
+ * \return page size on success or negative on failure
+ */
+HG_UTIL_PUBLIC long
+hg_mem_get_page_size(void);
+
+/**
+ * Allocate size bytes and return a pointer to the allocated memory.
+ * The memory address will be a multiple of alignment, which must be a power of
+ * two, and size should be a multiple of alignment.
+ *
+ * \param alignment [IN] alignment size
+ * \param size [IN] total requested size
+ *
+ * \return a pointer to the allocated memory, or NULL in case of failure
+ */
+HG_UTIL_PUBLIC void *
+hg_mem_aligned_alloc(size_t alignment, size_t size);
+
+/**
+ * Free memory allocated from hg_aligned_alloc().
+ *
+ * \param mem_ptr [IN] pointer to allocated memory
+ */
+HG_UTIL_PUBLIC void
+hg_mem_aligned_free(void *mem_ptr);
+
+/**
+ * Create/open a shared-memory mapped file of size \size with name \name.
+ *
+ * \param name [IN] name of mapped file
+ * \param size [IN] total requested size
+ * \param create [IN] create file if not existing
+ *
+ * \return a pointer to the mapped memory region, or NULL in case of failure
+ */
+HG_UTIL_PUBLIC void *
+hg_mem_shm_map(const char *name, size_t size, hg_util_bool_t create);
+
+/**
+ * Unmap a previously mapped region and close the file.
+ *
+ * \param name [IN] name of mapped file
+ * \param mem_ptr [IN] pointer to mapped memory region
+ * \param size [IN] size range of the mapped region
+ *
+ * \return non-negative on success, or negative in case of failure
+ */
+HG_UTIL_PUBLIC int
+hg_mem_shm_unmap(const char *name, void *mem_ptr, size_t size);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* MERCURY_MEM_H */
diff --git a/src/mercury/mercury_poll.c b/src/mercury/mercury_poll.c
new file mode 100644
index 0000000..57f7e3e
--- /dev/null
+++ b/src/mercury/mercury_poll.c
@@ -0,0 +1,531 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+#include "mercury_poll.h"
+#include "mercury_atomic.h"
+#include "mercury_event.h"
+#include "mercury_list.h"
+#include "mercury_thread_spin.h"
+#include "mercury_util_error.h"
+
+#include <stdlib.h>
+
+#if defined(_WIN32)
+/* TODO */
+#else
+# include <errno.h>
+# include <string.h>
+# include <unistd.h>
+# if defined(HG_UTIL_HAS_SYSEPOLL_H)
+# include <sys/epoll.h>
+# elif defined(HG_UTIL_HAS_SYSEVENT_H)
+# include <sys/event.h>
+# include <sys/time.h>
+# else
+# include <poll.h>
+# endif
+#endif /* defined(_WIN32) */
+
+/****************/
+/* Local Macros */
+/****************/
+
+#define HG_POLL_MAX_EVENTS 1024
+
+#ifndef MIN
+# define MIN(a, b) (((a) < (b)) ? (a) : (b))
+#endif
+
+/************************************/
+/* Local Type and Struct Definition */
+/************************************/
+
+struct hg_poll_data {
+#if defined(HG_UTIL_HAS_SYSEPOLL_H)
+ int fd;
+#elif defined(HG_UTIL_HAS_SYSEVENT_H)
+ struct kevent kev;
+#else
+ struct pollfd pollfd;
+#endif
+ hg_poll_cb_t poll_cb;
+ void *poll_arg;
+ HG_LIST_ENTRY(hg_poll_data) entry;
+};
+
+struct hg_poll_set {
+ int fd;
+ hg_atomic_int32_t nfds;
+ hg_poll_try_wait_cb_t try_wait_cb;
+ void *try_wait_arg;
+ HG_LIST_HEAD(hg_poll_data) poll_data_list;
+ hg_thread_spin_t poll_data_list_lock;
+};
+
+/********************/
+/* Local Prototypes */
+/********************/
+
+/*******************/
+/* Local Variables */
+/*******************/
+
+/*---------------------------------------------------------------------------*/
+hg_poll_set_t *
+hg_poll_create(void)
+{
+ struct hg_poll_set *hg_poll_set = NULL;
+
+ hg_poll_set = malloc(sizeof(struct hg_poll_set));
+ HG_UTIL_CHECK_ERROR_NORET(
+ hg_poll_set == NULL, error, "malloc() failed (%s)");
+#if defined(_WIN32)
+ /* TODO */
+#else
+ HG_LIST_INIT(&hg_poll_set->poll_data_list);
+ hg_thread_spin_init(&hg_poll_set->poll_data_list_lock);
+ hg_atomic_init32(&hg_poll_set->nfds, 0);
+ hg_poll_set->try_wait_cb = NULL;
+
+# if defined(HG_UTIL_HAS_SYSEPOLL_H)
+ hg_poll_set->fd = epoll_create1(0);
+ HG_UTIL_CHECK_ERROR_NORET(hg_poll_set->fd == -1, error,
+ "epoll_create1() failed (%s)", strerror(errno));
+# elif defined(HG_UTIL_HAS_SYSEVENT_H)
+ hg_poll_set->fd = kqueue();
+ HG_UTIL_CHECK_ERROR_NORET(
+ hg_poll_set->fd == -1, error, "kqueue() failed (%s)", strerror(errno));
+# else
+ hg_poll_set->fd = hg_event_create();
+ HG_UTIL_CHECK_ERROR_NORET(hg_poll_set->fd == -1, error,
+ "hg_event_create() failed (%s)", strerror(errno));
+# endif
+#endif /* defined(_WIN32) */
+
+ return hg_poll_set;
+
+error:
+ if (hg_poll_set) {
+ hg_thread_spin_destroy(&hg_poll_set->poll_data_list_lock);
+ free(hg_poll_set);
+ }
+ return NULL;
+}
+
+/*---------------------------------------------------------------------------*/
+int
+hg_poll_destroy(hg_poll_set_t *poll_set)
+{
+ int ret = HG_UTIL_SUCCESS;
+ int rc;
+
+ if (!poll_set)
+ goto done;
+
+#if defined(_WIN32)
+ /* TODO */
+#else
+ HG_UTIL_CHECK_ERROR(hg_atomic_get32(&poll_set->nfds), done, ret,
+ HG_UTIL_FAIL, "Poll set non empty");
+
+# if defined(HG_UTIL_HAS_SYSEPOLL_H) || defined(HG_UTIL_HAS_SYSEVENT_H)
+ /* Close poll descriptor */
+ rc = close(poll_set->fd);
+ HG_UTIL_CHECK_ERROR(rc == -1, done, ret, HG_UTIL_FAIL,
+ "close() failed (%s)", strerror(errno));
+# else
+ rc = hg_event_destroy(poll_set->fd);
+ HG_UTIL_CHECK_ERROR(rc == HG_UTIL_FAIL, done, ret, HG_UTIL_FAIL,
+ "hg_event_destroy() failed (%s)", strerror(errno));
+# endif
+
+ hg_thread_spin_destroy(&poll_set->poll_data_list_lock);
+#endif /* defined(_WIN32) */
+
+ free(poll_set);
+
+done:
+ return ret;
+}
+
+/*---------------------------------------------------------------------------*/
+int
+hg_poll_get_fd(hg_poll_set_t *poll_set)
+{
+ int fd = -1;
+
+ HG_UTIL_CHECK_ERROR_NORET(!poll_set, done, "NULL poll set");
+
+#if defined(_WIN32)
+ /* TODO */
+#else
+ fd = poll_set->fd;
+#endif
+
+done:
+ return fd;
+}
+
+/*---------------------------------------------------------------------------*/
+int
+hg_poll_set_try_wait(
+ hg_poll_set_t *poll_set, hg_poll_try_wait_cb_t try_wait_cb, void *arg)
+{
+ int ret = HG_UTIL_SUCCESS;
+
+ HG_UTIL_CHECK_ERROR(!poll_set, done, ret, HG_UTIL_FAIL, "NULL poll set");
+
+ poll_set->try_wait_cb = try_wait_cb;
+ poll_set->try_wait_arg = arg;
+
+done:
+ return ret;
+}
+
+/*---------------------------------------------------------------------------*/
+int
+hg_poll_add(hg_poll_set_t *poll_set, int fd, unsigned int flags,
+ hg_poll_cb_t poll_cb, void *poll_arg)
+{
+ struct hg_poll_data *hg_poll_data = NULL;
+ int ret = HG_UTIL_SUCCESS;
+
+ HG_UTIL_CHECK_ERROR(!poll_set, done, ret, HG_UTIL_FAIL, "NULL poll set");
+
+ /* Allocate poll data that can hold user data and callback */
+ hg_poll_data = malloc(sizeof(struct hg_poll_data));
+ HG_UTIL_CHECK_ERROR(
+ !hg_poll_data, done, ret, HG_UTIL_FAIL, "malloc() failed (%s)");
+ memset(hg_poll_data, 0, sizeof(struct hg_poll_data));
+ hg_poll_data->poll_cb = poll_cb;
+ hg_poll_data->poll_arg = poll_arg;
+
+ if (fd > 0) {
+#if defined(_WIN32)
+ /* TODO */
+#elif defined(HG_UTIL_HAS_SYSEPOLL_H)
+ struct epoll_event ev;
+ uint32_t poll_flags;
+ int rc;
+
+ /* Translate flags */
+ switch (flags) {
+ case HG_POLLIN:
+ poll_flags = EPOLLIN;
+ break;
+ case HG_POLLOUT:
+ poll_flags = EPOLLOUT;
+ break;
+ default:
+ HG_UTIL_GOTO_ERROR(error, ret, HG_UTIL_FAIL, "Invalid flag");
+ }
+
+ hg_poll_data->fd = fd;
+ ev.events = poll_flags;
+ ev.data.ptr = hg_poll_data;
+
+ rc = epoll_ctl(poll_set->fd, EPOLL_CTL_ADD, fd, &ev);
+ HG_UTIL_CHECK_ERROR(rc != 0, error, ret, HG_UTIL_FAIL,
+ "epoll_ctl() failed (%s)", strerror(errno));
+#elif defined(HG_UTIL_HAS_SYSEVENT_H)
+ struct timespec timeout = {0, 0};
+ int16_t poll_flags;
+ int rc;
+
+ /* Translate flags */
+ switch (flags) {
+ case HG_POLLIN:
+ poll_flags = EVFILT_READ;
+ break;
+ case HG_POLLOUT:
+ poll_flags = EVFILT_WRITE;
+ break;
+ default:
+ HG_UTIL_GOTO_ERROR(error, ret, HG_UTIL_FAIL, "Invalid flag");
+ }
+
+ EV_SET(&hg_poll_data->kev, (uintptr_t) fd, poll_flags, EV_ADD, 0, 0,
+ hg_poll_data);
+
+ rc = kevent(poll_set->fd, &hg_poll_data->kev, 1, NULL, 0, &timeout);
+ HG_UTIL_CHECK_ERROR(rc == -1, error, ret, HG_UTIL_FAIL,
+ "kevent() failed (%s)", strerror(errno));
+#else
+ short int poll_flags;
+
+ /* Translate flags */
+ switch (flags) {
+ case HG_POLLIN:
+ poll_flags = POLLIN;
+ break;
+ case HG_POLLOUT:
+ poll_flags = POLLOUT;
+ break;
+ default:
+ HG_UTIL_GOTO_ERROR(error, ret, HG_UTIL_FAIL, "Invalid flag");
+ }
+
+ hg_poll_data->pollfd.fd = fd;
+ hg_poll_data->pollfd.events = poll_flags;
+ hg_poll_data->pollfd.revents = 0;
+#endif /* defined(_WIN32) */
+ }
+ hg_atomic_incr32(&poll_set->nfds);
+
+ hg_thread_spin_lock(&poll_set->poll_data_list_lock);
+ HG_LIST_INSERT_HEAD(&poll_set->poll_data_list, hg_poll_data, entry);
+ hg_thread_spin_unlock(&poll_set->poll_data_list_lock);
+
+done:
+ return ret;
+
+error:
+ free(hg_poll_data);
+
+ return HG_UTIL_FAIL;
+}
+
+/*---------------------------------------------------------------------------*/
+int
+hg_poll_remove(hg_poll_set_t *poll_set, int fd)
+{
+ struct hg_poll_data *hg_poll_data;
+ hg_util_bool_t found = HG_UTIL_FALSE;
+ int ret = HG_UTIL_SUCCESS;
+
+ HG_UTIL_CHECK_ERROR(!poll_set, done, ret, HG_UTIL_FAIL, "NULL poll set");
+
+ hg_thread_spin_lock(&poll_set->poll_data_list_lock);
+ HG_LIST_FOREACH (hg_poll_data, &poll_set->poll_data_list, entry) {
+#if defined(_WIN32)
+ /* TODO */
+#elif defined(HG_UTIL_HAS_SYSEPOLL_H)
+ if (hg_poll_data->fd == fd) {
+ HG_LIST_REMOVE(hg_poll_data, entry);
+
+ if (fd > 0) {
+ int rc = epoll_ctl(poll_set->fd, EPOLL_CTL_DEL, fd, NULL);
+ HG_UTIL_CHECK_ERROR(rc != 0, error, ret, HG_UTIL_FAIL,
+ "epoll_ctl() failed (%s)", strerror(errno));
+ }
+ free(hg_poll_data);
+ found = HG_UTIL_TRUE;
+ break;
+ }
+#elif defined(HG_UTIL_HAS_SYSEVENT_H)
+ /* Events which are attached to file descriptors are automatically
+ * deleted on the last close of the descriptor. */
+ if ((int) hg_poll_data->kev.ident == fd) {
+ HG_LIST_REMOVE(hg_poll_data, entry);
+
+ if (fd > 0) {
+ struct timespec timeout = {0, 0};
+ int rc;
+
+ EV_SET(&hg_poll_data->kev, (uintptr_t) fd, EVFILT_READ,
+ EV_DELETE, 0, 0, NULL);
+ rc = kevent(
+ poll_set->fd, &hg_poll_data->kev, 1, NULL, 0, &timeout);
+ HG_UTIL_CHECK_ERROR(rc == -1, error, ret, HG_UTIL_FAIL,
+ "kevent() failed (%s)", strerror(errno));
+ }
+ free(hg_poll_data);
+ found = HG_UTIL_TRUE;
+ break;
+ }
+#else
+ if (hg_poll_data->pollfd.fd == fd) {
+ HG_LIST_REMOVE(hg_poll_data, entry);
+ free(hg_poll_data);
+ found = HG_UTIL_TRUE;
+ break;
+ }
+#endif
+ }
+ hg_thread_spin_unlock(&poll_set->poll_data_list_lock);
+
+ HG_UTIL_CHECK_ERROR(
+ !found, done, ret, HG_UTIL_FAIL, "Could not find fd in poll_set");
+ hg_atomic_decr32(&poll_set->nfds);
+
+done:
+ return ret;
+
+#if defined(HG_UTIL_HAS_SYSEPOLL_H) || defined(HG_UTIL_HAS_SYSEVENT_H)
+error:
+ hg_thread_spin_unlock(&poll_set->poll_data_list_lock);
+
+ return ret;
+#endif
+}
+
+/*---------------------------------------------------------------------------*/
+int
+hg_poll_wait(hg_poll_set_t *poll_set, unsigned int timeout,
+ unsigned int max_events, struct hg_poll_event *events,
+ unsigned int *actual_events)
+{
+ int max_poll_events = (int) MIN(max_events, HG_POLL_MAX_EVENTS);
+ int nfds = 0, i;
+ int ret = HG_UTIL_SUCCESS;
+
+ HG_UTIL_CHECK_ERROR(!poll_set, done, ret, HG_UTIL_FAIL, "NULL poll set");
+
+ if (timeout && (!poll_set->try_wait_cb ||
+ (poll_set->try_wait_cb &&
+ poll_set->try_wait_cb(poll_set->try_wait_arg)))) {
+#if defined(_WIN32)
+
+#elif defined(HG_UTIL_HAS_SYSEPOLL_H)
+ struct epoll_event poll_events[HG_POLL_MAX_EVENTS];
+
+ nfds = epoll_wait(
+ poll_set->fd, poll_events, max_poll_events, (int) timeout);
+ HG_UTIL_CHECK_ERROR(nfds == -1 && errno != EINTR, done, ret,
+ HG_UTIL_FAIL, "epoll_wait() failed (%s)", strerror(errno));
+
+ for (i = 0; i < nfds; ++i) {
+ struct hg_poll_data *hg_poll_data =
+ (struct hg_poll_data *) poll_events[i].data.ptr;
+ int error = 0, rc;
+
+ HG_UTIL_CHECK_ERROR(hg_poll_data == NULL, done, ret, HG_UTIL_FAIL,
+ "NULL poll data");
+
+ /* Don't change the if/else order */
+ if (poll_events[i].events & EPOLLERR)
+ error = EPOLLERR;
+ else if (poll_events[i].events & EPOLLHUP)
+ error = EPOLLHUP;
+ else if (poll_events[i].events & EPOLLRDHUP)
+ error = EPOLLRDHUP;
+
+ HG_UTIL_CHECK_ERROR(!(poll_events[i].events & (EPOLLIN | EPOLLOUT)),
+ done, ret, HG_UTIL_FAIL, "Unsupported events");
+
+ if (!hg_poll_data->poll_cb)
+ continue;
+
+ rc = hg_poll_data->poll_cb(
+ hg_poll_data->poll_arg, error, &events[i]);
+ HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, done, ret, HG_UTIL_FAIL,
+ "poll cb failed");
+ }
+#elif defined(HG_UTIL_HAS_SYSEVENT_H)
+ struct kevent poll_events[HG_POLL_MAX_EVENTS];
+ struct timespec timeout_spec;
+ ldiv_t ld;
+
+ /* Get sec / nsec */
+ ld = ldiv(timeout, 1000L);
+ timeout_spec.tv_sec = ld.quot;
+ timeout_spec.tv_nsec = ld.rem * 1000000L;
+
+ nfds = kevent(
+ poll_set->fd, NULL, 0, poll_events, max_events, &timeout_spec);
+ HG_UTIL_CHECK_ERROR(nfds == -1 && errno != EINTR, done, ret,
+ HG_UTIL_FAIL, "kevent() failed (%s)", strerror(errno));
+
+ for (i = 0; i < nfds; ++i) {
+ struct hg_poll_data *hg_poll_data =
+ (struct hg_poll_data *) poll_events[i].udata;
+ int rc;
+
+ HG_UTIL_CHECK_ERROR(hg_poll_data == NULL, done, ret, HG_UTIL_FAIL,
+ "NULL poll data");
+
+ if (!hg_poll_data->poll_cb)
+ continue;
+
+ rc = hg_poll_data->poll_cb(hg_poll_data->poll_arg, 0, &events[i]);
+ HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, done, ret, HG_UTIL_FAIL,
+ "poll cb failed");
+ }
+#else
+ struct pollfd poll_events[HG_POLL_MAX_EVENTS] = {0};
+ struct hg_poll_data *poll_data_events[HG_POLL_MAX_EVENTS] = {NULL};
+ struct hg_poll_data *hg_poll_data = NULL;
+ int nevents = 0;
+
+ /* Reset revents */
+ hg_thread_spin_lock(&poll_set->poll_data_list_lock);
+ for (hg_poll_data = HG_LIST_FIRST(&poll_set->poll_data_list);
+ hg_poll_data && (nevents < max_poll_events);
+ hg_poll_data = HG_LIST_NEXT(hg_poll_data, entry), nevents++) {
+ poll_events[nevents] = hg_poll_data->pollfd;
+ poll_data_events[nevents] = hg_poll_data;
+ }
+ hg_thread_spin_unlock(&poll_set->poll_data_list_lock);
+
+ nfds = poll(poll_events, nevents, (int) timeout);
+ HG_UTIL_CHECK_ERROR(nfds == -1 && errno != EINTR, done, ret,
+ HG_UTIL_FAIL, "poll() failed (%s)", strerror(errno));
+
+ /* An event on one of the fds has occurred. */
+ for (i = 0; i < nfds; ++i) {
+ int rc;
+
+ if (!(poll_events[i].revents & poll_events[i].events))
+ continue;
+
+ /* TODO check POLLHUP | POLLERR | POLLNVAL */
+ if (!poll_data_events[i]->poll_cb)
+ continue;
+
+ rc = poll_data_events[i]->poll_cb(
+ poll_data_events[i]->poll_arg, 0, &events[i]);
+ HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, done, ret, HG_UTIL_FAIL,
+ "poll cb failed");
+ }
+
+ if (nfds) {
+ /* TODO should figure where to call hg_event_get() */
+ int rc = hg_event_set(poll_set->fd);
+ HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, done, ret, HG_UTIL_FAIL,
+ "hg_event_set() failed (%s)", strerror(errno));
+ }
+#endif
+ } else {
+#ifdef _WIN32
+
+#else
+ struct hg_poll_data *poll_data_events[HG_POLL_MAX_EVENTS] = {NULL};
+ struct hg_poll_data *hg_poll_data;
+ int nevents = 0;
+
+ /* Reset revents */
+ hg_thread_spin_lock(&poll_set->poll_data_list_lock);
+ for (hg_poll_data = HG_LIST_FIRST(&poll_set->poll_data_list);
+ hg_poll_data && (nevents < max_poll_events);
+ hg_poll_data = HG_LIST_NEXT(hg_poll_data, entry), nevents++)
+ poll_data_events[nevents] = hg_poll_data;
+ hg_thread_spin_unlock(&poll_set->poll_data_list_lock);
+
+ nfds = nevents;
+ for (i = 0; i < nfds; ++i) {
+ int rc;
+
+ if (!poll_data_events[i]->poll_cb)
+ continue;
+
+ rc = poll_data_events[i]->poll_cb(
+ poll_data_events[i]->poll_arg, 0, &events[i]);
+ HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, done, ret, HG_UTIL_FAIL,
+ "poll cb failed");
+ }
+#endif
+ }
+
+ if (actual_events)
+ *actual_events = (unsigned int) nfds;
+
+done:
+ return ret;
+}
diff --git a/src/mercury/mercury_poll.h b/src/mercury/mercury_poll.h
new file mode 100644
index 0000000..8922f37
--- /dev/null
+++ b/src/mercury/mercury_poll.h
@@ -0,0 +1,164 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+#ifndef MERCURY_POLL_H
+#define MERCURY_POLL_H
+
+#include "mercury_util_config.h"
+
+/**
+ * Purpose: define an interface that either polls or allows busy wait
+ * without entering system calls.
+ */
+
+/*************************************/
+/* Public Type and Struct Definition */
+/*************************************/
+
+typedef struct hg_poll_set hg_poll_set_t;
+
+struct hg_poll_event {
+ hg_util_bool_t progressed; /* Indicates progress */
+ void *ptr; /* Pointer to user data */
+};
+
+/**
+ * Callback that can be used to signal when it is safe to block on the
+ * poll set or if blocking could hang the application.
+ *
+ * \param arg [IN] function argument
+ *
+ * \return HG_UTIL_TRUE if it is safe to block or HG_UTIL_FALSE otherwise
+ */
+typedef hg_util_bool_t (*hg_poll_try_wait_cb_t)(void *arg);
+
+/**
+ * Polling callback, arg can be used to pass user arguments, event can be used
+ * to return user arguments back to hg_poll_wait.
+ *
+ * \param arg [IN] pointer to user data
+ * \param error [IN] any error event has occurred
+ * \param ptr [OUT] event data output
+ *
+ * \return Non-negative on success or negative on failure
+ */
+typedef int (*hg_poll_cb_t)(void *arg, int error, struct hg_poll_event *event);
+
+/*****************/
+/* Public Macros */
+/*****************/
+
+/**
+ * Polling events.
+ */
+#define HG_POLLIN 0x001 /* Ready to read. */
+#define HG_POLLOUT 0x004 /* Ready to write. */
+
+/*********************/
+/* Public Prototypes */
+/*********************/
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Create a new poll set.
+ *
+ * \return Pointer to poll set or NULL in case of failure
+ */
+HG_UTIL_PUBLIC hg_poll_set_t *
+hg_poll_create(void);
+
+/**
+ * Destroy a poll set.
+ *
+ * \param poll_set [IN] pointer to poll set
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_poll_destroy(hg_poll_set_t *poll_set);
+
+/**
+ * Get a file descriptor from an existing poll set.
+ *
+ * \param poll_set [IN] pointer to poll set
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_poll_get_fd(hg_poll_set_t *poll_set);
+
+/**
+ * Set a callback that can be used to signal when it is safe to block on the
+ * poll set or if blocking could hang the application, in which case behavior
+ * is the same as passing a timeout of 0.
+ *
+ * \param poll_set [IN] pointer to poll set
+ * \param try_wait_cb [IN] function pointer
+ * \param try_wait_arg [IN] function pointer argument
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_poll_set_try_wait(hg_poll_set_t *poll_set, hg_poll_try_wait_cb_t try_wait_cb,
+ void *try_wait_arg);
+
+/**
+ * Add file descriptor to poll set.
+ *
+ * \param poll_set [IN] pointer to poll set
+ * \param fd [IN] file descriptor
+ * \param flags [IN] polling flags (HG_POLLIN, etc)
+ * \param poll_cb [IN] function pointer
+ * \param poll_cb_args [IN] function pointer argument
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_poll_add(hg_poll_set_t *poll_set, int fd, unsigned int flags,
+ hg_poll_cb_t poll_cb, void *poll_cb_arg);
+
+/**
+ * Remove file descriptor from poll set.
+ *
+ * \param poll_set [IN] pointer to poll set
+ * \param fd [IN] file descriptor
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_poll_remove(hg_poll_set_t *poll_set, int fd);
+
+/**
+ * Wait on a poll set for timeout ms, progressed indicating whether progress has
+ * been made after that call returns. If timeout is 0, progress is performed
+ * on all the registered polling callbacks and hg_poll_wait() exits as soon as
+ * progress is made. If timeout is non 0, the system dependent polling function
+ * call is entered and progress is performed on the list of file descriptors
+ * for which an event has occurred.
+ *
+ * \param poll_set [IN] pointer to poll set
+ * \param timeout [IN] timeout (in milliseconds)
+ * \param progressed [OUT] pointer to boolean indicating progress made
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_poll_wait(hg_poll_set_t *poll_set, unsigned int timeout,
+ unsigned int max_events, struct hg_poll_event events[],
+ unsigned int *actual_events);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* MERCURY_POLL_H */
diff --git a/src/mercury/mercury_queue.h b/src/mercury/mercury_queue.h
new file mode 100644
index 0000000..2133383
--- /dev/null
+++ b/src/mercury/mercury_queue.h
@@ -0,0 +1,123 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+/* Code below is derived from sys/queue.h which follows the below notice:
+ *
+ * Copyright (c) 1991, 1993
+ * The Regents of the University of California. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 3. Neither the name of the University nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ *
+ * @(#)queue.h 8.5 (Berkeley) 8/20/94
+ */
+
+#ifndef MERCURY_QUEUE_H
+#define MERCURY_QUEUE_H
+
+#define HG_QUEUE_HEAD_INITIALIZER(name) \
+ { \
+ NULL, &(name).head \
+ }
+
+#define HG_QUEUE_HEAD_INIT(struct_head_name, var_name) \
+ struct struct_head_name var_name = HG_QUEUE_HEAD_INITIALIZER(var_name)
+
+#define HG_QUEUE_HEAD_DECL(struct_head_name, struct_entry_name) \
+ struct struct_head_name { \
+ struct struct_entry_name *head; \
+ struct struct_entry_name **tail; \
+ }
+
+#define HG_QUEUE_HEAD(struct_entry_name) \
+ struct { \
+ struct struct_entry_name *head; \
+ struct struct_entry_name **tail; \
+ }
+
+#define HG_QUEUE_ENTRY(struct_entry_name) \
+ struct { \
+ struct struct_entry_name *next; \
+ }
+
+#define HG_QUEUE_INIT(head_ptr) \
+ do { \
+ (head_ptr)->head = NULL; \
+ (head_ptr)->tail = &(head_ptr)->head; \
+ } while (/*CONSTCOND*/ 0)
+
+#define HG_QUEUE_IS_EMPTY(head_ptr) ((head_ptr)->head == NULL)
+
+#define HG_QUEUE_FIRST(head_ptr) ((head_ptr)->head)
+
+#define HG_QUEUE_NEXT(entry_ptr, entry_field_name) \
+ ((entry_ptr)->entry_field_name.next)
+
+#define HG_QUEUE_PUSH_TAIL(head_ptr, entry_ptr, entry_field_name) \
+ do { \
+ (entry_ptr)->entry_field_name.next = NULL; \
+ *(head_ptr)->tail = (entry_ptr); \
+ (head_ptr)->tail = &(entry_ptr)->entry_field_name.next; \
+ } while (/*CONSTCOND*/ 0)
+
+/* TODO would be nice to not have any condition */
+#define HG_QUEUE_POP_HEAD(head_ptr, entry_field_name) \
+ do { \
+ if ((head_ptr)->head && \
+ ((head_ptr)->head = (head_ptr)->head->entry_field_name.next) == \
+ NULL) \
+ (head_ptr)->tail = &(head_ptr)->head; \
+ } while (/*CONSTCOND*/ 0)
+
+#define HG_QUEUE_FOREACH(var, head_ptr, entry_field_name) \
+ for ((var) = ((head_ptr)->head); (var); \
+ (var) = ((var)->entry_field_name.next))
+
+/**
+ * Avoid using those for performance reasons or use mercury_list.h instead
+ */
+
+#define HG_QUEUE_REMOVE(head_ptr, entry_ptr, type, entry_field_name) \
+ do { \
+ if ((head_ptr)->head == (entry_ptr)) { \
+ HG_QUEUE_POP_HEAD((head_ptr), entry_field_name); \
+ } else { \
+ struct type *curelm = (head_ptr)->head; \
+ while (curelm->entry_field_name.next != (entry_ptr)) \
+ curelm = curelm->entry_field_name.next; \
+ if ((curelm->entry_field_name.next = \
+ curelm->entry_field_name.next->entry_field_name \
+ .next) == NULL) \
+ (head_ptr)->tail = &(curelm)->entry_field_name.next; \
+ } \
+ } while (/*CONSTCOND*/ 0)
+
+#endif /* MERCURY_QUEUE_H */
diff --git a/src/mercury/mercury_request.c b/src/mercury/mercury_request.c
new file mode 100644
index 0000000..ae4fb2a
--- /dev/null
+++ b/src/mercury/mercury_request.c
@@ -0,0 +1,224 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+#include "mercury_request.h"
+#include "mercury_thread_condition.h"
+#include "mercury_thread_mutex.h"
+#include "mercury_time.h"
+#include "mercury_util_error.h"
+
+#include <stdlib.h>
+
+/****************/
+/* Local Macros */
+/****************/
+
+/************************************/
+/* Local Type and Struct Definition */
+/************************************/
+
+struct hg_request_class {
+ hg_request_progress_func_t progress_func;
+ hg_request_trigger_func_t trigger_func;
+ void *arg;
+ hg_util_bool_t progressing;
+ hg_thread_mutex_t progress_mutex;
+ hg_thread_cond_t progress_cond;
+};
+
+/********************/
+/* Local Prototypes */
+/********************/
+
+/*******************/
+/* Local Variables */
+/*******************/
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE hg_util_bool_t
+hg_request_check(hg_request_t *request)
+{
+ int trigger_ret;
+ unsigned int trigger_flag = 0;
+ hg_util_bool_t ret = HG_UTIL_FALSE;
+
+ do {
+ trigger_ret = request->request_class->trigger_func(
+ 0, &trigger_flag, request->request_class->arg);
+ } while ((trigger_ret == HG_UTIL_SUCCESS) && trigger_flag);
+
+ if (hg_atomic_cas32(&request->completed, HG_UTIL_TRUE, HG_UTIL_FALSE))
+ ret = HG_UTIL_TRUE;
+
+ return ret;
+}
+
+/*---------------------------------------------------------------------------*/
+hg_request_class_t *
+hg_request_init(hg_request_progress_func_t progress_func,
+ hg_request_trigger_func_t trigger_func, void *arg)
+{
+ struct hg_request_class *hg_request_class = NULL;
+
+ hg_request_class =
+ (struct hg_request_class *) malloc(sizeof(struct hg_request_class));
+ HG_UTIL_CHECK_ERROR_NORET(
+ hg_request_class == NULL, done, "Could not allocate hg_request_class");
+
+ hg_request_class->progress_func = progress_func;
+ hg_request_class->trigger_func = trigger_func;
+ hg_request_class->arg = arg;
+ hg_request_class->progressing = HG_UTIL_FALSE;
+ hg_thread_mutex_init(&hg_request_class->progress_mutex);
+ hg_thread_cond_init(&hg_request_class->progress_cond);
+
+done:
+ return hg_request_class;
+}
+
+/*---------------------------------------------------------------------------*/
+int
+hg_request_finalize(hg_request_class_t *request_class, void **arg)
+{
+ if (!request_class)
+ goto done;
+
+ if (arg)
+ *arg = request_class->arg;
+ hg_thread_mutex_destroy(&request_class->progress_mutex);
+ hg_thread_cond_destroy(&request_class->progress_cond);
+ free(request_class);
+
+done:
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+hg_request_t *
+hg_request_create(hg_request_class_t *request_class)
+{
+ struct hg_request *hg_request = NULL;
+
+ hg_request = (struct hg_request *) malloc(sizeof(struct hg_request));
+ HG_UTIL_CHECK_ERROR_NORET(
+ hg_request == NULL, done, "Could not allocate hg_request");
+
+ hg_request->data = NULL;
+ hg_atomic_set32(&hg_request->completed, HG_UTIL_FALSE);
+ hg_request->request_class = request_class;
+
+done:
+ return hg_request;
+}
+
+/*---------------------------------------------------------------------------*/
+int
+hg_request_destroy(hg_request_t *request)
+{
+ int ret = HG_UTIL_SUCCESS;
+
+ free(request);
+
+ return ret;
+}
+
+/*---------------------------------------------------------------------------*/
+/*
+ * lock(progress_mutex)
+ * while (!completed) {
+ * check_request
+ * if (completed) {
+ * unlock(progress_mutex);
+ * return;
+ * }
+ * if (in_progress) {
+ * wait_cond(progress_cond);
+ * continue;
+ * }
+ * in_progress = true;
+ * unlock(progress_mutex);
+ * trigger;
+ * progress;
+ * lock(progress);
+ * in_progress = false;
+ * signal(progress_cond);
+ * }
+ * unlock(progress_mutex);
+ */
+
+/*---------------------------------------------------------------------------*/
+int
+hg_request_wait(hg_request_t *request, unsigned int timeout, unsigned int *flag)
+{
+ double remaining =
+ timeout / 1000.0; /* Convert timeout in ms into seconds */
+ hg_util_bool_t completed = HG_UTIL_FALSE;
+ int ret = HG_UTIL_SUCCESS;
+
+ hg_thread_mutex_lock(&request->request_class->progress_mutex);
+
+ do {
+ hg_time_t t3, t4;
+
+ completed = hg_request_check(request);
+ if (completed)
+ break;
+
+ if (request->request_class->progressing) {
+ hg_time_t t1, t2;
+
+ if (remaining <= 0) {
+ /* Timeout occurred so leave */
+ break;
+ }
+
+ hg_time_get_current(&t1);
+ if (hg_thread_cond_timedwait(&request->request_class->progress_cond,
+ &request->request_class->progress_mutex,
+ (unsigned int) (remaining * 1000.0)) != HG_UTIL_SUCCESS) {
+ /* Timeout occurred so leave */
+ break;
+ }
+ hg_time_get_current(&t2);
+ remaining -= hg_time_to_double(hg_time_subtract(t2, t1));
+ if (remaining < 0)
+ break;
+ /* Continue as request may have completed in the meantime */
+ continue;
+ }
+
+ request->request_class->progressing = HG_UTIL_TRUE;
+
+ hg_thread_mutex_unlock(&request->request_class->progress_mutex);
+
+ if (timeout)
+ hg_time_get_current(&t3);
+
+ request->request_class->progress_func(
+ (unsigned int) (remaining * 1000.0), request->request_class->arg);
+
+ if (timeout) {
+ hg_time_get_current(&t4);
+ remaining -= hg_time_to_double(hg_time_subtract(t4, t3));
+ }
+
+ hg_thread_mutex_lock(&request->request_class->progress_mutex);
+ request->request_class->progressing = HG_UTIL_FALSE;
+ hg_thread_cond_broadcast(&request->request_class->progress_cond);
+
+ } while (!completed && (remaining > 0));
+
+ hg_thread_mutex_unlock(&request->request_class->progress_mutex);
+
+ if (flag)
+ *flag = completed;
+
+ return ret;
+}
diff --git a/src/mercury/mercury_request.h b/src/mercury/mercury_request.h
new file mode 100644
index 0000000..e84bea3
--- /dev/null
+++ b/src/mercury/mercury_request.h
@@ -0,0 +1,242 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+#ifndef MERCURY_REQUEST_H
+#define MERCURY_REQUEST_H
+
+#include "mercury_util_config.h"
+
+#include "mercury_atomic.h"
+
+/**
+ * Purpose: define a request emulation library on top of the callback model
+ * that uses progress/trigger functions. Note that this library can not be
+ * safely used within RPCs in most cases - calling hg_request_wait causes
+ * deadlock when the caller function was triggered by HG_Trigger
+ * (or HG_Bulk_trigger).
+ */
+
+typedef struct hg_request_class hg_request_class_t; /* Opaque request class */
+typedef struct hg_request hg_request_t; /* Opaque request object */
+
+struct hg_request {
+ void *data;
+ hg_atomic_int32_t completed;
+ hg_request_class_t *request_class;
+};
+
+/**
+ * Progress callback, arg can be used to pass extra parameters required by
+ * underlying API.
+ *
+ * \param timeout [IN] timeout (in milliseconds)
+ * \param arg [IN] pointer to data passed to callback
+ *
+ * \return HG_UTIL_SUCCESS if any completion has occurred / error code otherwise
+ */
+typedef int (*hg_request_progress_func_t)(unsigned int timeout, void *arg);
+
+/**
+ * Trigger callback, arg can be used to pass extra parameters required by
+ * underlying API.
+ *
+ * \param timeout [IN] timeout (in milliseconds)
+ * \param flag [OUT] 1 if callback has been triggered, 0 otherwise
+ * \param arg [IN] pointer to data passed to callback
+ *
+ * \return HG_UTIL_SUCCESS or corresponding error code
+ */
+typedef int (*hg_request_trigger_func_t)(
+ unsigned int timeout, unsigned int *flag, void *arg);
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Initialize the request class with the specific progress/trigger functions
+ * that will be called on hg_request_wait().
+ * arg can be used to pass extra parameters required by underlying API.
+ *
+ * \param progress [IN] progress function
+ * \param trigger [IN] trigger function
+ * \param arg [IN] pointer to data passed to callback
+ *
+ * \return Pointer to request class or NULL in case of failure
+ */
+HG_UTIL_PUBLIC hg_request_class_t *
+hg_request_init(hg_request_progress_func_t progress,
+ hg_request_trigger_func_t trigger, void *arg);
+
+/**
+ * Finalize the request class. User args that were passed through
+ * hg_request_init() can be retrieved through the \a arg parameter.
+ *
+ * \param request_class [IN] pointer to request class
+ * \param arg [IN/OUT] pointer to init args
+ */
+HG_UTIL_PUBLIC int
+hg_request_finalize(hg_request_class_t *request_class, void **arg);
+
+/**
+ * Create a new request from a specified request class. The progress function
+ * explicitly makes progress and may insert the completed operation into a
+ * completion queue. The operation gets triggered after a call to the trigger
+ * function.
+ *
+ * \param request_class [IN] pointer to request class
+ *
+ * \return Pointer to request or NULL in case of failure
+ */
+HG_UTIL_PUBLIC hg_request_t *
+hg_request_create(hg_request_class_t *request_class);
+
+/**
+ * Destroy the request, freeing the resources.
+ *
+ * \param request [IN/OUT] pointer to request
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_request_destroy(hg_request_t *request);
+
+/**
+ * Reset an existing request so that it can be safely re-used.
+ *
+ * \param request [IN/OUT] pointer to request
+ *
+ * \return Pointer to request or NULL in case of failure
+ */
+static HG_UTIL_INLINE int
+hg_request_reset(hg_request_t *request);
+
+/**
+ * Mark the request as completed. (most likely called by a callback triggered
+ * after a call to trigger)
+ *
+ * \param request [IN/OUT] pointer to request
+ *
+ * \return Non-negative on success or negative on failure
+ */
+static HG_UTIL_INLINE int
+hg_request_complete(hg_request_t *request);
+
+/**
+ * Wait timeout ms for the specified request to complete.
+ *
+ * \param request [IN/OUT] pointer to request
+ * \param timeout [IN] timeout (in milliseconds)
+ * \param flag [OUT] 1 if request has completed, 0 otherwise
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_request_wait(
+ hg_request_t *request, unsigned int timeout, unsigned int *flag);
+
+/**
+ * Wait timeout ms for all the specified request to complete.
+ *
+ * \param count [IN] number of requests
+ * \param request [IN/OUT] arrays of requests
+ * \param timeout [IN] timeout (in milliseconds)
+ * \param flag [OUT] 1 if all requests have completed, 0 otherwise
+ *
+ * \return Non-negative on success or negative on failure
+ */
+static HG_UTIL_INLINE int
+hg_request_waitall(int count, hg_request_t *request[], unsigned int timeout,
+ unsigned int *flag);
+
+/**
+ * Attach user data to a specified request.
+ *
+ * \param request [IN/OUT] pointer to request
+ * \param data [IN] pointer to data
+ *
+ * \return Non-negative on success or negative on failure
+ */
+static HG_UTIL_INLINE int
+hg_request_set_data(hg_request_t *request, void *data);
+
+/**
+ * Get user data from a specified request.
+ *
+ * \param request [IN/OUT] pointer to request
+ *
+ * \return Pointer to data or NULL if nothing was attached by user
+ */
+static HG_UTIL_INLINE void *
+hg_request_get_data(hg_request_t *request);
+
+/**
+ * Cancel the request.
+ *
+ * \param request [IN] request object
+ *
+ * \return Non-negative on success or negative on failure
+ *
+HG_UTIL_PUBLIC int
+hg_request_cancel(hg_request_t *request);
+ */
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_request_reset(hg_request_t *request)
+{
+ hg_atomic_set32(&request->completed, HG_UTIL_FALSE);
+
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_request_complete(hg_request_t *request)
+{
+ hg_atomic_incr32(&request->completed);
+
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_request_waitall(int count, hg_request_t *request[], unsigned int timeout,
+ unsigned int *flag)
+{
+ int i;
+
+ for (i = 0; i < count; i++)
+ hg_request_wait(request[i], timeout, flag);
+
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_request_set_data(hg_request_t *request, void *data)
+{
+ request->data = data;
+
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE void *
+hg_request_get_data(hg_request_t *request)
+{
+ return request->data;
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* MERCURY_REQUEST_H */
diff --git a/src/mercury/mercury_thread.c b/src/mercury/mercury_thread.c
new file mode 100644
index 0000000..1c0e976
--- /dev/null
+++ b/src/mercury/mercury_thread.c
@@ -0,0 +1,162 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+#include "mercury_thread.h"
+
+/*---------------------------------------------------------------------------*/
+void
+hg_thread_init(hg_thread_t *thread)
+{
+#ifdef _WIN32
+ *thread = NULL;
+#else
+ *thread = 0;
+#endif
+}
+
+/*---------------------------------------------------------------------------*/
+int
+hg_thread_create(hg_thread_t *thread, hg_thread_func_t f, void *data)
+{
+#ifdef _WIN32
+ *thread = CreateThread(NULL, 0, f, data, 0, NULL);
+ if (*thread == NULL)
+ return HG_UTIL_FAIL;
+#else
+ if (pthread_create(thread, NULL, f, data))
+ return HG_UTIL_FAIL;
+#endif
+
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+void
+hg_thread_exit(hg_thread_ret_t ret)
+{
+#ifdef _WIN32
+ ExitThread(ret);
+#else
+ pthread_exit(ret);
+#endif
+}
+
+/*---------------------------------------------------------------------------*/
+int
+hg_thread_join(hg_thread_t thread)
+{
+#ifdef _WIN32
+ WaitForSingleObject(thread, INFINITE);
+ CloseHandle(thread);
+#else
+ if (pthread_join(thread, NULL))
+ return HG_UTIL_FAIL;
+#endif
+
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+int
+hg_thread_cancel(hg_thread_t thread)
+{
+#ifdef _WIN32
+ WaitForSingleObject(thread, 0);
+ CloseHandle(thread);
+#else
+ if (pthread_cancel(thread))
+ return HG_UTIL_FAIL;
+#endif
+
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+int
+hg_thread_yield(void)
+{
+#ifdef _WIN32
+ SwitchToThread();
+#elif defined(__APPLE__)
+ pthread_yield_np();
+#else
+ pthread_yield();
+#endif
+
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+int
+hg_thread_key_create(hg_thread_key_t *key)
+{
+ if (!key)
+ return HG_UTIL_FAIL;
+
+#ifdef _WIN32
+ if ((*key = TlsAlloc()) == TLS_OUT_OF_INDEXES)
+ return HG_UTIL_FAIL;
+#else
+ if (pthread_key_create(key, NULL))
+ return HG_UTIL_FAIL;
+#endif
+
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+int
+hg_thread_key_delete(hg_thread_key_t key)
+{
+#ifdef _WIN32
+ if (!TlsFree(key))
+ return HG_UTIL_FAIL;
+#else
+ if (pthread_key_delete(key))
+ return HG_UTIL_FAIL;
+#endif
+
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+int
+hg_thread_getaffinity(hg_thread_t thread, hg_cpu_set_t *cpu_mask)
+{
+#if defined(_WIN32)
+ return HG_UTIL_FAIL;
+#elif defined(__APPLE__)
+ (void) thread;
+ (void) cpu_mask;
+ return HG_UTIL_FAIL;
+#else
+ if (pthread_getaffinity_np(thread, sizeof(hg_cpu_set_t), cpu_mask))
+ return HG_UTIL_FAIL;
+ return HG_UTIL_SUCCESS;
+#endif
+}
+
+/*---------------------------------------------------------------------------*/
+int
+hg_thread_setaffinity(hg_thread_t thread, const hg_cpu_set_t *cpu_mask)
+{
+#if defined(_WIN32)
+ if (!SetThreadAffinityMask(thread, *cpu_mask))
+ return HG_UTIL_FAIL;
+#elif defined(__APPLE__)
+ (void) thread;
+ (void) cpu_mask;
+ return HG_UTIL_FAIL;
+#else
+ if (pthread_setaffinity_np(thread, sizeof(hg_cpu_set_t), cpu_mask))
+ return HG_UTIL_FAIL;
+ return HG_UTIL_SUCCESS;
+#endif
+}
diff --git a/src/mercury/mercury_thread.h b/src/mercury/mercury_thread.h
new file mode 100644
index 0000000..cc4bbf1
--- /dev/null
+++ b/src/mercury/mercury_thread.h
@@ -0,0 +1,242 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+#ifndef MERCURY_THREAD_H
+#define MERCURY_THREAD_H
+
+#if !defined(_WIN32) && !defined(_GNU_SOURCE)
+# define _GNU_SOURCE
+#endif
+#include "mercury_util_config.h"
+
+#ifdef _WIN32
+# include <windows.h>
+typedef HANDLE hg_thread_t;
+typedef LPTHREAD_START_ROUTINE hg_thread_func_t;
+typedef DWORD hg_thread_ret_t;
+# define HG_THREAD_RETURN_TYPE hg_thread_ret_t WINAPI
+typedef DWORD hg_thread_key_t;
+typedef DWORD_PTR hg_cpu_set_t;
+#else
+# include <pthread.h>
+typedef pthread_t hg_thread_t;
+typedef void *(*hg_thread_func_t)(void *);
+typedef void *hg_thread_ret_t;
+# define HG_THREAD_RETURN_TYPE hg_thread_ret_t
+typedef pthread_key_t hg_thread_key_t;
+# ifdef __APPLE__
+/* Size definition for CPU sets. */
+# define HG_CPU_SETSIZE 1024
+# define HG_NCPUBITS (8 * sizeof(hg_cpu_mask_t))
+/* Type for array elements in 'cpu_set_t'. */
+typedef hg_util_uint64_t hg_cpu_mask_t;
+typedef struct {
+ hg_cpu_mask_t bits[HG_CPU_SETSIZE / HG_NCPUBITS];
+} hg_cpu_set_t;
+# else
+typedef cpu_set_t hg_cpu_set_t;
+# endif
+#endif
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Initialize the thread.
+ *
+ * \param thread [IN/OUT] pointer to thread object
+ */
+HG_UTIL_PUBLIC void
+hg_thread_init(hg_thread_t *thread);
+
+/**
+ * Create a new thread for the given function.
+ *
+ * \param thread [IN/OUT] pointer to thread object
+ * \param f [IN] pointer to function
+ * \param data [IN] pointer to data than be passed to function f
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_thread_create(hg_thread_t *thread, hg_thread_func_t f, void *data);
+
+/**
+ * Ends the calling thread.
+ *
+ * \param ret [IN] exit code for the thread
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC void
+hg_thread_exit(hg_thread_ret_t ret);
+
+/**
+ * Wait for thread completion.
+ *
+ * \param thread [IN] thread object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_thread_join(hg_thread_t thread);
+
+/**
+ * Terminate the thread.
+ *
+ * \param thread [IN] thread object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_thread_cancel(hg_thread_t thread);
+
+/**
+ * Yield the processor.
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_thread_yield(void);
+
+/**
+ * Obtain handle of the calling thread.
+ *
+ * \return
+ */
+static HG_UTIL_INLINE hg_thread_t
+hg_thread_self(void);
+
+/**
+ * Compare thread IDs.
+ *
+ * \return Non-zero if equal, zero if not equal
+ */
+static HG_UTIL_INLINE int
+hg_thread_equal(hg_thread_t t1, hg_thread_t t2);
+
+/**
+ * Create a thread-specific data key visible to all threads in the process.
+ *
+ * \param key [OUT] pointer to thread key object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_thread_key_create(hg_thread_key_t *key);
+
+/**
+ * Delete a thread-specific data key previously returned by
+ * hg_thread_key_create().
+ *
+ * \param key [IN] thread key object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_thread_key_delete(hg_thread_key_t key);
+
+/**
+ * Get value from specified key.
+ *
+ * \param key [IN] thread key object
+ *
+ * \return Pointer to data associated to the key
+ */
+static HG_UTIL_INLINE void *
+hg_thread_getspecific(hg_thread_key_t key);
+
+/**
+ * Set value to specified key.
+ *
+ * \param key [IN] thread key object
+ * \param value [IN] pointer to data that will be associated
+ *
+ * \return Non-negative on success or negative on failure
+ */
+static HG_UTIL_INLINE int
+hg_thread_setspecific(hg_thread_key_t key, const void *value);
+
+/**
+ * Get affinity mask.
+ *
+ * \param thread [IN] thread object
+ * \param cpu_mask [IN/OUT] cpu mask
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_thread_getaffinity(hg_thread_t thread, hg_cpu_set_t *cpu_mask);
+
+/**
+ * Set affinity mask.
+ *
+ * \param thread [IN] thread object
+ * \param cpu_mask [IN] cpu mask
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_thread_setaffinity(hg_thread_t thread, const hg_cpu_set_t *cpu_mask);
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE hg_thread_t
+hg_thread_self(void)
+{
+#ifdef _WIN32
+ return GetCurrentThread();
+#else
+ return pthread_self();
+#endif
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_thread_equal(hg_thread_t t1, hg_thread_t t2)
+{
+#ifdef _WIN32
+ return GetThreadId(t1) == GetThreadId(t2);
+#else
+ return pthread_equal(t1, t2);
+#endif
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE void *
+hg_thread_getspecific(hg_thread_key_t key)
+{
+#ifdef _WIN32
+ return TlsGetValue(key);
+#else
+ return pthread_getspecific(key);
+#endif
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_thread_setspecific(hg_thread_key_t key, const void *value)
+{
+#ifdef _WIN32
+ if (!TlsSetValue(key, (LPVOID) value))
+ return HG_UTIL_FAIL;
+#else
+ if (pthread_setspecific(key, value))
+ return HG_UTIL_FAIL;
+#endif
+
+ return HG_UTIL_SUCCESS;
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* MERCURY_THREAD_H */
diff --git a/src/mercury/mercury_thread_condition.c b/src/mercury/mercury_thread_condition.c
new file mode 100644
index 0000000..76e4fef
--- /dev/null
+++ b/src/mercury/mercury_thread_condition.c
@@ -0,0 +1,46 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+#include "mercury_thread_condition.h"
+
+/*---------------------------------------------------------------------------*/
+int
+hg_thread_cond_init(hg_thread_cond_t *cond)
+{
+#ifdef _WIN32
+ InitializeConditionVariable(cond);
+#else
+ pthread_condattr_t attr;
+
+ pthread_condattr_init(&attr);
+# if defined(HG_UTIL_HAS_PTHREAD_CONDATTR_SETCLOCK) && \
+ defined(HG_UTIL_HAS_CLOCK_MONOTONIC)
+ /* Must set clock ID if using different clock */
+ pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
+# endif
+ if (pthread_cond_init(cond, &attr))
+ return HG_UTIL_FAIL;
+ pthread_condattr_destroy(&attr);
+#endif
+
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+int
+hg_thread_cond_destroy(hg_thread_cond_t *cond)
+{
+#ifndef _WIN32
+ if (pthread_cond_destroy(cond))
+ return HG_UTIL_FAIL;
+#endif
+
+ return HG_UTIL_SUCCESS;
+}
diff --git a/src/mercury/mercury_thread_condition.h b/src/mercury/mercury_thread_condition.h
new file mode 100644
index 0000000..70e9748
--- /dev/null
+++ b/src/mercury/mercury_thread_condition.h
@@ -0,0 +1,182 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+#ifndef MERCURY_THREAD_CONDITION_H
+#define MERCURY_THREAD_CONDITION_H
+
+#include "mercury_thread_mutex.h"
+
+#ifdef _WIN32
+typedef CONDITION_VARIABLE hg_thread_cond_t;
+#else
+# if defined(HG_UTIL_HAS_PTHREAD_CONDATTR_SETCLOCK)
+# include "mercury_time.h"
+# elif defined(HG_UTIL_HAS_SYSTIME_H)
+# include <sys/time.h>
+# endif
+# include <stdlib.h>
+typedef pthread_cond_t hg_thread_cond_t;
+#endif
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Initialize the condition.
+ *
+ * \param cond [IN/OUT] pointer to condition object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_thread_cond_init(hg_thread_cond_t *cond);
+
+/**
+ * Destroy the condition.
+ *
+ * \param cond [IN/OUT] pointer to condition object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_thread_cond_destroy(hg_thread_cond_t *cond);
+
+/**
+ * Wake one thread waiting for the condition to change.
+ *
+ * \param cond [IN/OUT] pointer to condition object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+static HG_UTIL_INLINE int
+hg_thread_cond_signal(hg_thread_cond_t *cond);
+
+/**
+ * Wake all the threads waiting for the condition to change.
+ *
+ * \param cond [IN/OUT] pointer to condition object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+static HG_UTIL_INLINE int
+hg_thread_cond_broadcast(hg_thread_cond_t *cond);
+
+/**
+ * Wait for the condition to change.
+ *
+ * \param cond [IN/OUT] pointer to condition object
+ * \param mutex [IN/OUT] pointer to mutex object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+static HG_UTIL_INLINE int
+hg_thread_cond_wait(hg_thread_cond_t *cond, hg_thread_mutex_t *mutex);
+
+/**
+ * Wait timeout ms for the condition to change.
+ *
+ * \param cond [IN/OUT] pointer to condition object
+ * \param mutex [IN/OUT] pointer to mutex object
+ * \param timeout [IN] timeout (in milliseconds)
+ *
+ * \return Non-negative on success or negative on failure
+ */
+static HG_UTIL_INLINE int
+hg_thread_cond_timedwait(
+ hg_thread_cond_t *cond, hg_thread_mutex_t *mutex, unsigned int timeout);
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_thread_cond_signal(hg_thread_cond_t *cond)
+{
+#ifdef _WIN32
+ WakeConditionVariable(cond);
+#else
+ if (pthread_cond_signal(cond))
+ return HG_UTIL_FAIL;
+#endif
+
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_thread_cond_broadcast(hg_thread_cond_t *cond)
+{
+#ifdef _WIN32
+ WakeAllConditionVariable(cond);
+#else
+ if (pthread_cond_broadcast(cond))
+ return HG_UTIL_FAIL;
+#endif
+
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_thread_cond_wait(hg_thread_cond_t *cond, hg_thread_mutex_t *mutex)
+{
+#ifdef _WIN32
+ if (!SleepConditionVariableCS(cond, mutex, INFINITE))
+ return HG_UTIL_FAIL;
+#else
+ if (pthread_cond_wait(cond, mutex))
+ return HG_UTIL_FAIL;
+#endif
+
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_thread_cond_timedwait(
+ hg_thread_cond_t *cond, hg_thread_mutex_t *mutex, unsigned int timeout)
+{
+#ifdef _WIN32
+ if (!SleepConditionVariableCS(cond, mutex, timeout))
+ return HG_UTIL_FAIL;
+#else
+# if defined(HG_UTIL_HAS_PTHREAD_CONDATTR_SETCLOCK)
+ hg_time_t now;
+# elif defined(HG_UTIL_HAS_SYSTIME_H)
+ struct timeval now;
+# endif
+ struct timespec abs_timeout;
+ long int abs_timeout_us;
+ ldiv_t ld;
+
+ /* Need to convert timeout (ms) to absolute time */
+# if defined(HG_UTIL_HAS_PTHREAD_CONDATTR_SETCLOCK)
+ if (hg_time_get_current(&now) != HG_UTIL_SUCCESS)
+ return HG_UTIL_FAIL;
+# elif defined(HG_UTIL_HAS_SYSTIME_H)
+ if (gettimeofday(&now, NULL) != 0)
+ return HG_UTIL_FAIL;
+# endif
+ abs_timeout_us = now.tv_usec + timeout * 1000L;
+ /* Get sec / nsec */
+ ld = ldiv(abs_timeout_us, 1000000L);
+ abs_timeout.tv_sec = now.tv_sec + ld.quot;
+ abs_timeout.tv_nsec = ld.rem * 1000L;
+
+ if (pthread_cond_timedwait(cond, mutex, &abs_timeout))
+ return HG_UTIL_FAIL;
+#endif
+
+ return HG_UTIL_SUCCESS;
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* MERCURY_THREAD_CONDITION_H */
diff --git a/src/mercury/mercury_thread_mutex.c b/src/mercury/mercury_thread_mutex.c
new file mode 100644
index 0000000..d5fcc7c
--- /dev/null
+++ b/src/mercury/mercury_thread_mutex.c
@@ -0,0 +1,50 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+#include "mercury_thread_mutex.h"
+
+/*---------------------------------------------------------------------------*/
+int
+hg_thread_mutex_init(hg_thread_mutex_t *mutex)
+{
+#ifdef _WIN32
+ InitializeCriticalSection(mutex);
+#else
+ pthread_mutexattr_t mutex_attr;
+
+ pthread_mutexattr_init(&mutex_attr);
+# ifdef HG_UTIL_HAS_PTHREAD_MUTEX_ADAPTIVE_NP
+ /* Set type to PTHREAD_MUTEX_ADAPTIVE_NP to improve performance */
+ pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_ADAPTIVE_NP);
+# else
+ pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_DEFAULT);
+# endif
+ if (pthread_mutex_init(mutex, &mutex_attr))
+ return HG_UTIL_FAIL;
+
+ pthread_mutexattr_destroy(&mutex_attr);
+#endif
+
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+int
+hg_thread_mutex_destroy(hg_thread_mutex_t *mutex)
+{
+#ifdef _WIN32
+ DeleteCriticalSection(mutex);
+#else
+ if (pthread_mutex_destroy(mutex))
+ return HG_UTIL_FAIL;
+#endif
+
+ return HG_UTIL_SUCCESS;
+}
diff --git a/src/mercury/mercury_thread_mutex.h b/src/mercury/mercury_thread_mutex.h
new file mode 100644
index 0000000..fe54a0c
--- /dev/null
+++ b/src/mercury/mercury_thread_mutex.h
@@ -0,0 +1,127 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+#ifndef MERCURY_THREAD_MUTEX_H
+#define MERCURY_THREAD_MUTEX_H
+
+#include "mercury_util_config.h"
+
+#ifdef _WIN32
+# include <windows.h>
+# define HG_THREAD_MUTEX_INITIALIZER NULL
+typedef CRITICAL_SECTION hg_thread_mutex_t;
+#else
+# include <pthread.h>
+# define HG_THREAD_MUTEX_INITIALIZER PTHREAD_MUTEX_INITIALIZER
+typedef pthread_mutex_t hg_thread_mutex_t;
+#endif
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Initialize the mutex.
+ *
+ * \param mutex [IN/OUT] pointer to mutex object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_thread_mutex_init(hg_thread_mutex_t *mutex);
+
+/**
+ * Destroy the mutex.
+ *
+ * \param mutex [IN/OUT] pointer to mutex object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_thread_mutex_destroy(hg_thread_mutex_t *mutex);
+
+/**
+ * Lock the mutex.
+ *
+ * \param mutex [IN/OUT] pointer to mutex object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+static HG_UTIL_INLINE int
+hg_thread_mutex_lock(hg_thread_mutex_t *mutex);
+
+/**
+ * Try locking the mutex.
+ *
+ * \param mutex [IN/OUT] pointer to mutex object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+static HG_UTIL_INLINE int
+hg_thread_mutex_try_lock(hg_thread_mutex_t *mutex);
+
+/**
+ * Unlock the mutex.
+ *
+ * \param mutex [IN/OUT] pointer to mutex object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+static HG_UTIL_INLINE int
+hg_thread_mutex_unlock(hg_thread_mutex_t *mutex);
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_thread_mutex_lock(hg_thread_mutex_t *mutex)
+{
+#ifdef _WIN32
+ EnterCriticalSection(mutex);
+#else
+ if (pthread_mutex_lock(mutex))
+ return HG_UTIL_FAIL;
+#endif
+
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_thread_mutex_try_lock(hg_thread_mutex_t *mutex)
+{
+#ifdef _WIN32
+ if (!TryEnterCriticalSection(mutex))
+ return HG_UTIL_FAIL;
+#else
+ if (pthread_mutex_trylock(mutex))
+ return HG_UTIL_FAIL;
+#endif
+
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_thread_mutex_unlock(hg_thread_mutex_t *mutex)
+{
+#ifdef _WIN32
+ LeaveCriticalSection(mutex);
+#else
+ if (pthread_mutex_unlock(mutex))
+ return HG_UTIL_FAIL;
+#endif
+
+ return HG_UTIL_SUCCESS;
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* MERCURY_THREAD_MUTEX_H */
diff --git a/src/mercury/mercury_thread_pool.c b/src/mercury/mercury_thread_pool.c
new file mode 100644
index 0000000..48e3b6c
--- /dev/null
+++ b/src/mercury/mercury_thread_pool.c
@@ -0,0 +1,191 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+#include "mercury_thread_pool.h"
+
+#include "mercury_util_error.h"
+
+#include <stdlib.h>
+
+/****************/
+/* Local Macros */
+/****************/
+
+/************************************/
+/* Local Type and Struct Definition */
+/************************************/
+
+struct hg_thread_pool_private {
+ struct hg_thread_pool pool;
+ unsigned int thread_count;
+ hg_thread_t *threads;
+};
+
+/********************/
+/* Local Prototypes */
+/********************/
+
+/**
+ * Worker thread run by the thread pool
+ */
+static HG_THREAD_RETURN_TYPE
+hg_thread_pool_worker(void *args);
+
+/*******************/
+/* Local Variables */
+/*******************/
+
+/*---------------------------------------------------------------------------*/
+static HG_THREAD_RETURN_TYPE
+hg_thread_pool_worker(void *args)
+{
+ hg_thread_ret_t ret = 0;
+ hg_thread_pool_t *pool = (hg_thread_pool_t *) args;
+ struct hg_thread_work *work;
+
+ while (1) {
+ hg_thread_mutex_lock(&pool->mutex);
+
+ /* If not shutting down and nothing to do, worker sleeps */
+ while (!pool->shutdown && HG_QUEUE_IS_EMPTY(&pool->queue)) {
+ int rc;
+
+ pool->sleeping_worker_count++;
+
+ rc = hg_thread_cond_wait(&pool->cond, &pool->mutex);
+ HG_UTIL_CHECK_ERROR_NORET(rc != HG_UTIL_SUCCESS, unlock,
+ "Thread cannot wait on condition variable");
+
+ pool->sleeping_worker_count--;
+ }
+
+ if (pool->shutdown && HG_QUEUE_IS_EMPTY(&pool->queue))
+ goto unlock;
+
+ /* Grab our task */
+ work = HG_QUEUE_FIRST(&pool->queue);
+ HG_QUEUE_POP_HEAD(&pool->queue, entry);
+
+ /* Unlock */
+ hg_thread_mutex_unlock(&pool->mutex);
+
+ /* Get to work */
+ (*work->func)(work->args);
+ }
+
+unlock:
+ hg_thread_mutex_unlock(&pool->mutex);
+
+ return ret;
+}
+
+/*---------------------------------------------------------------------------*/
+int
+hg_thread_pool_init(unsigned int thread_count, hg_thread_pool_t **pool_ptr)
+{
+ int ret = HG_UTIL_SUCCESS, rc;
+ struct hg_thread_pool_private *priv_pool = NULL;
+ unsigned int i;
+
+ HG_UTIL_CHECK_ERROR(
+ pool_ptr == NULL, error, ret, HG_UTIL_FAIL, "NULL pointer");
+
+ priv_pool = (struct hg_thread_pool_private *) malloc(
+ sizeof(struct hg_thread_pool_private));
+ HG_UTIL_CHECK_ERROR(priv_pool == NULL, error, ret, HG_UTIL_FAIL,
+ "Could not allocate thread pool");
+
+ priv_pool->pool.sleeping_worker_count = 0;
+ priv_pool->thread_count = thread_count;
+ priv_pool->threads = NULL;
+ HG_QUEUE_INIT(&priv_pool->pool.queue);
+ priv_pool->pool.shutdown = 0;
+
+ rc = hg_thread_mutex_init(&priv_pool->pool.mutex);
+ HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, error, ret, HG_UTIL_FAIL,
+ "Could not initialize mutex");
+
+ rc = hg_thread_cond_init(&priv_pool->pool.cond);
+ HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, error, ret, HG_UTIL_FAIL,
+ "Could not initialize thread condition");
+
+ priv_pool->threads =
+ (hg_thread_t *) malloc(thread_count * sizeof(hg_thread_t));
+ HG_UTIL_CHECK_ERROR(!priv_pool->threads, error, ret, HG_UTIL_FAIL,
+ "Could not allocate thread pool array");
+
+ /* Start worker threads */
+ for (i = 0; i < thread_count; i++) {
+ rc = hg_thread_create(
+ &priv_pool->threads[i], hg_thread_pool_worker, (void *) priv_pool);
+ HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, error, ret, HG_UTIL_FAIL,
+ "Could not create thread");
+ }
+
+ *pool_ptr = (struct hg_thread_pool *) priv_pool;
+
+ return ret;
+
+error:
+ if (priv_pool)
+ hg_thread_pool_destroy((struct hg_thread_pool *) priv_pool);
+
+ return ret;
+}
+
+/*---------------------------------------------------------------------------*/
+int
+hg_thread_pool_destroy(hg_thread_pool_t *pool)
+{
+ struct hg_thread_pool_private *priv_pool =
+ (struct hg_thread_pool_private *) pool;
+ int ret = HG_UTIL_SUCCESS, rc;
+ unsigned int i;
+
+ if (!priv_pool)
+ goto done;
+
+ if (priv_pool->threads) {
+ hg_thread_mutex_lock(&priv_pool->pool.mutex);
+
+ priv_pool->pool.shutdown = 1;
+
+ rc = hg_thread_cond_broadcast(&priv_pool->pool.cond);
+ HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, error, ret, HG_UTIL_FAIL,
+ "Could not broadcast condition signal");
+
+ hg_thread_mutex_unlock(&priv_pool->pool.mutex);
+
+ for (i = 0; i < priv_pool->thread_count; i++) {
+ rc = hg_thread_join(priv_pool->threads[i]);
+ HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, done, ret, HG_UTIL_FAIL,
+ "Could not join thread");
+ }
+ }
+
+ rc = hg_thread_mutex_destroy(&priv_pool->pool.mutex);
+ HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, done, ret, HG_UTIL_FAIL,
+ "Could not destroy mutex");
+
+ rc = hg_thread_cond_destroy(&priv_pool->pool.cond);
+ HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, done, ret, HG_UTIL_FAIL,
+ "Could not destroy thread condition");
+
+ free(priv_pool->threads);
+ free(priv_pool);
+
+done:
+ return ret;
+
+error:
+ hg_thread_mutex_unlock(&priv_pool->pool.mutex);
+
+ return ret;
+}
diff --git a/src/mercury/mercury_thread_pool.h b/src/mercury/mercury_thread_pool.h
new file mode 100644
index 0000000..8ad501a
--- /dev/null
+++ b/src/mercury/mercury_thread_pool.h
@@ -0,0 +1,124 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+#ifndef MERCURY_THREAD_POOL_H
+#define MERCURY_THREAD_POOL_H
+
+#include "mercury_queue.h"
+#include "mercury_thread.h"
+#include "mercury_thread_condition.h"
+
+
+
+/*************************************/
+/* Public Type and Struct Definition */
+/*************************************/
+
+typedef struct hg_thread_pool hg_thread_pool_t;
+
+struct hg_thread_pool {
+ unsigned int sleeping_worker_count;
+ HG_QUEUE_HEAD(hg_thread_work) queue;
+ int shutdown;
+ hg_thread_mutex_t mutex;
+ hg_thread_cond_t cond;
+};
+
+struct hg_thread_work {
+ hg_thread_func_t func;
+ void *args;
+ HG_QUEUE_ENTRY(hg_thread_work) entry; /* Internal */
+};
+
+/*****************/
+/* Public Macros */
+/*****************/
+
+/*********************/
+/* Public Prototypes */
+/*********************/
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Initialize the thread pool.
+ *
+ * \param thread_count [IN] number of threads that will be created at
+ * initialization
+ * \param pool [OUT] pointer to pool object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_thread_pool_init(unsigned int thread_count, hg_thread_pool_t **pool);
+
+/**
+ * Destroy the thread pool.
+ *
+ * \param pool [IN/OUT] pointer to pool object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_thread_pool_destroy(hg_thread_pool_t *pool);
+
+/**
+ * Post work to the pool. Note that the operation may be queued depending on
+ * the number of threads and number of tasks already running.
+ *
+ * \param pool [IN/OUT] pointer to pool object
+ * \param work [IN] pointer to work struct
+ *
+ * \return Non-negative on success or negative on failure
+ */
+static HG_UTIL_INLINE int
+hg_thread_pool_post(hg_thread_pool_t *pool, struct hg_thread_work *work);
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_thread_pool_post(hg_thread_pool_t *pool, struct hg_thread_work *work)
+{
+ int ret = HG_UTIL_SUCCESS;
+
+ if (!pool || !work)
+ return HG_UTIL_FAIL;
+
+ if (!work->func)
+ return HG_UTIL_FAIL;
+
+ hg_thread_mutex_lock(&pool->mutex);
+
+ /* Are we shutting down ? */
+ if (pool->shutdown) {
+ ret = HG_UTIL_FAIL;
+ goto unlock;
+ }
+
+ /* Add task to task queue */
+ HG_QUEUE_PUSH_TAIL(&pool->queue, work, entry);
+
+ /* Wake up sleeping worker */
+ if (pool->sleeping_worker_count &&
+ (hg_thread_cond_signal(&pool->cond) != HG_UTIL_SUCCESS))
+ ret = HG_UTIL_FAIL;
+
+unlock:
+ hg_thread_mutex_unlock(&pool->mutex);
+
+ return ret;
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* MERCURY_THREAD_POOL_H */
diff --git a/src/mercury/mercury_thread_rwlock.c b/src/mercury/mercury_thread_rwlock.c
new file mode 100644
index 0000000..b7ffde4
--- /dev/null
+++ b/src/mercury/mercury_thread_rwlock.c
@@ -0,0 +1,77 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+/* Copyright (C) 2017 Intel Corporation
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted for any purpose (including commercial purposes)
+ * provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions, and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions, and the following disclaimer in the
+ * documentation and/or materials provided with the distribution.
+ *
+ * 3. In addition, redistributions of modified forms of the source or binary
+ * code must carry prominent notices stating that the original code was
+ * changed and the date of the change.
+ *
+ * 4. All publications or advertising materials mentioning features or use of
+ * this software are asked, but not required, to acknowledge that it was
+ * developed by Intel Corporation and credit the contributors.
+ *
+ * 5. Neither the name of Intel Corporation, nor the name of any Contributor
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "mercury_thread_rwlock.h"
+
+/*---------------------------------------------------------------------------*/
+int
+hg_thread_rwlock_init(hg_thread_rwlock_t *rwlock)
+{
+#ifdef _WIN32
+ InitializeSRWLock(rwlock);
+#else
+ if (pthread_rwlock_init(rwlock, NULL))
+ return HG_UTIL_FAIL;
+#endif
+
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+int
+hg_thread_rwlock_destroy(hg_thread_rwlock_t *rwlock)
+{
+#ifdef _WIN32
+ /* nothing to do */
+#else
+ if (pthread_rwlock_destroy(rwlock))
+ return HG_UTIL_FAIL;
+#endif
+
+ return HG_UTIL_SUCCESS;
+}
diff --git a/src/mercury/mercury_thread_rwlock.h b/src/mercury/mercury_thread_rwlock.h
new file mode 100644
index 0000000..22985c8
--- /dev/null
+++ b/src/mercury/mercury_thread_rwlock.h
@@ -0,0 +1,236 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+/* Copyright (C) 2017 Intel Corporation
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted for any purpose (including commercial purposes)
+ * provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions, and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions, and the following disclaimer in the
+ * documentation and/or materials provided with the distribution.
+ *
+ * 3. In addition, redistributions of modified forms of the source or binary
+ * code must carry prominent notices stating that the original code was
+ * changed and the date of the change.
+ *
+ * 4. All publications or advertising materials mentioning features or use of
+ * this software are asked, but not required, to acknowledge that it was
+ * developed by Intel Corporation and credit the contributors.
+ *
+ * 5. Neither the name of Intel Corporation, nor the name of any Contributor
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef MERCURY_THREAD_RWLOCK_H
+#define MERCURY_THREAD_RWLOCK_H
+
+#include "mercury_util_config.h"
+
+#ifdef _WIN32
+# include <windows.h>
+typedef PSRWLOCK hg_thread_rwlock_t;
+#else
+# include <pthread.h>
+typedef pthread_rwlock_t hg_thread_rwlock_t;
+#endif
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Initialize the rwlock.
+ *
+ * \param rwlock [IN/OUT] pointer to rwlock object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_thread_rwlock_init(hg_thread_rwlock_t *rwlock);
+
+/**
+ * Destroy the rwlock.
+ *
+ * \param rwlock [IN/OUT] pointer to rwlock object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_thread_rwlock_destroy(hg_thread_rwlock_t *rwlock);
+
+/**
+ * Take a read lock for the rwlock.
+ *
+ * \param rwlock [IN/OUT] pointer to rwlock object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+static HG_UTIL_INLINE int
+hg_thread_rwlock_rdlock(hg_thread_rwlock_t *rwlock);
+
+/**
+ * Try to take a read lock for the rwlock.
+ *
+ * \param rwlock [IN/OUT] pointer to rwlock object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+static HG_UTIL_INLINE int
+hg_thread_rwlock_try_rdlock(hg_thread_rwlock_t *rwlock);
+
+/**
+ * Release the read lock of the rwlock.
+ *
+ * \param rwlock [IN/OUT] pointer to rwlock object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+static HG_UTIL_INLINE int
+hg_thread_rwlock_release_rdlock(hg_thread_rwlock_t *rwlock);
+
+/**
+ * Take a write lock for the rwlock.
+ *
+ * \param rwlock [IN/OUT] pointer to rwlock object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+static HG_UTIL_INLINE int
+hg_thread_rwlock_wrlock(hg_thread_rwlock_t *rwlock);
+
+/**
+ * Try to take a write lock for the rwlock.
+ *
+ * \param rwlock [IN/OUT] pointer to rwlock object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+static HG_UTIL_INLINE int
+hg_thread_rwlock_try_wrlock(hg_thread_rwlock_t *rwlock);
+
+/**
+ * Release the write lock of the rwlock.
+ *
+ * \param rwlock [IN/OUT] pointer to rwlock object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+static HG_UTIL_INLINE int
+hg_thread_rwlock_release_wrlock(hg_thread_rwlock_t *rwlock);
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_thread_rwlock_rdlock(hg_thread_rwlock_t *rwlock)
+{
+#ifdef _WIN32
+ AcquireSRWLockShared(rwlock);
+#else
+ if (pthread_rwlock_rdlock(rwlock))
+ return HG_UTIL_FAIL;
+#endif
+
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_thread_rwlock_try_rdlock(hg_thread_rwlock_t *rwlock)
+{
+#ifdef _WIN32
+ if (TryAcquireSRWLockShared(rwlock) == 0)
+ return HG_UTIL_FAIL;
+#else
+ if (pthread_rwlock_tryrdlock(rwlock))
+ return HG_UTIL_FAIL;
+#endif
+
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_thread_rwlock_release_rdlock(hg_thread_rwlock_t *rwlock)
+{
+#ifdef _WIN32
+ ReleaseSRWLockShared(rwlock);
+#else
+ if (pthread_rwlock_unlock(rwlock))
+ return HG_UTIL_FAIL;
+#endif
+
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_thread_rwlock_wrlock(hg_thread_rwlock_t *rwlock)
+{
+#ifdef _WIN32
+ ReleaseSRWLockExclusive(rwlock);
+#else
+ if (pthread_rwlock_wrlock(rwlock))
+ return HG_UTIL_FAIL;
+#endif
+
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_thread_rwlock_try_wrlock(hg_thread_rwlock_t *rwlock)
+{
+#ifdef _WIN32
+ if (TryAcquireSRWLockExclusive(rwlock) == 0)
+ return HG_UTIL_FAIL;
+#else
+ if (pthread_rwlock_trywrlock(rwlock))
+ return HG_UTIL_FAIL;
+#endif
+
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_thread_rwlock_release_wrlock(hg_thread_rwlock_t *rwlock)
+{
+#ifdef _WIN32
+ ReleaseSRWLockExclusive(rwlock);
+#else
+ if (pthread_rwlock_unlock(rwlock))
+ return HG_UTIL_FAIL;
+#endif
+
+ return HG_UTIL_SUCCESS;
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* MERCURY_THREAD_RWLOCK_H */
diff --git a/src/mercury/mercury_thread_spin.c b/src/mercury/mercury_thread_spin.c
new file mode 100644
index 0000000..60ef1f6
--- /dev/null
+++ b/src/mercury/mercury_thread_spin.c
@@ -0,0 +1,47 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+#include "mercury_thread_spin.h"
+
+/*---------------------------------------------------------------------------*/
+int
+hg_thread_spin_init(hg_thread_spin_t *lock)
+{
+#if defined(_WIN32)
+ *lock = 0;
+
+ return HG_UTIL_SUCCESS;
+#elif defined(HG_UTIL_HAS_PTHREAD_SPINLOCK_T)
+ if (pthread_spin_init(lock, 0))
+ return HG_UTIL_FAIL;
+
+ return HG_UTIL_SUCCESS;
+#else
+ return hg_thread_mutex_init(lock);
+#endif
+}
+
+/*---------------------------------------------------------------------------*/
+int
+hg_thread_spin_destroy(hg_thread_spin_t *lock)
+{
+#if defined(_WIN32)
+ (void) lock;
+
+ return HG_UTIL_SUCCESS;
+#elif defined(HG_UTIL_HAS_PTHREAD_SPINLOCK_T)
+ if (pthread_spin_destroy(lock))
+ return HG_UTIL_FAIL;
+
+ return HG_UTIL_SUCCESS;
+#else
+ return hg_thread_mutex_destroy(lock);
+#endif
+}
diff --git a/src/mercury/mercury_thread_spin.h b/src/mercury/mercury_thread_spin.h
new file mode 100644
index 0000000..661d084
--- /dev/null
+++ b/src/mercury/mercury_thread_spin.h
@@ -0,0 +1,146 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+#ifndef MERCURY_THREAD_SPIN_H
+#define MERCURY_THREAD_SPIN_H
+
+#include "mercury_util_config.h"
+
+#if defined(_WIN32)
+# include <windows.h>
+typedef volatile LONG hg_thread_spin_t;
+#elif defined(HG_UTIL_HAS_PTHREAD_SPINLOCK_T)
+# include <pthread.h>
+typedef pthread_spinlock_t hg_thread_spin_t;
+#else
+/* Default to hg_thread_mutex_t if pthread_spinlock_t is not supported */
+# include "mercury_thread_mutex.h"
+typedef hg_thread_mutex_t hg_thread_spin_t;
+#endif
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Initialize the spin lock.
+ *
+ * \param lock [IN/OUT] pointer to lock object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_thread_spin_init(hg_thread_spin_t *lock);
+
+/**
+ * Destroy the spin lock.
+ *
+ * \param lock [IN/OUT] pointer to lock object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+HG_UTIL_PUBLIC int
+hg_thread_spin_destroy(hg_thread_spin_t *lock);
+
+/**
+ * Lock the spin lock.
+ *
+ * \param lock [IN/OUT] pointer to lock object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+static HG_UTIL_INLINE int
+hg_thread_spin_lock(hg_thread_spin_t *lock);
+
+/**
+ * Try locking the spin lock.
+ *
+ * \param mutex [IN/OUT] pointer to lock object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+static HG_UTIL_INLINE int
+hg_thread_spin_try_lock(hg_thread_spin_t *lock);
+
+/**
+ * Unlock the spin lock.
+ *
+ * \param mutex [IN/OUT] pointer to lock object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+static HG_UTIL_INLINE int
+hg_thread_spin_unlock(hg_thread_spin_t *lock);
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_thread_spin_lock(hg_thread_spin_t *lock)
+{
+#if defined(_WIN32)
+ while (InterlockedExchange(lock, EBUSY)) {
+ /* Don't lock while waiting */
+ while (*lock) {
+ YieldProcessor();
+
+ /* Compiler barrier. Prevent caching of *lock */
+ MemoryBarrier();
+ }
+ }
+ return HG_UTIL_SUCCESS;
+#elif defined(HG_UTIL_HAS_PTHREAD_SPINLOCK_T)
+ if (pthread_spin_lock(lock))
+ return HG_UTIL_FAIL;
+
+ return HG_UTIL_SUCCESS;
+#else
+ return hg_thread_mutex_lock(lock);
+#endif
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_thread_spin_try_lock(hg_thread_spin_t *lock)
+{
+#if defined(_WIN32)
+ return InterlockedExchange(lock, EBUSY);
+#elif defined(HG_UTIL_HAS_PTHREAD_SPINLOCK_T)
+ if (pthread_spin_trylock(lock))
+ return HG_UTIL_FAIL;
+
+ return HG_UTIL_SUCCESS;
+#else
+ return hg_thread_mutex_try_lock(lock);
+#endif
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_thread_spin_unlock(hg_thread_spin_t *lock)
+{
+#if defined(_WIN32)
+ /* Compiler barrier. The store below acts with release semantics */
+ MemoryBarrier();
+ *lock = 0;
+
+ return HG_UTIL_SUCCESS;
+#elif defined(HG_UTIL_HAS_PTHREAD_SPINLOCK_T)
+ if (pthread_spin_unlock(lock))
+ return HG_UTIL_FAIL;
+ return HG_UTIL_SUCCESS;
+#else
+ return hg_thread_mutex_unlock(lock);
+#endif
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* MERCURY_THREAD_SPIN_H */
diff --git a/src/mercury/mercury_time.h b/src/mercury/mercury_time.h
new file mode 100644
index 0000000..3493a9f
--- /dev/null
+++ b/src/mercury/mercury_time.h
@@ -0,0 +1,402 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+#ifndef MERCURY_TIME_H
+#define MERCURY_TIME_H
+
+#include "mercury_util_config.h"
+
+#if defined(_WIN32)
+# include <windows.h>
+#elif defined(HG_UTIL_HAS_CLOCK_MONOTONIC)
+# if defined(HG_UTIL_HAS_TIME_H) && defined(HG_UTIL_HAS_CLOCK_GETTIME)
+# include <time.h>
+# elif defined(__APPLE__) && defined(HG_UTIL_HAS_SYSTIME_H)
+# include <mach/mach_time.h>
+# include <sys/time.h>
+# else
+# error "Not supported on this platform."
+# endif
+#else
+# include <stdio.h>
+# include <unistd.h>
+# if defined(HG_UTIL_HAS_SYSTIME_H)
+# include <sys/time.h>
+# else
+# error "Not supported on this platform."
+# endif
+#endif
+
+/*************************************/
+/* Public Type and Struct Definition */
+/*************************************/
+
+typedef struct hg_time hg_time_t;
+
+struct hg_time {
+ long tv_sec;
+ long tv_usec;
+};
+
+/*****************/
+/* Public Macros */
+/*****************/
+
+/*********************/
+/* Public Prototypes */
+/*********************/
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Get an elapsed time on the calling processor.
+ *
+ * \param tv [OUT] pointer to returned time structure
+ *
+ * \return Non-negative on success or negative on failure
+ */
+static HG_UTIL_INLINE int
+hg_time_get_current(hg_time_t *tv);
+
+/**
+ * Convert hg_time_t to double.
+ *
+ * \param tv [IN] time structure
+ *
+ * \return Converted time in seconds
+ */
+static HG_UTIL_INLINE double
+hg_time_to_double(hg_time_t tv);
+
+/**
+ * Convert double to hg_time_t.
+ *
+ * \param d [IN] time in seconds
+ *
+ * \return Converted time structure
+ */
+static HG_UTIL_INLINE hg_time_t
+hg_time_from_double(double d);
+
+/**
+ * Compare time values.
+ *
+ * \param in1 [IN] time structure
+ * \param in2 [IN] time structure
+ *
+ * \return 1 if in1 < in2, 0 otherwise
+ */
+static HG_UTIL_INLINE int
+hg_time_less(hg_time_t in1, hg_time_t in2);
+
+/**
+ * Add time values.
+ *
+ * \param in1 [IN] time structure
+ * \param in2 [IN] time structure
+ *
+ * \return Summed time structure
+ */
+static HG_UTIL_INLINE hg_time_t
+hg_time_add(hg_time_t in1, hg_time_t in2);
+
+/**
+ * Subtract time values.
+ *
+ * \param in1 [IN] time structure
+ * \param in2 [IN] time structure
+ *
+ * \return Subtracted time structure
+ */
+static HG_UTIL_INLINE hg_time_t
+hg_time_subtract(hg_time_t in1, hg_time_t in2);
+
+/**
+ * Sleep until the time specified in rqt has elapsed.
+ *
+ * \param reqt [IN] time structure
+ *
+ * \return Non-negative on success or negative on failure
+ */
+static HG_UTIL_INLINE int
+hg_time_sleep(const hg_time_t rqt);
+
+/**
+ * Get a string containing current time/date stamp.
+ *
+ * \return Valid string or NULL on failure
+ */
+static HG_UTIL_INLINE char *
+hg_time_stamp(void);
+
+/*---------------------------------------------------------------------------*/
+#ifdef _WIN32
+static HG_UTIL_INLINE LARGE_INTEGER
+get_FILETIME_offset(void)
+{
+ SYSTEMTIME s;
+ FILETIME f;
+ LARGE_INTEGER t;
+
+ s.wYear = 1970;
+ s.wMonth = 1;
+ s.wDay = 1;
+ s.wHour = 0;
+ s.wMinute = 0;
+ s.wSecond = 0;
+ s.wMilliseconds = 0;
+ SystemTimeToFileTime(&s, &f);
+ t.QuadPart = f.dwHighDateTime;
+ t.QuadPart <<= 32;
+ t.QuadPart |= f.dwLowDateTime;
+
+ return t;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_time_get_current(hg_time_t *tv)
+{
+ LARGE_INTEGER t;
+ FILETIME f;
+ double t_usec;
+ static LARGE_INTEGER offset;
+ static double freq_to_usec;
+ static int initialized = 0;
+ static BOOL use_perf_counter = 0;
+
+ if (!tv)
+ return HG_UTIL_FAIL;
+
+ if (!initialized) {
+ LARGE_INTEGER perf_freq;
+ initialized = 1;
+ use_perf_counter = QueryPerformanceFrequency(&perf_freq);
+ if (use_perf_counter) {
+ QueryPerformanceCounter(&offset);
+ freq_to_usec = (double) perf_freq.QuadPart / 1000000.;
+ } else {
+ offset = get_FILETIME_offset();
+ freq_to_usec = 10.;
+ }
+ }
+ if (use_perf_counter) {
+ QueryPerformanceCounter(&t);
+ } else {
+ GetSystemTimeAsFileTime(&f);
+ t.QuadPart = f.dwHighDateTime;
+ t.QuadPart <<= 32;
+ t.QuadPart |= f.dwLowDateTime;
+ }
+
+ t.QuadPart -= offset.QuadPart;
+ t_usec = (double) t.QuadPart / freq_to_usec;
+ t.QuadPart = t_usec;
+ tv->tv_sec = t.QuadPart / 1000000;
+ tv->tv_usec = t.QuadPart % 1000000;
+
+ return HG_UTIL_SUCCESS;
+}
+
+#elif defined(HG_UTIL_HAS_CLOCK_MONOTONIC)
+/*---------------------------------------------------------------------------*/
+# if defined(HG_UTIL_HAS_TIME_H) && defined(HG_UTIL_HAS_CLOCK_GETTIME)
+static HG_UTIL_INLINE int
+hg_time_get_current(hg_time_t *tv)
+{
+ struct timespec tp = {0, 0};
+ /* NB. CLOCK_MONOTONIC_RAW is not explicitly supported in the vdso */
+ clockid_t clock_id = CLOCK_MONOTONIC;
+
+ if (!tv)
+ return HG_UTIL_FAIL;
+
+ clock_gettime(clock_id, &tp);
+ tv->tv_sec = tp.tv_sec;
+ tv->tv_usec = tp.tv_nsec / 1000;
+
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+# elif defined(__APPLE__) && defined(HG_UTIL_HAS_SYSTIME_H)
+static HG_UTIL_INLINE int
+hg_time_get_current(hg_time_t *tv)
+{
+ static uint64_t monotonic_timebase_factor = 0;
+ uint64_t monotonic_nsec;
+
+ if (!tv)
+ return HG_UTIL_FAIL;
+
+ if (monotonic_timebase_factor == 0) {
+ mach_timebase_info_data_t timebase_info;
+
+ (void) mach_timebase_info(&timebase_info);
+ monotonic_timebase_factor = timebase_info.numer / timebase_info.denom;
+ }
+ monotonic_nsec = (mach_absolute_time() * monotonic_timebase_factor);
+ tv->tv_sec = (long) (monotonic_nsec / 1000000000);
+ tv->tv_usec = (long) ((monotonic_nsec - (uint64_t) tv->tv_sec) / 1000);
+
+ return HG_UTIL_SUCCESS;
+}
+
+# endif
+#else
+/*---------------------------------------------------------------------------*/
+# if defined(HG_UTIL_HAS_SYSTIME_H)
+static HG_UTIL_INLINE int
+hg_time_get_current(hg_time_t *tv)
+{
+ if (!tv)
+ return HG_UTIL_FAIL;
+
+ gettimeofday((struct timeval *) tv, NULL);
+
+ return HG_UTIL_SUCCESS;
+}
+
+# endif
+#endif
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE double
+hg_time_to_double(hg_time_t tv)
+{
+ return (double) tv.tv_sec + (double) (tv.tv_usec) * 0.000001;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE hg_time_t
+hg_time_from_double(double d)
+{
+ hg_time_t tv;
+
+ tv.tv_sec = (long) d;
+ tv.tv_usec = (long) ((d - (double) (tv.tv_sec)) * 1000000);
+
+ return tv;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_time_less(hg_time_t in1, hg_time_t in2)
+{
+ return ((in1.tv_sec < in2.tv_sec) ||
+ ((in1.tv_sec == in2.tv_sec) && (in1.tv_usec < in2.tv_usec)));
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE hg_time_t
+hg_time_add(hg_time_t in1, hg_time_t in2)
+{
+ hg_time_t out;
+
+ out.tv_sec = in1.tv_sec + in2.tv_sec;
+ out.tv_usec = in1.tv_usec + in2.tv_usec;
+ if (out.tv_usec > 1000000) {
+ out.tv_usec -= 1000000;
+ out.tv_sec += 1;
+ }
+
+ return out;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE hg_time_t
+hg_time_subtract(hg_time_t in1, hg_time_t in2)
+{
+ hg_time_t out;
+
+ out.tv_sec = in1.tv_sec - in2.tv_sec;
+ out.tv_usec = in1.tv_usec - in2.tv_usec;
+ if (out.tv_usec < 0) {
+ out.tv_usec += 1000000;
+ out.tv_sec -= 1;
+ }
+
+ return out;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_time_sleep(const hg_time_t rqt)
+{
+#ifdef _WIN32
+ DWORD dwMilliseconds = (DWORD)(hg_time_to_double(rqt) / 1000);
+
+ Sleep(dwMilliseconds);
+#elif defined(HG_UTIL_HAS_CLOCK_MONOTONIC)
+ struct timespec rqtp;
+
+ rqtp.tv_sec = rqt.tv_sec;
+ rqtp.tv_nsec = rqt.tv_usec * 1000;
+
+ if (nanosleep(&rqtp, NULL))
+ return HG_UTIL_FAIL;
+#else
+ useconds_t usec =
+ (useconds_t) rqt.tv_sec * 1000000 + (useconds_t) rqt.tv_usec;
+
+ if (usleep(usec))
+ return HG_UTIL_FAIL;
+#endif
+
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+#define HG_UTIL_STAMP_MAX 128
+static HG_UTIL_INLINE char *
+hg_time_stamp(void)
+{
+ static char buf[HG_UTIL_STAMP_MAX] = {'\0'};
+
+#if defined(_WIN32)
+ /* TODO not implemented */
+#elif defined(HG_UTIL_HAS_CLOCK_MONOTONIC)
+ struct tm *local_time;
+ time_t t;
+
+ t = time(NULL);
+ local_time = localtime(&t);
+ if (local_time == NULL)
+ return NULL;
+
+ if (strftime(buf, HG_UTIL_STAMP_MAX, "%a, %d %b %Y %T %Z", local_time) == 0)
+ return NULL;
+#else
+ struct timeval tv;
+ struct timezone tz;
+ unsigned long days, hours, minutes, seconds;
+
+ gettimeofday(&tv, &tz);
+ days = (unsigned long) tv.tv_sec / (3600 * 24);
+ hours = ((unsigned long) tv.tv_sec - days * 24 * 3600) / 3600;
+ minutes =
+ ((unsigned long) tv.tv_sec - days * 24 * 3600 - hours * 3600) / 60;
+ seconds = (unsigned long) tv.tv_sec - days * 24 * 3600 - hours * 3600 -
+ minutes * 60;
+ hours -= (unsigned long) tz.tz_minuteswest / 60;
+
+ snprintf(buf, HG_UTIL_STAMP_MAX, "%02lu:%02lu:%02lu (GMT-%d)", hours,
+ minutes, seconds, tz.tz_minuteswest / 60);
+#endif
+
+ return buf;
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* MERCURY_TIME_H */
diff --git a/src/mercury/mercury_util_config.h b/src/mercury/mercury_util_config.h
new file mode 100644
index 0000000..3a3a9d8
--- /dev/null
+++ b/src/mercury/mercury_util_config.h
@@ -0,0 +1,141 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+/* Generated file. Only edit mercury_util_config.h.in. */
+
+#ifndef MERCURY_UTIL_CONFIG_H
+#define MERCURY_UTIL_CONFIG_H
+
+/*************************************/
+/* Public Type and Struct Definition */
+/*************************************/
+
+/* Type definitions */
+#ifdef _WIN32
+typedef signed __int64 hg_util_int64_t;
+typedef signed __int32 hg_util_int32_t;
+typedef signed __int16 hg_util_int16_t;
+typedef signed __int8 hg_util_int8_t;
+typedef unsigned __int64 hg_util_uint64_t;
+typedef unsigned __int32 hg_util_uint32_t;
+typedef unsigned __int16 hg_util_uint16_t;
+typedef unsigned __int8 hg_util_uint8_t;
+#else
+# include <stddef.h>
+# include <stdint.h>
+typedef int64_t hg_util_int64_t;
+typedef int32_t hg_util_int32_t;
+typedef int16_t hg_util_int16_t;
+typedef int8_t hg_util_int8_t;
+typedef uint64_t hg_util_uint64_t;
+typedef uint32_t hg_util_uint32_t;
+typedef uint16_t hg_util_uint16_t;
+typedef uint8_t hg_util_uint8_t;
+#endif
+typedef hg_util_uint8_t hg_util_bool_t;
+typedef hg_util_uint64_t hg_util_ptr_t;
+
+/* True / false */
+#define HG_UTIL_TRUE 1
+#define HG_UTIL_FALSE 0
+
+/* Return codes */
+#define HG_UTIL_SUCCESS 0
+#define HG_UTIL_FAIL -1
+
+/*****************/
+/* Public Macros */
+/*****************/
+
+/* Visibility of symbols */
+#if defined(_WIN32)
+# define HG_UTIL_ABI_IMPORT __declspec(dllimport)
+# define HG_UTIL_ABI_EXPORT __declspec(dllexport)
+# define HG_UTIL_ABI_HIDDEN
+#elif defined(__GNUC__) && (__GNUC__ >= 4)
+# define HG_UTIL_ABI_IMPORT __attribute__((visibility("default")))
+# define HG_UTIL_ABI_EXPORT __attribute__((visibility("default")))
+# define HG_UTIL_ABI_HIDDEN __attribute__((visibility("hidden")))
+#else
+# define HG_UTIL_ABI_IMPORT
+# define HG_UTIL_ABI_EXPORT
+# define HG_UTIL_ABI_HIDDEN
+#endif
+
+/* Inline macro */
+#ifdef _WIN32
+# define HG_UTIL_INLINE __inline
+#else
+# define HG_UTIL_INLINE __inline__
+#endif
+
+/* Shared libraries */
+#define HG_UTIL_BUILD_SHARED_LIBS
+#ifdef HG_UTIL_BUILD_SHARED_LIBS
+# ifdef mercury_util_EXPORTS
+# define HG_UTIL_PUBLIC HG_UTIL_ABI_EXPORT
+# else
+# define HG_UTIL_PUBLIC HG_UTIL_ABI_IMPORT
+# endif
+# define HG_UTIL_PRIVATE HG_UTIL_ABI_HIDDEN
+#else
+# define HG_UTIL_PUBLIC
+# define HG_UTIL_PRIVATE
+#endif
+
+/* Define if has 'clock_gettime()' */
+#define HG_UTIL_HAS_CLOCK_GETTIME
+
+/* Define if has CLOCK_MONOTONIC */
+/* #undef HG_UTIL_HAS_CLOCK_MONOTONIC */
+
+/* Define if has eventfd_t type */
+#define HG_UTIL_HAS_EVENTFD_T
+
+/* Define if has colored output */
+/* #undef HG_UTIL_HAS_LOG_COLOR */
+
+/* Define if has <opa_primitives.h> */
+/* #undef HG_UTIL_HAS_OPA_PRIMITIVES_H */
+
+/* Define if has 'pthread_condattr_setclock()' */
+#define HG_UTIL_HAS_PTHREAD_CONDATTR_SETCLOCK
+
+/* Define if has PTHREAD_MUTEX_ADAPTIVE_NP */
+#define HG_UTIL_HAS_PTHREAD_MUTEX_ADAPTIVE_NP
+
+/* Define if has pthread_spinlock_t type */
+#define HG_UTIL_HAS_PTHREAD_SPINLOCK_T
+
+/* Define if has <stdatomic.h> */
+#define HG_UTIL_HAS_STDATOMIC_H
+
+/* Define type size of atomic_long */
+#define HG_UTIL_ATOMIC_LONG_WIDTH 8
+
+/* Define if has <sys/epoll.h> */
+#define HG_UTIL_HAS_SYSEPOLL_H
+
+/* Define if has <sys/event.h> */
+/* #undef HG_UTIL_HAS_SYSEVENT_H */
+
+/* Define if has <sys/eventfd.h> */
+#define HG_UTIL_HAS_SYSEVENTFD_H
+
+/* Define if has <sys/time.h> */
+#define HG_UTIL_HAS_SYSTIME_H
+
+/* Define if has <time.h> */
+#define HG_UTIL_HAS_TIME_H
+
+/* Define if has verbose error */
+#define HG_UTIL_HAS_VERBOSE_ERROR
+
+#endif /* MERCURY_UTIL_CONFIG_H */
diff --git a/src/mercury/mercury_util_error.c b/src/mercury/mercury_util_error.c
new file mode 100644
index 0000000..0280c88
--- /dev/null
+++ b/src/mercury/mercury_util_error.c
@@ -0,0 +1,20 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+#include "mercury_util_error.h"
+
+/*******************/
+/* Local Variables */
+/*******************/
+
+/* Default error log mask */
+#ifdef HG_UTIL_HAS_VERBOSE_ERROR
+unsigned int HG_UTIL_LOG_MASK = HG_LOG_TYPE_ERROR | HG_LOG_TYPE_WARNING;
+#endif \ No newline at end of file
diff --git a/src/mercury/mercury_util_error.h b/src/mercury/mercury_util_error.h
new file mode 100644
index 0000000..b03d7bf
--- /dev/null
+++ b/src/mercury/mercury_util_error.h
@@ -0,0 +1,104 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+#ifndef MERCURY_UTIL_ERROR_H
+#define MERCURY_UTIL_ERROR_H
+
+#include "mercury_util_config.h"
+
+/* Default error macro */
+#ifdef HG_UTIL_HAS_VERBOSE_ERROR
+# include <mercury/mercury_log.h>
+# define HG_UTIL_LOG_MASK hg_util_log_mask
+/* Log mask will be initialized in init routine */
+extern HG_UTIL_PRIVATE unsigned int HG_UTIL_LOG_MASK;
+# define HG_UTIL_LOG_MODULE_NAME "HG Util"
+# define HG_UTIL_LOG_ERROR(...) \
+ do { \
+ if (HG_UTIL_LOG_MASK & HG_LOG_TYPE_ERROR) \
+ HG_LOG_WRITE_ERROR(HG_UTIL_LOG_MODULE_NAME, __VA_ARGS__); \
+ } while (0)
+# ifdef HG_UTIL_HAS_DEBUG
+# define HG_UTIL_LOG_DEBUG(...) \
+ do { \
+ if (HG_UTIL_LOG_MASK & HG_LOG_TYPE_DEBUG) \
+ HG_LOG_WRITE_DEBUG(HG_UTIL_LOG_MODULE_NAME, __VA_ARGS__); \
+ } while (0)
+# else
+# define HG_UTIL_LOG_DEBUG(...) (void) 0
+# endif
+# define HG_UTIL_LOG_WARNING(...) \
+ do { \
+ if (HG_UTIL_LOG_MASK & HG_LOG_TYPE_WARNING) \
+ HG_LOG_WRITE_WARNING(HG_UTIL_LOG_MODULE_NAME, __VA_ARGS__); \
+ } while (0)
+#else
+# define HG_UTIL_LOG_ERROR(...) (void) 0
+# define HG_UTIL_LOG_DEBUG(...) (void) 0
+# define HG_UTIL_LOG_WARNING(...) (void) 0
+#endif
+
+/* Branch predictor hints */
+#ifndef _WIN32
+# define likely(x) __builtin_expect(!!(x), 1)
+# define unlikely(x) __builtin_expect(!!(x), 0)
+#else
+# define likely(x) (x)
+# define unlikely(x) (x)
+#endif
+
+/* Error macros */
+#define HG_UTIL_GOTO_DONE(label, ret, ret_val) \
+ do { \
+ ret = ret_val; \
+ goto label; \
+ } while (0)
+
+#define HG_UTIL_GOTO_ERROR(label, ret, err_val, ...) \
+ do { \
+ HG_UTIL_LOG_ERROR(__VA_ARGS__); \
+ ret = err_val; \
+ goto label; \
+ } while (0)
+
+/* Check for cond, set ret to err_val and goto label */
+#define HG_UTIL_CHECK_ERROR(cond, label, ret, err_val, ...) \
+ do { \
+ if (unlikely(cond)) { \
+ HG_UTIL_LOG_ERROR(__VA_ARGS__); \
+ ret = err_val; \
+ goto label; \
+ } \
+ } while (0)
+
+#define HG_UTIL_CHECK_ERROR_NORET(cond, label, ...) \
+ do { \
+ if (unlikely(cond)) { \
+ HG_UTIL_LOG_ERROR(__VA_ARGS__); \
+ goto label; \
+ } \
+ } while (0)
+
+#define HG_UTIL_CHECK_ERROR_DONE(cond, ...) \
+ do { \
+ if (unlikely(cond)) { \
+ HG_UTIL_LOG_ERROR(__VA_ARGS__); \
+ } \
+ } while (0)
+
+/* Check for cond and print warning */
+#define HG_UTIL_CHECK_WARNING(cond, ...) \
+ do { \
+ if (unlikely(cond)) { \
+ HG_UTIL_LOG_WARNING(__VA_ARGS__); \
+ } \
+ } while (0)
+
+#endif /* MERCURY_UTIL_ERROR_H */
diff --git a/testpar/CMakeLists.txt b/testpar/CMakeLists.txt
index a9f45d5..3d0fe7e 100644
--- a/testpar/CMakeLists.txt
+++ b/testpar/CMakeLists.txt
@@ -75,6 +75,7 @@ set (H5P_TESTS
t_shapesame
t_filters_parallel
t_2Gio
+ t_subfile_openclose
)
foreach (h5_testp ${H5P_TESTS})
diff --git a/testpar/t_subfile_openclose.c b/testpar/t_subfile_openclose.c
new file mode 100644
index 0000000..9f31b6e
--- /dev/null
+++ b/testpar/t_subfile_openclose.c
@@ -0,0 +1,42 @@
+#include <stdio.h>
+#include "hdf5.h"
+#include "H5FDsubfile_public.h"
+
+#include "mpi.h"
+
+int
+main(int argc, char **argv)
+{
+
+ int i, mpi_size, mpi_rank;
+ int mpi_provides, require = MPI_THREAD_MULTIPLE;
+ hid_t subfile_id;
+
+ MPI_Init_thread(&argc, &argv, require, &mpi_provides);
+ MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
+ MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
+
+ H5open();
+
+ if (H5FDsubfiling_init() == SUCCEED) {
+ subfile_id = get_subfiling_context();
+ printf("[%d] subfile_id = %lx\n", mpi_rank, subfile_id);
+ }
+ else if (mpi_rank == 0) {
+ puts("Error: Unable to initialize subfiling!");
+ }
+
+ for(i=0; i < 10; i++) {
+ sf_open_subfiles(subfile_id, NULL, O_CREAT|O_TRUNC|O_RDWR);
+ sf_close_subfiles(subfile_id);
+ }
+
+ MPI_Barrier(MPI_COMM_WORLD);
+
+ H5FDsubfiling_finalize();
+
+ H5close();
+
+ MPI_Finalize();
+ return 0;
+}
diff --git a/tools/lib/h5diff.c b/tools/lib/h5diff.c
index 8b6ace9..e297c8f 100644
--- a/tools/lib/h5diff.c
+++ b/tools/lib/h5diff.c
@@ -899,7 +899,7 @@ h5diff(const char *fname1, const char *fname2, const char *objname1, const char
H5TOOLS_DEBUG("groups traversed - errstat:%d", opts->err_stat);
#ifdef H5_HAVE_PARALLEL
- if(g_Parallel) {
+ if(g_Parallel && !g_CollectInfoOnly) {
int i;
if((HDstrlen(fname1) > MAX_FILENAME) || (HDstrlen(fname2) > MAX_FILENAME)) {
@@ -914,6 +914,11 @@ h5diff(const char *fname1, const char *fname2, const char *objname1, const char
for(i = 1; i < g_nTasks; i++)
MPI_Send(filenames, (MAX_FILENAME * 2), MPI_CHAR, i, MPI_TAG_PARALLEL, MPI_COMM_WORLD);
} /* end if */
+ else if (g_CollectInfoOnly) {
+ build_match_list (obj1fullname, info1_lp, obj2fullname, info2_lp, &match_list, opts);
+
+ }
+
#endif
H5TOOLS_DEBUG("build_match_list next - errstat:%d", opts->err_stat);
diff --git a/tools/lib/h5tools_utils.c b/tools/lib/h5tools_utils.c
index 1a1c2db..2d53030 100644
--- a/tools/lib/h5tools_utils.c
+++ b/tools/lib/h5tools_utils.c
@@ -48,6 +48,7 @@ hsize_t H5TOOLS_BUFSIZE = ( 32 * 1024 * 1024); /* 32 MB */
/* ``parallel_print'' variables */
unsigned char g_Parallel = 0; /*0 for serial, 1 for parallel */
+unsigned char g_CollectInfoOnly = 0;
char outBuff[OUTBUFF_SIZE];
unsigned outBuffOffset;
FILE* overflow_file = NULL;
diff --git a/tools/lib/h5tools_utils.h b/tools/lib/h5tools_utils.h
index 42144cc..a05f883 100644
--- a/tools/lib/h5tools_utils.h
+++ b/tools/lib/h5tools_utils.h
@@ -32,6 +32,7 @@ extern "C" {
H5TOOLS_DLLVAR int g_nTasks;
H5TOOLS_DLLVAR unsigned char g_Parallel;
+H5TOOLS_DLLVAR unsigned char g_CollectInfoOnly;
H5TOOLS_DLLVAR char outBuff[];
H5TOOLS_DLLVAR unsigned outBuffOffset;
H5TOOLS_DLLVAR FILE *overflow_file;
diff --git a/tools/lib/h5trav.c b/tools/lib/h5trav.c
index dc7e27d..a9b5b75 100644
--- a/tools/lib/h5trav.c
+++ b/tools/lib/h5trav.c
@@ -15,6 +15,9 @@
#include "h5trav.h"
#include "h5tools.h"
#include "H5private.h"
+#ifdef H5_HAVE_PARALLEL
+#include "h5tools_utils.h"
+#endif
/*-------------------------------------------------------------------------
* local typedefs
@@ -179,8 +182,10 @@ static herr_t
traverse_cb(hid_t loc_id, const char *path, const H5L_info2_t *linfo,
void *_udata)
{
+ herr_t ret_value = SUCCEED;
trav_ud_traverse_t *udata = (trav_ud_traverse_t *)_udata; /* User data */
char *new_name = NULL;
+
const char *full_name;
const char *already_visited = NULL; /* Whether the link/object was already visited */
@@ -201,6 +206,18 @@ traverse_cb(hid_t loc_id, const char *path, const H5L_info2_t *linfo,
else
full_name = path;
+#ifdef H5_HAVE_PARALLEL
+ if(linfo->type == H5L_TYPE_EXTERNAL) {
+ h5tool_link_info_t lnk_info;
+ if ((ret_value = H5tools_get_symlink_info(loc_id, path, &lnk_info, FALSE)) < 0) {
+ puts("H5tools_get_symlink_info failed!");
+ }
+ else if (ret_value == 0) {
+ puts("Dangling link?");
+ }
+ printf("Visiting external link: %s\n", path);
+ }
+#endif
/* Perform the correct action for different types of links */
if(linfo->type == H5L_TYPE_HARD) {
H5O_info2_t oinfo;