From 5e02da94f11742aa246eb284c964709d97404d3d Mon Sep 17 00:00:00 2001 From: Richard Warren Date: Mon, 22 Jun 2020 14:46:24 -0400 Subject: Initial subfiling branch --- src/CMakeLists.txt | 7 + src/H5FDsubfile.c | 250 ++++++ src/H5FDsubfile.h | 0 src/H5FDsubfile_mpi.c | 1542 ++++++++++++++++++++++++++++++++ src/H5FDsubfile_private.h | 183 ++++ src/H5FDsubfile_public.h | 11 + src/H5FDsubfile_threads.c | 132 +++ src/mercury/mercury_atomic.h | 637 +++++++++++++ src/mercury/mercury_atomic_queue.c | 83 ++ src/mercury/mercury_atomic_queue.h | 271 ++++++ src/mercury/mercury_event.c | 72 ++ src/mercury/mercury_event.h | 184 ++++ src/mercury/mercury_hash_string.h | 48 + src/mercury/mercury_hash_table.c | 526 +++++++++++ src/mercury/mercury_hash_table.h | 252 ++++++ src/mercury/mercury_list.h | 126 +++ src/mercury/mercury_log.c | 128 +++ src/mercury/mercury_log.h | 104 +++ src/mercury/mercury_mem.c | 177 ++++ src/mercury/mercury_mem.h | 92 ++ src/mercury/mercury_poll.c | 531 +++++++++++ src/mercury/mercury_poll.h | 164 ++++ src/mercury/mercury_queue.h | 123 +++ src/mercury/mercury_request.c | 224 +++++ src/mercury/mercury_request.h | 242 +++++ src/mercury/mercury_thread.c | 162 ++++ src/mercury/mercury_thread.h | 242 +++++ src/mercury/mercury_thread_condition.c | 46 + src/mercury/mercury_thread_condition.h | 182 ++++ src/mercury/mercury_thread_mutex.c | 50 ++ src/mercury/mercury_thread_mutex.h | 127 +++ src/mercury/mercury_thread_pool.c | 191 ++++ src/mercury/mercury_thread_pool.h | 124 +++ src/mercury/mercury_thread_rwlock.c | 77 ++ src/mercury/mercury_thread_rwlock.h | 236 +++++ src/mercury/mercury_thread_spin.c | 47 + src/mercury/mercury_thread_spin.h | 146 +++ src/mercury/mercury_time.h | 402 +++++++++ src/mercury/mercury_util_config.h | 141 +++ src/mercury/mercury_util_error.c | 20 + src/mercury/mercury_util_error.h | 104 +++ testpar/CMakeLists.txt | 1 + testpar/t_subfile_openclose.c | 42 + tools/lib/h5diff.c | 7 +- tools/lib/h5tools_utils.c | 1 + tools/lib/h5tools_utils.h | 1 + tools/lib/h5trav.c | 17 + 47 files changed, 8474 insertions(+), 1 deletion(-) create mode 100644 src/H5FDsubfile.c create mode 100644 src/H5FDsubfile.h create mode 100644 src/H5FDsubfile_mpi.c create mode 100644 src/H5FDsubfile_private.h create mode 100644 src/H5FDsubfile_public.h create mode 100644 src/H5FDsubfile_threads.c create mode 100644 src/mercury/mercury_atomic.h create mode 100644 src/mercury/mercury_atomic_queue.c create mode 100644 src/mercury/mercury_atomic_queue.h create mode 100644 src/mercury/mercury_event.c create mode 100644 src/mercury/mercury_event.h create mode 100644 src/mercury/mercury_hash_string.h create mode 100644 src/mercury/mercury_hash_table.c create mode 100644 src/mercury/mercury_hash_table.h create mode 100644 src/mercury/mercury_list.h create mode 100644 src/mercury/mercury_log.c create mode 100644 src/mercury/mercury_log.h create mode 100644 src/mercury/mercury_mem.c create mode 100644 src/mercury/mercury_mem.h create mode 100644 src/mercury/mercury_poll.c create mode 100644 src/mercury/mercury_poll.h create mode 100644 src/mercury/mercury_queue.h create mode 100644 src/mercury/mercury_request.c create mode 100644 src/mercury/mercury_request.h create mode 100644 src/mercury/mercury_thread.c create mode 100644 src/mercury/mercury_thread.h create mode 100644 src/mercury/mercury_thread_condition.c create mode 100644 src/mercury/mercury_thread_condition.h create mode 100644 src/mercury/mercury_thread_mutex.c create mode 100644 src/mercury/mercury_thread_mutex.h create mode 100644 src/mercury/mercury_thread_pool.c create mode 100644 src/mercury/mercury_thread_pool.h create mode 100644 src/mercury/mercury_thread_rwlock.c create mode 100644 src/mercury/mercury_thread_rwlock.h create mode 100644 src/mercury/mercury_thread_spin.c create mode 100644 src/mercury/mercury_thread_spin.h create mode 100644 src/mercury/mercury_time.h create mode 100644 src/mercury/mercury_util_config.h create mode 100644 src/mercury/mercury_util_error.c create mode 100644 src/mercury/mercury_util_error.h create mode 100644 testpar/t_subfile_openclose.c diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index c0915c8..c5f6316 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -243,6 +243,9 @@ set (H5FD_SOURCES ${HDF5_SRC_DIR}/H5FDstdio.c ${HDF5_SRC_DIR}/H5FDtest.c ${HDF5_SRC_DIR}/H5FDwindows.c + ${HDF5_SRC_DIR}/H5FDsubfile.c + ${HDF5_SRC_DIR}/H5FDsubfile_threads.c + ${HDF5_SRC_DIR}/H5FDsubfile_mpi.c ) set (H5FD_HDRS @@ -262,6 +265,10 @@ set (H5FD_HDRS ${HDF5_SRC_DIR}/H5FDsplitter.h ${HDF5_SRC_DIR}/H5FDstdio.h ${HDF5_SRC_DIR}/H5FDwindows.h + ${HDF5_SRC_DIR}/H5FDsubfile_public.h + ${HDF5_SRC_DIR}/mercury/mercury_thread.h + ${HDF5_SRC_DIR}/mercury/mercury_thread_mutex.h + ${HDF5_SRC_DIR}/mercury/mercury_log.h ) IDE_GENERATED_PROPERTIES ("H5FD" "${H5FD_HDRS}" "${H5FD_SOURCES}" ) diff --git a/src/H5FDsubfile.c b/src/H5FDsubfile.c new file mode 100644 index 0000000..2b94f0d --- /dev/null +++ b/src/H5FDsubfile.c @@ -0,0 +1,250 @@ + +#include "H5FDsubfile_public.h" + +#ifdef H5_HAVE_PARALLEL + +/***********/ +/* Headers */ +/***********/ +#include "H5private.h" /* Generic Functions */ +#include "H5CXprivate.h" /* API Contexts */ +#include "H5Dprivate.h" /* Datasets */ +#include "H5Eprivate.h" /* Error handling */ +#include "H5Ipublic.h" /* IDs */ +#include "H5Iprivate.h" /* IDs */ +#include "H5MMprivate.h" /* Memory management */ +#include "H5Pprivate.h" /* Property lists */ + +/* +========================================= +Private functions +======================================== +*/ + +static size_t sf_topology_limit = 4; +static size_t sf_topology_entries = 0; +static sf_topology_t **sf_topology_cache = NULL; + +static size_t sf_context_limit = 4; +static size_t sf_context_entries = 0; +static subfiling_context_t **sf_context_cache = NULL; +static hid_t context_id = H5I_INVALID_HID; +static hid_t topology_id = H5I_INVALID_HID; + + +static int64_t record_subfiling_object(SF_OBJ_TYPE type, void *obj) +{ + size_t index; + int64_t obj_reference; + uint64_t tag; + switch(type) { + case SF_TOPOLOGY: { + if (sf_topology_cache == NULL) { + sf_topology_cache = (sf_topology_t **) + calloc(sf_topology_limit, sizeof(sf_topology_t *)); + } + assert(sf_topology_cache != NULL); + index = sf_topology_entries++; + tag = SF_TOPOLOGY; + obj_reference = (int64_t)((tag << 32) | index); + sf_topology_cache[index] = obj; + return obj_reference; + break; + } + case SF_CONTEXT: { + if (sf_context_cache == NULL) { + sf_context_cache = (subfiling_context_t **) + calloc(sf_context_limit, sizeof(subfiling_context_t *)); + } + assert(sf_context_cache != NULL); + index = sf_context_entries++; + tag = SF_CONTEXT; + obj_reference = (int64_t)((tag << 32) | index); + sf_context_cache[index] = (subfiling_context_t *)obj; + return obj_reference; + break; + } + default: + puts("UNKNOWN Subfiling object type"); + } + + return -1; +} + +/* +========================================= +Public vars (for subfiling) and functions +======================================== +*/ + +int sf_verbose_flag = 0; + +/* +========================================= +File functions +========================================= + +The pread and pwrite posix functions are described as +being thread safe. We include mutex locks and unlocks +to work around any potential threading conflicts... +Those however, are compiled according #ifdef +*/ + +int sf_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank) +{ + int ret = 0; + ssize_t bytes_read; + ssize_t bytes_remaining = (ssize_t)data_size; + char *this_buffer = data_buffer; + + while(bytes_remaining) { + if ((bytes_read = (ssize_t)pread(fd, this_buffer, (size_t)bytes_remaining, file_offset)) < 0) { + perror("pread failed!"); + fflush(stdout); + } + else if (bytes_read > 0) { + if (sf_verbose_flag) { + printf("[ioc(%d) %s] read %ld bytes of %ld requested\n", + subfile_rank, __func__, + bytes_read, bytes_remaining); + } + bytes_remaining -= bytes_read; + this_buffer += bytes_read; + file_offset += bytes_read; + } + else { + printf("[ioc(%d) %s] ERROR! read of 0 bytes == eof!\n", subfile_rank, __func__ ); + fflush(stdout); + break; + } + } + return ret; +} + +int sf_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank) +{ + int ret = 0; + char *this_data = (char *)data_buffer; + ssize_t bytes_remaining = (ssize_t) data_size; + ssize_t written = 0; + + while(bytes_remaining) { + if ((written = pwrite(fd, this_data, (size_t)bytes_remaining, file_offset)) < 0) { + perror("pwrite failed!"); + fflush(stdout); + } + else { + if (sf_verbose_flag) { + printf("[ioc(%d) %s] wrote %ld bytes of %ld requested\n", + subfile_rank, __func__, + written, bytes_remaining); + } + bytes_remaining -= written; + this_data += written; + file_offset += written; + } + } +#ifdef SUBFILE_REQUIRE_FLUSH + fdatasync(fd); +#endif + + return ret; +} + + + + +void * get_subfiling_object(int64_t object_id) +{ + int obj_type = (int)((object_id >> 32) & 0x0FFFF); + /* We don't require a large indexing space + * 16 bits should be enough.. + */ + size_t index = (object_id & 0x0FFFF); + if (obj_type == SF_TOPOLOGY) { + if (index < sf_context_entries) { + return (void *)sf_topology_cache[index]; + } + else { + puts("Illegal object index"); + } + } + else if (obj_type == SF_CONTEXT) { + if (index < sf_context_entries) { + return (void *)sf_context_cache[index]; + } + else { + puts("Illegal object index"); + } + } + else { + puts("UNKNOWN Subfiling object type"); + } + return NULL; +} + +herr_t +H5FDsubfiling_init(void) +{ + herr_t ret_value = SUCCEED; + int ioc_count; + int world_rank, world_size; + sf_topology_t *thisApp = NULL; + subfiling_context_t *newContext = NULL; + + + + if (MPI_Comm_size(MPI_COMM_WORLD, &world_size) != MPI_SUCCESS) { + puts("MPI_Comm_size returned an error"); + ret_value = FAIL; + goto done; + } + if (MPI_Comm_rank(MPI_COMM_WORLD, &world_rank) != MPI_SUCCESS) { + puts("MPI_Comm_rank returned an error"); + ret_value = FAIL; + goto done; + } + if ((ioc_count = H5FD__determine_ioc_count (world_size, world_rank, &thisApp)) > 0) { + topology_id = (hid_t)record_subfiling_object(SF_TOPOLOGY, thisApp); + } + if (topology_id < 0) { + puts("Unable to register subfiling topology!"); + ret_value = FAIL; + goto done; + } + if (H5FD__init_subfile_context(&newContext, ioc_count, world_size, world_rank, thisApp->rank_is_ioc) != SUCCEED) { + puts("Unable to initialize a subfiling context!"); + ret_value = FAIL; + goto done; + } + context_id = (hid_t)record_subfiling_object(SF_CONTEXT, newContext); + if (context_id < 0) { + ret_value = FAIL; + puts("Unable to register subfiling context!"); + } + +done: + return ret_value; +} + +herr_t +H5FDsubfiling_finalize(void) +{ + herr_t ret_value = SUCCEED; /* Return value */ + + /* Shutdown the IO Concentrator threads */ + sf_shutdown_flag = 1; + usleep(100); + MPI_Barrier(MPI_COMM_WORLD); + delete_subfiling_context(context_id); + + return ret_value; +} + +hid_t +get_subfiling_context() +{ + return context_id; +} + +#endif /* H5_HAVE_PARALLEL */ diff --git a/src/H5FDsubfile.h b/src/H5FDsubfile.h new file mode 100644 index 0000000..e69de29 diff --git a/src/H5FDsubfile_mpi.c b/src/H5FDsubfile_mpi.c new file mode 100644 index 0000000..c66c6b7 --- /dev/null +++ b/src/H5FDsubfile_mpi.c @@ -0,0 +1,1542 @@ + +#include "H5FDsubfile_private.h" + +static int *io_concentrator = NULL; +static int n_io_concentrators = -1; +static int sf_world_rank = -1; +static int sf_world_size = -1; +static int subfile_fid = -1; +static int64_t sf_stripe_size = -1; +static int64_t sf_blocksize_per_stripe = 0; + +static MPI_Datatype H5FD__create_f_l_mpi_type(subfiling_context_t *context, + int64_t target_write_bytes, + int64_t first_write, + int64_t last_write, + int ioc_depth); +static MPI_Datatype H5FD__create_first_mpi_type(subfiling_context_t *context, + int64_t offset, + int64_t target_write_bytes, + int64_t first_write, + int ioc_depth); +static MPI_Datatype H5FD__create_final_mpi_type(subfiling_context_t *context, + int64_t target_write_bytes, + int64_t last_write, + int ioc_depth); +static MPI_Datatype H5FD__create_mpi_uniform_type(subfiling_context_t *context, + int64_t offset, + int64_t target_write_bytes, + int ioc_depth); + +static int * request_count_per_rank = NULL; + +atomic_int sf_workinprogress = 0; +atomic_int sf_work_pending = 0; +atomic_int sf_file_close_count = 0; +atomic_int sf_file_refcount = 0; + +#ifdef DEBUG_TRACING +FILE *sf_logfile = NULL; +#endif + +MPI_Comm sf_msg_comm = MPI_COMM_NULL; /* Messages IN */ +MPI_Comm sf_data_comm = MPI_COMM_NULL; /* Messages OUT */ + +int sf_shutdown_flag = 0; + +const char *sf_subfile_prefix = "."; + + +#define MAX_WORK_PER_RANK 2 + +/* +========================================= +Private functions +========================================= +*/ + +static int _determine_subfile_rank(int myrank) +{ + if (io_concentrator) { + int i; + for(i=0; i< n_io_concentrators; i++) { + if (io_concentrator[i] == myrank) + return i; + } + } + return -1; +} + +static int is_io_concentrator(int rank) +{ + int index = _determine_subfile_rank(rank); + if (index < 0) return 0; + return 1; /* true */ +} + + + +static void init_io_vars(int64_t stripe_size, int64_t blocksize_per_stripe, + int64_t file_offset, int64_t data_extent, + int64_t *first_io, int64_t *first_io_offset, int64_t *last_io, + int *starting_ioc, int *final_ioc, int *starting_row, int *final_row) +{ + int64_t total_stripe_width = stripe_size * n_io_concentrators; + int64_t starting_offset = file_offset % stripe_size; + int64_t final_offset = (file_offset + data_extent -1); + int64_t last_io_check = (starting_offset + data_extent) % stripe_size; + *starting_row = (int)(file_offset / total_stripe_width); + *final_row = (int)(final_offset / total_stripe_width); + + /* Maybe update how many bytes in the entire IOC collection */ + if (blocksize_per_stripe == 0) + sf_blocksize_per_stripe = total_stripe_width; + + *starting_ioc = (int)((file_offset / stripe_size) % n_io_concentrators); + *final_ioc = (int)((final_offset / stripe_size) % n_io_concentrators); + *first_io_offset = starting_offset; + *first_io = ((stripe_size - starting_offset) >= data_extent ? data_extent : (stripe_size - starting_offset)); + /* Check for just a single IO op */ + if (*first_io == data_extent) *last_io = 0; + else *last_io = (last_io_check > 0 ? last_io_check : stripe_size); +} + +static int init__indep_io(subfiling_context_t *sf_context, + int64_t **source_data_offset, int64_t **sf_datasize, + int64_t **sf_offset, MPI_Datatype **sf_dtype, + int64_t offset, int64_t elements, int dtype_extent) +{ + int64_t data_extent = elements * dtype_extent; + int64_t first_io=0, last_io=0, first_io_offset=0; + + int64_t *data_offset = *source_data_offset; + int64_t *ioc_datasize = *sf_datasize; + int64_t *ioc_offset = *sf_offset; + MPI_Datatype *ioc_type = *sf_dtype; + int k, ioc_start, ioc_last, ioc_depth, starting_row, final_row; + sf_stripe_size = sf_context->sf_stripe_size; + sf_blocksize_per_stripe = sf_context->sf_blocksize_per_stripe; + + init_io_vars(sf_stripe_size, sf_blocksize_per_stripe, offset, data_extent, + &first_io, &first_io_offset, &last_io, + &ioc_start, &ioc_last, &starting_row, &final_row); + + if (sf_verbose_flag) { + printf("[%d] offset=%ld,data_extent=%ld,sf_stripe_size=%ld,n_io_concentrators=%d," + "first_io=%ld,first_io_offset=%ld,last_io=%ld,ioc_start=%d,ioc_last=%d\n", + sf_world_rank, offset,data_extent,sf_stripe_size,n_io_concentrators, + first_io,first_io_offset,last_io,ioc_start,ioc_last); + fflush(stdout); + } + + if (data_offset == NULL) { + data_offset = (int64_t *)calloc((size_t)n_io_concentrators, sizeof(int64_t)); + assert(data_offset != NULL); + *source_data_offset = data_offset; + } + + if (ioc_datasize == NULL) { + ioc_datasize = (int64_t *)calloc((size_t)n_io_concentrators, sizeof(int64_t)); + assert(ioc_datasize != NULL); + *sf_datasize = ioc_datasize; + } + + if (ioc_offset == NULL) { + ioc_offset = (int64_t *)calloc((size_t)n_io_concentrators, sizeof(int64_t)); + assert(ioc_offset != NULL); + *sf_offset = ioc_offset; + } + + if (ioc_type == NULL) { + ioc_type = (MPI_Datatype *)calloc((size_t)n_io_concentrators, sizeof(MPI_Datatype)); + assert(ioc_type != NULL); + *sf_dtype = ioc_type; + } + + for(k=0; k < n_io_concentrators; k++) { + ioc_datasize[k] = 0; + ioc_offset[k] = 0; + /* Free previously used datatypes */ + if (ioc_type[k] && + (ioc_type[k] != MPI_DATATYPE_NULL) && + (ioc_type[k] != MPI_BYTE)) + MPI_Type_free(&ioc_type[k]); + else ioc_type[k] = MPI_DATATYPE_NULL; + } + + if (data_extent) { + int next_index = ioc_start; + int64_t target_bytes; + int64_t total_bytes_remaining = data_extent; + int64_t row_base = starting_row * sf_stripe_size; + int64_t subfile_offset = row_base + first_io_offset; + int64_t source_offset = 0; + int64_t remaining_bytes_in_row = ((n_io_concentrators - ioc_start) * sf_stripe_size) - first_io_offset; + + ioc_depth = (final_row - starting_row) +1; + if ((ioc_start > ioc_last) && (data_extent > remaining_bytes_in_row)) ioc_depth--; + + while(total_bytes_remaining > 0) { + target_bytes = 0; + if (next_index == ioc_start) { + target_bytes = first_io; + } + if (next_index == ioc_last) { + target_bytes += last_io; + ioc_depth--; + } + if (ioc_depth) { + if (next_index == ioc_start) + target_bytes += (sf_stripe_size * (ioc_depth -1)); + else target_bytes += (sf_stripe_size * ioc_depth); + } + + data_offset[next_index] = source_offset; + ioc_datasize[next_index] += target_bytes; + ioc_offset[next_index] += subfile_offset; + total_bytes_remaining -= target_bytes; + /* + * With the exception of the very 1st IO, all additional + * IO operations start on a slice_boundary (and this is + * consistent across the collection of IOCs). + */ + + subfile_offset = row_base; + + /* + * Possibly Create an MPI datatype for each MPI_Send operation. + * If the length allows writing into a single stripe on + * a single IOC, then we can use the MPI_BYTE datatype. + */ + + + if (next_index == ioc_start) { /* First target */ + if (next_index == ioc_last) { + ioc_type[next_index] = + H5FD__create_f_l_mpi_type(sf_context, target_bytes, + first_io, last_io, ioc_depth+1); + } else { + ioc_type[next_index] = + H5FD__create_first_mpi_type(sf_context, ioc_offset[next_index], + target_bytes, first_io, ioc_depth); + } + source_offset += first_io; + } + else { + if (next_index == ioc_last) { + ioc_type[next_index] = + H5FD__create_final_mpi_type(sf_context, + target_bytes, last_io, ioc_depth+1); + } else { + ioc_type[next_index] = + H5FD__create_mpi_uniform_type(sf_context,ioc_offset[next_index], + target_bytes, ioc_depth); + } + source_offset += sf_stripe_size; + } + + if (++next_index == n_io_concentrators) { + next_index = 0; + row_base += sf_stripe_size; + subfile_offset = row_base; + } + } + } + return 0; +} + + +static int compare_hostid(const void *h1, const void *h2) +{ + const layout_t *host1 = (const layout_t *)h1; + const layout_t *host2 = (const layout_t *)h2; + return (host1->hostid > host2->hostid); +} + + +static void gather_topology_info(sf_topology_t *info) +{ + sf_world_size = info->world_size; + sf_world_rank = info->world_rank; + + if (info->topology) + return; + + if (sf_world_size > 1) { + long hostid = gethostid(); + layout_t my_hostinfo; + layout_t *topology = (layout_t *)calloc((size_t)sf_world_size+1, sizeof(layout_t)); + if (topology == NULL) { + perror("calloc failure!"); + MPI_Abort(MPI_COMM_WORLD, 1); + } + info->hostid = hostid; + info->topology = topology; + my_hostinfo.rank = sf_world_rank; + my_hostinfo.hostid = hostid; + info->topology[sf_world_rank] = my_hostinfo; + if (MPI_Allgather(&my_hostinfo, 2, MPI_LONG, + info->topology, 2, MPI_LONG, + MPI_COMM_WORLD) == MPI_SUCCESS) { + qsort(info->topology, (size_t)sf_world_size, sizeof(layout_t), compare_hostid); + } + } +} + +static int count_nodes(sf_topology_t *info) +{ + int k, node_count, hostid_index = -1; + long nextid; + + assert(info != NULL); + if (info->topology == NULL) + gather_topology_info (info); + + nextid = info->topology[0].hostid; + info->node_ranks = (int *)calloc((size_t)(info->world_size+1), sizeof(int)); + if (nextid == info->hostid) + hostid_index = 0; + + node_count = 1; + /* Recall that the topology array has been sorted! */ + for (k=1; k < info->world_size; k++) { + if (info->topology[k].hostid != nextid) { + nextid = info->topology[k].hostid; + if (hostid_index < 0) { + if (nextid == info->hostid) hostid_index = k; + } + /* Record the index of new hostid */ + info->node_ranks[node_count++] = k; + } + } + + /* Mark the end of the node_ranks */ + info->node_ranks[node_count] = info->world_size; + /* Save the index where we first located my hostid */ + info->node_index = hostid_index; + return info->node_count = node_count; +} + +int +H5FD__determine_ioc_count(int world_size, int world_rank, sf_topology_t **thisapp) +{ + static int ioc_count = 0; + if (!ioc_count) { + int k, node; + int node_index; + int iocs_per_node = 1; + char *envValue = NULL; + sf_topology_t *app_topology = (sf_topology_t *)calloc(1, sizeof(sf_topology_t)); + assert(app_topology != NULL); + app_topology->world_size = world_size; + app_topology->world_rank = world_rank; + + io_concentrator = (int *)calloc((size_t)world_size, sizeof(int)); + assert(io_concentrator != NULL); + ioc_count = count_nodes (app_topology); + /* FIXME: This should ONLY be used for testing! + * For production, we should probably limit the + * number to a single IOC per node... + * (based on performance numbers) + */ + if ((envValue = getenv("IOC_COUNT_PER_NODE")) != NULL) { + int value_check = atoi(envValue); + if (value_check > 0) { + iocs_per_node = value_check; + } + } + + /* 'node_ranks' contain the index of the first instance of a hostid + * in the sorted sf_topology array. Our own index is 'node_index'. + */ + node_index = app_topology->node_index; + app_topology->local_peers = app_topology->node_ranks[node_index+1] - + app_topology->node_ranks[node_index]; + if (app_topology->topology[node_index].rank == world_rank) { + app_topology->rank_is_ioc = true; + app_topology->subfile_rank = node_index; + } + /* FIXME: This should ONLY be used for testing! + * NOTE: The app_topology->local_peers is ONLY valid + * for the current NODE. There is no guarantee that + * the application layout defines a uniform number of + * MPI ranks per node... + * Because this is only for testing purposes (at this time) + * we can live with the assumption that if we define the + * IOC_COUNT_PER_NODE environment variable, then each + * node will have *at-least* that many MPI ranks assigned. + * See above! + */ + else if ((app_topology->local_peers > 1) && (iocs_per_node > 1)) { + if (iocs_per_node > app_topology->local_peers) + iocs_per_node = app_topology->local_peers; + for(k=1; k< iocs_per_node; k++) { + if (app_topology->topology[node_index + k].rank == world_rank) { + app_topology->rank_is_ioc = true; + app_topology->subfile_rank = node_index + k; + break; + } + } + } + /* More hacks for testing */ + if (io_concentrator) { + int n_iocs = 0; + for(node = 0; node < ioc_count; node++) { + for (k=0; k < iocs_per_node; k++) { + node_index = app_topology->node_ranks[node]; + io_concentrator[n_iocs++] = (int)( + app_topology->topology[node_index + k].rank); + } + } + ioc_count = n_io_concentrators = n_iocs; + } + + if (ioc_count > 0) { + *thisapp = app_topology; + } + } + return ioc_count; +} + +int +H5FD__init_subfile_context(subfiling_context_t **newContext, int n_iocs, int world_size, int world_rank, bool rank_is_ioc) +{ + int status; + subfiling_context_t *next = (subfiling_context_t *) + malloc(sizeof(subfiling_context_t)); + if (io_concentrator == NULL) { + goto err_exit; + } + if (next == NULL) { + goto err_exit; + } + else { + int k; + char *envValue = NULL; + int ioc_leader = io_concentrator[0]; + int app_leader = 0; + *newContext = next; + next->sf_stripe_size = DEFAULT_STRIPE_SIZE; + if ((envValue = getenv("IOC_STRIPE_SIZE")) != NULL) { + long value_check = atol(envValue); + if (value_check > 0) { + next->sf_stripe_size = (int64_t)value_check; + } + } + if ((envValue = getenv("SUBFILE_PREFIX")) != NULL) { + char temp[PATH_MAX]; + sprintf(temp,"%s", envValue); + next->subfile_prefix = strdup(temp); + sf_subfile_prefix = strdup(temp); + } + + next->sf_blocksize_per_stripe = next->sf_stripe_size * n_iocs; + status = MPI_Comm_dup(MPI_COMM_WORLD, &next->sf_msg_comm); + if (status != MPI_SUCCESS) goto err_exit; + status = MPI_Comm_set_errhandler(next->sf_msg_comm, MPI_ERRORS_RETURN); + if (status != MPI_SUCCESS) goto err_exit; + status = MPI_Comm_dup(MPI_COMM_WORLD, &next->sf_data_comm); + if (status != MPI_SUCCESS) goto err_exit; + status = MPI_Comm_set_errhandler(next->sf_data_comm, MPI_ERRORS_RETURN); + if (status != MPI_SUCCESS) goto err_exit; + + k = 0; + while(is_io_concentrator(k)) + k++; + app_leader = k; + + if (sf_verbose_flag && (world_rank == 0)) { + printf("app_leader = %d and ioc_leader = %d\n", app_leader, ioc_leader); + } + + if (n_iocs > 1) { + status = MPI_Comm_split(MPI_COMM_WORLD, rank_is_ioc, world_rank, &next->sf_group_comm); + if (status != MPI_SUCCESS) goto err_exit; + status = MPI_Comm_size(next->sf_group_comm, &next->sf_group_size); + if (status != MPI_SUCCESS) goto err_exit; + status = MPI_Comm_rank(next->sf_group_comm, &next->sf_group_rank); + if (status != MPI_SUCCESS) goto err_exit; + /* + * There may be additional functionality we need for the IOCs... + * If so, then can probably initialize those things here! + */ + } + + if (rank_is_ioc) { + status = initialize_ioc_threads(next); + if (status) goto err_exit; + } + } + return 0; + +err_exit: + return -1; +} + + +/* +--------------------------------------------------------------------------------- + The data that we're sending to receiving from an IO concentrator (IOC) contains + the initial collection of bytes. The length of this initial segment is 'first_write'. + Note that the terminology isn't significant. We are describing an IO operation in + terms of an MPI datatype which will either gather data from a source buffer + to send to an IOC or will be used to unpack data from an IOC into a user buffer. + Subsequent IO operations which are related to the current File IO will begin on + sf_stripe_size boundaries. +--------------------------------------------------------------------------------- +*/ + +static MPI_Datatype H5FD__create_first_mpi_type( + subfiling_context_t *context, int64_t offset, + int64_t target_write_bytes, int64_t first_write, int ioc_depth) +{ + MPI_Datatype newType = MPI_DATATYPE_NULL; + int64_t stripe_size = context->sf_stripe_size; + int64_t offset_in_stripe = offset % sf_stripe_size; + int64_t depth_in_bytes = sf_stripe_size * ioc_depth; + int64_t next_offset = context->sf_blocksize_per_stripe - offset_in_stripe; + int64_t total_bytes = first_write; + + assert(ioc_depth > 0); + if (stripe_size >= depth_in_bytes) + return MPI_BYTE; + + if (depth_in_bytes) { + int k; + int temp_blocks[64]; + int temp_disps[64]; + int *blocks = temp_blocks; + int *disps = temp_disps; + if (ioc_depth > 64) { + blocks = (int *)calloc((size_t)ioc_depth, sizeof(int)); + disps = (int *)calloc((size_t)ioc_depth, sizeof(int)); + } + blocks[0] = (int)first_write; + disps[0] = (int) 0; + for(k=1; k < ioc_depth; k++) { + disps[k] = (int)next_offset; + blocks[k] = (int)stripe_size; + total_bytes += stripe_size; + next_offset += context->sf_blocksize_per_stripe; + } + if (total_bytes != target_write_bytes) { + printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", + __func__, total_bytes, target_write_bytes); + } + + if (MPI_Type_indexed(ioc_depth, blocks, disps, MPI_BYTE, &newType) != MPI_SUCCESS) { + perror("MPI_Type_indexed failed!"); + return MPI_DATATYPE_NULL; + } + MPI_Type_commit(&newType); + if (ioc_depth > 64) { + if (blocks != temp_blocks) { + free(blocks); + blocks = NULL; + } + if (disps != temp_disps) { + free(disps); + disps = NULL; + } + } + } + return newType; +} + +/* +--------------------------------------------------------------------------------- + The data that we're sending to an IO concentrator (IOC) contains the final + collection of bytes. Other than that detail, this is pretty much like the + typical' case... All chunks sizes are the identical (execpt for the very + last chunk) and all will start at relative stripe offset of 0. More precisely, + the start offset is a multiple of the subfiling "stripe_size". + We can utilize MPI_Type_indexed to represent the new type. +--------------------------------------------------------------------------------- +*/ +static MPI_Datatype H5FD__create_final_mpi_type(subfiling_context_t *context, int64_t target_write_bytes, int64_t last_write, int ioc_depth) +{ + MPI_Datatype newType = MPI_DATATYPE_NULL; + int64_t stripe_size = context->sf_stripe_size; + int64_t depth_in_bytes = (stripe_size * ioc_depth) + last_write; + int64_t total_bytes = last_write; + + assert(ioc_depth > 0); + + if (depth_in_bytes <= stripe_size) + return MPI_BYTE; + + if (depth_in_bytes) { + int k; + int temp_blocks[64]; + int temp_disps[64]; + int *blocks = temp_blocks; + int *disps = temp_disps; + if (ioc_depth > 64) { + blocks = (int *)calloc((size_t)ioc_depth, sizeof(int)); + disps = (int *)calloc((size_t)ioc_depth, sizeof(int)); + } + + for(k=0; k < ioc_depth; k++) { + disps[k] = (int)(k * context->sf_blocksize_per_stripe); + blocks[k] = (int)stripe_size; + total_bytes += stripe_size; + } + blocks[k-1] = (int)last_write; + if (total_bytes != target_write_bytes) { + printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", + __func__, total_bytes, target_write_bytes); + } + + if (MPI_Type_indexed(ioc_depth, blocks, disps, MPI_BYTE, &newType) != MPI_SUCCESS) { + return MPI_DATATYPE_NULL; + } + MPI_Type_commit(&newType); + if (ioc_depth > 64) { + if (blocks != temp_blocks) { + free(blocks); + blocks = NULL; + } + if (disps != temp_disps) { + free(disps); + disps = NULL; + } + } + } + return newType; +} + +/* +--------------------------------------------------------------------------------- + Special case where the current IOC has both the first and final write chunks. + This implmentation is a merge of the first_mpi_type and final_mpi_type + functions. +--------------------------------------------------------------------------------- +*/ +static MPI_Datatype H5FD__create_f_l_mpi_type(subfiling_context_t *context, + int64_t target_write_bytes, + int64_t first_write, + int64_t last_write, int ioc_depth) +{ + MPI_Datatype newType = MPI_DATATYPE_NULL; + int64_t stripe_size = context->sf_stripe_size; + int64_t depth_in_bytes = stripe_size * ioc_depth; + int64_t offset_in_stripe = stripe_size - first_write; + int64_t next_offset = context->sf_blocksize_per_stripe - offset_in_stripe; + int64_t total_bytes = first_write + last_write; + + assert(ioc_depth > 0); + if (last_write == 0) { + newType = MPI_BYTE; + } + else if (depth_in_bytes) { + int k; + int temp_blocks[64]; + int temp_disps[64]; + int *blocks = temp_blocks; + int *disps = temp_disps; + if (ioc_depth > 64) { + blocks = (int *)calloc((size_t)ioc_depth, sizeof(int)); + disps = (int *)calloc((size_t)ioc_depth, sizeof(int)); + } + blocks[0] = (int)first_write; + disps[0] = 0; + for(k=1; k < ioc_depth; k++) { + total_bytes += stripe_size; + blocks[k] = (int)stripe_size; + disps[k] = (int)next_offset; + next_offset += context->sf_blocksize_per_stripe; + } + blocks[k-1] = (int)last_write; + + if (total_bytes != target_write_bytes) { + printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", + __func__, total_bytes, target_write_bytes); + } + + if (MPI_Type_indexed(ioc_depth, blocks, disps, MPI_BYTE, &newType) != MPI_SUCCESS) { + perror("MPI_Type_indexed failed!"); + return MPI_DATATYPE_NULL; + } + + MPI_Type_commit(&newType); + if (ioc_depth > 64) { + if (blocks != temp_blocks) { + free(blocks); + blocks = NULL; + } + if (disps != temp_disps) { + free(disps); + disps = NULL; + } + } + } + return newType; +} + +/* +--------------------------------------------------------------------------------- + This is the 'typical' case in which the IOC has neither the first chunck nor + the last. All chunks sizes are the identical and start at offset = 0. + We utilize MPI_Type_indexed to represent the new type. +--------------------------------------------------------------------------------- +*/ +MPI_Datatype H5FD__create_mpi_uniform_type(subfiling_context_t *context, + int64_t offset, + int64_t target_write_bytes, int ioc_depth) +{ + /* Maintain some state between function calls allow reuse of the new datatypes... */ + static MPI_Datatype uniformType = MPI_DATATYPE_NULL; + static int64_t depth_in_bytes = 0; + + MPI_Datatype newType = MPI_DATATYPE_NULL; + int64_t stripe_size = context->sf_stripe_size; + int64_t offset_in_stripe = offset % stripe_size; + int64_t check_depth = stripe_size * ioc_depth; + int64_t total_bytes = 0; + + assert(offset_in_stripe == 0); + assert(ioc_depth > 0); + + if (check_depth == stripe_size) + return MPI_BYTE; + + if (depth_in_bytes) { + if (depth_in_bytes != check_depth) { + MPI_Type_free(&uniformType); + depth_in_bytes = 0; + } + } + if (!depth_in_bytes) { + int k; + int temp_blocks[64]; + int temp_disps[64]; + int *blocks = temp_blocks; + int *disps = temp_disps; + if (ioc_depth > 64) { + blocks = (int *)calloc((size_t)ioc_depth, sizeof(int)); + disps = (int *)calloc((size_t)ioc_depth, sizeof(int)); + } + for(k=0; k < ioc_depth; k++) { + disps[k] = (int)(k * context->sf_blocksize_per_stripe); + blocks[k] = (int)(stripe_size); + total_bytes += stripe_size; + } + + if (total_bytes != target_write_bytes) { + printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", + __func__, total_bytes, target_write_bytes); + } + + if (MPI_Type_indexed(ioc_depth, blocks, disps, MPI_BYTE, &uniformType) != MPI_SUCCESS) { + perror("MPI_Type_indexed failed!"); + return MPI_DATATYPE_NULL; + } + MPI_Type_commit(&uniformType); + if (ioc_depth > 64) { + if (blocks != temp_blocks) { + free(blocks); + blocks = NULL; + } + if (disps != temp_disps) { + free(disps); + disps = NULL; + } + } + depth_in_bytes = check_depth; + } + MPI_Type_dup(uniformType, &newType); + return newType; +} + + +int sf_read_independent(hid_t context_id, int64_t offset, int64_t elements, int dtype_extent, void *data) +{ + static int *acks = NULL; + static int *indices = NULL; + static MPI_Request *ackreqs = NULL; + static MPI_Request *reqs = NULL; + static MPI_Status *stats = NULL; + static int64_t *source_data_offset = NULL; + static int64_t *ioc_read_datasize = NULL; + static int64_t *ioc_read_offset = NULL; + static MPI_Datatype *ioc_read_type = NULL; + + subfiling_context_t *sf_context = get_subfiling_object(context_id); + int i, ioc, n_waiting = 0, status = 0; + + assert(sf_context != NULL); + + if (acks == NULL) { + if ((acks = (int *)calloc((size_t)n_io_concentrators*2, sizeof(int))) == NULL) { + perror("calloc"); + return -1; + } + else indices = &acks[n_io_concentrators]; + } + if (reqs == NULL) { + if ((reqs = (MPI_Request *)calloc((size_t)n_io_concentrators, sizeof(MPI_Request))) == NULL) { + perror("calloc"); + return -1; + } + } + if (ackreqs == NULL) { + if ((ackreqs = (MPI_Request *)calloc((size_t)n_io_concentrators, sizeof(MPI_Request))) == NULL) { + perror("calloc"); + return -1; + } + } + if (stats == NULL) { + if ((stats = (MPI_Status *)calloc((size_t)n_io_concentrators, sizeof(MPI_Status))) == NULL) { + perror("calloc"); + return -1; + } + } + + if (init__indep_io(sf_context, &source_data_offset, &ioc_read_datasize, &ioc_read_offset, + &ioc_read_type, offset, elements, dtype_extent) < 0) { + return -1; + } + + if (sf_verbose_flag) { + for(ioc=0; ioc < n_io_concentrators; ioc++) { + int64_t sourceOffset = source_data_offset[ioc]; + printf("[%d %s]: read_source[ioc(%d), sourceOffset=%ld, datasize=%ld, foffset=%ld]\n", + sf_world_rank, __func__, ioc, sourceOffset, ioc_read_datasize[ioc], ioc_read_offset[ioc] ); + } + } + + /* Prepare the IOCs with a message which indicates the length + * and file offset for the actual data to be provided. + */ + for(ioc=0; ioc < n_io_concentrators; ioc++) { + int64_t msg[2] = {ioc_read_datasize[ioc], ioc_read_offset[ioc]}; + char *sourceData = (char *)data; + int64_t sourceOffset = source_data_offset[ioc]; + + /* We may not require data from this IOC... + * or we may read the data directly from the file! + * Check the size to verify! + */ + reqs[ioc] = MPI_REQUEST_NULL; + if (ioc_read_datasize[ioc] == 0) { + continue; + } + + if (sf_verbose_flag ) { + printf("[%d %s] Requesting %ld read bytes from IOC(%d): sourceOffset=%ld\n", + sf_world_rank, __func__, msg[0], io_concentrator[ioc], sourceOffset ); + } + + status = MPI_Ssend(msg, 2, MPI_INT64_T, io_concentrator[ioc], READ_INDEP, sf_context->sf_msg_comm); + if (status != MPI_SUCCESS) { + printf("[%d] MPI_Send failure!", sf_world_rank); + return status; + } + else { + if (ioc_read_type[ioc] == MPI_BYTE) { + int bytes = (int) ioc_read_datasize[ioc]; + status = MPI_Irecv(&sourceData[sourceOffset], bytes, ioc_read_type[ioc], io_concentrator[ioc], + READ_INDEP_DATA, sf_context->sf_data_comm, &reqs[ioc]); + } else { + status = MPI_Irecv(&sourceData[sourceOffset], 1, ioc_read_type[ioc], io_concentrator[ioc], + READ_INDEP_DATA, sf_context->sf_data_comm, &reqs[ioc]); + } + if (status != MPI_SUCCESS) { + int length = 256; + char error_string[length]; + MPI_Error_string(status, error_string, &length); + printf("(%s) MPI_Irecv error: %s\n", __func__, error_string); + return status; + } + n_waiting++; + } + + } + /* We've queued all of the Async READs, now we just need to + * complete them in any order... + */ + while(n_waiting) { + int ready = 0; + status = MPI_Waitsome(n_io_concentrators, reqs, &ready, indices, stats); + if (status != MPI_SUCCESS) { + int len; + char estring[MPI_MAX_ERROR_STRING]; + MPI_Error_string(status, estring, &len); + printf("[%d %s] MPI_ERROR! MPI_Waitsome returned an error(%s)\n", + sf_world_rank, __func__, estring ); + fflush(stdout); + } + + for(i=0; i < ready; i++) { + ioc = io_concentrator[indices[i]]; + if (sf_verbose_flag) { + printf("[%d] READ bytes(%ld) of data from ioc_concentrator %d complete\n", + sf_world_rank, ioc_read_datasize[indices[i]] , ioc); + fflush(stdout); + } + n_waiting--; + } + } + return status; +} + + + +int sf_write_independent(hid_t context_id, int64_t offset, int64_t elements, int dtype_extent, void *data) +{ + static int *acks = NULL; + static int *indices = NULL; + static MPI_Request *reqs = NULL; + static MPI_Status *stats = NULL; + static int64_t *source_data_offset = NULL; + static int64_t *ioc_write_datasize = NULL; + static int64_t *ioc_write_offset = NULL; + static MPI_Datatype *ioc_write_type = NULL; + + subfiling_context_t *sf_context = get_subfiling_object(context_id); + int i, target, ioc, n_waiting = 0, status = 0; + int errors = 0; + if (acks == NULL) { + if ((acks = (int *)calloc((size_t)n_io_concentrators*2, sizeof(int))) == NULL) { + perror("calloc"); + return -1; + } + else indices = &acks[n_io_concentrators]; + } + if (reqs == NULL) { + if ((reqs = (MPI_Request *)calloc((size_t)n_io_concentrators, sizeof(MPI_Request))) == NULL) { + perror("calloc"); + return -1; + } + } + if (stats == NULL) { + if ((stats = (MPI_Status *)calloc((size_t)n_io_concentrators, sizeof(MPI_Status))) == NULL) { + perror("calloc"); + return -1; + } + } + + if (init__indep_io(sf_context, &source_data_offset, &ioc_write_datasize, &ioc_write_offset, + &ioc_write_type, offset, elements, dtype_extent) < 0) { + return -1; + } + + if (sf_verbose_flag) { + for(ioc=0; ioc < n_io_concentrators; ioc++) { + int64_t sourceOffset = source_data_offset[ioc]; + printf("[%d %s]: write_dest[ioc(%d), sourceOffset=%ld, datasize=%ld, foffset=%ld]\n", + sf_world_rank, __func__, ioc, sourceOffset, + ioc_write_datasize[ioc], ioc_write_offset[ioc] ); + } + } + + /* Prepare the IOCs with a message which indicates the length + * of the actual data to be written. We also provide the file + * offset so that when the IOC recieves the data (in whatever order) + * they can lseek to the correct offset and write the data. + */ + for(target=0; target < n_io_concentrators; target++) { + int64_t sourceOffset; + int64_t msg[2] = {0,}; + char *sourceData = (char *)data; + ioc = (sf_world_rank + target) % n_io_concentrators; + + sourceOffset = source_data_offset[ioc]; + msg[0] = ioc_write_datasize[ioc]; + msg[1] = ioc_write_offset[ioc]; + acks[ioc] = 0; + reqs[ioc] = MPI_REQUEST_NULL; + + if (ioc_write_datasize[ioc] == 0) { + if (sf_verbose_flag) { + printf("[%d %s] skipping ioc(%d) send datasize = %ld\n", + sf_world_rank,__func__, ioc, ioc_write_datasize[ioc]); + fflush(stdout); + } + continue; + } + if ( sf_verbose_flag ) { + printf("[%d] Datatype(%x) Sending to ioc(%d) %ld bytes of data with file_offset=%ld\n", + sf_world_rank, ioc_write_type[ioc], ioc, ioc_write_datasize[ioc], ioc_write_offset[ioc]); + fflush(stdout); + } + + /* Send the Message HEADER which indicates the requested IO operation + * (via the message TAG) along with the data size and file offset. + */ + + status = MPI_Ssend(msg, 2, MPI_INT64_T, io_concentrator[ioc], + WRITE_INDEP, sf_context->sf_msg_comm); + + if (status != MPI_SUCCESS) { + int len; + char estring[MPI_MAX_ERROR_STRING]; + MPI_Error_string(status, estring, &len); + printf("[%d] ERROR! MPI_Send of %ld bytes to %d returned an error(%s)\n", + sf_world_rank, sizeof(msg), io_concentrator[ioc], estring ); + fflush(stdout); + } + + status = MPI_Recv(&acks[ioc], 1, MPI_INT, io_concentrator[ioc], WRITE_INDEP_ACK, + sf_context->sf_data_comm, &stats[ioc]); + + if (status == MPI_SUCCESS) { + if (sf_verbose_flag) { + printf("[%d] received ack(%d) from ioc(%d)\n",sf_world_rank, acks[ioc], ioc); + fflush(stdout); + } + if (acks[ioc] > 0) { + if (ioc_write_type[ioc] == MPI_BYTE) { + int datasize = (int)(ioc_write_datasize[ioc] & INT32_MASK); + status = MPI_Issend(&sourceData[sourceOffset], datasize, + MPI_BYTE, io_concentrator[ioc], WRITE_INDEP_DATA, + sf_context->sf_data_comm,&reqs[ioc]); + } + else { + status = MPI_Issend(&sourceData[sourceOffset], 1, ioc_write_type[ioc], + io_concentrator[ioc], WRITE_INDEP_DATA, + sf_context->sf_data_comm,&reqs[ioc]); + } + /* Queued another Isend which need to be completed (below) */ + n_waiting++; + } + } else { + errors++; + puts("ACK error!"); + fflush(stdout); + } + if (status != MPI_SUCCESS) { + errors++; + printf("[%d] ERROR! Unable to Send data to ioc(%d)\n", + sf_world_rank, ioc); + fflush(stdout); + } + } + + while(n_waiting) { + int ready = 0; + status = MPI_Waitsome(n_io_concentrators, reqs, &ready, indices, stats); + if (status != MPI_SUCCESS) { + int len; + char estring[MPI_MAX_ERROR_STRING]; + MPI_Error_string(status, estring, &len); + printf("[%d %s] MPI_ERROR! MPI_Waitsome returned an error(%s)\n", + sf_world_rank, __func__, estring ); + fflush(stdout); + errors++; + } + + for(i=0; i < ready; i++) { + /* One of the Issend calls has completed + * Wait for another ACK to indicate the data as been written + * to the subfile. + */ + acks[indices[i]] = 0; + n_waiting--; + } + } + if (errors) return -1; + return status; +} + +int sf_close_subfiles(hid_t context_id) +{ + int i, status; + int errors = 0; + int n_waiting = 0; + int indices[n_io_concentrators]; + int ioc_acks[n_io_concentrators]; + MPI_Request reqs[n_io_concentrators]; + subfiling_context_t *sf_context = get_subfiling_object(context_id); + + for (i=0; i < n_io_concentrators; i++) { + int64_t msg[2] = {0, 0}; + status = MPI_Ssend(msg, 2, MPI_INT64_T, io_concentrator[i], CLOSE_OP, sf_context->sf_msg_comm); + if (status == MPI_SUCCESS) { + status = MPI_Irecv(&ioc_acks[i], 1, MPI_INT, io_concentrator[i], COMPLETED, sf_context->sf_data_comm, &reqs[i]); + } + if (status != MPI_SUCCESS) { + printf("[%d] MPI close_subfiles failure!", sf_world_rank); + errors++; + } + else n_waiting++; + } + while(n_waiting) { + int ready = 0; + status = MPI_Waitsome(n_io_concentrators, reqs, &ready, indices, MPI_STATUSES_IGNORE); + if (status != MPI_SUCCESS) { + int len; + char estring[MPI_MAX_ERROR_STRING]; + MPI_Error_string(status, estring, &len); + printf("[%d %s] MPI_ERROR! MPI_Waitsome returned an error(%s)\n", + sf_world_rank, __func__, estring ); + fflush(stdout); + errors++; + } + for(i=0; i < ready; i++) { + n_waiting--; + } + } + return errors; +} + +int sf_open_subfiles(hid_t context_id, char *prefix, int flags) +{ + int i, status; + int n_waiting = 0; + int indices[n_io_concentrators]; + int ioc_acks[n_io_concentrators]; + MPI_Request reqs[n_io_concentrators]; + subfiling_context_t *sf_context = get_subfiling_object(context_id); + + sf_stripe_size = sf_context->sf_stripe_size; + + if ((sf_context->subfile_prefix != NULL) && (prefix != NULL)) { + if (strcmp(sf_context->subfile_prefix, prefix) != 0) { + sf_context->subfile_prefix = strdup(prefix); + } + } + + for (i=0; i < n_io_concentrators; i++) { + int64_t msg[2] = {flags, 0}; + if (sf_verbose_flag) { + printf("[%d] file open request (flags = %0lx)\n", sf_world_rank, msg[0]); + } + status = MPI_Ssend(msg, 2, MPI_INT64_T, io_concentrator[i], OPEN_OP, sf_context->sf_msg_comm); + if (status == MPI_SUCCESS) { + status = MPI_Irecv(&ioc_acks[i], 1, MPI_INT, io_concentrator[i], COMPLETED, sf_context->sf_data_comm, &reqs[i]); + } + if (status != MPI_SUCCESS) { + printf("[%d] MPI close_subfiles failure!", sf_world_rank); + } + else n_waiting++; + } + while(n_waiting) { + int ready = 0; + status = MPI_Waitsome(n_io_concentrators, reqs, &ready, indices, MPI_STATUSES_IGNORE); + if (status != MPI_SUCCESS) { + int len; + char estring[MPI_MAX_ERROR_STRING]; + MPI_Error_string(status, estring, &len); + printf("[%d %s] MPI_ERROR! MPI_Waitsome returned an error(%s)\n", + sf_world_rank, __func__, estring ); + fflush(stdout); + } + for(i=0; i < ready; i++) { + n_waiting--; + } + } + + return 0; +} + +int +ioc_main(subfiling_context_t *context) +{ + int subfile_rank; + int flag, ret; + int max_work_depth; + MPI_Status status, msg_status; + sf_work_request_t *incoming_requests = NULL; + useconds_t delay = 20; + + assert(context != NULL); + subfile_rank = context->sf_group_rank; + if (request_count_per_rank == NULL) { + request_count_per_rank = (int *)calloc((size_t)sf_world_size, sizeof(int)); + assert(request_count_per_rank != NULL); + } + + max_work_depth = sf_world_size * MAX_WORK_PER_RANK; + incoming_requests = (sf_work_request_t *)calloc((size_t)max_work_depth, sizeof(sf_work_request_t)); + assert(incoming_requests != NULL); + +#ifdef DEBUG_TRACING + char logname[64]; + sprintf(logname,"ioc_%d.log", subfile_rank); + sf_logfile = fopen(logname, "w+"); +#endif + /* Initialize atomic vars */ + atomic_init(&sf_workinprogress, 0); + atomic_init(&sf_work_pending, 0); + atomic_init(&sf_file_close_count, 0); + atomic_init(&sf_file_refcount, 0); + + sf_msg_comm = context->sf_msg_comm; /* Messages IN */ + sf_data_comm = context->sf_data_comm; /* Messages OUT */ + + while(!sf_shutdown_flag || sf_work_pending) { + flag = 0; + ret = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, context->sf_msg_comm, &flag, &status); + if ((ret == MPI_SUCCESS) && (flag != 0)) { + sf_work_request_t *msg = NULL; + int count; + int request_size = (int)sizeof(sf_work_request_t); + int source = status.MPI_SOURCE; + int tag = status.MPI_TAG; + + MPI_Get_count(&status, MPI_BYTE, &count); + if (count > request_size) { + msg = (sf_work_request_t *) malloc((size_t)count); + ret = MPI_Recv(msg,count,MPI_BYTE, source, tag, context->sf_msg_comm, &msg_status); + } + else { + ret = MPI_Recv(&incoming_requests[sf_workinprogress],count, MPI_BYTE, + source, tag, context->sf_msg_comm, &msg_status); + } + if (ret == MPI_SUCCESS) { + +#ifdef DEBUG_TRACING + printf("[[ioc(%d) msg from %d tag=%x, datasize=%ld, foffset=%ld]]\n", subfile_rank, source, tag, + incoming_requests[sf_workinprogress].header[0], + incoming_requests[sf_workinprogress].header[1]); + fflush(stdout); +#endif + if (msg) { + msg->tag = tag; + msg->source = source; + msg->subfile_rank = subfile_rank; + tpool_add_work(msg); + } + else { + incoming_requests[sf_workinprogress].tag = tag; + incoming_requests[sf_workinprogress].source = source; + incoming_requests[sf_workinprogress].subfile_rank = subfile_rank; + tpool_add_work(&incoming_requests[sf_workinprogress]); + atomic_fetch_add(&sf_workinprogress, 1); // atomic + atomic_compare_exchange_strong(&sf_workinprogress, &max_work_depth, 0); + } + } + } + else usleep(delay); + } + +#ifdef DEBUG_TRACING + fclose(sf_logfile); +#endif + + return 0; +} + +/* +========================================= +Private helper functions +========================================= +*/ + +static int send_ack__(int target, int subfile_rank, int tag, MPI_Comm comm) +{ + int ack = 1; + int ret = MPI_Send(&ack, 1, MPI_INT, target, tag, comm); + if (sf_verbose_flag) { + printf("[ioc(%d): Sending ACK to MPI_rank(%d)\n", subfile_rank, target); + } + return ret; +} + +static int send_nack__(int target, int subfile_rank, int tag, MPI_Comm comm) +{ + int nack = 0; + int ret = MPI_Send(&nack, 1, MPI_INT, target, tag, comm); + if (sf_verbose_flag) { + printf("[ioc(%d): Sending NACK to MPI_rank(%d)\n", subfile_rank, target); + } + return ret; +} + +/* +========================================= +queue_xxx functions that should be run +from the thread pool threads... +========================================= +*/ + +int queue_write_coll(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm) +{ + return 0; +} + +int queue_read_coll(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm) +{ + return 0; +} + +int queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm) +{ + char *recv_buffer = NULL; + int ret = MPI_SUCCESS; + MPI_Status msg_status; + int64_t data_size = msg->header[0]; + int64_t file_offset = msg->header[1]; + + if (sf_verbose_flag) { + printf("[ioc(%d) %s]: msg from %d: datasize=%ld\toffset=%ld\n", subfile_rank, __func__, source, data_size, file_offset ); + fflush(stdout); + } + if (recv_buffer == NULL) { + if ((recv_buffer = (char *)malloc((size_t)data_size)) == NULL) { + perror("malloc"); + send_nack__(source, subfile_rank, WRITE_INDEP_ACK, comm); + return -1; + } + } + + send_ack__(source, subfile_rank, WRITE_INDEP_ACK, comm); + + ret = MPI_Recv(recv_buffer, (int)data_size, MPI_BYTE, source, WRITE_INDEP_DATA, comm, &msg_status ); + if (ret != MPI_SUCCESS) { + int len; + char estring[MPI_MAX_ERROR_STRING]; + MPI_Error_string(ret, estring, &len); + printf("[ioc(%d) %s] MPI_ERROR(%d)! MPI_Recv of %ld bytes from %d returned an error(%s)\n", + subfile_rank, __func__, msg_status.MPI_ERROR, data_size, source, estring ); + fflush(stdout); + return ret; + } else if(sf_verbose_flag) { + printf("[ioc(%d) %s] MPI_Recv success. Writing %ld bytes from rank %d to disk\n", + subfile_rank, __func__, data_size, source); + fflush(stdout); + } + if (sf_write_data(subfile_fid, file_offset, recv_buffer, data_size, subfile_rank ) < 0) { + free(recv_buffer); + recv_buffer = NULL; + printf("[ioc(%d) %s] sf_write_data returned an error!\n", subfile_rank, __func__); + fflush(stdout); + return -1; + } + // send_ack__(source, WRITE_COMPLETED, tinfo->comm); + if (recv_buffer) { + free(recv_buffer); + } + return 0; +} + +int queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm) +{ + char *send_buffer = NULL; + int ret = MPI_SUCCESS; + int64_t data_size = msg->header[0]; + int64_t file_offset = msg->header[1]; + + + if (sf_verbose_flag) { + printf("[ioc(%d) %s] msg from %d: datasize=%ld\toffset=%ld\n", subfile_rank, __func__, source, data_size, file_offset ); + fflush(stdout); + } + if ((send_buffer = (char *)malloc((size_t)data_size)) == NULL) { + perror("malloc"); + return -1; + } + + if (sf_read_data(subfile_fid, file_offset, send_buffer, data_size, subfile_rank) < 0) { + printf("[%d] %s - sf_read_data returned an error!\n", subfile_rank, __func__); + fflush(stdout); + return -1; + } + ret = MPI_Send(send_buffer, (int)data_size, MPI_BYTE, source, READ_INDEP_DATA, comm); + if (ret != MPI_SUCCESS) { + int len; + char estring[MPI_MAX_ERROR_STRING]; + MPI_Error_string(ret, estring, &len); + printf("[ioc(%d)] ERROR! MPI_Send of %ld bytes to %d returned an error(%s)\n",subfile_rank, data_size, source, estring ); + fflush(stdout); + return ret; + } + + if (send_buffer) free(send_buffer); + + return 0; +} + + +int queue_file_open(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm) +{ + int ret, req_count, errors=0; + int prev_count; + int flags = (int)(msg->header[0] & 0x0ffffffff); + + atomic_fetch_add(&sf_file_refcount, 1); // atomic + + if (sf_verbose_flag) { + printf("[ioc(%d) %s] file open flags = %0x, source=%d\n", subfile_rank, __func__, flags, source); + fflush(stdout); + } + if (subfile_fid < 0) { + errors = subfiling_open_file(sf_subfile_prefix, subfile_rank, flags); + } + + req_count = COMPLETED; + ret = MPI_Send(&req_count, 1, MPI_INT, source, COMPLETED, comm); + if (ret != MPI_SUCCESS) { + errors++; + } + if (errors) { + printf("[ioc(%d) %s] Error opening file\n", subfile_rank, __func__); + fflush(stdout); + } + return errors; +} + +/* + * The decrement is somewhat of misnomer, i.e. we check the number of file open + * requests to the number of file close requests. When those values match, the + * actual file gets closed via the callback_ftn. The effects a weak collective + * on the file close operation. File opens on the other hand, can occur in + * any random order and no collective semanitics are enforced. + */ +int decrement_file_ref_counts( int subfile_rank, int source, MPI_Comm comm, file_close_cb callback_ftn) +{ + int close_count, open_count; + atomic_fetch_add(&sf_file_close_count, 1); // atomic + close_count = atomic_load(&sf_file_close_count); + open_count = atomic_load(&sf_file_refcount); + + if (close_count == open_count) { + atomic_store(&sf_file_refcount, 0); + atomic_store(&sf_file_close_count, 0); /* Complete the reset to zeros */ + if (callback_ftn(subfile_rank, comm) < 0) { + printf("[ioc(%d) %s] callback_ftn returned an error\n", subfile_rank, __func__ ); + fflush(stdout); + } + } + return 0; +} + +/* Note: This function should be called ONLY when all clients + * have called the CLOSE_OP on this IO Concentrator. + * The IOC API maintains a reference count on subfiles + * so that once that count is decremented to zero, the + * decrement_file_ref_counts function will call here. + */ +int subfiling_close_file(int subfile_rank, MPI_Comm comm) +{ + int ret, source = 0; + int errors = 0, flag = COMPLETED; + if (subfile_fid >= 0) { + close(subfile_fid); + subfile_fid = -1; + } + /* Notify all ranks */ + for (source = 0; source < sf_world_size; source++) { + /* Don't release our local MPI process until all + * other ranks are released. + */ + if (source == sf_world_rank) { + continue; + } + ret = MPI_Send(&flag, 1, MPI_INT, source, COMPLETED, comm); + if (ret != MPI_SUCCESS) errors++; + } + + /* Release the local MPI process */ + ret = MPI_Send(&flag, 1, MPI_INT, sf_world_rank, COMPLETED, comm); + if (ret != MPI_SUCCESS) errors++; + + if (errors) { + printf("[ioc(%d) %s] Errors sending file close replies\n", subfile_rank, __func__); + fflush(stdout); + } + + return errors; +} + +int subfiling_open_file(const char *prefix, int subfile_rank, int flags) +{ + int errors = 0; + /* Only the real IOCs open the subfiles */ + if (subfile_rank >= 0) { + const char *dotconfig = ".subfile_config"; + mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; + char filepath[PATH_MAX]; + char config[PATH_MAX]; + if (prefix) { + mkdir(prefix, S_IRWXU); + sprintf(filepath, "%s/node_local_temp_%d_of_%d", + prefix, subfile_rank, n_io_concentrators); + sprintf(config, "%s/%s", prefix, dotconfig); + } + else { + sprintf(filepath, "node_local_temp_%d_of_%d", + subfile_rank,n_io_concentrators); + strcpy(config, dotconfig); + } + + if ((subfile_fid = open(filepath, flags, mode)) < 0) { + perror("subfile open"); + errors++; + goto done; + } + else { + /* + * File open and close operations are collective + * in intent. Apart from actual IO operations, we + * initialize the number of references to everyone + * so that we detect when pending IO operations are + * have all completed before we close the actual + * subfiles. + */ + atomic_init((atomic_int *)&sf_workinprogress, sf_world_size); + } + + if ((subfile_rank == 0) && (flags & O_CREAT)) { + size_t bufsize = PATH_MAX + 16; + FILE *f = NULL; + char linebuf[bufsize]; + /* If a config file already exists, AND + * the user wants to truncate subfiles (if they exist), + * then we should also truncate an existing config file. + */ + if (access(config, flags) == 0) { + truncate(config, 0); + } + f = fopen(config, "w+"); + if (f != NULL) { + int k; + char *underscore = strrchr(filepath,'_'); + *underscore=0; + strcpy(config, filepath); + *underscore='_'; + sprintf(linebuf,"stripe_size=%ld\n", sf_stripe_size); + fwrite(linebuf, strlen(linebuf), 1, f); + sprintf(linebuf,"aggregator_count=%d\n",n_io_concentrators); + fwrite(linebuf, strlen(linebuf), 1, f); + + for(k=0; k < n_io_concentrators; k++) { + snprintf(linebuf,bufsize,"%s_%d:%d\n",config, k, io_concentrator[k]); + fwrite(linebuf, strlen(linebuf), 1, f); + } + + fclose(f); + } + else { + perror("fopen(config)"); + errors++; + goto done; + } + } + if (sf_verbose_flag) { + printf("[ioc:%d] Opened subfile %s\n", subfile_rank, filepath); + } + } +done: + return errors; +} + +void +delete_subfiling_context(hid_t context_id) +{ + subfiling_context_t *sf_context = get_subfiling_object(context_id); + MPI_Comm_free(&sf_context->sf_msg_comm); + MPI_Comm_free(&sf_context->sf_data_comm); + sf_msg_comm = MPI_COMM_NULL; + sf_data_comm = MPI_COMM_NULL; + if (n_io_concentrators > 1) { + MPI_Comm_free(&sf_context->sf_group_comm); + MPI_Comm_free(&sf_context->sf_intercomm); + } + + free(sf_context); + usleep(100); + + return; +} diff --git a/src/H5FDsubfile_private.h b/src/H5FDsubfile_private.h new file mode 100644 index 0000000..cacfdb9 --- /dev/null +++ b/src/H5FDsubfile_private.h @@ -0,0 +1,183 @@ +/********************/ +/* Standard Headers */ +/********************/ + +#include +#include +#include +#include +#include +#include +#include +#include + +/**************/ +/* H5 Headers */ +/**************/ +#include "H5private.h" /* Generic Functions */ +#include "H5CXprivate.h" /* API Contexts */ +#include "H5Dprivate.h" /* Datasets */ +#include "H5Eprivate.h" /* Error handling */ +#include "H5Ipublic.h" +#include "H5Iprivate.h" /* IDs */ +#include "H5MMprivate.h" /* Memory management */ +#include "H5Pprivate.h" /* Property lists */ + + +#include "mpi.h" + +#ifndef _H5FDsubfile_private_H +#define _H5FDsubfile_private_H + +typedef int (*file_close_cb)(int,MPI_Comm); + +typedef struct { + int64_t sf_stripe_size; + int64_t sf_blocksize_per_stripe; + MPI_Comm sf_msg_comm; + MPI_Comm sf_data_comm; + MPI_Comm sf_group_comm; + MPI_Comm sf_intercomm; + int sf_group_size; + int sf_group_rank; + int sf_intercomm_root; + char *subfile_prefix; +} subfiling_context_t; + +typedef struct { + /* {Datasize, Offset} */ + int64_t header[2]; + int tag; + int source; + int subfile_rank; +} sf_work_request_t; + + + +typedef struct { + long rank; + long hostid; +} layout_t; + +typedef struct { + long hostid; + layout_t *topology; + int *node_ranks; + int node_count; + int node_index; + int local_peers; + int subfile_rank; + int world_rank; + int world_size; + bool rank_is_ioc; +} sf_topology_t; + +#define K(n) ((n)*1024) +#define DEFAULT_STRIPE_SIZE K(4) /* (1024*1024) */ +#define MAX_DEPTH 256 + +typedef enum io_ops { + READ_OP = 1, + WRITE_OP = 2, + OPEN_OP = 3, + CLOSE_OP = 4, + INCR_OP = 8, + DECR_OP = 16, +} io_op_t; + +typedef enum { + SF_BADID = (-1), + SF_TOPOLOGY = 1, + SF_CONTEXT, + SF_NTYPES /* number of subfiling object types, MUST BE LAST */ +} SF_OBJ_TYPE; + + + +/* MPI Tags are 32 bits, we treat them as unsigned + * to allow the use of the available bits for RPC + * selections: + * 0000 + * 0001 READ_OP (Independent) + * 0010 WRITE_OP (Independent) + * 0011 ///////// + * 0100 CLOSE_OP (Independent) + * ----- + * 1000 + * 1001 COLLECTIVE_READ + * 1010 COLLECTIVE_WRITE + * 1011 ///////// + * 1100 COLLECTIVE_CLOSE + * + * 31 28 24 20 16 12 8 4 0| + * +-------+-------+-------+-------+-------+-------+-------+-------+ + * | | | ACKS | OP | + * +-------+-------+-------+-------+-------+-------+-------+-------+ + * + */ + +/* Bit 3 SET indicates collectives */ +#define COLL_FUNC (0x1 << 3) + +#define ACK_PART (0x0acc << 8) +#define DATA_PART (0xd8da << 8) +#define COMPLETED (0xfed1 << 8) + +#define READ_INDEP (READ_OP) +#define READ_COLL (COLL_FUNC | READ_OP) +#define WRITE_INDEP (WRITE_OP) +#define WRITE_COLL (COLL_FUNC | WRITE_OP) + +#define WRITE_INDEP_ACK (ACK_PART | WRITE_OP) +#define WRITE_INDEP_DATA (DATA_PART | WRITE_OP) + +#define READ_INDEP_DATA (DATA_PART | READ_OP) + +#define INT32_MASK 0x07FFFFFFFFFFFFFFF + +extern int sf_verbose_flag; +extern int sf_shutdown_flag; + +extern atomic_int sf_workinprogress; +extern atomic_int sf_work_pending; +extern atomic_int sf_file_close_count; +extern atomic_int sf_file_refcount; + +/* +------------- +Messages IN +------------- +*/ +extern MPI_Comm sf_msg_comm; + +/* +------------- +Messages OUT +------------- +*/ +extern MPI_Comm sf_data_comm; + + + +H5_DLL int H5FD__determine_ioc_count(int world_size, int world_rank, sf_topology_t **thisapp); +H5_DLL int H5FD__init_subfile_context(subfiling_context_t **newContext, int n_iocs, int world_size, int world_rank, bool rank_is_ioc); +H5_DLL void * get_subfiling_object(int64_t object_id); +H5_DLL hid_t get_subfiling_context(void); +H5_DLL int initialize_ioc_threads(subfiling_context_t *sf_context); +H5_DLL int tpool_add_work(sf_work_request_t *); +H5_DLL int ioc_main(subfiling_context_t *context); +H5_DLL int queue_write_coll(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm); +H5_DLL int queue_read_coll(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm); +H5_DLL int queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm); +H5_DLL int queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm); +H5_DLL int subfiling_close_file(int subfile_rank, MPI_Comm comm); +H5_DLL int subfiling_open_file(const char *prefix, int subfile_rank, MPI_Comm comm); +H5_DLL int queue_file_open(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm); +H5_DLL int decrement_file_ref_counts( int subfile_rank, int source, MPI_Comm comm, file_close_cb callback_ftn); +H5_DLL int sf_open_subfiles(hid_t context_id, char *prefix, int flags); +H5_DLL int sf_close_subfiles(hid_t context_id); +H5_DLL int sf_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank); +H5_DLL int sf_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank); +H5_DLL void delete_subfiling_context(hid_t context_id); + +#endif diff --git a/src/H5FDsubfile_public.h b/src/H5FDsubfile_public.h new file mode 100644 index 0000000..6e4e23c --- /dev/null +++ b/src/H5FDsubfile_public.h @@ -0,0 +1,11 @@ +#ifndef _H5FDsubfile_public_H +#define _H5FDsubfile_public_H + +#include "H5FDsubfile_private.h" + +herr_t H5FDsubfiling_init(void); +herr_t H5FDsubfiling_finalize(void); + + +#endif /* _H5FDsubfile_public_H */ + diff --git a/src/H5FDsubfile_threads.c b/src/H5FDsubfile_threads.c new file mode 100644 index 0000000..bc9e352 --- /dev/null +++ b/src/H5FDsubfile_threads.c @@ -0,0 +1,132 @@ + +#include "H5FDsubfile_private.h" + +#include "mercury/mercury_util_config.h" +#include "mercury/mercury_log.h" +#include "mercury/mercury_log.c" +#include "mercury/mercury_util_error.c" +#include "mercury/mercury_thread.c" +#include "mercury/mercury_thread_mutex.c" +#include "mercury/mercury_thread_condition.h" +#include "mercury/mercury_thread_condition.c" +#include "mercury/mercury_thread_pool.c" +#include "mercury/mercury_thread_spin.c" + +static hg_thread_mutex_t ioc_mutex = PTHREAD_MUTEX_INITIALIZER; +static hg_thread_pool_t *ioc_thread_pool = NULL; +static hg_thread_t ioc_thread; + +#define HG_TEST_NUM_THREADS_DEFAULT 4 +#define POOL_CONCURRENT_MAX 64 + +static struct hg_thread_work pool_request[POOL_CONCURRENT_MAX]; + +static HG_THREAD_RETURN_TYPE +ioc_thread_main(void *arg) +{ + hg_thread_ret_t thread_ret = (hg_thread_ret_t) 0; + + /* Pass along the subfiling_context_t */ + ioc_main(arg); + + // hg_thread_exit(thread_ret); + return thread_ret; +} + +int +initialize_ioc_threads(subfiling_context_t *sf_context) +{ + int status; + status = hg_thread_mutex_init(&ioc_mutex); + if (status) { + puts("hg_thread_mutex_init failed"); + goto err_exit; + } + status = hg_thread_pool_init(HG_TEST_NUM_THREADS_DEFAULT, &ioc_thread_pool); + if (status) { + puts("hg_thread_pool_init failed"); + goto err_exit; + } + status = hg_thread_create(&ioc_thread, ioc_thread_main, sf_context); + if (status) { + puts("hg_thread_create failed"); + goto err_exit; + } + return 0; + +err_exit: + return -1; +} + + +void __attribute__((destructor)) finalize_ioc_threads() +{ + if (ioc_thread_pool != NULL) { + hg_thread_pool_destroy(ioc_thread_pool); + ioc_thread_pool = NULL; + + if (hg_thread_join(ioc_thread) == 0) + puts("thread_join succeeded"); + else puts("thread_join failed"); + } +} + + +static HG_THREAD_RETURN_TYPE +handle_work_request(void *arg) +{ + hg_thread_ret_t ret = 0; + sf_work_request_t *msg = (sf_work_request_t *)arg; + int status = 0; + + atomic_fetch_add(&sf_work_pending, 1); // atomic + switch(msg->tag) { + case WRITE_COLL: + status = queue_write_coll( msg, msg->subfile_rank, msg->source, sf_data_comm); + break; + case READ_COLL: + status = queue_read_coll( msg, msg->subfile_rank, msg->source, sf_data_comm); + break; + case WRITE_INDEP: + status = queue_write_indep( msg, msg->subfile_rank, msg->source, sf_data_comm); + break; + case READ_INDEP: + status = queue_read_indep( msg, msg->subfile_rank, msg->source, sf_data_comm); + break; + case CLOSE_OP: + hg_thread_mutex_lock(&ioc_mutex); + status = decrement_file_ref_counts( msg->subfile_rank, msg->source, sf_data_comm, + subfiling_close_file); + hg_thread_mutex_unlock(&ioc_mutex); + break; + case OPEN_OP: + status = queue_file_open( msg, msg->subfile_rank, msg->source, sf_data_comm); + break; + + default: + printf("[ioc(%d)] received message tag(%x)from rank %d\n", msg->subfile_rank, msg->tag, msg->source); + status = -1; + break; + } + + atomic_fetch_sub(&sf_work_pending, 1); // atomic + if (status < 0) { + printf("[ioc(%d) %s]: Error encounted processing request(%x) from rank(%d\n", + msg->subfile_rank, __func__, msg->tag, msg->source); + fflush(stdout); + } + return ret; +} + +int tpool_add_work(sf_work_request_t *work) +{ + static int work_index = 0; + hg_thread_mutex_lock(&ioc_mutex); + if (work_index == POOL_CONCURRENT_MAX) + work_index = 0; + pool_request[work_index].func = handle_work_request; + pool_request[work_index].args = work; + hg_thread_pool_post(ioc_thread_pool, &pool_request[work_index++]); + hg_thread_mutex_unlock(&ioc_mutex); + return 0; +} diff --git a/src/mercury/mercury_atomic.h b/src/mercury/mercury_atomic.h new file mode 100644 index 0000000..7a684b9 --- /dev/null +++ b/src/mercury/mercury_atomic.h @@ -0,0 +1,637 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +#ifndef MERCURY_ATOMIC_H +#define MERCURY_ATOMIC_H + +#include "mercury_util_config.h" + +#if defined(_WIN32) +# include +typedef struct { + volatile LONG value; +} hg_atomic_int32_t; +typedef struct { + volatile LONGLONG value; +} hg_atomic_int64_t; +# define HG_ATOMIC_VAR_INIT(x) \ + { \ + (x) \ + } +#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H) +# include +typedef OPA_int_t hg_atomic_int32_t; +typedef OPA_ptr_t hg_atomic_int64_t; /* OPA has only limited 64-bit support */ +# define HG_ATOMIC_VAR_INIT(x) OPA_PTR_T_INITIALIZER(x) +#elif defined(HG_UTIL_HAS_STDATOMIC_H) +# include +typedef atomic_int hg_atomic_int32_t; +# if HG_UTIL_ATOMIC_LONG_WIDTH == 8 +typedef atomic_long hg_atomic_int64_t; +# else +typedef atomic_llong hg_atomic_int64_t; +# endif +# define HG_ATOMIC_VAR_INIT(x) ATOMIC_VAR_INIT(x) +#elif defined(__APPLE__) +# include +typedef struct { + volatile hg_util_int32_t value; +} hg_atomic_int32_t; +typedef struct { + volatile hg_util_int64_t value; +} hg_atomic_int64_t; +# define HG_ATOMIC_VAR_INIT(x) \ + { \ + (x) \ + } +#else +# error "Not supported on this platform." +#endif + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Init atomic value (32-bit integer). + * + * \param ptr [OUT] pointer to an atomic32 integer + * \param value [IN] value + */ +static HG_UTIL_INLINE void +hg_atomic_init32(hg_atomic_int32_t *ptr, hg_util_int32_t value); + +/** + * Set atomic value (32-bit integer). + * + * \param ptr [OUT] pointer to an atomic32 integer + * \param value [IN] value + */ +static HG_UTIL_INLINE void +hg_atomic_set32(hg_atomic_int32_t *ptr, hg_util_int32_t value); + +/** + * Get atomic value (32-bit integer). + * + * \param ptr [OUT] pointer to an atomic32 integer + * + * \return Value of the atomic integer + */ +static HG_UTIL_INLINE hg_util_int32_t +hg_atomic_get32(hg_atomic_int32_t *ptr); + +/** + * Increment atomic value (32-bit integer). + * + * \param ptr [IN/OUT] pointer to an atomic32 integer + * + * \return Incremented value + */ +static HG_UTIL_INLINE hg_util_int32_t +hg_atomic_incr32(hg_atomic_int32_t *ptr); + +/** + * Decrement atomic value (32-bit integer). + * + * \param ptr [IN/OUT] pointer to an atomic32 integer + * + * \return Decremented value + */ +static HG_UTIL_INLINE hg_util_int32_t +hg_atomic_decr32(hg_atomic_int32_t *ptr); + +/** + * OR atomic value (32-bit integer). + * + * \param ptr [IN/OUT] pointer to an atomic32 integer + * \param value [IN] value to OR with + * + * \return Original value + */ +static HG_UTIL_INLINE hg_util_int32_t +hg_atomic_or32(hg_atomic_int32_t *ptr, hg_util_int32_t value); + +/** + * XOR atomic value (32-bit integer). + * + * \param ptr [IN/OUT] pointer to an atomic32 integer + * \param value [IN] value to XOR with + * + * \return Original value + */ +static HG_UTIL_INLINE hg_util_int32_t +hg_atomic_xor32(hg_atomic_int32_t *ptr, hg_util_int32_t value); + +/** + * AND atomic value (32-bit integer). + * + * \param ptr [IN/OUT] pointer to an atomic32 integer + * \param value [IN] value to AND with + * + * \return Original value + */ +static HG_UTIL_INLINE hg_util_int32_t +hg_atomic_and32(hg_atomic_int32_t *ptr, hg_util_int32_t value); + +/** + * Compare and swap values (32-bit integer). + * + * \param ptr [IN/OUT] pointer to an atomic32 integer + * \param compare_value [IN] value to compare to + * \param swap_value [IN] value to swap with if ptr value is equal to + * compare value + * + * \return HG_UTIL_TRUE if swapped or HG_UTIL_FALSE + */ +static HG_UTIL_INLINE hg_util_bool_t +hg_atomic_cas32(hg_atomic_int32_t *ptr, hg_util_int32_t compare_value, + hg_util_int32_t swap_value); + +/** + * Init atomic value (64-bit integer). + * + * \param ptr [OUT] pointer to an atomic32 integer + * \param value [IN] value + */ +static HG_UTIL_INLINE void +hg_atomic_init64(hg_atomic_int64_t *ptr, hg_util_int64_t value); + +/** + * Set atomic value (64-bit integer). + * + * \param ptr [OUT] pointer to an atomic64 integer + * \param value [IN] value + */ +static HG_UTIL_INLINE void +hg_atomic_set64(hg_atomic_int64_t *ptr, hg_util_int64_t value); + +/** + * Get atomic value (64-bit integer). + * + * \param ptr [OUT] pointer to an atomic64 integer + * + * \return Value of the atomic integer + */ +static HG_UTIL_INLINE hg_util_int64_t +hg_atomic_get64(hg_atomic_int64_t *ptr); + +/** + * Increment atomic value (64-bit integer). + * + * \param ptr [IN/OUT] pointer to an atomic64 integer + * + * \return Incremented value + */ +static HG_UTIL_INLINE hg_util_int64_t +hg_atomic_incr64(hg_atomic_int64_t *ptr); + +/** + * Decrement atomic value (64-bit integer). + * + * \param ptr [IN/OUT] pointer to an atomic64 integer + * + * \return Decremented value + */ +static HG_UTIL_INLINE hg_util_int64_t +hg_atomic_decr64(hg_atomic_int64_t *ptr); + +/** + * OR atomic value (64-bit integer). + * + * \param ptr [IN/OUT] pointer to an atomic64 integer + * \param value [IN] value to OR with + * + * \return Original value + */ +static HG_UTIL_INLINE hg_util_int64_t +hg_atomic_or64(hg_atomic_int64_t *ptr, hg_util_int64_t value); + +/** + * XOR atomic value (64-bit integer). + * + * \param ptr [IN/OUT] pointer to an atomic64 integer + * \param value [IN] value to XOR with + * + * \return Original value + */ +static HG_UTIL_INLINE hg_util_int64_t +hg_atomic_xor64(hg_atomic_int64_t *ptr, hg_util_int64_t value); + +/** + * AND atomic value (64-bit integer). + * + * \param ptr [IN/OUT] pointer to an atomic64 integer + * \param value [IN] value to AND with + * + * \return Original value + */ +static HG_UTIL_INLINE hg_util_int64_t +hg_atomic_and64(hg_atomic_int64_t *ptr, hg_util_int64_t value); + +/** + * Compare and swap values (64-bit integer). + * + * \param ptr [IN/OUT] pointer to an atomic64 integer + * \param compare_value [IN] value to compare to + * \param swap_value [IN] value to swap with if ptr value is equal to + * compare value + * + * \return HG_UTIL_TRUE if swapped or HG_UTIL_FALSE + */ +static HG_UTIL_INLINE hg_util_bool_t +hg_atomic_cas64(hg_atomic_int64_t *ptr, hg_util_int64_t compare_value, + hg_util_int64_t swap_value); + +/** + * Memory barrier. + * + */ +static HG_UTIL_INLINE void +hg_atomic_fence(void); + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE void +hg_atomic_init32(hg_atomic_int32_t *ptr, hg_util_int32_t value) +{ +#if defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H) + atomic_init(ptr, value); +#else + hg_atomic_set32(ptr, value); +#endif +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE void +hg_atomic_set32(hg_atomic_int32_t *ptr, hg_util_int32_t value) +{ +#if defined(_WIN32) + ptr->value = value; +#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H) + OPA_store_int(ptr, value); +#elif defined(HG_UTIL_HAS_STDATOMIC_H) + atomic_store_explicit(ptr, value, memory_order_release); +#elif defined(__APPLE__) + ptr->value = value; +#else +# error "Not supported on this platform." +#endif +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE hg_util_int32_t +hg_atomic_get32(hg_atomic_int32_t *ptr) +{ + hg_util_int32_t ret; + +#if defined(_WIN32) + ret = ptr->value; +#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H) + ret = OPA_load_int(ptr); +#elif defined(HG_UTIL_HAS_STDATOMIC_H) + ret = atomic_load_explicit(ptr, memory_order_acquire); +#elif defined(__APPLE__) + ret = ptr->value; +#else +# error "Not supported on this platform." +#endif + + return ret; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE hg_util_int32_t +hg_atomic_incr32(hg_atomic_int32_t *ptr) +{ + hg_util_int32_t ret; + +#if defined(_WIN32) + ret = InterlockedIncrementNoFence(&ptr->value); +#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H) + ret = OPA_fetch_and_incr_int(ptr) + 1; +#elif defined(HG_UTIL_HAS_STDATOMIC_H) + ret = atomic_fetch_add_explicit(ptr, 1, memory_order_acq_rel) + 1; +#elif defined(__APPLE__) + ret = OSAtomicIncrement32(&ptr->value); +#else +# error "Not supported on this platform." +#endif + + return ret; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE hg_util_int32_t +hg_atomic_decr32(hg_atomic_int32_t *ptr) +{ + hg_util_int32_t ret; + +#if defined(_WIN32) + ret = InterlockedDecrementNoFence(&ptr->value); +#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H) + ret = OPA_fetch_and_decr_int(ptr) - 1; +#elif defined(HG_UTIL_HAS_STDATOMIC_H) + ret = atomic_fetch_sub_explicit(ptr, 1, memory_order_acq_rel) - 1; +#elif defined(__APPLE__) + ret = OSAtomicDecrement32(&ptr->value); +#else +# error "Not supported on this platform." +#endif + + return ret; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE hg_util_int32_t +hg_atomic_or32(hg_atomic_int32_t *ptr, hg_util_int32_t value) +{ + hg_util_int32_t ret; + +#if defined(_WIN32) + ret = InterlockedOrNoFence(&ptr->value, value); +#elif defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H) + ret = atomic_fetch_or_explicit(ptr, value, memory_order_acq_rel); +#elif defined(__APPLE__) + ret = OSAtomicOr32Orig((uint32_t) value, (volatile uint32_t *) &ptr->value); +#else + do { + ret = hg_atomic_get32(ptr); + } while (!hg_atomic_cas32(ptr, ret, (ret | value))); +#endif + + return ret; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE hg_util_int32_t +hg_atomic_xor32(hg_atomic_int32_t *ptr, hg_util_int32_t value) +{ + hg_util_int32_t ret; + +#if defined(_WIN32) + ret = InterlockedXorNoFence(&ptr->value, value); +#elif defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H) + ret = atomic_fetch_xor_explicit(ptr, value, memory_order_acq_rel); +#elif defined(__APPLE__) + ret = + OSAtomicXor32Orig((uint32_t) value, (volatile uint32_t *) &ptr->value); +#else + do { + ret = hg_atomic_get32(ptr); + } while (!hg_atomic_cas32(ptr, ret, (ret ^ value))); +#endif + + return ret; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE hg_util_int32_t +hg_atomic_and32(hg_atomic_int32_t *ptr, hg_util_int32_t value) +{ + hg_util_int32_t ret; + +#if defined(_WIN32) + ret = InterlockedAndNoFence(&ptr->value, value); +#elif defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H) + ret = atomic_fetch_and_explicit(ptr, value, memory_order_acq_rel); +#elif defined(__APPLE__) + ret = + OSAtomicAnd32Orig((uint32_t) value, (volatile uint32_t *) &ptr->value); +#else + do { + ret = hg_atomic_get32(ptr); + } while (!hg_atomic_cas32(ptr, ret, (ret & value))); +#endif + + return ret; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE hg_util_bool_t +hg_atomic_cas32(hg_atomic_int32_t *ptr, hg_util_int32_t compare_value, + hg_util_int32_t swap_value) +{ + hg_util_bool_t ret; + +#if defined(_WIN32) + ret = (compare_value == InterlockedCompareExchangeNoFence( + &ptr->value, swap_value, compare_value)); +#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H) + ret = (hg_util_bool_t)( + compare_value == OPA_cas_int(ptr, compare_value, swap_value)); +#elif defined(HG_UTIL_HAS_STDATOMIC_H) + ret = atomic_compare_exchange_strong_explicit(ptr, &compare_value, + swap_value, memory_order_acq_rel, memory_order_acquire); +#elif defined(__APPLE__) + ret = OSAtomicCompareAndSwap32(compare_value, swap_value, &ptr->value); +#else +# error "Not supported on this platform." +#endif + + return ret; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE void +hg_atomic_init64(hg_atomic_int64_t *ptr, hg_util_int64_t value) +{ +#if defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H) + atomic_init(ptr, value); +#else + hg_atomic_set64(ptr, value); +#endif +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE void +hg_atomic_set64(hg_atomic_int64_t *ptr, hg_util_int64_t value) +{ +#if defined(_WIN32) + ptr->value = value; +#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H) + OPA_store_ptr(ptr, (void *) value); +#elif defined(HG_UTIL_HAS_STDATOMIC_H) + atomic_store_explicit(ptr, value, memory_order_release); +#elif defined(__APPLE__) + ptr->value = value; +#else +# error "Not supported on this platform." +#endif +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE hg_util_int64_t +hg_atomic_get64(hg_atomic_int64_t *ptr) +{ + hg_util_int64_t ret; + +#if defined(_WIN32) + ret = ptr->value; +#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H) + ret = (hg_util_int64_t) OPA_load_ptr(ptr); +#elif defined(HG_UTIL_HAS_STDATOMIC_H) + ret = atomic_load_explicit(ptr, memory_order_acquire); +#elif defined(__APPLE__) + ptr->value = value; +#else +# error "Not supported on this platform." +#endif + + return ret; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE hg_util_int64_t +hg_atomic_incr64(hg_atomic_int64_t *ptr) +{ + hg_util_int64_t ret; + +#if defined(_WIN32) + ret = InterlockedIncrementNoFence64(&ptr->value); +#elif defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H) + ret = atomic_fetch_add_explicit(ptr, 1, memory_order_acq_rel) + 1; +#elif defined(__APPLE__) + ret = OSAtomicIncrement64(&ptr->value); +#else + do { + ret = hg_atomic_get64(ptr); + } while (!hg_atomic_cas64(ptr, ret, ret + 1)); + ret++; +#endif + + return ret; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE hg_util_int64_t +hg_atomic_decr64(hg_atomic_int64_t *ptr) +{ + hg_util_int64_t ret; + +#if defined(_WIN32) + ret = InterlockedDecrementNoFence64(&ptr->value); +#elif defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H) + ret = atomic_fetch_sub_explicit(ptr, 1, memory_order_acq_rel) - 1; +#elif defined(__APPLE__) + ret = OSAtomicDecrement64(&ptr->value); +#else + do { + ret = hg_atomic_get64(ptr); + } while (!hg_atomic_cas64(ptr, ret, ret - 1)); + ret--; +#endif + + return ret; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE hg_util_int64_t +hg_atomic_or64(hg_atomic_int64_t *ptr, hg_util_int64_t value) +{ + hg_util_int64_t ret; + +#if defined(_WIN32) + ret = InterlockedOr64NoFence(&ptr->value, value); +#elif defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H) + ret = atomic_fetch_or_explicit(ptr, value, memory_order_acq_rel); +#else + do { + ret = hg_atomic_get64(ptr); + } while (!hg_atomic_cas64(ptr, ret, (ret | value))); +#endif + + return ret; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE hg_util_int64_t +hg_atomic_xor64(hg_atomic_int64_t *ptr, hg_util_int64_t value) +{ + hg_util_int64_t ret; + +#if defined(_WIN32) + ret = InterlockedXor64NoFence(&ptr->value, value); +#elif defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H) + ret = atomic_fetch_xor_explicit(ptr, value, memory_order_acq_rel); +#else + do { + ret = hg_atomic_get64(ptr); + } while (!hg_atomic_cas64(ptr, ret, (ret ^ value))); +#endif + + return ret; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE hg_util_int64_t +hg_atomic_and64(hg_atomic_int64_t *ptr, hg_util_int64_t value) +{ + hg_util_int64_t ret; + +#if defined(_WIN32) + ret = InterlockedAnd64NoFence(&ptr->value, value); +#elif defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H) + ret = atomic_fetch_and_explicit(ptr, value, memory_order_acq_rel); +#else + do { + ret = hg_atomic_get64(ptr); + } while (!hg_atomic_cas64(ptr, ret, (ret & value))); +#endif + + return ret; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE hg_util_bool_t +hg_atomic_cas64(hg_atomic_int64_t *ptr, hg_util_int64_t compare_value, + hg_util_int64_t swap_value) +{ + hg_util_bool_t ret; + +#if defined(_WIN32) + ret = (compare_value == InterlockedCompareExchangeNoFence64( + &ptr->value, swap_value, compare_value)); +#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H) + ret = (hg_util_bool_t)( + compare_value == (hg_util_int64_t) OPA_cas_ptr( + ptr, (void *) compare_value, (void *) swap_value)); +#elif defined(HG_UTIL_HAS_STDATOMIC_H) + ret = atomic_compare_exchange_strong_explicit(ptr, &compare_value, + swap_value, memory_order_acq_rel, memory_order_acquire); +#elif defined(__APPLE__) + ret = OSAtomicCompareAndSwap64(compare_value, swap_value, &ptr->value); +#else +# error "Not supported on this platform." +#endif + + return ret; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE void +hg_atomic_fence() +{ +#if defined(_WIN32) + MemoryBarrier(); +#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H) + OPA_read_write_barrier(); +#elif defined(HG_UTIL_HAS_STDATOMIC_H) + atomic_thread_fence(memory_order_acq_rel); +#elif defined(__APPLE__) + OSMemoryBarrier(); +#else +# error "Not supported on this platform." +#endif +} + +#ifdef __cplusplus +} +#endif + +#endif /* MERCURY_ATOMIC_H */ diff --git a/src/mercury/mercury_atomic_queue.c b/src/mercury/mercury_atomic_queue.c new file mode 100644 index 0000000..4c6a8e2 --- /dev/null +++ b/src/mercury/mercury_atomic_queue.c @@ -0,0 +1,83 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +/* Implementation derived from: + * https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h + * + * - + * Copyright (c) 2007-2009 Kip Macy + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + * + */ + +#include "mercury_atomic_queue.h" +#include "mercury_util_error.h" + +#include + +/****************/ +/* Local Macros */ +/****************/ + +/* From */ +#define powerof2(x) ((((x) -1) & (x)) == 0) + +/*---------------------------------------------------------------------------*/ +struct hg_atomic_queue * +hg_atomic_queue_alloc(unsigned int count) +{ + struct hg_atomic_queue *hg_atomic_queue = NULL; + + HG_UTIL_CHECK_ERROR_NORET( + !powerof2(count), done, "atomic queue size must be power of 2"); + + hg_atomic_queue = malloc( + sizeof(struct hg_atomic_queue) + count * sizeof(hg_atomic_int64_t)); + HG_UTIL_CHECK_ERROR_NORET( + hg_atomic_queue == NULL, done, "Could not allocate atomic queue"); + + hg_atomic_queue->prod_size = hg_atomic_queue->cons_size = count; + hg_atomic_queue->prod_mask = hg_atomic_queue->cons_mask = count - 1; + hg_atomic_init32(&hg_atomic_queue->prod_head, 0); + hg_atomic_init32(&hg_atomic_queue->cons_head, 0); + hg_atomic_init32(&hg_atomic_queue->prod_tail, 0); + hg_atomic_init32(&hg_atomic_queue->cons_tail, 0); + +done: + return hg_atomic_queue; +} + +/*---------------------------------------------------------------------------*/ +void +hg_atomic_queue_free(struct hg_atomic_queue *hg_atomic_queue) +{ + free(hg_atomic_queue); +} diff --git a/src/mercury/mercury_atomic_queue.h b/src/mercury/mercury_atomic_queue.h new file mode 100644 index 0000000..73c0b15 --- /dev/null +++ b/src/mercury/mercury_atomic_queue.h @@ -0,0 +1,271 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +/* Implementation derived from: + * https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h + * + * - + * Copyright (c) 2007-2009 Kip Macy + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + * + */ + +#ifndef MERCURY_ATOMIC_QUEUE_H +#define MERCURY_ATOMIC_QUEUE_H + +#include "mercury_atomic.h" +#include "mercury_mem.h" + +/*************************************/ +/* Public Type and Struct Definition */ +/*************************************/ + +struct hg_atomic_queue { + hg_atomic_int32_t prod_head; + hg_atomic_int32_t prod_tail; + unsigned int prod_size; + unsigned int prod_mask; + hg_util_uint64_t drops; + hg_atomic_int32_t cons_head + __attribute__((aligned(HG_MEM_CACHE_LINE_SIZE))); + hg_atomic_int32_t cons_tail; + unsigned int cons_size; + unsigned int cons_mask; + hg_atomic_int64_t ring[] __attribute__((aligned(HG_MEM_CACHE_LINE_SIZE))); +}; + +/*****************/ +/* Public Macros */ +/*****************/ + +#ifndef cpu_spinwait +# if defined(__x86_64__) || defined(__amd64__) +# define cpu_spinwait() asm volatile("pause\n" : : : "memory"); +# else +# define cpu_spinwait() ; +# endif +#endif + +/*********************/ +/* Public Prototypes */ +/*********************/ + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Allocate a new queue that can hold \count elements. + * + * \param count [IN] maximum number of elements + * + * \return pointer to allocated queue or NULL on failure + */ +HG_UTIL_PUBLIC struct hg_atomic_queue * +hg_atomic_queue_alloc(unsigned int count); + +/** + * Free an existing queue. + * + * \param hg_atomic_queue [IN] pointer to queue + */ +HG_UTIL_PUBLIC void +hg_atomic_queue_free(struct hg_atomic_queue *hg_atomic_queue); + +/** + * Push an entry to the queue. + * + * \param hg_atomic_queue [IN/OUT] pointer to queue + * \param entry [IN] pointer to object + * + * \return Non-negative on success or negative on failure + */ +static HG_UTIL_INLINE int +hg_atomic_queue_push(struct hg_atomic_queue *hg_atomic_queue, void *entry); + +/** + * Pop an entry from the queue (multi-consumer). + * + * \param hg_atomic_queue [IN/OUT] pointer to queue + * + * \return Pointer to popped object or NULL if queue is empty + */ +static HG_UTIL_INLINE void * +hg_atomic_queue_pop_mc(struct hg_atomic_queue *hg_atomic_queue); + +/** + * Pop an entry from the queue (single consumer). + * + * \param hg_atomic_queue [IN/OUT] pointer to queue + * + * \return Pointer to popped object or NULL if queue is empty + */ +static HG_UTIL_INLINE void * +hg_atomic_queue_pop_sc(struct hg_atomic_queue *hg_atomic_queue); + +/** + * Determine whether queue is empty. + * + * \param hg_atomic_queue [IN/OUT] pointer to queue + * + * \return HG_UTIL_TRUE if empty, HG_UTIL_FALSE if not + */ +static HG_UTIL_INLINE hg_util_bool_t +hg_atomic_queue_is_empty(struct hg_atomic_queue *hg_atomic_queue); + +/** + * Determine number of entries in a queue. + * + * \param hg_atomic_queue [IN/OUT] pointer to queue + * + * \return Number of entries queued or 0 if none + */ +static HG_UTIL_INLINE unsigned int +hg_atomic_queue_count(struct hg_atomic_queue *hg_atomic_queue); + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE int +hg_atomic_queue_push(struct hg_atomic_queue *hg_atomic_queue, void *entry) +{ + hg_util_int32_t prod_head, prod_next, cons_tail; + + do { + prod_head = hg_atomic_get32(&hg_atomic_queue->prod_head); + prod_next = (prod_head + 1) & (int) hg_atomic_queue->prod_mask; + cons_tail = hg_atomic_get32(&hg_atomic_queue->cons_tail); + + if (prod_next == cons_tail) { + hg_atomic_fence(); + if (prod_head == hg_atomic_get32(&hg_atomic_queue->prod_head) && + cons_tail == hg_atomic_get32(&hg_atomic_queue->cons_tail)) { + hg_atomic_queue->drops++; + /* Full */ + return HG_UTIL_FAIL; + } + continue; + } + } while ( + !hg_atomic_cas32(&hg_atomic_queue->prod_head, prod_head, prod_next)); + + hg_atomic_set64(&hg_atomic_queue->ring[prod_head], (hg_util_int64_t) entry); + + /* + * If there are other enqueues in progress + * that preceded us, we need to wait for them + * to complete + */ + while (hg_atomic_get32(&hg_atomic_queue->prod_tail) != prod_head) + cpu_spinwait(); + + hg_atomic_set32(&hg_atomic_queue->prod_tail, prod_next); + + return HG_UTIL_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE void * +hg_atomic_queue_pop_mc(struct hg_atomic_queue *hg_atomic_queue) +{ + hg_util_int32_t cons_head, cons_next; + void *entry = NULL; + + do { + cons_head = hg_atomic_get32(&hg_atomic_queue->cons_head); + cons_next = (cons_head + 1) & (int) hg_atomic_queue->cons_mask; + + if (cons_head == hg_atomic_get32(&hg_atomic_queue->prod_tail)) + return NULL; + } while ( + !hg_atomic_cas32(&hg_atomic_queue->cons_head, cons_head, cons_next)); + + entry = (void *) hg_atomic_get64(&hg_atomic_queue->ring[cons_head]); + + /* + * If there are other dequeues in progress + * that preceded us, we need to wait for them + * to complete + */ + while (hg_atomic_get32(&hg_atomic_queue->cons_tail) != cons_head) + cpu_spinwait(); + + hg_atomic_set32(&hg_atomic_queue->cons_tail, cons_next); + + return entry; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE void * +hg_atomic_queue_pop_sc(struct hg_atomic_queue *hg_atomic_queue) +{ + hg_util_int32_t cons_head, cons_next; + hg_util_int32_t prod_tail; + void *entry = NULL; + + cons_head = hg_atomic_get32(&hg_atomic_queue->cons_head); + prod_tail = hg_atomic_get32(&hg_atomic_queue->prod_tail); + cons_next = (cons_head + 1) & (int) hg_atomic_queue->cons_mask; + + if (cons_head == prod_tail) + /* Empty */ + return NULL; + + hg_atomic_set32(&hg_atomic_queue->cons_head, cons_next); + + entry = (void *) hg_atomic_get64(&hg_atomic_queue->ring[cons_head]); + + hg_atomic_set32(&hg_atomic_queue->cons_tail, cons_next); + + return entry; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE hg_util_bool_t +hg_atomic_queue_is_empty(struct hg_atomic_queue *hg_atomic_queue) +{ + return (hg_atomic_get32(&hg_atomic_queue->cons_head) == + hg_atomic_get32(&hg_atomic_queue->prod_tail)); +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE unsigned int +hg_atomic_queue_count(struct hg_atomic_queue *hg_atomic_queue) +{ + return ((hg_atomic_queue->prod_size + + (unsigned int) hg_atomic_get32(&hg_atomic_queue->prod_tail) - + (unsigned int) hg_atomic_get32(&hg_atomic_queue->cons_tail)) & + hg_atomic_queue->prod_mask); +} + +#ifdef __cplusplus +} +#endif + +#endif /* MERCURY_ATOMIC_QUEUE_H */ diff --git a/src/mercury/mercury_event.c b/src/mercury/mercury_event.c new file mode 100644 index 0000000..42f4533 --- /dev/null +++ b/src/mercury/mercury_event.c @@ -0,0 +1,72 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +#include "mercury_event.h" + +#include "mercury_util_error.h" + +/*---------------------------------------------------------------------------*/ +int +hg_event_create(void) +{ + int fd = -1; +#if defined(_WIN32) + +#elif defined(HG_UTIL_HAS_SYSEVENTFD_H) + /* Create local signal event on self address */ + fd = eventfd(0, EFD_NONBLOCK | EFD_SEMAPHORE); + HG_UTIL_CHECK_ERROR_NORET( + fd == -1, done, "eventfd() failed (%s)", strerror(errno)); +#elif defined(HG_UTIL_HAS_SYSEVENT_H) + struct kevent kev; + struct timespec timeout = {0, 0}; + int rc; + + /* Create kqueue */ + fd = kqueue(); + HG_UTIL_CHECK_ERROR_NORET( + fd == -1, done, "kqueue() failed (%s)", strerror(errno)); + + EV_SET(&kev, HG_EVENT_IDENT, EVFILT_USER, EV_ADD | EV_CLEAR, 0, 0, NULL); + + /* Add user-defined event to kqueue */ + rc = kevent(fd, &kev, 1, NULL, 0, &timeout); + HG_UTIL_CHECK_ERROR_NORET( + rc == -1, error, "kevent() failed (%s)", strerror(errno)); +#else + +#endif + +done: + return fd; + +#if defined(HG_UTIL_HAS_SYSEVENT_H) +error: + hg_event_destroy(fd); + + return -1; +#endif +} + +/*---------------------------------------------------------------------------*/ +int +hg_event_destroy(int fd) +{ + int ret = HG_UTIL_SUCCESS, rc; +#if defined(_WIN32) + +#else + rc = close(fd); + HG_UTIL_CHECK_ERROR(rc == -1, done, ret, HG_UTIL_FAIL, + "close() failed (%s)", strerror(errno)); +#endif +done: + return ret; +} diff --git a/src/mercury/mercury_event.h b/src/mercury/mercury_event.h new file mode 100644 index 0000000..48175b1 --- /dev/null +++ b/src/mercury/mercury_event.h @@ -0,0 +1,184 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +#ifndef MERCURY_EVENT_H +#define MERCURY_EVENT_H + +#include "mercury_util_config.h" + +#ifdef _WIN32 + +#else +# include +# include +# include +# if defined(HG_UTIL_HAS_SYSEVENTFD_H) +# include +# ifndef HG_UTIL_HAS_EVENTFD_T +typedef uint64_t eventfd_t; +# endif +# elif defined(HG_UTIL_HAS_SYSEVENT_H) +# include +# define HG_EVENT_IDENT 42 /* User-defined ident */ +# endif +#endif + +/** + * Purpose: define an event object that can be used as an event + * wait/notify mechanism. + */ + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Create a new event object. + * + * \return file descriptor on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_event_create(void); + +/** + * Destroy an event object. + * + * \param fd [IN] event file descriptor + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_event_destroy(int fd); + +/** + * Notify for event. + * + * \param fd [IN] event file descriptor + * + * \return Non-negative on success or negative on failure + */ +static HG_UTIL_INLINE int +hg_event_set(int fd); + +/** + * Get event notification. + * + * \param fd [IN] event file descriptor + * \param notified [IN] boolean set to HG_UTIL_TRUE if event received + * + * \return Non-negative on success or negative on failure + */ +static HG_UTIL_INLINE int +hg_event_get(int fd, hg_util_bool_t *notified); + +/*---------------------------------------------------------------------------*/ +#if defined(_WIN32) +/* TODO */ +#elif defined(HG_UTIL_HAS_SYSEVENTFD_H) +# ifdef HG_UTIL_HAS_EVENTFD_T +static HG_UTIL_INLINE int +hg_event_set(int fd) +{ + return (eventfd_write(fd, 1) == 0) ? HG_UTIL_SUCCESS : HG_UTIL_FAIL; +} +# else +static HG_UTIL_INLINE int +hg_event_set(int fd) +{ + eventfd_t count = 1; + ssize_t s = write(fd, &count, sizeof(eventfd_t)); + + return (s == sizeof(eventfd_t)) ? HG_UTIL_SUCCESS : HG_UTIL_FAIL; +} +# endif +#elif defined(HG_UTIL_HAS_SYSEVENT_H) +static HG_UTIL_INLINE int +hg_event_set(int fd) +{ + struct kevent kev; + struct timespec timeout = {0, 0}; + int rc; + + EV_SET(&kev, HG_EVENT_IDENT, EVFILT_USER, 0, NOTE_TRIGGER, 0, NULL); + + /* Trigger user-defined event */ + rc = kevent(fd, &kev, 1, NULL, 0, &timeout); + + return (rc == -1) ? HG_UTIL_FAIL : HG_UTIL_SUCCESS; +} +#else +# error "Not supported on this platform." +#endif + +/*---------------------------------------------------------------------------*/ +#if defined(_WIN32) +#elif defined(HG_UTIL_HAS_SYSEVENTFD_H) +# ifdef HG_UTIL_HAS_EVENTFD_T +static HG_UTIL_INLINE int +hg_event_get(int fd, hg_util_bool_t *signaled) +{ + eventfd_t count = 0; + + if ((eventfd_read(fd, &count) == 0) && count) + *signaled = HG_UTIL_TRUE; + else { + if (errno == EAGAIN) + *signaled = HG_UTIL_FALSE; + else + return HG_UTIL_FAIL; + } + + return HG_UTIL_SUCCESS; +} +# else +static HG_UTIL_INLINE int +hg_event_get(int fd, hg_util_bool_t *signaled) +{ + eventfd_t count = 0; + ssize_t s = read(fd, &count, sizeof(eventfd_t)); + if ((s == sizeof(eventfd_t)) && count) + *signaled = HG_UTIL_TRUE; + else { + if (errno == EAGAIN) + *signaled = HG_UTIL_FALSE; + else + return HG_UTIL_FAIL; + } + + return HG_UTIL_SUCCESS; +} +# endif +#elif defined(HG_UTIL_HAS_SYSEVENT_H) +static HG_UTIL_INLINE int +hg_event_get(int fd, hg_util_bool_t *signaled) +{ + struct kevent kev; + int nfds; + struct timespec timeout = {0, 0}; + + /* Check user-defined event */ + nfds = kevent(fd, NULL, 0, &kev, 1, &timeout); + if (nfds == -1) + return HG_UTIL_FAIL; + + *signaled = ((nfds > 0) && (kev.ident == HG_EVENT_IDENT)) ? HG_UTIL_TRUE + : HG_UTIL_FALSE; + + return HG_UTIL_SUCCESS; +} +#else +# error "Not supported on this platform." +#endif + +#ifdef __cplusplus +} +#endif + +#endif /* MERCURY_EVENT_H */ diff --git a/src/mercury/mercury_hash_string.h b/src/mercury/mercury_hash_string.h new file mode 100644 index 0000000..878fd4f --- /dev/null +++ b/src/mercury/mercury_hash_string.h @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +#ifndef MERCURY_HASH_STRING_H +#define MERCURY_HASH_STRING_H + +#include "mercury_util_config.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Hash function name for unique ID to register. + * + * \param string [IN] string name + * + * \return Non-negative ID that corresponds to string name + */ +static HG_UTIL_INLINE unsigned int +hg_hash_string(const char *string) +{ + /* This is the djb2 string hash function */ + + unsigned int result = 5381; + const unsigned char *p; + + p = (const unsigned char *) string; + + while (*p != '\0') { + result = (result << 5) + result + *p; + ++p; + } + return result; +} + +#ifdef __cplusplus +} +#endif + +#endif /* MERCURY_HASH_STRING_H */ diff --git a/src/mercury/mercury_hash_table.c b/src/mercury/mercury_hash_table.c new file mode 100644 index 0000000..d7d14df --- /dev/null +++ b/src/mercury/mercury_hash_table.c @@ -0,0 +1,526 @@ +/* + +Copyright (c) 2005-2008, Simon Howard + +Permission to use, copy, modify, and/or distribute this software +for any purpose with or without fee is hereby granted, provided +that the above copyright notice and this permission notice appear +in all copies. + +THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL +WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE +AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR +CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM +LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, +NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN +CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + + */ + +/* Hash table implementation */ + +#include "mercury_hash_table.h" + +#include +#include + +struct hg_hash_table_entry { + hg_hash_table_key_t key; + hg_hash_table_value_t value; + hg_hash_table_entry_t *next; +}; + +struct hg_hash_table { + hg_hash_table_entry_t **table; + unsigned int table_size; + hg_hash_table_hash_func_t hash_func; + hg_hash_table_equal_func_t equal_func; + hg_hash_table_key_free_func_t key_free_func; + hg_hash_table_value_free_func_t value_free_func; + unsigned int entries; + unsigned int prime_index; +}; + +/* This is a set of good hash table prime numbers, from: + * http://planetmath.org/goodhashtableprimes + * Each prime is roughly double the previous value, and as far as + * possible from the nearest powers of two. */ + +static const unsigned int hash_table_primes[] = { + 193, + 389, + 769, + 1543, + 3079, + 6151, + 12289, + 24593, + 49157, + 98317, + 196613, + 393241, + 786433, + 1572869, + 3145739, + 6291469, + 12582917, + 25165843, + 50331653, + 100663319, + 201326611, + 402653189, + 805306457, + 1610612741, +}; + +static const unsigned int hash_table_num_primes = + sizeof(hash_table_primes) / sizeof(int); + +/* Internal function used to allocate the table on hash table creation + * and when enlarging the table */ + +static int +hash_table_allocate_table(hg_hash_table_t *hash_table) +{ + unsigned int new_table_size; + + /* Determine the table size based on the current prime index. + * An attempt is made here to ensure sensible behavior if the + * maximum prime is exceeded, but in practice other things are + * likely to break long before that happens. */ + + if (hash_table->prime_index < hash_table_num_primes) { + new_table_size = hash_table_primes[hash_table->prime_index]; + } else { + new_table_size = hash_table->entries * 10; + } + + hash_table->table_size = new_table_size; + + /* Allocate the table and initialise to NULL for all entries */ + + hash_table->table = (hg_hash_table_entry_t **) calloc( + hash_table->table_size, sizeof(hg_hash_table_entry_t *)); + + return hash_table->table != NULL; +} + +/* Free an entry, calling the free functions if there are any registered */ + +static void +hash_table_free_entry(hg_hash_table_t *hash_table, hg_hash_table_entry_t *entry) +{ + /* If there is a function registered for freeing keys, use it to free + * the key */ + + if (hash_table->key_free_func != NULL) { + hash_table->key_free_func(entry->key); + } + + /* Likewise with the value */ + + if (hash_table->value_free_func != NULL) { + hash_table->value_free_func(entry->value); + } + + /* Free the data structure */ + + free(entry); +} + +hg_hash_table_t * +hg_hash_table_new( + hg_hash_table_hash_func_t hash_func, hg_hash_table_equal_func_t equal_func) +{ + hg_hash_table_t *hash_table; + + /* Allocate a new hash table structure */ + + hash_table = (hg_hash_table_t *) malloc(sizeof(hg_hash_table_t)); + + if (hash_table == NULL) { + return NULL; + } + + hash_table->hash_func = hash_func; + hash_table->equal_func = equal_func; + hash_table->key_free_func = NULL; + hash_table->value_free_func = NULL; + hash_table->entries = 0; + hash_table->prime_index = 0; + + /* Allocate the table */ + + if (!hash_table_allocate_table(hash_table)) { + free(hash_table); + + return NULL; + } + + return hash_table; +} + +void +hg_hash_table_free(hg_hash_table_t *hash_table) +{ + hg_hash_table_entry_t *rover; + hg_hash_table_entry_t *next; + unsigned int i; + + /* Free all entries in all chains */ + + for (i = 0; i < hash_table->table_size; ++i) { + rover = hash_table->table[i]; + while (rover != NULL) { + next = rover->next; + hash_table_free_entry(hash_table, rover); + rover = next; + } + } + + /* Free the table */ + + free(hash_table->table); + + /* Free the hash table structure */ + + free(hash_table); +} + +void +hg_hash_table_register_free_functions(hg_hash_table_t *hash_table, + hg_hash_table_key_free_func_t key_free_func, + hg_hash_table_value_free_func_t value_free_func) +{ + hash_table->key_free_func = key_free_func; + hash_table->value_free_func = value_free_func; +} + +static int +hash_table_enlarge(hg_hash_table_t *hash_table) +{ + hg_hash_table_entry_t **old_table; + unsigned int old_table_size; + unsigned int old_prime_index; + hg_hash_table_entry_t *rover; + hg_hash_table_entry_t *next; + unsigned int entry_index; + unsigned int i; + + /* Store a copy of the old table */ + + old_table = hash_table->table; + old_table_size = hash_table->table_size; + old_prime_index = hash_table->prime_index; + + /* Allocate a new, larger table */ + + ++hash_table->prime_index; + + if (!hash_table_allocate_table(hash_table)) { + + /* Failed to allocate the new table */ + + hash_table->table = old_table; + hash_table->table_size = old_table_size; + hash_table->prime_index = old_prime_index; + + return 0; + } + + /* Link all entries from all chains into the new table */ + + for (i = 0; i < old_table_size; ++i) { + rover = old_table[i]; + + while (rover != NULL) { + next = rover->next; + + /* Find the index into the new table */ + + entry_index = + hash_table->hash_func(rover->key) % hash_table->table_size; + + /* Link this entry into the chain */ + + rover->next = hash_table->table[entry_index]; + hash_table->table[entry_index] = rover; + + /* Advance to next in the chain */ + + rover = next; + } + } + + /* Free the old table */ + + free(old_table); + + return 1; +} + +int +hg_hash_table_insert(hg_hash_table_t *hash_table, hg_hash_table_key_t key, + hg_hash_table_value_t value) +{ + hg_hash_table_entry_t *rover; + hg_hash_table_entry_t *newentry; + unsigned int entry_index; + + /* If there are too many items in the table with respect to the table + * size, the number of hash collisions increases and performance + * decreases. Enlarge the table size to prevent this happening */ + + if ((hash_table->entries * 3) / hash_table->table_size > 0) { + + /* Table is more than 1/3 full */ + + if (!hash_table_enlarge(hash_table)) { + + /* Failed to enlarge the table */ + + return 0; + } + } + + /* Generate the hash of the key and hence the index into the table */ + + entry_index = hash_table->hash_func(key) % hash_table->table_size; + + /* Traverse the chain at this location and look for an existing + * entry with the same key */ + + rover = hash_table->table[entry_index]; + + while (rover != NULL) { + if (hash_table->equal_func(rover->key, key) != 0) { + + /* Same key: overwrite this entry with new data */ + + /* If there is a value free function, free the old data + * before adding in the new data */ + + if (hash_table->value_free_func != NULL) { + hash_table->value_free_func(rover->value); + } + + /* Same with the key: use the new key value and free + * the old one */ + + if (hash_table->key_free_func != NULL) { + hash_table->key_free_func(rover->key); + } + + rover->key = key; + rover->value = value; + + /* Finished */ + + return 1; + } + rover = rover->next; + } + + /* Not in the hash table yet. Create a new entry */ + + newentry = (hg_hash_table_entry_t *) malloc(sizeof(hg_hash_table_entry_t)); + + if (newentry == NULL) { + return 0; + } + + newentry->key = key; + newentry->value = value; + + /* Link into the list */ + + newentry->next = hash_table->table[entry_index]; + hash_table->table[entry_index] = newentry; + + /* Maintain the count of the number of entries */ + + ++hash_table->entries; + + /* Added successfully */ + + return 1; +} + +hg_hash_table_value_t +hg_hash_table_lookup(hg_hash_table_t *hash_table, hg_hash_table_key_t key) +{ + hg_hash_table_entry_t *rover; + unsigned int entry_index; + + /* Generate the hash of the key and hence the index into the table */ + + entry_index = hash_table->hash_func(key) % hash_table->table_size; + + /* Walk the chain at this index until the corresponding entry is + * found */ + + rover = hash_table->table[entry_index]; + + while (rover != NULL) { + if (hash_table->equal_func(key, rover->key) != 0) { + + /* Found the entry. Return the data. */ + + return rover->value; + } + rover = rover->next; + } + + /* Not found */ + + return HG_HASH_TABLE_NULL; +} + +int +hg_hash_table_remove(hg_hash_table_t *hash_table, hg_hash_table_key_t key) +{ + hg_hash_table_entry_t **rover; + hg_hash_table_entry_t *entry; + unsigned int entry_index; + int result; + + /* Generate the hash of the key and hence the index into the table */ + + entry_index = hash_table->hash_func(key) % hash_table->table_size; + + /* Rover points at the pointer which points at the current entry + * in the chain being inspected. ie. the entry in the table, or + * the "next" pointer of the previous entry in the chain. This + * allows us to unlink the entry when we find it. */ + + result = 0; + rover = &hash_table->table[entry_index]; + + while (*rover != NULL) { + + if (hash_table->equal_func(key, (*rover)->key) != 0) { + + /* This is the entry to remove */ + + entry = *rover; + + /* Unlink from the list */ + + *rover = entry->next; + + /* Destroy the entry structure */ + + hash_table_free_entry(hash_table, entry); + + /* Track count of entries */ + + --hash_table->entries; + + result = 1; + + break; + } + + /* Advance to the next entry */ + + rover = &((*rover)->next); + } + + return result; +} + +unsigned int +hg_hash_table_num_entries(hg_hash_table_t *hash_table) +{ + return hash_table->entries; +} + +void +hg_hash_table_iterate( + hg_hash_table_t *hash_table, hg_hash_table_iter_t *iterator) +{ + unsigned int chain; + + iterator->hash_table = hash_table; + + /* Default value of next if no entries are found. */ + + iterator->next_entry = NULL; + + /* Find the first entry */ + + for (chain = 0; chain < hash_table->table_size; ++chain) { + + if (hash_table->table[chain] != NULL) { + iterator->next_entry = hash_table->table[chain]; + iterator->next_chain = chain; + break; + } + } +} + +int +hg_hash_table_iter_has_more(hg_hash_table_iter_t *iterator) +{ + return iterator->next_entry != NULL; +} + +hg_hash_table_value_t +hg_hash_table_iter_next(hg_hash_table_iter_t *iterator) +{ + hg_hash_table_entry_t *current_entry; + hg_hash_table_t *hash_table; + hg_hash_table_value_t result; + unsigned int chain; + + hash_table = iterator->hash_table; + + /* No more entries? */ + + if (iterator->next_entry == NULL) { + return HG_HASH_TABLE_NULL; + } + + /* Result is immediately available */ + + current_entry = iterator->next_entry; + result = current_entry->value; + + /* Find the next entry */ + + if (current_entry->next != NULL) { + + /* Next entry in current chain */ + + iterator->next_entry = current_entry->next; + + } else { + + /* None left in this chain, so advance to the next chain */ + + chain = iterator->next_chain + 1; + + /* Default value if no next chain found */ + + iterator->next_entry = NULL; + + while (chain < hash_table->table_size) { + + /* Is there anything in this chain? */ + + if (hash_table->table[chain] != NULL) { + iterator->next_entry = hash_table->table[chain]; + break; + } + + /* Try the next chain */ + + ++chain; + } + + iterator->next_chain = chain; + } + + return result; +} diff --git a/src/mercury/mercury_hash_table.h b/src/mercury/mercury_hash_table.h new file mode 100644 index 0000000..619857b --- /dev/null +++ b/src/mercury/mercury_hash_table.h @@ -0,0 +1,252 @@ +/* + +Copyright (c) 2005-2008, Simon Howard + +Permission to use, copy, modify, and/or distribute this software +for any purpose with or without fee is hereby granted, provided +that the above copyright notice and this permission notice appear +in all copies. + +THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL +WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE +AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR +CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM +LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, +NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN +CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + + */ + +/** + * \file mercury_hash_table.h + * + * \brief Hash table. + * + * A hash table stores a set of values which can be addressed by a + * key. Given the key, the corresponding value can be looked up + * quickly. + * + * To create a hash table, use \ref hg_hash_table_new. To destroy a + * hash table, use \ref hg_hash_table_free. + * + * To insert a value into a hash table, use \ref hg_hash_table_insert. + * + * To remove a value from a hash table, use \ref hg_hash_table_remove. + * + * To look up a value by its key, use \ref hg_hash_table_lookup. + * + * To iterate over all values in a hash table, use + * \ref hg_hash_table_iterate to initialize a \ref hg_hash_table_iter + * structure. Each value can then be read in turn using + * \ref hg_hash_table_iter_next and \ref hg_hash_table_iter_has_more. + */ + +#ifndef HG_HASH_TABLE_H +#define HG_HASH_TABLE_H + +#include "mercury_util_config.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * A hash table structure. + */ + +typedef struct hg_hash_table hg_hash_table_t; + +/** + * Structure used to iterate over a hash table. + */ + +typedef struct hg_hash_table_iter hg_hash_table_iter_t; + +/** + * Internal structure representing an entry in a hash table. + */ + +typedef struct hg_hash_table_entry hg_hash_table_entry_t; + +/** + * A key to look up a value in a \ref hg_hash_table_t. + */ + +typedef void *hg_hash_table_key_t; + +/** + * A value stored in a \ref hg_hash_table_t. + */ + +typedef void *hg_hash_table_value_t; + +/** + * Definition of a \ref hg_hash_table_iter. + */ + +struct hg_hash_table_iter { + hg_hash_table_t *hash_table; + hg_hash_table_entry_t *next_entry; + unsigned int next_chain; +}; + +/** + * A null \ref HashTableValue. + */ + +#define HG_HASH_TABLE_NULL ((void *) 0) + +/** + * Hash function used to generate hash values for keys used in a hash + * table. + * + * \param value The value to generate a hash value for. + * \return The hash value. + */ + +typedef unsigned int (*hg_hash_table_hash_func_t)(hg_hash_table_key_t value); + +/** + * Function used to compare two keys for equality. + * + * \return Non-zero if the two keys are equal, zero if the keys are + * not equal. + */ + +typedef int (*hg_hash_table_equal_func_t)( + hg_hash_table_key_t value1, hg_hash_table_key_t value2); + +/** + * Type of function used to free keys when entries are removed from a + * hash table. + */ + +typedef void (*hg_hash_table_key_free_func_t)(hg_hash_table_key_t value); + +/** + * Type of function used to free values when entries are removed from a + * hash table. + */ + +typedef void (*hg_hash_table_value_free_func_t)(hg_hash_table_value_t value); + +/** + * Create a new hash table. + * + * \param hash_func Function used to generate hash keys for the + * keys used in the table. + * \param equal_func Function used to test keys used in the table + * for equality. + * \return A new hash table structure, or NULL if it + * was not possible to allocate the new hash + * table. + */ +HG_UTIL_PUBLIC hg_hash_table_t * +hg_hash_table_new( + hg_hash_table_hash_func_t hash_func, hg_hash_table_equal_func_t equal_func); + +/** + * Destroy a hash table. + * + * \param hash_table The hash table to destroy. + */ +HG_UTIL_PUBLIC void +hg_hash_table_free(hg_hash_table_t *hash_table); + +/** + * Register functions used to free the key and value when an entry is + * removed from a hash table. + * + * \param hash_table The hash table. + * \param key_free_func Function used to free keys. + * \param value_free_func Function used to free values. + */ +HG_UTIL_PUBLIC void +hg_hash_table_register_free_functions(hg_hash_table_t *hash_table, + hg_hash_table_key_free_func_t key_free_func, + hg_hash_table_value_free_func_t value_free_func); + +/** + * Insert a value into a hash table, overwriting any existing entry + * using the same key. + * + * \param hash_table The hash table. + * \param key The key for the new value. + * \param value The value to insert. + * \return Non-zero if the value was added successfully, + * or zero if it was not possible to allocate + * memory for the new entry. + */ +HG_UTIL_PUBLIC int +hg_hash_table_insert(hg_hash_table_t *hash_table, hg_hash_table_key_t key, + hg_hash_table_value_t value); + +/** + * Look up a value in a hash table by key. + * + * \param hash_table The hash table. + * \param key The key of the value to look up. + * \return The value, or \ref HASH_TABLE_NULL if there + * is no value with that key in the hash table. + */ +HG_UTIL_PUBLIC hg_hash_table_value_t +hg_hash_table_lookup(hg_hash_table_t *hash_table, hg_hash_table_key_t key); + +/** + * Remove a value from a hash table. + * + * \param hash_table The hash table. + * \param key The key of the value to remove. + * \return Non-zero if a key was removed, or zero if the + * specified key was not found in the hash table. + */ +HG_UTIL_PUBLIC int +hg_hash_table_remove(hg_hash_table_t *hash_table, hg_hash_table_key_t key); + +/** + * Retrieve the number of entries in a hash table. + * + * \param hash_table The hash table. + * \return The number of entries in the hash table. + */ +HG_UTIL_PUBLIC unsigned int +hg_hash_table_num_entries(hg_hash_table_t *hash_table); + +/** + * Initialise a \ref HashTableIterator to iterate over a hash table. + * + * \param hash_table The hash table. + * \param iter Pointer to an iterator structure to + * initialise. + */ +HG_UTIL_PUBLIC void +hg_hash_table_iterate(hg_hash_table_t *hash_table, hg_hash_table_iter_t *iter); + +/** + * Determine if there are more keys in the hash table to iterate over. + * + * \param iterator The hash table iterator. + * \return Zero if there are no more values to iterate + * over, non-zero if there are more values to + * iterate over. + */ +HG_UTIL_PUBLIC int +hg_hash_table_iter_has_more(hg_hash_table_iter_t *iterator); + +/** + * Using a hash table iterator, retrieve the next key. + * + * \param iterator The hash table iterator. + * \return The next key from the hash table, or + * \ref HG_HASH_TABLE_NULL if there are no more + * keys to iterate over. + */ +HG_UTIL_PUBLIC hg_hash_table_value_t +hg_hash_table_iter_next(hg_hash_table_iter_t *iterator); + +#ifdef __cplusplus +} +#endif + +#endif /* HG_HASH_TABLE_H */ diff --git a/src/mercury/mercury_list.h b/src/mercury/mercury_list.h new file mode 100644 index 0000000..5a29e4a --- /dev/null +++ b/src/mercury/mercury_list.h @@ -0,0 +1,126 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +/* Code below is derived from sys/queue.h which follows the below notice: + * + * Copyright (c) 1991, 1993 + * The Regents of the University of California. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the University nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + * + * @(#)queue.h 8.5 (Berkeley) 8/20/94 + */ + +#ifndef MERCURY_LIST_H +#define MERCURY_LIST_H + +#define HG_LIST_HEAD_INITIALIZER(name) \ + { \ + NULL \ + } + +#define HG_LIST_HEAD_INIT(struct_head_name, var_name) \ + struct struct_head_name var_name = HG_LIST_HEAD_INITIALIZER(var_name) + +#define HG_LIST_HEAD_DECL(struct_head_name, struct_entry_name) \ + struct struct_head_name { \ + struct struct_entry_name *head; \ + } + +#define HG_LIST_HEAD(struct_entry_name) \ + struct { \ + struct struct_entry_name *head; \ + } + +#define HG_LIST_ENTRY(struct_entry_name) \ + struct { \ + struct struct_entry_name *next; \ + struct struct_entry_name **prev; \ + } + +#define HG_LIST_INIT(head_ptr) \ + do { \ + (head_ptr)->head = NULL; \ + } while (/*CONSTCOND*/ 0) + +#define HG_LIST_IS_EMPTY(head_ptr) ((head_ptr)->head == NULL) + +#define HG_LIST_FIRST(head_ptr) ((head_ptr)->head) + +#define HG_LIST_NEXT(entry_ptr, entry_field_name) \ + ((entry_ptr)->entry_field_name.next) + +#define HG_LIST_INSERT_AFTER(list_entry_ptr, entry_ptr, entry_field_name) \ + do { \ + if (((entry_ptr)->entry_field_name.next = \ + (list_entry_ptr)->entry_field_name.next) != NULL) \ + (list_entry_ptr)->entry_field_name.next->entry_field_name.prev = \ + &(entry_ptr)->entry_field_name.next; \ + (list_entry_ptr)->entry_field_name.next = (entry_ptr); \ + (entry_ptr)->entry_field_name.prev = \ + &(list_entry_ptr)->entry_field_name.next; \ + } while (/*CONSTCOND*/ 0) + +#define HG_LIST_INSERT_BEFORE(list_entry_ptr, entry_ptr, entry_field_name) \ + do { \ + (entry_ptr)->entry_field_name.prev = \ + (list_entry_ptr)->entry_field_name.prev; \ + (entry_ptr)->entry_field_name.next = (list_entry_ptr); \ + *(list_entry_ptr)->entry_field_name.prev = (entry_ptr); \ + (list_entry_ptr)->entry_field_name.prev = \ + &(entry_ptr)->entry_field_name.next; \ + } while (/*CONSTCOND*/ 0) + +#define HG_LIST_INSERT_HEAD(head_ptr, entry_ptr, entry_field_name) \ + do { \ + if (((entry_ptr)->entry_field_name.next = (head_ptr)->head) != NULL) \ + (head_ptr)->head->entry_field_name.prev = \ + &(entry_ptr)->entry_field_name.next; \ + (head_ptr)->head = (entry_ptr); \ + (entry_ptr)->entry_field_name.prev = &(head_ptr)->head; \ + } while (/*CONSTCOND*/ 0) + +/* TODO would be nice to not have any condition */ +#define HG_LIST_REMOVE(entry_ptr, entry_field_name) \ + do { \ + if ((entry_ptr)->entry_field_name.next != NULL) \ + (entry_ptr)->entry_field_name.next->entry_field_name.prev = \ + (entry_ptr)->entry_field_name.prev; \ + *(entry_ptr)->entry_field_name.prev = \ + (entry_ptr)->entry_field_name.next; \ + } while (/*CONSTCOND*/ 0) + +#define HG_LIST_FOREACH(var, head_ptr, entry_field_name) \ + for ((var) = ((head_ptr)->head); (var); \ + (var) = ((var)->entry_field_name.next)) + +#endif /* MERCURY_LIST_H */ diff --git a/src/mercury/mercury_log.c b/src/mercury/mercury_log.c new file mode 100644 index 0000000..84c468a --- /dev/null +++ b/src/mercury/mercury_log.c @@ -0,0 +1,128 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +#include "mercury_log.h" + +#include + +/****************/ +/* Local Macros */ +/****************/ + +#define HG_LOG_MAX_BUF 256 + +#ifdef HG_UTIL_HAS_LOG_COLOR +# define HG_LOG_ESC "\033" +# define HG_LOG_RESET HG_LOG_ESC "[0m" +# define HG_LOG_REG HG_LOG_ESC "[0;" +# define HG_LOG_BOLD HG_LOG_ESC "[1;" +# define HG_LOG_RED "31m" +# define HG_LOG_GREEN "32m" +# define HG_LOG_YELLOW "33m" +# define HG_LOG_BLUE "34m" +# define HG_LOG_MAGENTA "35m" +# define HG_LOG_CYAN "36m" +#endif + +/*******************/ +/* Local Variables */ +/*******************/ + +static int (*hg_log_func_g)(FILE *stream, const char *format, ...) = fprintf; +static FILE *hg_log_stream_debug_g = NULL; +static FILE *hg_log_stream_warning_g = NULL; +static FILE *hg_log_stream_error_g = NULL; + +/*---------------------------------------------------------------------------*/ +void +hg_log_set_func(int (*log_func)(FILE *stream, const char *format, ...)) +{ + hg_log_func_g = log_func; +} + +/*---------------------------------------------------------------------------*/ +void +hg_log_set_stream_debug(FILE *stream) +{ + hg_log_stream_debug_g = stream; +} + +/*---------------------------------------------------------------------------*/ +void +hg_log_set_stream_warning(FILE *stream) +{ + hg_log_stream_warning_g = stream; +} + +/*---------------------------------------------------------------------------*/ +void +hg_log_set_stream_error(FILE *stream) +{ + hg_log_stream_error_g = stream; +} + +/*---------------------------------------------------------------------------*/ +void +hg_log_write(unsigned int log_type, const char *module, const char *file, + unsigned int line, const char *func, const char *format, ...) +{ + char buf[HG_LOG_MAX_BUF]; + FILE *stream = NULL; + const char *msg_type = NULL; +#ifdef HG_UTIL_HAS_LOG_COLOR + const char *color = ""; +#endif + va_list ap; + + switch (log_type) { + case HG_LOG_TYPE_DEBUG: +#ifdef HG_UTIL_HAS_LOG_COLOR + color = HG_LOG_BLUE; +#endif + stream = hg_log_stream_debug_g ? hg_log_stream_debug_g : stdout; + msg_type = "Debug"; + break; + case HG_LOG_TYPE_WARNING: +#ifdef HG_UTIL_HAS_LOG_COLOR + color = HG_LOG_MAGENTA; +#endif + stream = hg_log_stream_warning_g ? hg_log_stream_warning_g : stdout; + msg_type = "Warning"; + break; + case HG_LOG_TYPE_ERROR: +#ifdef HG_UTIL_HAS_LOG_COLOR + color = HG_LOG_RED; +#endif + stream = hg_log_stream_error_g ? hg_log_stream_error_g : stderr; + msg_type = "Error"; + break; + default: + return; + }; + + va_start(ap, format); + vsnprintf(buf, HG_LOG_MAX_BUF, format, ap); + va_end(ap); + +/* Print using logging function */ +#ifdef HG_UTIL_HAS_LOG_COLOR + hg_log_func_g(stream, + "# %s%s[%s -- %s%s%s%s%s -- %s:%d]%s\n" + "## %s%s%s()%s: %s\n", + HG_LOG_REG, color, module, HG_LOG_BOLD, color, msg_type, HG_LOG_REG, + color, file, line, HG_LOG_RESET, HG_LOG_REG, HG_LOG_YELLOW, func, + HG_LOG_RESET, buf); +#else + hg_log_func_g(stream, + "# %s -- %s -- %s:%d\n" + " # %s(): %s\n", + module, msg_type, file, line, func, buf); +#endif +} diff --git a/src/mercury/mercury_log.h b/src/mercury/mercury_log.h new file mode 100644 index 0000000..9ee48f2 --- /dev/null +++ b/src/mercury/mercury_log.h @@ -0,0 +1,104 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +#ifndef MERCURY_LOG_H +#define MERCURY_LOG_H + +#include "mercury_util_config.h" + +#include + +#define HG_LOG_TYPE_NONE 0 +#define HG_LOG_TYPE_DEBUG 0x01 +#define HG_LOG_TYPE_WARNING 0x02 +#define HG_LOG_TYPE_ERROR 0x04 + +/* For compatibility */ +#if defined(__STDC_VERSION__) && (__STDC_VERSION__ < 199901L) +# if defined(__GNUC__) && (__GNUC__ >= 2) +# define __func__ __FUNCTION__ +# else +# define __func__ "" +# endif +#elif defined(_WIN32) +# define __func__ __FUNCTION__ +#endif + +#define HG_LOG_WRITE_ERROR(HG_LOG_MODULE_NAME, ...) \ + do { \ + hg_log_write(HG_LOG_TYPE_ERROR, HG_LOG_MODULE_NAME, __FILE__, \ + __LINE__, __func__, __VA_ARGS__); \ + } while (0) +#define HG_LOG_WRITE_DEBUG(HG_LOG_MODULE_NAME, ...) \ + do { \ + hg_log_write(HG_LOG_TYPE_DEBUG, HG_LOG_MODULE_NAME, __FILE__, \ + __LINE__, __func__, __VA_ARGS__); \ + } while (0) +#define HG_LOG_WRITE_WARNING(HG_LOG_MODULE_NAME, ...) \ + do { \ + hg_log_write(HG_LOG_TYPE_WARNING, HG_LOG_MODULE_NAME, __FILE__, \ + __LINE__, __func__, __VA_ARGS__); \ + } while (0) + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Set the logging function. + * + * \param log_func [IN] pointer to function + */ +HG_UTIL_PUBLIC void +hg_log_set_func(int (*log_func)(FILE *stream, const char *format, ...)); + +/** + * Set the stream for debug output. + * + * \param stream [IN/OUT] pointer to stream + */ +HG_UTIL_PUBLIC void +hg_log_set_stream_debug(FILE *stream); + +/** + * Set the stream for warning output. + * + * \param stream [IN/OUT] pointer to stream + */ +HG_UTIL_PUBLIC void +hg_log_set_stream_warning(FILE *stream); + +/** + * Set the stream for error output. + * + * \param stream [IN/OUT] pointer to stream + */ +HG_UTIL_PUBLIC void +hg_log_set_stream_error(FILE *stream); + +/** + * Write log. + * + * \param log_type [IN] log type (HG_LOG_TYPE_DEBUG, etc) + * \param module [IN] module name + * \param file [IN] file name + * \param line [IN] line number + * \param func [IN] function name + * \param format [IN] string format + */ +HG_UTIL_PUBLIC void +hg_log_write(unsigned int log_type, const char *module, const char *file, + unsigned int line, const char *func, const char *format, ...); + +#ifdef __cplusplus +} +#endif + +#endif /* MERCURY_LOG_H */ diff --git a/src/mercury/mercury_mem.c b/src/mercury/mercury_mem.c new file mode 100644 index 0000000..59a0acb --- /dev/null +++ b/src/mercury/mercury_mem.c @@ -0,0 +1,177 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +#include "mercury_mem.h" + +#include "mercury_util_error.h" + +#ifdef _WIN32 +# include +#else +# include +# include /* For O_* constants */ +# include +# include +# include /* For mode constants */ +# include +# include +#endif +#include + +/*---------------------------------------------------------------------------*/ +long +hg_mem_get_page_size(void) +{ + long page_size; + +#ifdef _WIN32 + SYSTEM_INFO system_info; + GetSystemInfo(&system_info); + page_size = system_info.dwPageSize; +#else + page_size = sysconf(_SC_PAGE_SIZE); +#endif + + return page_size; +} + +/*---------------------------------------------------------------------------*/ +void * +hg_mem_aligned_alloc(size_t alignment, size_t size) +{ + void *mem_ptr = NULL; + +#ifdef _WIN32 + mem_ptr = _aligned_malloc(size, alignment); +#else +# ifdef _ISOC11_SOURCE + mem_ptr = aligned_alloc(alignment, size); +# else + int rc = posix_memalign(&mem_ptr, alignment, size); + if (rc != 0) + return NULL; +# endif +#endif + + return mem_ptr; +} + +/*---------------------------------------------------------------------------*/ +void +hg_mem_aligned_free(void *mem_ptr) +{ +#ifdef _WIN32 + _aligned_free(mem_ptr); +#else + free(mem_ptr); +#endif +} + +/*---------------------------------------------------------------------------*/ +void * +hg_mem_shm_map(const char *name, size_t size, hg_util_bool_t create) +{ + void *mem_ptr = NULL; +#ifdef _WIN32 + HANDLE fd = INVALID_HANDLE_VALUE; + LARGE_INTEGER large = {.QuadPart = size}; + DWORD access = FILE_MAP_READ | FILE_MAP_WRITE; + BOOL rc; + + if (create) { + fd = CreateFileMappingA(INVALID_HANDLE_VALUE, 0, PAGE_READWRITE, + large.HighPart, large.LowPart, name); + HG_UTIL_CHECK_ERROR_NORET(!fd, error, "CreateFileMappingA() failed"); + } else { + fd = OpenFileMappingA(access, FALSE, name); + HG_UTIL_CHECK_ERROR_NORET(!fd, error, "OpenFileMappingA() failed"); + } + + mem_ptr = MapViewOfFile(fd, access, 0, 0, size); + HG_UTIL_CHECK_ERROR_NORET(!mem_ptr, error, "MapViewOfFile() failed"); + + /* The handle can be closed without affecting the memory mapping */ + rc = CloseHandle(fd); + HG_UTIL_CHECK_ERROR_NORET(!rc, error, "CloseHandle() failed"); +#else + int fd = 0; + int flags = O_RDWR | (create ? O_CREAT : 0); + struct stat shm_stat; + int rc; + + fd = shm_open(name, flags, S_IRUSR | S_IWUSR); + HG_UTIL_CHECK_ERROR_NORET( + fd < 0, error, "shm_open() failed (%s)", strerror(errno)); + + rc = fstat(fd, &shm_stat); + HG_UTIL_CHECK_ERROR_NORET( + rc != 0, error, "fstat() failed (%s)", strerror(errno)); + + if (shm_stat.st_size == 0) { + rc = ftruncate(fd, (off_t) size); + HG_UTIL_CHECK_ERROR_NORET( + rc != 0, error, "ftruncate() failed (%s)", strerror(errno)); + } else + HG_UTIL_CHECK_ERROR_NORET( + shm_stat.st_size < (off_t) size, error, "shm file size too small"); + + mem_ptr = mmap(NULL, size, PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0); + HG_UTIL_CHECK_ERROR_NORET( + mem_ptr == MAP_FAILED, error, "mmap() failed (%s)", strerror(errno)); + + /* The file descriptor can be closed without affecting the memory mapping */ + rc = close(fd); + HG_UTIL_CHECK_ERROR_NORET( + rc != 0, error, "close() failed (%s)", strerror(errno)); +#endif + + return mem_ptr; + +error: +#ifdef _WIN32 + if (fd) + CloseHandle(fd); +#else + if (fd > 0) + close(fd); +#endif + + return NULL; +} + +/*---------------------------------------------------------------------------*/ +int +hg_mem_shm_unmap(const char *name, void *mem_ptr, size_t size) +{ + int ret = HG_UTIL_SUCCESS; + +#ifdef _WIN32 + if (mem_ptr) { + BOOL rc = UnmapViewOfFile(mem_ptr); + HG_UTIL_CHECK_ERROR( + !rc, done, ret, HG_UTIL_FAIL, "UnmapViewOfFile() failed"); + } +#else + if (mem_ptr && mem_ptr != MAP_FAILED) { + int rc = munmap(mem_ptr, size); + HG_UTIL_CHECK_ERROR(rc != 0, done, ret, HG_UTIL_FAIL, + "munmap() failed (%s)", strerror(errno)); + } + + if (name) { + int rc = shm_unlink(name); + HG_UTIL_CHECK_ERROR(rc != 0, done, ret, HG_UTIL_FAIL, + "shm_unlink() failed (%s)", strerror(errno)); + } +#endif + +done: + return ret; +} diff --git a/src/mercury/mercury_mem.h b/src/mercury/mercury_mem.h new file mode 100644 index 0000000..c1ff53d --- /dev/null +++ b/src/mercury/mercury_mem.h @@ -0,0 +1,92 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +#ifndef MERCURY_MEM_H +#define MERCURY_MEM_H + +#include "mercury_util_config.h" + +/*************************************/ +/* Public Type and Struct Definition */ +/*************************************/ + +/*****************/ +/* Public Macros */ +/*****************/ + +#define HG_MEM_CACHE_LINE_SIZE 64 +#define HG_MEM_PAGE_SIZE 4096 + +/*********************/ +/* Public Prototypes */ +/*********************/ + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Get system default page size. + * + * \return page size on success or negative on failure + */ +HG_UTIL_PUBLIC long +hg_mem_get_page_size(void); + +/** + * Allocate size bytes and return a pointer to the allocated memory. + * The memory address will be a multiple of alignment, which must be a power of + * two, and size should be a multiple of alignment. + * + * \param alignment [IN] alignment size + * \param size [IN] total requested size + * + * \return a pointer to the allocated memory, or NULL in case of failure + */ +HG_UTIL_PUBLIC void * +hg_mem_aligned_alloc(size_t alignment, size_t size); + +/** + * Free memory allocated from hg_aligned_alloc(). + * + * \param mem_ptr [IN] pointer to allocated memory + */ +HG_UTIL_PUBLIC void +hg_mem_aligned_free(void *mem_ptr); + +/** + * Create/open a shared-memory mapped file of size \size with name \name. + * + * \param name [IN] name of mapped file + * \param size [IN] total requested size + * \param create [IN] create file if not existing + * + * \return a pointer to the mapped memory region, or NULL in case of failure + */ +HG_UTIL_PUBLIC void * +hg_mem_shm_map(const char *name, size_t size, hg_util_bool_t create); + +/** + * Unmap a previously mapped region and close the file. + * + * \param name [IN] name of mapped file + * \param mem_ptr [IN] pointer to mapped memory region + * \param size [IN] size range of the mapped region + * + * \return non-negative on success, or negative in case of failure + */ +HG_UTIL_PUBLIC int +hg_mem_shm_unmap(const char *name, void *mem_ptr, size_t size); + +#ifdef __cplusplus +} +#endif + +#endif /* MERCURY_MEM_H */ diff --git a/src/mercury/mercury_poll.c b/src/mercury/mercury_poll.c new file mode 100644 index 0000000..57f7e3e --- /dev/null +++ b/src/mercury/mercury_poll.c @@ -0,0 +1,531 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +#include "mercury_poll.h" +#include "mercury_atomic.h" +#include "mercury_event.h" +#include "mercury_list.h" +#include "mercury_thread_spin.h" +#include "mercury_util_error.h" + +#include + +#if defined(_WIN32) +/* TODO */ +#else +# include +# include +# include +# if defined(HG_UTIL_HAS_SYSEPOLL_H) +# include +# elif defined(HG_UTIL_HAS_SYSEVENT_H) +# include +# include +# else +# include +# endif +#endif /* defined(_WIN32) */ + +/****************/ +/* Local Macros */ +/****************/ + +#define HG_POLL_MAX_EVENTS 1024 + +#ifndef MIN +# define MIN(a, b) (((a) < (b)) ? (a) : (b)) +#endif + +/************************************/ +/* Local Type and Struct Definition */ +/************************************/ + +struct hg_poll_data { +#if defined(HG_UTIL_HAS_SYSEPOLL_H) + int fd; +#elif defined(HG_UTIL_HAS_SYSEVENT_H) + struct kevent kev; +#else + struct pollfd pollfd; +#endif + hg_poll_cb_t poll_cb; + void *poll_arg; + HG_LIST_ENTRY(hg_poll_data) entry; +}; + +struct hg_poll_set { + int fd; + hg_atomic_int32_t nfds; + hg_poll_try_wait_cb_t try_wait_cb; + void *try_wait_arg; + HG_LIST_HEAD(hg_poll_data) poll_data_list; + hg_thread_spin_t poll_data_list_lock; +}; + +/********************/ +/* Local Prototypes */ +/********************/ + +/*******************/ +/* Local Variables */ +/*******************/ + +/*---------------------------------------------------------------------------*/ +hg_poll_set_t * +hg_poll_create(void) +{ + struct hg_poll_set *hg_poll_set = NULL; + + hg_poll_set = malloc(sizeof(struct hg_poll_set)); + HG_UTIL_CHECK_ERROR_NORET( + hg_poll_set == NULL, error, "malloc() failed (%s)"); +#if defined(_WIN32) + /* TODO */ +#else + HG_LIST_INIT(&hg_poll_set->poll_data_list); + hg_thread_spin_init(&hg_poll_set->poll_data_list_lock); + hg_atomic_init32(&hg_poll_set->nfds, 0); + hg_poll_set->try_wait_cb = NULL; + +# if defined(HG_UTIL_HAS_SYSEPOLL_H) + hg_poll_set->fd = epoll_create1(0); + HG_UTIL_CHECK_ERROR_NORET(hg_poll_set->fd == -1, error, + "epoll_create1() failed (%s)", strerror(errno)); +# elif defined(HG_UTIL_HAS_SYSEVENT_H) + hg_poll_set->fd = kqueue(); + HG_UTIL_CHECK_ERROR_NORET( + hg_poll_set->fd == -1, error, "kqueue() failed (%s)", strerror(errno)); +# else + hg_poll_set->fd = hg_event_create(); + HG_UTIL_CHECK_ERROR_NORET(hg_poll_set->fd == -1, error, + "hg_event_create() failed (%s)", strerror(errno)); +# endif +#endif /* defined(_WIN32) */ + + return hg_poll_set; + +error: + if (hg_poll_set) { + hg_thread_spin_destroy(&hg_poll_set->poll_data_list_lock); + free(hg_poll_set); + } + return NULL; +} + +/*---------------------------------------------------------------------------*/ +int +hg_poll_destroy(hg_poll_set_t *poll_set) +{ + int ret = HG_UTIL_SUCCESS; + int rc; + + if (!poll_set) + goto done; + +#if defined(_WIN32) + /* TODO */ +#else + HG_UTIL_CHECK_ERROR(hg_atomic_get32(&poll_set->nfds), done, ret, + HG_UTIL_FAIL, "Poll set non empty"); + +# if defined(HG_UTIL_HAS_SYSEPOLL_H) || defined(HG_UTIL_HAS_SYSEVENT_H) + /* Close poll descriptor */ + rc = close(poll_set->fd); + HG_UTIL_CHECK_ERROR(rc == -1, done, ret, HG_UTIL_FAIL, + "close() failed (%s)", strerror(errno)); +# else + rc = hg_event_destroy(poll_set->fd); + HG_UTIL_CHECK_ERROR(rc == HG_UTIL_FAIL, done, ret, HG_UTIL_FAIL, + "hg_event_destroy() failed (%s)", strerror(errno)); +# endif + + hg_thread_spin_destroy(&poll_set->poll_data_list_lock); +#endif /* defined(_WIN32) */ + + free(poll_set); + +done: + return ret; +} + +/*---------------------------------------------------------------------------*/ +int +hg_poll_get_fd(hg_poll_set_t *poll_set) +{ + int fd = -1; + + HG_UTIL_CHECK_ERROR_NORET(!poll_set, done, "NULL poll set"); + +#if defined(_WIN32) + /* TODO */ +#else + fd = poll_set->fd; +#endif + +done: + return fd; +} + +/*---------------------------------------------------------------------------*/ +int +hg_poll_set_try_wait( + hg_poll_set_t *poll_set, hg_poll_try_wait_cb_t try_wait_cb, void *arg) +{ + int ret = HG_UTIL_SUCCESS; + + HG_UTIL_CHECK_ERROR(!poll_set, done, ret, HG_UTIL_FAIL, "NULL poll set"); + + poll_set->try_wait_cb = try_wait_cb; + poll_set->try_wait_arg = arg; + +done: + return ret; +} + +/*---------------------------------------------------------------------------*/ +int +hg_poll_add(hg_poll_set_t *poll_set, int fd, unsigned int flags, + hg_poll_cb_t poll_cb, void *poll_arg) +{ + struct hg_poll_data *hg_poll_data = NULL; + int ret = HG_UTIL_SUCCESS; + + HG_UTIL_CHECK_ERROR(!poll_set, done, ret, HG_UTIL_FAIL, "NULL poll set"); + + /* Allocate poll data that can hold user data and callback */ + hg_poll_data = malloc(sizeof(struct hg_poll_data)); + HG_UTIL_CHECK_ERROR( + !hg_poll_data, done, ret, HG_UTIL_FAIL, "malloc() failed (%s)"); + memset(hg_poll_data, 0, sizeof(struct hg_poll_data)); + hg_poll_data->poll_cb = poll_cb; + hg_poll_data->poll_arg = poll_arg; + + if (fd > 0) { +#if defined(_WIN32) + /* TODO */ +#elif defined(HG_UTIL_HAS_SYSEPOLL_H) + struct epoll_event ev; + uint32_t poll_flags; + int rc; + + /* Translate flags */ + switch (flags) { + case HG_POLLIN: + poll_flags = EPOLLIN; + break; + case HG_POLLOUT: + poll_flags = EPOLLOUT; + break; + default: + HG_UTIL_GOTO_ERROR(error, ret, HG_UTIL_FAIL, "Invalid flag"); + } + + hg_poll_data->fd = fd; + ev.events = poll_flags; + ev.data.ptr = hg_poll_data; + + rc = epoll_ctl(poll_set->fd, EPOLL_CTL_ADD, fd, &ev); + HG_UTIL_CHECK_ERROR(rc != 0, error, ret, HG_UTIL_FAIL, + "epoll_ctl() failed (%s)", strerror(errno)); +#elif defined(HG_UTIL_HAS_SYSEVENT_H) + struct timespec timeout = {0, 0}; + int16_t poll_flags; + int rc; + + /* Translate flags */ + switch (flags) { + case HG_POLLIN: + poll_flags = EVFILT_READ; + break; + case HG_POLLOUT: + poll_flags = EVFILT_WRITE; + break; + default: + HG_UTIL_GOTO_ERROR(error, ret, HG_UTIL_FAIL, "Invalid flag"); + } + + EV_SET(&hg_poll_data->kev, (uintptr_t) fd, poll_flags, EV_ADD, 0, 0, + hg_poll_data); + + rc = kevent(poll_set->fd, &hg_poll_data->kev, 1, NULL, 0, &timeout); + HG_UTIL_CHECK_ERROR(rc == -1, error, ret, HG_UTIL_FAIL, + "kevent() failed (%s)", strerror(errno)); +#else + short int poll_flags; + + /* Translate flags */ + switch (flags) { + case HG_POLLIN: + poll_flags = POLLIN; + break; + case HG_POLLOUT: + poll_flags = POLLOUT; + break; + default: + HG_UTIL_GOTO_ERROR(error, ret, HG_UTIL_FAIL, "Invalid flag"); + } + + hg_poll_data->pollfd.fd = fd; + hg_poll_data->pollfd.events = poll_flags; + hg_poll_data->pollfd.revents = 0; +#endif /* defined(_WIN32) */ + } + hg_atomic_incr32(&poll_set->nfds); + + hg_thread_spin_lock(&poll_set->poll_data_list_lock); + HG_LIST_INSERT_HEAD(&poll_set->poll_data_list, hg_poll_data, entry); + hg_thread_spin_unlock(&poll_set->poll_data_list_lock); + +done: + return ret; + +error: + free(hg_poll_data); + + return HG_UTIL_FAIL; +} + +/*---------------------------------------------------------------------------*/ +int +hg_poll_remove(hg_poll_set_t *poll_set, int fd) +{ + struct hg_poll_data *hg_poll_data; + hg_util_bool_t found = HG_UTIL_FALSE; + int ret = HG_UTIL_SUCCESS; + + HG_UTIL_CHECK_ERROR(!poll_set, done, ret, HG_UTIL_FAIL, "NULL poll set"); + + hg_thread_spin_lock(&poll_set->poll_data_list_lock); + HG_LIST_FOREACH (hg_poll_data, &poll_set->poll_data_list, entry) { +#if defined(_WIN32) + /* TODO */ +#elif defined(HG_UTIL_HAS_SYSEPOLL_H) + if (hg_poll_data->fd == fd) { + HG_LIST_REMOVE(hg_poll_data, entry); + + if (fd > 0) { + int rc = epoll_ctl(poll_set->fd, EPOLL_CTL_DEL, fd, NULL); + HG_UTIL_CHECK_ERROR(rc != 0, error, ret, HG_UTIL_FAIL, + "epoll_ctl() failed (%s)", strerror(errno)); + } + free(hg_poll_data); + found = HG_UTIL_TRUE; + break; + } +#elif defined(HG_UTIL_HAS_SYSEVENT_H) + /* Events which are attached to file descriptors are automatically + * deleted on the last close of the descriptor. */ + if ((int) hg_poll_data->kev.ident == fd) { + HG_LIST_REMOVE(hg_poll_data, entry); + + if (fd > 0) { + struct timespec timeout = {0, 0}; + int rc; + + EV_SET(&hg_poll_data->kev, (uintptr_t) fd, EVFILT_READ, + EV_DELETE, 0, 0, NULL); + rc = kevent( + poll_set->fd, &hg_poll_data->kev, 1, NULL, 0, &timeout); + HG_UTIL_CHECK_ERROR(rc == -1, error, ret, HG_UTIL_FAIL, + "kevent() failed (%s)", strerror(errno)); + } + free(hg_poll_data); + found = HG_UTIL_TRUE; + break; + } +#else + if (hg_poll_data->pollfd.fd == fd) { + HG_LIST_REMOVE(hg_poll_data, entry); + free(hg_poll_data); + found = HG_UTIL_TRUE; + break; + } +#endif + } + hg_thread_spin_unlock(&poll_set->poll_data_list_lock); + + HG_UTIL_CHECK_ERROR( + !found, done, ret, HG_UTIL_FAIL, "Could not find fd in poll_set"); + hg_atomic_decr32(&poll_set->nfds); + +done: + return ret; + +#if defined(HG_UTIL_HAS_SYSEPOLL_H) || defined(HG_UTIL_HAS_SYSEVENT_H) +error: + hg_thread_spin_unlock(&poll_set->poll_data_list_lock); + + return ret; +#endif +} + +/*---------------------------------------------------------------------------*/ +int +hg_poll_wait(hg_poll_set_t *poll_set, unsigned int timeout, + unsigned int max_events, struct hg_poll_event *events, + unsigned int *actual_events) +{ + int max_poll_events = (int) MIN(max_events, HG_POLL_MAX_EVENTS); + int nfds = 0, i; + int ret = HG_UTIL_SUCCESS; + + HG_UTIL_CHECK_ERROR(!poll_set, done, ret, HG_UTIL_FAIL, "NULL poll set"); + + if (timeout && (!poll_set->try_wait_cb || + (poll_set->try_wait_cb && + poll_set->try_wait_cb(poll_set->try_wait_arg)))) { +#if defined(_WIN32) + +#elif defined(HG_UTIL_HAS_SYSEPOLL_H) + struct epoll_event poll_events[HG_POLL_MAX_EVENTS]; + + nfds = epoll_wait( + poll_set->fd, poll_events, max_poll_events, (int) timeout); + HG_UTIL_CHECK_ERROR(nfds == -1 && errno != EINTR, done, ret, + HG_UTIL_FAIL, "epoll_wait() failed (%s)", strerror(errno)); + + for (i = 0; i < nfds; ++i) { + struct hg_poll_data *hg_poll_data = + (struct hg_poll_data *) poll_events[i].data.ptr; + int error = 0, rc; + + HG_UTIL_CHECK_ERROR(hg_poll_data == NULL, done, ret, HG_UTIL_FAIL, + "NULL poll data"); + + /* Don't change the if/else order */ + if (poll_events[i].events & EPOLLERR) + error = EPOLLERR; + else if (poll_events[i].events & EPOLLHUP) + error = EPOLLHUP; + else if (poll_events[i].events & EPOLLRDHUP) + error = EPOLLRDHUP; + + HG_UTIL_CHECK_ERROR(!(poll_events[i].events & (EPOLLIN | EPOLLOUT)), + done, ret, HG_UTIL_FAIL, "Unsupported events"); + + if (!hg_poll_data->poll_cb) + continue; + + rc = hg_poll_data->poll_cb( + hg_poll_data->poll_arg, error, &events[i]); + HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, done, ret, HG_UTIL_FAIL, + "poll cb failed"); + } +#elif defined(HG_UTIL_HAS_SYSEVENT_H) + struct kevent poll_events[HG_POLL_MAX_EVENTS]; + struct timespec timeout_spec; + ldiv_t ld; + + /* Get sec / nsec */ + ld = ldiv(timeout, 1000L); + timeout_spec.tv_sec = ld.quot; + timeout_spec.tv_nsec = ld.rem * 1000000L; + + nfds = kevent( + poll_set->fd, NULL, 0, poll_events, max_events, &timeout_spec); + HG_UTIL_CHECK_ERROR(nfds == -1 && errno != EINTR, done, ret, + HG_UTIL_FAIL, "kevent() failed (%s)", strerror(errno)); + + for (i = 0; i < nfds; ++i) { + struct hg_poll_data *hg_poll_data = + (struct hg_poll_data *) poll_events[i].udata; + int rc; + + HG_UTIL_CHECK_ERROR(hg_poll_data == NULL, done, ret, HG_UTIL_FAIL, + "NULL poll data"); + + if (!hg_poll_data->poll_cb) + continue; + + rc = hg_poll_data->poll_cb(hg_poll_data->poll_arg, 0, &events[i]); + HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, done, ret, HG_UTIL_FAIL, + "poll cb failed"); + } +#else + struct pollfd poll_events[HG_POLL_MAX_EVENTS] = {0}; + struct hg_poll_data *poll_data_events[HG_POLL_MAX_EVENTS] = {NULL}; + struct hg_poll_data *hg_poll_data = NULL; + int nevents = 0; + + /* Reset revents */ + hg_thread_spin_lock(&poll_set->poll_data_list_lock); + for (hg_poll_data = HG_LIST_FIRST(&poll_set->poll_data_list); + hg_poll_data && (nevents < max_poll_events); + hg_poll_data = HG_LIST_NEXT(hg_poll_data, entry), nevents++) { + poll_events[nevents] = hg_poll_data->pollfd; + poll_data_events[nevents] = hg_poll_data; + } + hg_thread_spin_unlock(&poll_set->poll_data_list_lock); + + nfds = poll(poll_events, nevents, (int) timeout); + HG_UTIL_CHECK_ERROR(nfds == -1 && errno != EINTR, done, ret, + HG_UTIL_FAIL, "poll() failed (%s)", strerror(errno)); + + /* An event on one of the fds has occurred. */ + for (i = 0; i < nfds; ++i) { + int rc; + + if (!(poll_events[i].revents & poll_events[i].events)) + continue; + + /* TODO check POLLHUP | POLLERR | POLLNVAL */ + if (!poll_data_events[i]->poll_cb) + continue; + + rc = poll_data_events[i]->poll_cb( + poll_data_events[i]->poll_arg, 0, &events[i]); + HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, done, ret, HG_UTIL_FAIL, + "poll cb failed"); + } + + if (nfds) { + /* TODO should figure where to call hg_event_get() */ + int rc = hg_event_set(poll_set->fd); + HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, done, ret, HG_UTIL_FAIL, + "hg_event_set() failed (%s)", strerror(errno)); + } +#endif + } else { +#ifdef _WIN32 + +#else + struct hg_poll_data *poll_data_events[HG_POLL_MAX_EVENTS] = {NULL}; + struct hg_poll_data *hg_poll_data; + int nevents = 0; + + /* Reset revents */ + hg_thread_spin_lock(&poll_set->poll_data_list_lock); + for (hg_poll_data = HG_LIST_FIRST(&poll_set->poll_data_list); + hg_poll_data && (nevents < max_poll_events); + hg_poll_data = HG_LIST_NEXT(hg_poll_data, entry), nevents++) + poll_data_events[nevents] = hg_poll_data; + hg_thread_spin_unlock(&poll_set->poll_data_list_lock); + + nfds = nevents; + for (i = 0; i < nfds; ++i) { + int rc; + + if (!poll_data_events[i]->poll_cb) + continue; + + rc = poll_data_events[i]->poll_cb( + poll_data_events[i]->poll_arg, 0, &events[i]); + HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, done, ret, HG_UTIL_FAIL, + "poll cb failed"); + } +#endif + } + + if (actual_events) + *actual_events = (unsigned int) nfds; + +done: + return ret; +} diff --git a/src/mercury/mercury_poll.h b/src/mercury/mercury_poll.h new file mode 100644 index 0000000..8922f37 --- /dev/null +++ b/src/mercury/mercury_poll.h @@ -0,0 +1,164 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +#ifndef MERCURY_POLL_H +#define MERCURY_POLL_H + +#include "mercury_util_config.h" + +/** + * Purpose: define an interface that either polls or allows busy wait + * without entering system calls. + */ + +/*************************************/ +/* Public Type and Struct Definition */ +/*************************************/ + +typedef struct hg_poll_set hg_poll_set_t; + +struct hg_poll_event { + hg_util_bool_t progressed; /* Indicates progress */ + void *ptr; /* Pointer to user data */ +}; + +/** + * Callback that can be used to signal when it is safe to block on the + * poll set or if blocking could hang the application. + * + * \param arg [IN] function argument + * + * \return HG_UTIL_TRUE if it is safe to block or HG_UTIL_FALSE otherwise + */ +typedef hg_util_bool_t (*hg_poll_try_wait_cb_t)(void *arg); + +/** + * Polling callback, arg can be used to pass user arguments, event can be used + * to return user arguments back to hg_poll_wait. + * + * \param arg [IN] pointer to user data + * \param error [IN] any error event has occurred + * \param ptr [OUT] event data output + * + * \return Non-negative on success or negative on failure + */ +typedef int (*hg_poll_cb_t)(void *arg, int error, struct hg_poll_event *event); + +/*****************/ +/* Public Macros */ +/*****************/ + +/** + * Polling events. + */ +#define HG_POLLIN 0x001 /* Ready to read. */ +#define HG_POLLOUT 0x004 /* Ready to write. */ + +/*********************/ +/* Public Prototypes */ +/*********************/ + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Create a new poll set. + * + * \return Pointer to poll set or NULL in case of failure + */ +HG_UTIL_PUBLIC hg_poll_set_t * +hg_poll_create(void); + +/** + * Destroy a poll set. + * + * \param poll_set [IN] pointer to poll set + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_poll_destroy(hg_poll_set_t *poll_set); + +/** + * Get a file descriptor from an existing poll set. + * + * \param poll_set [IN] pointer to poll set + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_poll_get_fd(hg_poll_set_t *poll_set); + +/** + * Set a callback that can be used to signal when it is safe to block on the + * poll set or if blocking could hang the application, in which case behavior + * is the same as passing a timeout of 0. + * + * \param poll_set [IN] pointer to poll set + * \param try_wait_cb [IN] function pointer + * \param try_wait_arg [IN] function pointer argument + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_poll_set_try_wait(hg_poll_set_t *poll_set, hg_poll_try_wait_cb_t try_wait_cb, + void *try_wait_arg); + +/** + * Add file descriptor to poll set. + * + * \param poll_set [IN] pointer to poll set + * \param fd [IN] file descriptor + * \param flags [IN] polling flags (HG_POLLIN, etc) + * \param poll_cb [IN] function pointer + * \param poll_cb_args [IN] function pointer argument + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_poll_add(hg_poll_set_t *poll_set, int fd, unsigned int flags, + hg_poll_cb_t poll_cb, void *poll_cb_arg); + +/** + * Remove file descriptor from poll set. + * + * \param poll_set [IN] pointer to poll set + * \param fd [IN] file descriptor + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_poll_remove(hg_poll_set_t *poll_set, int fd); + +/** + * Wait on a poll set for timeout ms, progressed indicating whether progress has + * been made after that call returns. If timeout is 0, progress is performed + * on all the registered polling callbacks and hg_poll_wait() exits as soon as + * progress is made. If timeout is non 0, the system dependent polling function + * call is entered and progress is performed on the list of file descriptors + * for which an event has occurred. + * + * \param poll_set [IN] pointer to poll set + * \param timeout [IN] timeout (in milliseconds) + * \param progressed [OUT] pointer to boolean indicating progress made + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_poll_wait(hg_poll_set_t *poll_set, unsigned int timeout, + unsigned int max_events, struct hg_poll_event events[], + unsigned int *actual_events); + +#ifdef __cplusplus +} +#endif + +#endif /* MERCURY_POLL_H */ diff --git a/src/mercury/mercury_queue.h b/src/mercury/mercury_queue.h new file mode 100644 index 0000000..2133383 --- /dev/null +++ b/src/mercury/mercury_queue.h @@ -0,0 +1,123 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +/* Code below is derived from sys/queue.h which follows the below notice: + * + * Copyright (c) 1991, 1993 + * The Regents of the University of California. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the University nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + * + * @(#)queue.h 8.5 (Berkeley) 8/20/94 + */ + +#ifndef MERCURY_QUEUE_H +#define MERCURY_QUEUE_H + +#define HG_QUEUE_HEAD_INITIALIZER(name) \ + { \ + NULL, &(name).head \ + } + +#define HG_QUEUE_HEAD_INIT(struct_head_name, var_name) \ + struct struct_head_name var_name = HG_QUEUE_HEAD_INITIALIZER(var_name) + +#define HG_QUEUE_HEAD_DECL(struct_head_name, struct_entry_name) \ + struct struct_head_name { \ + struct struct_entry_name *head; \ + struct struct_entry_name **tail; \ + } + +#define HG_QUEUE_HEAD(struct_entry_name) \ + struct { \ + struct struct_entry_name *head; \ + struct struct_entry_name **tail; \ + } + +#define HG_QUEUE_ENTRY(struct_entry_name) \ + struct { \ + struct struct_entry_name *next; \ + } + +#define HG_QUEUE_INIT(head_ptr) \ + do { \ + (head_ptr)->head = NULL; \ + (head_ptr)->tail = &(head_ptr)->head; \ + } while (/*CONSTCOND*/ 0) + +#define HG_QUEUE_IS_EMPTY(head_ptr) ((head_ptr)->head == NULL) + +#define HG_QUEUE_FIRST(head_ptr) ((head_ptr)->head) + +#define HG_QUEUE_NEXT(entry_ptr, entry_field_name) \ + ((entry_ptr)->entry_field_name.next) + +#define HG_QUEUE_PUSH_TAIL(head_ptr, entry_ptr, entry_field_name) \ + do { \ + (entry_ptr)->entry_field_name.next = NULL; \ + *(head_ptr)->tail = (entry_ptr); \ + (head_ptr)->tail = &(entry_ptr)->entry_field_name.next; \ + } while (/*CONSTCOND*/ 0) + +/* TODO would be nice to not have any condition */ +#define HG_QUEUE_POP_HEAD(head_ptr, entry_field_name) \ + do { \ + if ((head_ptr)->head && \ + ((head_ptr)->head = (head_ptr)->head->entry_field_name.next) == \ + NULL) \ + (head_ptr)->tail = &(head_ptr)->head; \ + } while (/*CONSTCOND*/ 0) + +#define HG_QUEUE_FOREACH(var, head_ptr, entry_field_name) \ + for ((var) = ((head_ptr)->head); (var); \ + (var) = ((var)->entry_field_name.next)) + +/** + * Avoid using those for performance reasons or use mercury_list.h instead + */ + +#define HG_QUEUE_REMOVE(head_ptr, entry_ptr, type, entry_field_name) \ + do { \ + if ((head_ptr)->head == (entry_ptr)) { \ + HG_QUEUE_POP_HEAD((head_ptr), entry_field_name); \ + } else { \ + struct type *curelm = (head_ptr)->head; \ + while (curelm->entry_field_name.next != (entry_ptr)) \ + curelm = curelm->entry_field_name.next; \ + if ((curelm->entry_field_name.next = \ + curelm->entry_field_name.next->entry_field_name \ + .next) == NULL) \ + (head_ptr)->tail = &(curelm)->entry_field_name.next; \ + } \ + } while (/*CONSTCOND*/ 0) + +#endif /* MERCURY_QUEUE_H */ diff --git a/src/mercury/mercury_request.c b/src/mercury/mercury_request.c new file mode 100644 index 0000000..ae4fb2a --- /dev/null +++ b/src/mercury/mercury_request.c @@ -0,0 +1,224 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +#include "mercury_request.h" +#include "mercury_thread_condition.h" +#include "mercury_thread_mutex.h" +#include "mercury_time.h" +#include "mercury_util_error.h" + +#include + +/****************/ +/* Local Macros */ +/****************/ + +/************************************/ +/* Local Type and Struct Definition */ +/************************************/ + +struct hg_request_class { + hg_request_progress_func_t progress_func; + hg_request_trigger_func_t trigger_func; + void *arg; + hg_util_bool_t progressing; + hg_thread_mutex_t progress_mutex; + hg_thread_cond_t progress_cond; +}; + +/********************/ +/* Local Prototypes */ +/********************/ + +/*******************/ +/* Local Variables */ +/*******************/ + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE hg_util_bool_t +hg_request_check(hg_request_t *request) +{ + int trigger_ret; + unsigned int trigger_flag = 0; + hg_util_bool_t ret = HG_UTIL_FALSE; + + do { + trigger_ret = request->request_class->trigger_func( + 0, &trigger_flag, request->request_class->arg); + } while ((trigger_ret == HG_UTIL_SUCCESS) && trigger_flag); + + if (hg_atomic_cas32(&request->completed, HG_UTIL_TRUE, HG_UTIL_FALSE)) + ret = HG_UTIL_TRUE; + + return ret; +} + +/*---------------------------------------------------------------------------*/ +hg_request_class_t * +hg_request_init(hg_request_progress_func_t progress_func, + hg_request_trigger_func_t trigger_func, void *arg) +{ + struct hg_request_class *hg_request_class = NULL; + + hg_request_class = + (struct hg_request_class *) malloc(sizeof(struct hg_request_class)); + HG_UTIL_CHECK_ERROR_NORET( + hg_request_class == NULL, done, "Could not allocate hg_request_class"); + + hg_request_class->progress_func = progress_func; + hg_request_class->trigger_func = trigger_func; + hg_request_class->arg = arg; + hg_request_class->progressing = HG_UTIL_FALSE; + hg_thread_mutex_init(&hg_request_class->progress_mutex); + hg_thread_cond_init(&hg_request_class->progress_cond); + +done: + return hg_request_class; +} + +/*---------------------------------------------------------------------------*/ +int +hg_request_finalize(hg_request_class_t *request_class, void **arg) +{ + if (!request_class) + goto done; + + if (arg) + *arg = request_class->arg; + hg_thread_mutex_destroy(&request_class->progress_mutex); + hg_thread_cond_destroy(&request_class->progress_cond); + free(request_class); + +done: + return HG_UTIL_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +hg_request_t * +hg_request_create(hg_request_class_t *request_class) +{ + struct hg_request *hg_request = NULL; + + hg_request = (struct hg_request *) malloc(sizeof(struct hg_request)); + HG_UTIL_CHECK_ERROR_NORET( + hg_request == NULL, done, "Could not allocate hg_request"); + + hg_request->data = NULL; + hg_atomic_set32(&hg_request->completed, HG_UTIL_FALSE); + hg_request->request_class = request_class; + +done: + return hg_request; +} + +/*---------------------------------------------------------------------------*/ +int +hg_request_destroy(hg_request_t *request) +{ + int ret = HG_UTIL_SUCCESS; + + free(request); + + return ret; +} + +/*---------------------------------------------------------------------------*/ +/* + * lock(progress_mutex) + * while (!completed) { + * check_request + * if (completed) { + * unlock(progress_mutex); + * return; + * } + * if (in_progress) { + * wait_cond(progress_cond); + * continue; + * } + * in_progress = true; + * unlock(progress_mutex); + * trigger; + * progress; + * lock(progress); + * in_progress = false; + * signal(progress_cond); + * } + * unlock(progress_mutex); + */ + +/*---------------------------------------------------------------------------*/ +int +hg_request_wait(hg_request_t *request, unsigned int timeout, unsigned int *flag) +{ + double remaining = + timeout / 1000.0; /* Convert timeout in ms into seconds */ + hg_util_bool_t completed = HG_UTIL_FALSE; + int ret = HG_UTIL_SUCCESS; + + hg_thread_mutex_lock(&request->request_class->progress_mutex); + + do { + hg_time_t t3, t4; + + completed = hg_request_check(request); + if (completed) + break; + + if (request->request_class->progressing) { + hg_time_t t1, t2; + + if (remaining <= 0) { + /* Timeout occurred so leave */ + break; + } + + hg_time_get_current(&t1); + if (hg_thread_cond_timedwait(&request->request_class->progress_cond, + &request->request_class->progress_mutex, + (unsigned int) (remaining * 1000.0)) != HG_UTIL_SUCCESS) { + /* Timeout occurred so leave */ + break; + } + hg_time_get_current(&t2); + remaining -= hg_time_to_double(hg_time_subtract(t2, t1)); + if (remaining < 0) + break; + /* Continue as request may have completed in the meantime */ + continue; + } + + request->request_class->progressing = HG_UTIL_TRUE; + + hg_thread_mutex_unlock(&request->request_class->progress_mutex); + + if (timeout) + hg_time_get_current(&t3); + + request->request_class->progress_func( + (unsigned int) (remaining * 1000.0), request->request_class->arg); + + if (timeout) { + hg_time_get_current(&t4); + remaining -= hg_time_to_double(hg_time_subtract(t4, t3)); + } + + hg_thread_mutex_lock(&request->request_class->progress_mutex); + request->request_class->progressing = HG_UTIL_FALSE; + hg_thread_cond_broadcast(&request->request_class->progress_cond); + + } while (!completed && (remaining > 0)); + + hg_thread_mutex_unlock(&request->request_class->progress_mutex); + + if (flag) + *flag = completed; + + return ret; +} diff --git a/src/mercury/mercury_request.h b/src/mercury/mercury_request.h new file mode 100644 index 0000000..e84bea3 --- /dev/null +++ b/src/mercury/mercury_request.h @@ -0,0 +1,242 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +#ifndef MERCURY_REQUEST_H +#define MERCURY_REQUEST_H + +#include "mercury_util_config.h" + +#include "mercury_atomic.h" + +/** + * Purpose: define a request emulation library on top of the callback model + * that uses progress/trigger functions. Note that this library can not be + * safely used within RPCs in most cases - calling hg_request_wait causes + * deadlock when the caller function was triggered by HG_Trigger + * (or HG_Bulk_trigger). + */ + +typedef struct hg_request_class hg_request_class_t; /* Opaque request class */ +typedef struct hg_request hg_request_t; /* Opaque request object */ + +struct hg_request { + void *data; + hg_atomic_int32_t completed; + hg_request_class_t *request_class; +}; + +/** + * Progress callback, arg can be used to pass extra parameters required by + * underlying API. + * + * \param timeout [IN] timeout (in milliseconds) + * \param arg [IN] pointer to data passed to callback + * + * \return HG_UTIL_SUCCESS if any completion has occurred / error code otherwise + */ +typedef int (*hg_request_progress_func_t)(unsigned int timeout, void *arg); + +/** + * Trigger callback, arg can be used to pass extra parameters required by + * underlying API. + * + * \param timeout [IN] timeout (in milliseconds) + * \param flag [OUT] 1 if callback has been triggered, 0 otherwise + * \param arg [IN] pointer to data passed to callback + * + * \return HG_UTIL_SUCCESS or corresponding error code + */ +typedef int (*hg_request_trigger_func_t)( + unsigned int timeout, unsigned int *flag, void *arg); + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Initialize the request class with the specific progress/trigger functions + * that will be called on hg_request_wait(). + * arg can be used to pass extra parameters required by underlying API. + * + * \param progress [IN] progress function + * \param trigger [IN] trigger function + * \param arg [IN] pointer to data passed to callback + * + * \return Pointer to request class or NULL in case of failure + */ +HG_UTIL_PUBLIC hg_request_class_t * +hg_request_init(hg_request_progress_func_t progress, + hg_request_trigger_func_t trigger, void *arg); + +/** + * Finalize the request class. User args that were passed through + * hg_request_init() can be retrieved through the \a arg parameter. + * + * \param request_class [IN] pointer to request class + * \param arg [IN/OUT] pointer to init args + */ +HG_UTIL_PUBLIC int +hg_request_finalize(hg_request_class_t *request_class, void **arg); + +/** + * Create a new request from a specified request class. The progress function + * explicitly makes progress and may insert the completed operation into a + * completion queue. The operation gets triggered after a call to the trigger + * function. + * + * \param request_class [IN] pointer to request class + * + * \return Pointer to request or NULL in case of failure + */ +HG_UTIL_PUBLIC hg_request_t * +hg_request_create(hg_request_class_t *request_class); + +/** + * Destroy the request, freeing the resources. + * + * \param request [IN/OUT] pointer to request + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_request_destroy(hg_request_t *request); + +/** + * Reset an existing request so that it can be safely re-used. + * + * \param request [IN/OUT] pointer to request + * + * \return Pointer to request or NULL in case of failure + */ +static HG_UTIL_INLINE int +hg_request_reset(hg_request_t *request); + +/** + * Mark the request as completed. (most likely called by a callback triggered + * after a call to trigger) + * + * \param request [IN/OUT] pointer to request + * + * \return Non-negative on success or negative on failure + */ +static HG_UTIL_INLINE int +hg_request_complete(hg_request_t *request); + +/** + * Wait timeout ms for the specified request to complete. + * + * \param request [IN/OUT] pointer to request + * \param timeout [IN] timeout (in milliseconds) + * \param flag [OUT] 1 if request has completed, 0 otherwise + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_request_wait( + hg_request_t *request, unsigned int timeout, unsigned int *flag); + +/** + * Wait timeout ms for all the specified request to complete. + * + * \param count [IN] number of requests + * \param request [IN/OUT] arrays of requests + * \param timeout [IN] timeout (in milliseconds) + * \param flag [OUT] 1 if all requests have completed, 0 otherwise + * + * \return Non-negative on success or negative on failure + */ +static HG_UTIL_INLINE int +hg_request_waitall(int count, hg_request_t *request[], unsigned int timeout, + unsigned int *flag); + +/** + * Attach user data to a specified request. + * + * \param request [IN/OUT] pointer to request + * \param data [IN] pointer to data + * + * \return Non-negative on success or negative on failure + */ +static HG_UTIL_INLINE int +hg_request_set_data(hg_request_t *request, void *data); + +/** + * Get user data from a specified request. + * + * \param request [IN/OUT] pointer to request + * + * \return Pointer to data or NULL if nothing was attached by user + */ +static HG_UTIL_INLINE void * +hg_request_get_data(hg_request_t *request); + +/** + * Cancel the request. + * + * \param request [IN] request object + * + * \return Non-negative on success or negative on failure + * +HG_UTIL_PUBLIC int +hg_request_cancel(hg_request_t *request); + */ + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE int +hg_request_reset(hg_request_t *request) +{ + hg_atomic_set32(&request->completed, HG_UTIL_FALSE); + + return HG_UTIL_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE int +hg_request_complete(hg_request_t *request) +{ + hg_atomic_incr32(&request->completed); + + return HG_UTIL_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE int +hg_request_waitall(int count, hg_request_t *request[], unsigned int timeout, + unsigned int *flag) +{ + int i; + + for (i = 0; i < count; i++) + hg_request_wait(request[i], timeout, flag); + + return HG_UTIL_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE int +hg_request_set_data(hg_request_t *request, void *data) +{ + request->data = data; + + return HG_UTIL_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE void * +hg_request_get_data(hg_request_t *request) +{ + return request->data; +} + +#ifdef __cplusplus +} +#endif + +#endif /* MERCURY_REQUEST_H */ diff --git a/src/mercury/mercury_thread.c b/src/mercury/mercury_thread.c new file mode 100644 index 0000000..1c0e976 --- /dev/null +++ b/src/mercury/mercury_thread.c @@ -0,0 +1,162 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +#include "mercury_thread.h" + +/*---------------------------------------------------------------------------*/ +void +hg_thread_init(hg_thread_t *thread) +{ +#ifdef _WIN32 + *thread = NULL; +#else + *thread = 0; +#endif +} + +/*---------------------------------------------------------------------------*/ +int +hg_thread_create(hg_thread_t *thread, hg_thread_func_t f, void *data) +{ +#ifdef _WIN32 + *thread = CreateThread(NULL, 0, f, data, 0, NULL); + if (*thread == NULL) + return HG_UTIL_FAIL; +#else + if (pthread_create(thread, NULL, f, data)) + return HG_UTIL_FAIL; +#endif + + return HG_UTIL_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +void +hg_thread_exit(hg_thread_ret_t ret) +{ +#ifdef _WIN32 + ExitThread(ret); +#else + pthread_exit(ret); +#endif +} + +/*---------------------------------------------------------------------------*/ +int +hg_thread_join(hg_thread_t thread) +{ +#ifdef _WIN32 + WaitForSingleObject(thread, INFINITE); + CloseHandle(thread); +#else + if (pthread_join(thread, NULL)) + return HG_UTIL_FAIL; +#endif + + return HG_UTIL_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +int +hg_thread_cancel(hg_thread_t thread) +{ +#ifdef _WIN32 + WaitForSingleObject(thread, 0); + CloseHandle(thread); +#else + if (pthread_cancel(thread)) + return HG_UTIL_FAIL; +#endif + + return HG_UTIL_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +int +hg_thread_yield(void) +{ +#ifdef _WIN32 + SwitchToThread(); +#elif defined(__APPLE__) + pthread_yield_np(); +#else + pthread_yield(); +#endif + + return HG_UTIL_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +int +hg_thread_key_create(hg_thread_key_t *key) +{ + if (!key) + return HG_UTIL_FAIL; + +#ifdef _WIN32 + if ((*key = TlsAlloc()) == TLS_OUT_OF_INDEXES) + return HG_UTIL_FAIL; +#else + if (pthread_key_create(key, NULL)) + return HG_UTIL_FAIL; +#endif + + return HG_UTIL_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +int +hg_thread_key_delete(hg_thread_key_t key) +{ +#ifdef _WIN32 + if (!TlsFree(key)) + return HG_UTIL_FAIL; +#else + if (pthread_key_delete(key)) + return HG_UTIL_FAIL; +#endif + + return HG_UTIL_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +int +hg_thread_getaffinity(hg_thread_t thread, hg_cpu_set_t *cpu_mask) +{ +#if defined(_WIN32) + return HG_UTIL_FAIL; +#elif defined(__APPLE__) + (void) thread; + (void) cpu_mask; + return HG_UTIL_FAIL; +#else + if (pthread_getaffinity_np(thread, sizeof(hg_cpu_set_t), cpu_mask)) + return HG_UTIL_FAIL; + return HG_UTIL_SUCCESS; +#endif +} + +/*---------------------------------------------------------------------------*/ +int +hg_thread_setaffinity(hg_thread_t thread, const hg_cpu_set_t *cpu_mask) +{ +#if defined(_WIN32) + if (!SetThreadAffinityMask(thread, *cpu_mask)) + return HG_UTIL_FAIL; +#elif defined(__APPLE__) + (void) thread; + (void) cpu_mask; + return HG_UTIL_FAIL; +#else + if (pthread_setaffinity_np(thread, sizeof(hg_cpu_set_t), cpu_mask)) + return HG_UTIL_FAIL; + return HG_UTIL_SUCCESS; +#endif +} diff --git a/src/mercury/mercury_thread.h b/src/mercury/mercury_thread.h new file mode 100644 index 0000000..cc4bbf1 --- /dev/null +++ b/src/mercury/mercury_thread.h @@ -0,0 +1,242 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +#ifndef MERCURY_THREAD_H +#define MERCURY_THREAD_H + +#if !defined(_WIN32) && !defined(_GNU_SOURCE) +# define _GNU_SOURCE +#endif +#include "mercury_util_config.h" + +#ifdef _WIN32 +# include +typedef HANDLE hg_thread_t; +typedef LPTHREAD_START_ROUTINE hg_thread_func_t; +typedef DWORD hg_thread_ret_t; +# define HG_THREAD_RETURN_TYPE hg_thread_ret_t WINAPI +typedef DWORD hg_thread_key_t; +typedef DWORD_PTR hg_cpu_set_t; +#else +# include +typedef pthread_t hg_thread_t; +typedef void *(*hg_thread_func_t)(void *); +typedef void *hg_thread_ret_t; +# define HG_THREAD_RETURN_TYPE hg_thread_ret_t +typedef pthread_key_t hg_thread_key_t; +# ifdef __APPLE__ +/* Size definition for CPU sets. */ +# define HG_CPU_SETSIZE 1024 +# define HG_NCPUBITS (8 * sizeof(hg_cpu_mask_t)) +/* Type for array elements in 'cpu_set_t'. */ +typedef hg_util_uint64_t hg_cpu_mask_t; +typedef struct { + hg_cpu_mask_t bits[HG_CPU_SETSIZE / HG_NCPUBITS]; +} hg_cpu_set_t; +# else +typedef cpu_set_t hg_cpu_set_t; +# endif +#endif + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Initialize the thread. + * + * \param thread [IN/OUT] pointer to thread object + */ +HG_UTIL_PUBLIC void +hg_thread_init(hg_thread_t *thread); + +/** + * Create a new thread for the given function. + * + * \param thread [IN/OUT] pointer to thread object + * \param f [IN] pointer to function + * \param data [IN] pointer to data than be passed to function f + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_thread_create(hg_thread_t *thread, hg_thread_func_t f, void *data); + +/** + * Ends the calling thread. + * + * \param ret [IN] exit code for the thread + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC void +hg_thread_exit(hg_thread_ret_t ret); + +/** + * Wait for thread completion. + * + * \param thread [IN] thread object + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_thread_join(hg_thread_t thread); + +/** + * Terminate the thread. + * + * \param thread [IN] thread object + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_thread_cancel(hg_thread_t thread); + +/** + * Yield the processor. + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_thread_yield(void); + +/** + * Obtain handle of the calling thread. + * + * \return + */ +static HG_UTIL_INLINE hg_thread_t +hg_thread_self(void); + +/** + * Compare thread IDs. + * + * \return Non-zero if equal, zero if not equal + */ +static HG_UTIL_INLINE int +hg_thread_equal(hg_thread_t t1, hg_thread_t t2); + +/** + * Create a thread-specific data key visible to all threads in the process. + * + * \param key [OUT] pointer to thread key object + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_thread_key_create(hg_thread_key_t *key); + +/** + * Delete a thread-specific data key previously returned by + * hg_thread_key_create(). + * + * \param key [IN] thread key object + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_thread_key_delete(hg_thread_key_t key); + +/** + * Get value from specified key. + * + * \param key [IN] thread key object + * + * \return Pointer to data associated to the key + */ +static HG_UTIL_INLINE void * +hg_thread_getspecific(hg_thread_key_t key); + +/** + * Set value to specified key. + * + * \param key [IN] thread key object + * \param value [IN] pointer to data that will be associated + * + * \return Non-negative on success or negative on failure + */ +static HG_UTIL_INLINE int +hg_thread_setspecific(hg_thread_key_t key, const void *value); + +/** + * Get affinity mask. + * + * \param thread [IN] thread object + * \param cpu_mask [IN/OUT] cpu mask + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_thread_getaffinity(hg_thread_t thread, hg_cpu_set_t *cpu_mask); + +/** + * Set affinity mask. + * + * \param thread [IN] thread object + * \param cpu_mask [IN] cpu mask + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_thread_setaffinity(hg_thread_t thread, const hg_cpu_set_t *cpu_mask); + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE hg_thread_t +hg_thread_self(void) +{ +#ifdef _WIN32 + return GetCurrentThread(); +#else + return pthread_self(); +#endif +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE int +hg_thread_equal(hg_thread_t t1, hg_thread_t t2) +{ +#ifdef _WIN32 + return GetThreadId(t1) == GetThreadId(t2); +#else + return pthread_equal(t1, t2); +#endif +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE void * +hg_thread_getspecific(hg_thread_key_t key) +{ +#ifdef _WIN32 + return TlsGetValue(key); +#else + return pthread_getspecific(key); +#endif +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE int +hg_thread_setspecific(hg_thread_key_t key, const void *value) +{ +#ifdef _WIN32 + if (!TlsSetValue(key, (LPVOID) value)) + return HG_UTIL_FAIL; +#else + if (pthread_setspecific(key, value)) + return HG_UTIL_FAIL; +#endif + + return HG_UTIL_SUCCESS; +} + +#ifdef __cplusplus +} +#endif + +#endif /* MERCURY_THREAD_H */ diff --git a/src/mercury/mercury_thread_condition.c b/src/mercury/mercury_thread_condition.c new file mode 100644 index 0000000..76e4fef --- /dev/null +++ b/src/mercury/mercury_thread_condition.c @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +#include "mercury_thread_condition.h" + +/*---------------------------------------------------------------------------*/ +int +hg_thread_cond_init(hg_thread_cond_t *cond) +{ +#ifdef _WIN32 + InitializeConditionVariable(cond); +#else + pthread_condattr_t attr; + + pthread_condattr_init(&attr); +# if defined(HG_UTIL_HAS_PTHREAD_CONDATTR_SETCLOCK) && \ + defined(HG_UTIL_HAS_CLOCK_MONOTONIC) + /* Must set clock ID if using different clock */ + pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); +# endif + if (pthread_cond_init(cond, &attr)) + return HG_UTIL_FAIL; + pthread_condattr_destroy(&attr); +#endif + + return HG_UTIL_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +int +hg_thread_cond_destroy(hg_thread_cond_t *cond) +{ +#ifndef _WIN32 + if (pthread_cond_destroy(cond)) + return HG_UTIL_FAIL; +#endif + + return HG_UTIL_SUCCESS; +} diff --git a/src/mercury/mercury_thread_condition.h b/src/mercury/mercury_thread_condition.h new file mode 100644 index 0000000..70e9748 --- /dev/null +++ b/src/mercury/mercury_thread_condition.h @@ -0,0 +1,182 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +#ifndef MERCURY_THREAD_CONDITION_H +#define MERCURY_THREAD_CONDITION_H + +#include "mercury_thread_mutex.h" + +#ifdef _WIN32 +typedef CONDITION_VARIABLE hg_thread_cond_t; +#else +# if defined(HG_UTIL_HAS_PTHREAD_CONDATTR_SETCLOCK) +# include "mercury_time.h" +# elif defined(HG_UTIL_HAS_SYSTIME_H) +# include +# endif +# include +typedef pthread_cond_t hg_thread_cond_t; +#endif + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Initialize the condition. + * + * \param cond [IN/OUT] pointer to condition object + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_thread_cond_init(hg_thread_cond_t *cond); + +/** + * Destroy the condition. + * + * \param cond [IN/OUT] pointer to condition object + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_thread_cond_destroy(hg_thread_cond_t *cond); + +/** + * Wake one thread waiting for the condition to change. + * + * \param cond [IN/OUT] pointer to condition object + * + * \return Non-negative on success or negative on failure + */ +static HG_UTIL_INLINE int +hg_thread_cond_signal(hg_thread_cond_t *cond); + +/** + * Wake all the threads waiting for the condition to change. + * + * \param cond [IN/OUT] pointer to condition object + * + * \return Non-negative on success or negative on failure + */ +static HG_UTIL_INLINE int +hg_thread_cond_broadcast(hg_thread_cond_t *cond); + +/** + * Wait for the condition to change. + * + * \param cond [IN/OUT] pointer to condition object + * \param mutex [IN/OUT] pointer to mutex object + * + * \return Non-negative on success or negative on failure + */ +static HG_UTIL_INLINE int +hg_thread_cond_wait(hg_thread_cond_t *cond, hg_thread_mutex_t *mutex); + +/** + * Wait timeout ms for the condition to change. + * + * \param cond [IN/OUT] pointer to condition object + * \param mutex [IN/OUT] pointer to mutex object + * \param timeout [IN] timeout (in milliseconds) + * + * \return Non-negative on success or negative on failure + */ +static HG_UTIL_INLINE int +hg_thread_cond_timedwait( + hg_thread_cond_t *cond, hg_thread_mutex_t *mutex, unsigned int timeout); + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE int +hg_thread_cond_signal(hg_thread_cond_t *cond) +{ +#ifdef _WIN32 + WakeConditionVariable(cond); +#else + if (pthread_cond_signal(cond)) + return HG_UTIL_FAIL; +#endif + + return HG_UTIL_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE int +hg_thread_cond_broadcast(hg_thread_cond_t *cond) +{ +#ifdef _WIN32 + WakeAllConditionVariable(cond); +#else + if (pthread_cond_broadcast(cond)) + return HG_UTIL_FAIL; +#endif + + return HG_UTIL_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE int +hg_thread_cond_wait(hg_thread_cond_t *cond, hg_thread_mutex_t *mutex) +{ +#ifdef _WIN32 + if (!SleepConditionVariableCS(cond, mutex, INFINITE)) + return HG_UTIL_FAIL; +#else + if (pthread_cond_wait(cond, mutex)) + return HG_UTIL_FAIL; +#endif + + return HG_UTIL_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE int +hg_thread_cond_timedwait( + hg_thread_cond_t *cond, hg_thread_mutex_t *mutex, unsigned int timeout) +{ +#ifdef _WIN32 + if (!SleepConditionVariableCS(cond, mutex, timeout)) + return HG_UTIL_FAIL; +#else +# if defined(HG_UTIL_HAS_PTHREAD_CONDATTR_SETCLOCK) + hg_time_t now; +# elif defined(HG_UTIL_HAS_SYSTIME_H) + struct timeval now; +# endif + struct timespec abs_timeout; + long int abs_timeout_us; + ldiv_t ld; + + /* Need to convert timeout (ms) to absolute time */ +# if defined(HG_UTIL_HAS_PTHREAD_CONDATTR_SETCLOCK) + if (hg_time_get_current(&now) != HG_UTIL_SUCCESS) + return HG_UTIL_FAIL; +# elif defined(HG_UTIL_HAS_SYSTIME_H) + if (gettimeofday(&now, NULL) != 0) + return HG_UTIL_FAIL; +# endif + abs_timeout_us = now.tv_usec + timeout * 1000L; + /* Get sec / nsec */ + ld = ldiv(abs_timeout_us, 1000000L); + abs_timeout.tv_sec = now.tv_sec + ld.quot; + abs_timeout.tv_nsec = ld.rem * 1000L; + + if (pthread_cond_timedwait(cond, mutex, &abs_timeout)) + return HG_UTIL_FAIL; +#endif + + return HG_UTIL_SUCCESS; +} + +#ifdef __cplusplus +} +#endif + +#endif /* MERCURY_THREAD_CONDITION_H */ diff --git a/src/mercury/mercury_thread_mutex.c b/src/mercury/mercury_thread_mutex.c new file mode 100644 index 0000000..d5fcc7c --- /dev/null +++ b/src/mercury/mercury_thread_mutex.c @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +#include "mercury_thread_mutex.h" + +/*---------------------------------------------------------------------------*/ +int +hg_thread_mutex_init(hg_thread_mutex_t *mutex) +{ +#ifdef _WIN32 + InitializeCriticalSection(mutex); +#else + pthread_mutexattr_t mutex_attr; + + pthread_mutexattr_init(&mutex_attr); +# ifdef HG_UTIL_HAS_PTHREAD_MUTEX_ADAPTIVE_NP + /* Set type to PTHREAD_MUTEX_ADAPTIVE_NP to improve performance */ + pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_ADAPTIVE_NP); +# else + pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_DEFAULT); +# endif + if (pthread_mutex_init(mutex, &mutex_attr)) + return HG_UTIL_FAIL; + + pthread_mutexattr_destroy(&mutex_attr); +#endif + + return HG_UTIL_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +int +hg_thread_mutex_destroy(hg_thread_mutex_t *mutex) +{ +#ifdef _WIN32 + DeleteCriticalSection(mutex); +#else + if (pthread_mutex_destroy(mutex)) + return HG_UTIL_FAIL; +#endif + + return HG_UTIL_SUCCESS; +} diff --git a/src/mercury/mercury_thread_mutex.h b/src/mercury/mercury_thread_mutex.h new file mode 100644 index 0000000..fe54a0c --- /dev/null +++ b/src/mercury/mercury_thread_mutex.h @@ -0,0 +1,127 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +#ifndef MERCURY_THREAD_MUTEX_H +#define MERCURY_THREAD_MUTEX_H + +#include "mercury_util_config.h" + +#ifdef _WIN32 +# include +# define HG_THREAD_MUTEX_INITIALIZER NULL +typedef CRITICAL_SECTION hg_thread_mutex_t; +#else +# include +# define HG_THREAD_MUTEX_INITIALIZER PTHREAD_MUTEX_INITIALIZER +typedef pthread_mutex_t hg_thread_mutex_t; +#endif + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Initialize the mutex. + * + * \param mutex [IN/OUT] pointer to mutex object + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_thread_mutex_init(hg_thread_mutex_t *mutex); + +/** + * Destroy the mutex. + * + * \param mutex [IN/OUT] pointer to mutex object + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_thread_mutex_destroy(hg_thread_mutex_t *mutex); + +/** + * Lock the mutex. + * + * \param mutex [IN/OUT] pointer to mutex object + * + * \return Non-negative on success or negative on failure + */ +static HG_UTIL_INLINE int +hg_thread_mutex_lock(hg_thread_mutex_t *mutex); + +/** + * Try locking the mutex. + * + * \param mutex [IN/OUT] pointer to mutex object + * + * \return Non-negative on success or negative on failure + */ +static HG_UTIL_INLINE int +hg_thread_mutex_try_lock(hg_thread_mutex_t *mutex); + +/** + * Unlock the mutex. + * + * \param mutex [IN/OUT] pointer to mutex object + * + * \return Non-negative on success or negative on failure + */ +static HG_UTIL_INLINE int +hg_thread_mutex_unlock(hg_thread_mutex_t *mutex); + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE int +hg_thread_mutex_lock(hg_thread_mutex_t *mutex) +{ +#ifdef _WIN32 + EnterCriticalSection(mutex); +#else + if (pthread_mutex_lock(mutex)) + return HG_UTIL_FAIL; +#endif + + return HG_UTIL_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE int +hg_thread_mutex_try_lock(hg_thread_mutex_t *mutex) +{ +#ifdef _WIN32 + if (!TryEnterCriticalSection(mutex)) + return HG_UTIL_FAIL; +#else + if (pthread_mutex_trylock(mutex)) + return HG_UTIL_FAIL; +#endif + + return HG_UTIL_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE int +hg_thread_mutex_unlock(hg_thread_mutex_t *mutex) +{ +#ifdef _WIN32 + LeaveCriticalSection(mutex); +#else + if (pthread_mutex_unlock(mutex)) + return HG_UTIL_FAIL; +#endif + + return HG_UTIL_SUCCESS; +} + +#ifdef __cplusplus +} +#endif + +#endif /* MERCURY_THREAD_MUTEX_H */ diff --git a/src/mercury/mercury_thread_pool.c b/src/mercury/mercury_thread_pool.c new file mode 100644 index 0000000..48e3b6c --- /dev/null +++ b/src/mercury/mercury_thread_pool.c @@ -0,0 +1,191 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +#include "mercury_thread_pool.h" + +#include "mercury_util_error.h" + +#include + +/****************/ +/* Local Macros */ +/****************/ + +/************************************/ +/* Local Type and Struct Definition */ +/************************************/ + +struct hg_thread_pool_private { + struct hg_thread_pool pool; + unsigned int thread_count; + hg_thread_t *threads; +}; + +/********************/ +/* Local Prototypes */ +/********************/ + +/** + * Worker thread run by the thread pool + */ +static HG_THREAD_RETURN_TYPE +hg_thread_pool_worker(void *args); + +/*******************/ +/* Local Variables */ +/*******************/ + +/*---------------------------------------------------------------------------*/ +static HG_THREAD_RETURN_TYPE +hg_thread_pool_worker(void *args) +{ + hg_thread_ret_t ret = 0; + hg_thread_pool_t *pool = (hg_thread_pool_t *) args; + struct hg_thread_work *work; + + while (1) { + hg_thread_mutex_lock(&pool->mutex); + + /* If not shutting down and nothing to do, worker sleeps */ + while (!pool->shutdown && HG_QUEUE_IS_EMPTY(&pool->queue)) { + int rc; + + pool->sleeping_worker_count++; + + rc = hg_thread_cond_wait(&pool->cond, &pool->mutex); + HG_UTIL_CHECK_ERROR_NORET(rc != HG_UTIL_SUCCESS, unlock, + "Thread cannot wait on condition variable"); + + pool->sleeping_worker_count--; + } + + if (pool->shutdown && HG_QUEUE_IS_EMPTY(&pool->queue)) + goto unlock; + + /* Grab our task */ + work = HG_QUEUE_FIRST(&pool->queue); + HG_QUEUE_POP_HEAD(&pool->queue, entry); + + /* Unlock */ + hg_thread_mutex_unlock(&pool->mutex); + + /* Get to work */ + (*work->func)(work->args); + } + +unlock: + hg_thread_mutex_unlock(&pool->mutex); + + return ret; +} + +/*---------------------------------------------------------------------------*/ +int +hg_thread_pool_init(unsigned int thread_count, hg_thread_pool_t **pool_ptr) +{ + int ret = HG_UTIL_SUCCESS, rc; + struct hg_thread_pool_private *priv_pool = NULL; + unsigned int i; + + HG_UTIL_CHECK_ERROR( + pool_ptr == NULL, error, ret, HG_UTIL_FAIL, "NULL pointer"); + + priv_pool = (struct hg_thread_pool_private *) malloc( + sizeof(struct hg_thread_pool_private)); + HG_UTIL_CHECK_ERROR(priv_pool == NULL, error, ret, HG_UTIL_FAIL, + "Could not allocate thread pool"); + + priv_pool->pool.sleeping_worker_count = 0; + priv_pool->thread_count = thread_count; + priv_pool->threads = NULL; + HG_QUEUE_INIT(&priv_pool->pool.queue); + priv_pool->pool.shutdown = 0; + + rc = hg_thread_mutex_init(&priv_pool->pool.mutex); + HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, error, ret, HG_UTIL_FAIL, + "Could not initialize mutex"); + + rc = hg_thread_cond_init(&priv_pool->pool.cond); + HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, error, ret, HG_UTIL_FAIL, + "Could not initialize thread condition"); + + priv_pool->threads = + (hg_thread_t *) malloc(thread_count * sizeof(hg_thread_t)); + HG_UTIL_CHECK_ERROR(!priv_pool->threads, error, ret, HG_UTIL_FAIL, + "Could not allocate thread pool array"); + + /* Start worker threads */ + for (i = 0; i < thread_count; i++) { + rc = hg_thread_create( + &priv_pool->threads[i], hg_thread_pool_worker, (void *) priv_pool); + HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, error, ret, HG_UTIL_FAIL, + "Could not create thread"); + } + + *pool_ptr = (struct hg_thread_pool *) priv_pool; + + return ret; + +error: + if (priv_pool) + hg_thread_pool_destroy((struct hg_thread_pool *) priv_pool); + + return ret; +} + +/*---------------------------------------------------------------------------*/ +int +hg_thread_pool_destroy(hg_thread_pool_t *pool) +{ + struct hg_thread_pool_private *priv_pool = + (struct hg_thread_pool_private *) pool; + int ret = HG_UTIL_SUCCESS, rc; + unsigned int i; + + if (!priv_pool) + goto done; + + if (priv_pool->threads) { + hg_thread_mutex_lock(&priv_pool->pool.mutex); + + priv_pool->pool.shutdown = 1; + + rc = hg_thread_cond_broadcast(&priv_pool->pool.cond); + HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, error, ret, HG_UTIL_FAIL, + "Could not broadcast condition signal"); + + hg_thread_mutex_unlock(&priv_pool->pool.mutex); + + for (i = 0; i < priv_pool->thread_count; i++) { + rc = hg_thread_join(priv_pool->threads[i]); + HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, done, ret, HG_UTIL_FAIL, + "Could not join thread"); + } + } + + rc = hg_thread_mutex_destroy(&priv_pool->pool.mutex); + HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, done, ret, HG_UTIL_FAIL, + "Could not destroy mutex"); + + rc = hg_thread_cond_destroy(&priv_pool->pool.cond); + HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, done, ret, HG_UTIL_FAIL, + "Could not destroy thread condition"); + + free(priv_pool->threads); + free(priv_pool); + +done: + return ret; + +error: + hg_thread_mutex_unlock(&priv_pool->pool.mutex); + + return ret; +} diff --git a/src/mercury/mercury_thread_pool.h b/src/mercury/mercury_thread_pool.h new file mode 100644 index 0000000..8ad501a --- /dev/null +++ b/src/mercury/mercury_thread_pool.h @@ -0,0 +1,124 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +#ifndef MERCURY_THREAD_POOL_H +#define MERCURY_THREAD_POOL_H + +#include "mercury_queue.h" +#include "mercury_thread.h" +#include "mercury_thread_condition.h" + + + +/*************************************/ +/* Public Type and Struct Definition */ +/*************************************/ + +typedef struct hg_thread_pool hg_thread_pool_t; + +struct hg_thread_pool { + unsigned int sleeping_worker_count; + HG_QUEUE_HEAD(hg_thread_work) queue; + int shutdown; + hg_thread_mutex_t mutex; + hg_thread_cond_t cond; +}; + +struct hg_thread_work { + hg_thread_func_t func; + void *args; + HG_QUEUE_ENTRY(hg_thread_work) entry; /* Internal */ +}; + +/*****************/ +/* Public Macros */ +/*****************/ + +/*********************/ +/* Public Prototypes */ +/*********************/ + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Initialize the thread pool. + * + * \param thread_count [IN] number of threads that will be created at + * initialization + * \param pool [OUT] pointer to pool object + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_thread_pool_init(unsigned int thread_count, hg_thread_pool_t **pool); + +/** + * Destroy the thread pool. + * + * \param pool [IN/OUT] pointer to pool object + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_thread_pool_destroy(hg_thread_pool_t *pool); + +/** + * Post work to the pool. Note that the operation may be queued depending on + * the number of threads and number of tasks already running. + * + * \param pool [IN/OUT] pointer to pool object + * \param work [IN] pointer to work struct + * + * \return Non-negative on success or negative on failure + */ +static HG_UTIL_INLINE int +hg_thread_pool_post(hg_thread_pool_t *pool, struct hg_thread_work *work); + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE int +hg_thread_pool_post(hg_thread_pool_t *pool, struct hg_thread_work *work) +{ + int ret = HG_UTIL_SUCCESS; + + if (!pool || !work) + return HG_UTIL_FAIL; + + if (!work->func) + return HG_UTIL_FAIL; + + hg_thread_mutex_lock(&pool->mutex); + + /* Are we shutting down ? */ + if (pool->shutdown) { + ret = HG_UTIL_FAIL; + goto unlock; + } + + /* Add task to task queue */ + HG_QUEUE_PUSH_TAIL(&pool->queue, work, entry); + + /* Wake up sleeping worker */ + if (pool->sleeping_worker_count && + (hg_thread_cond_signal(&pool->cond) != HG_UTIL_SUCCESS)) + ret = HG_UTIL_FAIL; + +unlock: + hg_thread_mutex_unlock(&pool->mutex); + + return ret; +} + +#ifdef __cplusplus +} +#endif + +#endif /* MERCURY_THREAD_POOL_H */ diff --git a/src/mercury/mercury_thread_rwlock.c b/src/mercury/mercury_thread_rwlock.c new file mode 100644 index 0000000..b7ffde4 --- /dev/null +++ b/src/mercury/mercury_thread_rwlock.c @@ -0,0 +1,77 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +/* Copyright (C) 2017 Intel Corporation + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted for any purpose (including commercial purposes) + * provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions, and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions, and the following disclaimer in the + * documentation and/or materials provided with the distribution. + * + * 3. In addition, redistributions of modified forms of the source or binary + * code must carry prominent notices stating that the original code was + * changed and the date of the change. + * + * 4. All publications or advertising materials mentioning features or use of + * this software are asked, but not required, to acknowledge that it was + * developed by Intel Corporation and credit the contributors. + * + * 5. Neither the name of Intel Corporation, nor the name of any Contributor + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "mercury_thread_rwlock.h" + +/*---------------------------------------------------------------------------*/ +int +hg_thread_rwlock_init(hg_thread_rwlock_t *rwlock) +{ +#ifdef _WIN32 + InitializeSRWLock(rwlock); +#else + if (pthread_rwlock_init(rwlock, NULL)) + return HG_UTIL_FAIL; +#endif + + return HG_UTIL_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +int +hg_thread_rwlock_destroy(hg_thread_rwlock_t *rwlock) +{ +#ifdef _WIN32 + /* nothing to do */ +#else + if (pthread_rwlock_destroy(rwlock)) + return HG_UTIL_FAIL; +#endif + + return HG_UTIL_SUCCESS; +} diff --git a/src/mercury/mercury_thread_rwlock.h b/src/mercury/mercury_thread_rwlock.h new file mode 100644 index 0000000..22985c8 --- /dev/null +++ b/src/mercury/mercury_thread_rwlock.h @@ -0,0 +1,236 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +/* Copyright (C) 2017 Intel Corporation + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted for any purpose (including commercial purposes) + * provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions, and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions, and the following disclaimer in the + * documentation and/or materials provided with the distribution. + * + * 3. In addition, redistributions of modified forms of the source or binary + * code must carry prominent notices stating that the original code was + * changed and the date of the change. + * + * 4. All publications or advertising materials mentioning features or use of + * this software are asked, but not required, to acknowledge that it was + * developed by Intel Corporation and credit the contributors. + * + * 5. Neither the name of Intel Corporation, nor the name of any Contributor + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef MERCURY_THREAD_RWLOCK_H +#define MERCURY_THREAD_RWLOCK_H + +#include "mercury_util_config.h" + +#ifdef _WIN32 +# include +typedef PSRWLOCK hg_thread_rwlock_t; +#else +# include +typedef pthread_rwlock_t hg_thread_rwlock_t; +#endif + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Initialize the rwlock. + * + * \param rwlock [IN/OUT] pointer to rwlock object + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_thread_rwlock_init(hg_thread_rwlock_t *rwlock); + +/** + * Destroy the rwlock. + * + * \param rwlock [IN/OUT] pointer to rwlock object + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_thread_rwlock_destroy(hg_thread_rwlock_t *rwlock); + +/** + * Take a read lock for the rwlock. + * + * \param rwlock [IN/OUT] pointer to rwlock object + * + * \return Non-negative on success or negative on failure + */ +static HG_UTIL_INLINE int +hg_thread_rwlock_rdlock(hg_thread_rwlock_t *rwlock); + +/** + * Try to take a read lock for the rwlock. + * + * \param rwlock [IN/OUT] pointer to rwlock object + * + * \return Non-negative on success or negative on failure + */ +static HG_UTIL_INLINE int +hg_thread_rwlock_try_rdlock(hg_thread_rwlock_t *rwlock); + +/** + * Release the read lock of the rwlock. + * + * \param rwlock [IN/OUT] pointer to rwlock object + * + * \return Non-negative on success or negative on failure + */ +static HG_UTIL_INLINE int +hg_thread_rwlock_release_rdlock(hg_thread_rwlock_t *rwlock); + +/** + * Take a write lock for the rwlock. + * + * \param rwlock [IN/OUT] pointer to rwlock object + * + * \return Non-negative on success or negative on failure + */ +static HG_UTIL_INLINE int +hg_thread_rwlock_wrlock(hg_thread_rwlock_t *rwlock); + +/** + * Try to take a write lock for the rwlock. + * + * \param rwlock [IN/OUT] pointer to rwlock object + * + * \return Non-negative on success or negative on failure + */ +static HG_UTIL_INLINE int +hg_thread_rwlock_try_wrlock(hg_thread_rwlock_t *rwlock); + +/** + * Release the write lock of the rwlock. + * + * \param rwlock [IN/OUT] pointer to rwlock object + * + * \return Non-negative on success or negative on failure + */ +static HG_UTIL_INLINE int +hg_thread_rwlock_release_wrlock(hg_thread_rwlock_t *rwlock); + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE int +hg_thread_rwlock_rdlock(hg_thread_rwlock_t *rwlock) +{ +#ifdef _WIN32 + AcquireSRWLockShared(rwlock); +#else + if (pthread_rwlock_rdlock(rwlock)) + return HG_UTIL_FAIL; +#endif + + return HG_UTIL_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE int +hg_thread_rwlock_try_rdlock(hg_thread_rwlock_t *rwlock) +{ +#ifdef _WIN32 + if (TryAcquireSRWLockShared(rwlock) == 0) + return HG_UTIL_FAIL; +#else + if (pthread_rwlock_tryrdlock(rwlock)) + return HG_UTIL_FAIL; +#endif + + return HG_UTIL_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE int +hg_thread_rwlock_release_rdlock(hg_thread_rwlock_t *rwlock) +{ +#ifdef _WIN32 + ReleaseSRWLockShared(rwlock); +#else + if (pthread_rwlock_unlock(rwlock)) + return HG_UTIL_FAIL; +#endif + + return HG_UTIL_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE int +hg_thread_rwlock_wrlock(hg_thread_rwlock_t *rwlock) +{ +#ifdef _WIN32 + ReleaseSRWLockExclusive(rwlock); +#else + if (pthread_rwlock_wrlock(rwlock)) + return HG_UTIL_FAIL; +#endif + + return HG_UTIL_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE int +hg_thread_rwlock_try_wrlock(hg_thread_rwlock_t *rwlock) +{ +#ifdef _WIN32 + if (TryAcquireSRWLockExclusive(rwlock) == 0) + return HG_UTIL_FAIL; +#else + if (pthread_rwlock_trywrlock(rwlock)) + return HG_UTIL_FAIL; +#endif + + return HG_UTIL_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE int +hg_thread_rwlock_release_wrlock(hg_thread_rwlock_t *rwlock) +{ +#ifdef _WIN32 + ReleaseSRWLockExclusive(rwlock); +#else + if (pthread_rwlock_unlock(rwlock)) + return HG_UTIL_FAIL; +#endif + + return HG_UTIL_SUCCESS; +} + +#ifdef __cplusplus +} +#endif + +#endif /* MERCURY_THREAD_RWLOCK_H */ diff --git a/src/mercury/mercury_thread_spin.c b/src/mercury/mercury_thread_spin.c new file mode 100644 index 0000000..60ef1f6 --- /dev/null +++ b/src/mercury/mercury_thread_spin.c @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +#include "mercury_thread_spin.h" + +/*---------------------------------------------------------------------------*/ +int +hg_thread_spin_init(hg_thread_spin_t *lock) +{ +#if defined(_WIN32) + *lock = 0; + + return HG_UTIL_SUCCESS; +#elif defined(HG_UTIL_HAS_PTHREAD_SPINLOCK_T) + if (pthread_spin_init(lock, 0)) + return HG_UTIL_FAIL; + + return HG_UTIL_SUCCESS; +#else + return hg_thread_mutex_init(lock); +#endif +} + +/*---------------------------------------------------------------------------*/ +int +hg_thread_spin_destroy(hg_thread_spin_t *lock) +{ +#if defined(_WIN32) + (void) lock; + + return HG_UTIL_SUCCESS; +#elif defined(HG_UTIL_HAS_PTHREAD_SPINLOCK_T) + if (pthread_spin_destroy(lock)) + return HG_UTIL_FAIL; + + return HG_UTIL_SUCCESS; +#else + return hg_thread_mutex_destroy(lock); +#endif +} diff --git a/src/mercury/mercury_thread_spin.h b/src/mercury/mercury_thread_spin.h new file mode 100644 index 0000000..661d084 --- /dev/null +++ b/src/mercury/mercury_thread_spin.h @@ -0,0 +1,146 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +#ifndef MERCURY_THREAD_SPIN_H +#define MERCURY_THREAD_SPIN_H + +#include "mercury_util_config.h" + +#if defined(_WIN32) +# include +typedef volatile LONG hg_thread_spin_t; +#elif defined(HG_UTIL_HAS_PTHREAD_SPINLOCK_T) +# include +typedef pthread_spinlock_t hg_thread_spin_t; +#else +/* Default to hg_thread_mutex_t if pthread_spinlock_t is not supported */ +# include "mercury_thread_mutex.h" +typedef hg_thread_mutex_t hg_thread_spin_t; +#endif + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Initialize the spin lock. + * + * \param lock [IN/OUT] pointer to lock object + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_thread_spin_init(hg_thread_spin_t *lock); + +/** + * Destroy the spin lock. + * + * \param lock [IN/OUT] pointer to lock object + * + * \return Non-negative on success or negative on failure + */ +HG_UTIL_PUBLIC int +hg_thread_spin_destroy(hg_thread_spin_t *lock); + +/** + * Lock the spin lock. + * + * \param lock [IN/OUT] pointer to lock object + * + * \return Non-negative on success or negative on failure + */ +static HG_UTIL_INLINE int +hg_thread_spin_lock(hg_thread_spin_t *lock); + +/** + * Try locking the spin lock. + * + * \param mutex [IN/OUT] pointer to lock object + * + * \return Non-negative on success or negative on failure + */ +static HG_UTIL_INLINE int +hg_thread_spin_try_lock(hg_thread_spin_t *lock); + +/** + * Unlock the spin lock. + * + * \param mutex [IN/OUT] pointer to lock object + * + * \return Non-negative on success or negative on failure + */ +static HG_UTIL_INLINE int +hg_thread_spin_unlock(hg_thread_spin_t *lock); + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE int +hg_thread_spin_lock(hg_thread_spin_t *lock) +{ +#if defined(_WIN32) + while (InterlockedExchange(lock, EBUSY)) { + /* Don't lock while waiting */ + while (*lock) { + YieldProcessor(); + + /* Compiler barrier. Prevent caching of *lock */ + MemoryBarrier(); + } + } + return HG_UTIL_SUCCESS; +#elif defined(HG_UTIL_HAS_PTHREAD_SPINLOCK_T) + if (pthread_spin_lock(lock)) + return HG_UTIL_FAIL; + + return HG_UTIL_SUCCESS; +#else + return hg_thread_mutex_lock(lock); +#endif +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE int +hg_thread_spin_try_lock(hg_thread_spin_t *lock) +{ +#if defined(_WIN32) + return InterlockedExchange(lock, EBUSY); +#elif defined(HG_UTIL_HAS_PTHREAD_SPINLOCK_T) + if (pthread_spin_trylock(lock)) + return HG_UTIL_FAIL; + + return HG_UTIL_SUCCESS; +#else + return hg_thread_mutex_try_lock(lock); +#endif +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE int +hg_thread_spin_unlock(hg_thread_spin_t *lock) +{ +#if defined(_WIN32) + /* Compiler barrier. The store below acts with release semantics */ + MemoryBarrier(); + *lock = 0; + + return HG_UTIL_SUCCESS; +#elif defined(HG_UTIL_HAS_PTHREAD_SPINLOCK_T) + if (pthread_spin_unlock(lock)) + return HG_UTIL_FAIL; + return HG_UTIL_SUCCESS; +#else + return hg_thread_mutex_unlock(lock); +#endif +} + +#ifdef __cplusplus +} +#endif + +#endif /* MERCURY_THREAD_SPIN_H */ diff --git a/src/mercury/mercury_time.h b/src/mercury/mercury_time.h new file mode 100644 index 0000000..3493a9f --- /dev/null +++ b/src/mercury/mercury_time.h @@ -0,0 +1,402 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +#ifndef MERCURY_TIME_H +#define MERCURY_TIME_H + +#include "mercury_util_config.h" + +#if defined(_WIN32) +# include +#elif defined(HG_UTIL_HAS_CLOCK_MONOTONIC) +# if defined(HG_UTIL_HAS_TIME_H) && defined(HG_UTIL_HAS_CLOCK_GETTIME) +# include +# elif defined(__APPLE__) && defined(HG_UTIL_HAS_SYSTIME_H) +# include +# include +# else +# error "Not supported on this platform." +# endif +#else +# include +# include +# if defined(HG_UTIL_HAS_SYSTIME_H) +# include +# else +# error "Not supported on this platform." +# endif +#endif + +/*************************************/ +/* Public Type and Struct Definition */ +/*************************************/ + +typedef struct hg_time hg_time_t; + +struct hg_time { + long tv_sec; + long tv_usec; +}; + +/*****************/ +/* Public Macros */ +/*****************/ + +/*********************/ +/* Public Prototypes */ +/*********************/ + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Get an elapsed time on the calling processor. + * + * \param tv [OUT] pointer to returned time structure + * + * \return Non-negative on success or negative on failure + */ +static HG_UTIL_INLINE int +hg_time_get_current(hg_time_t *tv); + +/** + * Convert hg_time_t to double. + * + * \param tv [IN] time structure + * + * \return Converted time in seconds + */ +static HG_UTIL_INLINE double +hg_time_to_double(hg_time_t tv); + +/** + * Convert double to hg_time_t. + * + * \param d [IN] time in seconds + * + * \return Converted time structure + */ +static HG_UTIL_INLINE hg_time_t +hg_time_from_double(double d); + +/** + * Compare time values. + * + * \param in1 [IN] time structure + * \param in2 [IN] time structure + * + * \return 1 if in1 < in2, 0 otherwise + */ +static HG_UTIL_INLINE int +hg_time_less(hg_time_t in1, hg_time_t in2); + +/** + * Add time values. + * + * \param in1 [IN] time structure + * \param in2 [IN] time structure + * + * \return Summed time structure + */ +static HG_UTIL_INLINE hg_time_t +hg_time_add(hg_time_t in1, hg_time_t in2); + +/** + * Subtract time values. + * + * \param in1 [IN] time structure + * \param in2 [IN] time structure + * + * \return Subtracted time structure + */ +static HG_UTIL_INLINE hg_time_t +hg_time_subtract(hg_time_t in1, hg_time_t in2); + +/** + * Sleep until the time specified in rqt has elapsed. + * + * \param reqt [IN] time structure + * + * \return Non-negative on success or negative on failure + */ +static HG_UTIL_INLINE int +hg_time_sleep(const hg_time_t rqt); + +/** + * Get a string containing current time/date stamp. + * + * \return Valid string or NULL on failure + */ +static HG_UTIL_INLINE char * +hg_time_stamp(void); + +/*---------------------------------------------------------------------------*/ +#ifdef _WIN32 +static HG_UTIL_INLINE LARGE_INTEGER +get_FILETIME_offset(void) +{ + SYSTEMTIME s; + FILETIME f; + LARGE_INTEGER t; + + s.wYear = 1970; + s.wMonth = 1; + s.wDay = 1; + s.wHour = 0; + s.wMinute = 0; + s.wSecond = 0; + s.wMilliseconds = 0; + SystemTimeToFileTime(&s, &f); + t.QuadPart = f.dwHighDateTime; + t.QuadPart <<= 32; + t.QuadPart |= f.dwLowDateTime; + + return t; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE int +hg_time_get_current(hg_time_t *tv) +{ + LARGE_INTEGER t; + FILETIME f; + double t_usec; + static LARGE_INTEGER offset; + static double freq_to_usec; + static int initialized = 0; + static BOOL use_perf_counter = 0; + + if (!tv) + return HG_UTIL_FAIL; + + if (!initialized) { + LARGE_INTEGER perf_freq; + initialized = 1; + use_perf_counter = QueryPerformanceFrequency(&perf_freq); + if (use_perf_counter) { + QueryPerformanceCounter(&offset); + freq_to_usec = (double) perf_freq.QuadPart / 1000000.; + } else { + offset = get_FILETIME_offset(); + freq_to_usec = 10.; + } + } + if (use_perf_counter) { + QueryPerformanceCounter(&t); + } else { + GetSystemTimeAsFileTime(&f); + t.QuadPart = f.dwHighDateTime; + t.QuadPart <<= 32; + t.QuadPart |= f.dwLowDateTime; + } + + t.QuadPart -= offset.QuadPart; + t_usec = (double) t.QuadPart / freq_to_usec; + t.QuadPart = t_usec; + tv->tv_sec = t.QuadPart / 1000000; + tv->tv_usec = t.QuadPart % 1000000; + + return HG_UTIL_SUCCESS; +} + +#elif defined(HG_UTIL_HAS_CLOCK_MONOTONIC) +/*---------------------------------------------------------------------------*/ +# if defined(HG_UTIL_HAS_TIME_H) && defined(HG_UTIL_HAS_CLOCK_GETTIME) +static HG_UTIL_INLINE int +hg_time_get_current(hg_time_t *tv) +{ + struct timespec tp = {0, 0}; + /* NB. CLOCK_MONOTONIC_RAW is not explicitly supported in the vdso */ + clockid_t clock_id = CLOCK_MONOTONIC; + + if (!tv) + return HG_UTIL_FAIL; + + clock_gettime(clock_id, &tp); + tv->tv_sec = tp.tv_sec; + tv->tv_usec = tp.tv_nsec / 1000; + + return HG_UTIL_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +# elif defined(__APPLE__) && defined(HG_UTIL_HAS_SYSTIME_H) +static HG_UTIL_INLINE int +hg_time_get_current(hg_time_t *tv) +{ + static uint64_t monotonic_timebase_factor = 0; + uint64_t monotonic_nsec; + + if (!tv) + return HG_UTIL_FAIL; + + if (monotonic_timebase_factor == 0) { + mach_timebase_info_data_t timebase_info; + + (void) mach_timebase_info(&timebase_info); + monotonic_timebase_factor = timebase_info.numer / timebase_info.denom; + } + monotonic_nsec = (mach_absolute_time() * monotonic_timebase_factor); + tv->tv_sec = (long) (monotonic_nsec / 1000000000); + tv->tv_usec = (long) ((monotonic_nsec - (uint64_t) tv->tv_sec) / 1000); + + return HG_UTIL_SUCCESS; +} + +# endif +#else +/*---------------------------------------------------------------------------*/ +# if defined(HG_UTIL_HAS_SYSTIME_H) +static HG_UTIL_INLINE int +hg_time_get_current(hg_time_t *tv) +{ + if (!tv) + return HG_UTIL_FAIL; + + gettimeofday((struct timeval *) tv, NULL); + + return HG_UTIL_SUCCESS; +} + +# endif +#endif +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE double +hg_time_to_double(hg_time_t tv) +{ + return (double) tv.tv_sec + (double) (tv.tv_usec) * 0.000001; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE hg_time_t +hg_time_from_double(double d) +{ + hg_time_t tv; + + tv.tv_sec = (long) d; + tv.tv_usec = (long) ((d - (double) (tv.tv_sec)) * 1000000); + + return tv; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE int +hg_time_less(hg_time_t in1, hg_time_t in2) +{ + return ((in1.tv_sec < in2.tv_sec) || + ((in1.tv_sec == in2.tv_sec) && (in1.tv_usec < in2.tv_usec))); +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE hg_time_t +hg_time_add(hg_time_t in1, hg_time_t in2) +{ + hg_time_t out; + + out.tv_sec = in1.tv_sec + in2.tv_sec; + out.tv_usec = in1.tv_usec + in2.tv_usec; + if (out.tv_usec > 1000000) { + out.tv_usec -= 1000000; + out.tv_sec += 1; + } + + return out; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE hg_time_t +hg_time_subtract(hg_time_t in1, hg_time_t in2) +{ + hg_time_t out; + + out.tv_sec = in1.tv_sec - in2.tv_sec; + out.tv_usec = in1.tv_usec - in2.tv_usec; + if (out.tv_usec < 0) { + out.tv_usec += 1000000; + out.tv_sec -= 1; + } + + return out; +} + +/*---------------------------------------------------------------------------*/ +static HG_UTIL_INLINE int +hg_time_sleep(const hg_time_t rqt) +{ +#ifdef _WIN32 + DWORD dwMilliseconds = (DWORD)(hg_time_to_double(rqt) / 1000); + + Sleep(dwMilliseconds); +#elif defined(HG_UTIL_HAS_CLOCK_MONOTONIC) + struct timespec rqtp; + + rqtp.tv_sec = rqt.tv_sec; + rqtp.tv_nsec = rqt.tv_usec * 1000; + + if (nanosleep(&rqtp, NULL)) + return HG_UTIL_FAIL; +#else + useconds_t usec = + (useconds_t) rqt.tv_sec * 1000000 + (useconds_t) rqt.tv_usec; + + if (usleep(usec)) + return HG_UTIL_FAIL; +#endif + + return HG_UTIL_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +#define HG_UTIL_STAMP_MAX 128 +static HG_UTIL_INLINE char * +hg_time_stamp(void) +{ + static char buf[HG_UTIL_STAMP_MAX] = {'\0'}; + +#if defined(_WIN32) + /* TODO not implemented */ +#elif defined(HG_UTIL_HAS_CLOCK_MONOTONIC) + struct tm *local_time; + time_t t; + + t = time(NULL); + local_time = localtime(&t); + if (local_time == NULL) + return NULL; + + if (strftime(buf, HG_UTIL_STAMP_MAX, "%a, %d %b %Y %T %Z", local_time) == 0) + return NULL; +#else + struct timeval tv; + struct timezone tz; + unsigned long days, hours, minutes, seconds; + + gettimeofday(&tv, &tz); + days = (unsigned long) tv.tv_sec / (3600 * 24); + hours = ((unsigned long) tv.tv_sec - days * 24 * 3600) / 3600; + minutes = + ((unsigned long) tv.tv_sec - days * 24 * 3600 - hours * 3600) / 60; + seconds = (unsigned long) tv.tv_sec - days * 24 * 3600 - hours * 3600 - + minutes * 60; + hours -= (unsigned long) tz.tz_minuteswest / 60; + + snprintf(buf, HG_UTIL_STAMP_MAX, "%02lu:%02lu:%02lu (GMT-%d)", hours, + minutes, seconds, tz.tz_minuteswest / 60); +#endif + + return buf; +} + +#ifdef __cplusplus +} +#endif + +#endif /* MERCURY_TIME_H */ diff --git a/src/mercury/mercury_util_config.h b/src/mercury/mercury_util_config.h new file mode 100644 index 0000000..3a3a9d8 --- /dev/null +++ b/src/mercury/mercury_util_config.h @@ -0,0 +1,141 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +/* Generated file. Only edit mercury_util_config.h.in. */ + +#ifndef MERCURY_UTIL_CONFIG_H +#define MERCURY_UTIL_CONFIG_H + +/*************************************/ +/* Public Type and Struct Definition */ +/*************************************/ + +/* Type definitions */ +#ifdef _WIN32 +typedef signed __int64 hg_util_int64_t; +typedef signed __int32 hg_util_int32_t; +typedef signed __int16 hg_util_int16_t; +typedef signed __int8 hg_util_int8_t; +typedef unsigned __int64 hg_util_uint64_t; +typedef unsigned __int32 hg_util_uint32_t; +typedef unsigned __int16 hg_util_uint16_t; +typedef unsigned __int8 hg_util_uint8_t; +#else +# include +# include +typedef int64_t hg_util_int64_t; +typedef int32_t hg_util_int32_t; +typedef int16_t hg_util_int16_t; +typedef int8_t hg_util_int8_t; +typedef uint64_t hg_util_uint64_t; +typedef uint32_t hg_util_uint32_t; +typedef uint16_t hg_util_uint16_t; +typedef uint8_t hg_util_uint8_t; +#endif +typedef hg_util_uint8_t hg_util_bool_t; +typedef hg_util_uint64_t hg_util_ptr_t; + +/* True / false */ +#define HG_UTIL_TRUE 1 +#define HG_UTIL_FALSE 0 + +/* Return codes */ +#define HG_UTIL_SUCCESS 0 +#define HG_UTIL_FAIL -1 + +/*****************/ +/* Public Macros */ +/*****************/ + +/* Visibility of symbols */ +#if defined(_WIN32) +# define HG_UTIL_ABI_IMPORT __declspec(dllimport) +# define HG_UTIL_ABI_EXPORT __declspec(dllexport) +# define HG_UTIL_ABI_HIDDEN +#elif defined(__GNUC__) && (__GNUC__ >= 4) +# define HG_UTIL_ABI_IMPORT __attribute__((visibility("default"))) +# define HG_UTIL_ABI_EXPORT __attribute__((visibility("default"))) +# define HG_UTIL_ABI_HIDDEN __attribute__((visibility("hidden"))) +#else +# define HG_UTIL_ABI_IMPORT +# define HG_UTIL_ABI_EXPORT +# define HG_UTIL_ABI_HIDDEN +#endif + +/* Inline macro */ +#ifdef _WIN32 +# define HG_UTIL_INLINE __inline +#else +# define HG_UTIL_INLINE __inline__ +#endif + +/* Shared libraries */ +#define HG_UTIL_BUILD_SHARED_LIBS +#ifdef HG_UTIL_BUILD_SHARED_LIBS +# ifdef mercury_util_EXPORTS +# define HG_UTIL_PUBLIC HG_UTIL_ABI_EXPORT +# else +# define HG_UTIL_PUBLIC HG_UTIL_ABI_IMPORT +# endif +# define HG_UTIL_PRIVATE HG_UTIL_ABI_HIDDEN +#else +# define HG_UTIL_PUBLIC +# define HG_UTIL_PRIVATE +#endif + +/* Define if has 'clock_gettime()' */ +#define HG_UTIL_HAS_CLOCK_GETTIME + +/* Define if has CLOCK_MONOTONIC */ +/* #undef HG_UTIL_HAS_CLOCK_MONOTONIC */ + +/* Define if has eventfd_t type */ +#define HG_UTIL_HAS_EVENTFD_T + +/* Define if has colored output */ +/* #undef HG_UTIL_HAS_LOG_COLOR */ + +/* Define if has */ +/* #undef HG_UTIL_HAS_OPA_PRIMITIVES_H */ + +/* Define if has 'pthread_condattr_setclock()' */ +#define HG_UTIL_HAS_PTHREAD_CONDATTR_SETCLOCK + +/* Define if has PTHREAD_MUTEX_ADAPTIVE_NP */ +#define HG_UTIL_HAS_PTHREAD_MUTEX_ADAPTIVE_NP + +/* Define if has pthread_spinlock_t type */ +#define HG_UTIL_HAS_PTHREAD_SPINLOCK_T + +/* Define if has */ +#define HG_UTIL_HAS_STDATOMIC_H + +/* Define type size of atomic_long */ +#define HG_UTIL_ATOMIC_LONG_WIDTH 8 + +/* Define if has */ +#define HG_UTIL_HAS_SYSEPOLL_H + +/* Define if has */ +/* #undef HG_UTIL_HAS_SYSEVENT_H */ + +/* Define if has */ +#define HG_UTIL_HAS_SYSEVENTFD_H + +/* Define if has */ +#define HG_UTIL_HAS_SYSTIME_H + +/* Define if has */ +#define HG_UTIL_HAS_TIME_H + +/* Define if has verbose error */ +#define HG_UTIL_HAS_VERBOSE_ERROR + +#endif /* MERCURY_UTIL_CONFIG_H */ diff --git a/src/mercury/mercury_util_error.c b/src/mercury/mercury_util_error.c new file mode 100644 index 0000000..0280c88 --- /dev/null +++ b/src/mercury/mercury_util_error.c @@ -0,0 +1,20 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +#include "mercury_util_error.h" + +/*******************/ +/* Local Variables */ +/*******************/ + +/* Default error log mask */ +#ifdef HG_UTIL_HAS_VERBOSE_ERROR +unsigned int HG_UTIL_LOG_MASK = HG_LOG_TYPE_ERROR | HG_LOG_TYPE_WARNING; +#endif \ No newline at end of file diff --git a/src/mercury/mercury_util_error.h b/src/mercury/mercury_util_error.h new file mode 100644 index 0000000..b03d7bf --- /dev/null +++ b/src/mercury/mercury_util_error.h @@ -0,0 +1,104 @@ +/* + * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy, + * UChicago Argonne, LLC and The HDF Group. + * All rights reserved. + * + * The full copyright notice, including terms governing use, modification, + * and redistribution, is contained in the COPYING file that can be + * found at the root of the source code distribution tree. + */ + +#ifndef MERCURY_UTIL_ERROR_H +#define MERCURY_UTIL_ERROR_H + +#include "mercury_util_config.h" + +/* Default error macro */ +#ifdef HG_UTIL_HAS_VERBOSE_ERROR +# include +# define HG_UTIL_LOG_MASK hg_util_log_mask +/* Log mask will be initialized in init routine */ +extern HG_UTIL_PRIVATE unsigned int HG_UTIL_LOG_MASK; +# define HG_UTIL_LOG_MODULE_NAME "HG Util" +# define HG_UTIL_LOG_ERROR(...) \ + do { \ + if (HG_UTIL_LOG_MASK & HG_LOG_TYPE_ERROR) \ + HG_LOG_WRITE_ERROR(HG_UTIL_LOG_MODULE_NAME, __VA_ARGS__); \ + } while (0) +# ifdef HG_UTIL_HAS_DEBUG +# define HG_UTIL_LOG_DEBUG(...) \ + do { \ + if (HG_UTIL_LOG_MASK & HG_LOG_TYPE_DEBUG) \ + HG_LOG_WRITE_DEBUG(HG_UTIL_LOG_MODULE_NAME, __VA_ARGS__); \ + } while (0) +# else +# define HG_UTIL_LOG_DEBUG(...) (void) 0 +# endif +# define HG_UTIL_LOG_WARNING(...) \ + do { \ + if (HG_UTIL_LOG_MASK & HG_LOG_TYPE_WARNING) \ + HG_LOG_WRITE_WARNING(HG_UTIL_LOG_MODULE_NAME, __VA_ARGS__); \ + } while (0) +#else +# define HG_UTIL_LOG_ERROR(...) (void) 0 +# define HG_UTIL_LOG_DEBUG(...) (void) 0 +# define HG_UTIL_LOG_WARNING(...) (void) 0 +#endif + +/* Branch predictor hints */ +#ifndef _WIN32 +# define likely(x) __builtin_expect(!!(x), 1) +# define unlikely(x) __builtin_expect(!!(x), 0) +#else +# define likely(x) (x) +# define unlikely(x) (x) +#endif + +/* Error macros */ +#define HG_UTIL_GOTO_DONE(label, ret, ret_val) \ + do { \ + ret = ret_val; \ + goto label; \ + } while (0) + +#define HG_UTIL_GOTO_ERROR(label, ret, err_val, ...) \ + do { \ + HG_UTIL_LOG_ERROR(__VA_ARGS__); \ + ret = err_val; \ + goto label; \ + } while (0) + +/* Check for cond, set ret to err_val and goto label */ +#define HG_UTIL_CHECK_ERROR(cond, label, ret, err_val, ...) \ + do { \ + if (unlikely(cond)) { \ + HG_UTIL_LOG_ERROR(__VA_ARGS__); \ + ret = err_val; \ + goto label; \ + } \ + } while (0) + +#define HG_UTIL_CHECK_ERROR_NORET(cond, label, ...) \ + do { \ + if (unlikely(cond)) { \ + HG_UTIL_LOG_ERROR(__VA_ARGS__); \ + goto label; \ + } \ + } while (0) + +#define HG_UTIL_CHECK_ERROR_DONE(cond, ...) \ + do { \ + if (unlikely(cond)) { \ + HG_UTIL_LOG_ERROR(__VA_ARGS__); \ + } \ + } while (0) + +/* Check for cond and print warning */ +#define HG_UTIL_CHECK_WARNING(cond, ...) \ + do { \ + if (unlikely(cond)) { \ + HG_UTIL_LOG_WARNING(__VA_ARGS__); \ + } \ + } while (0) + +#endif /* MERCURY_UTIL_ERROR_H */ diff --git a/testpar/CMakeLists.txt b/testpar/CMakeLists.txt index a9f45d5..3d0fe7e 100644 --- a/testpar/CMakeLists.txt +++ b/testpar/CMakeLists.txt @@ -75,6 +75,7 @@ set (H5P_TESTS t_shapesame t_filters_parallel t_2Gio + t_subfile_openclose ) foreach (h5_testp ${H5P_TESTS}) diff --git a/testpar/t_subfile_openclose.c b/testpar/t_subfile_openclose.c new file mode 100644 index 0000000..9f31b6e --- /dev/null +++ b/testpar/t_subfile_openclose.c @@ -0,0 +1,42 @@ +#include +#include "hdf5.h" +#include "H5FDsubfile_public.h" + +#include "mpi.h" + +int +main(int argc, char **argv) +{ + + int i, mpi_size, mpi_rank; + int mpi_provides, require = MPI_THREAD_MULTIPLE; + hid_t subfile_id; + + MPI_Init_thread(&argc, &argv, require, &mpi_provides); + MPI_Comm_size(MPI_COMM_WORLD, &mpi_size); + MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank); + + H5open(); + + if (H5FDsubfiling_init() == SUCCEED) { + subfile_id = get_subfiling_context(); + printf("[%d] subfile_id = %lx\n", mpi_rank, subfile_id); + } + else if (mpi_rank == 0) { + puts("Error: Unable to initialize subfiling!"); + } + + for(i=0; i < 10; i++) { + sf_open_subfiles(subfile_id, NULL, O_CREAT|O_TRUNC|O_RDWR); + sf_close_subfiles(subfile_id); + } + + MPI_Barrier(MPI_COMM_WORLD); + + H5FDsubfiling_finalize(); + + H5close(); + + MPI_Finalize(); + return 0; +} diff --git a/tools/lib/h5diff.c b/tools/lib/h5diff.c index 8b6ace9..e297c8f 100644 --- a/tools/lib/h5diff.c +++ b/tools/lib/h5diff.c @@ -899,7 +899,7 @@ h5diff(const char *fname1, const char *fname2, const char *objname1, const char H5TOOLS_DEBUG("groups traversed - errstat:%d", opts->err_stat); #ifdef H5_HAVE_PARALLEL - if(g_Parallel) { + if(g_Parallel && !g_CollectInfoOnly) { int i; if((HDstrlen(fname1) > MAX_FILENAME) || (HDstrlen(fname2) > MAX_FILENAME)) { @@ -914,6 +914,11 @@ h5diff(const char *fname1, const char *fname2, const char *objname1, const char for(i = 1; i < g_nTasks; i++) MPI_Send(filenames, (MAX_FILENAME * 2), MPI_CHAR, i, MPI_TAG_PARALLEL, MPI_COMM_WORLD); } /* end if */ + else if (g_CollectInfoOnly) { + build_match_list (obj1fullname, info1_lp, obj2fullname, info2_lp, &match_list, opts); + + } + #endif H5TOOLS_DEBUG("build_match_list next - errstat:%d", opts->err_stat); diff --git a/tools/lib/h5tools_utils.c b/tools/lib/h5tools_utils.c index 1a1c2db..2d53030 100644 --- a/tools/lib/h5tools_utils.c +++ b/tools/lib/h5tools_utils.c @@ -48,6 +48,7 @@ hsize_t H5TOOLS_BUFSIZE = ( 32 * 1024 * 1024); /* 32 MB */ /* ``parallel_print'' variables */ unsigned char g_Parallel = 0; /*0 for serial, 1 for parallel */ +unsigned char g_CollectInfoOnly = 0; char outBuff[OUTBUFF_SIZE]; unsigned outBuffOffset; FILE* overflow_file = NULL; diff --git a/tools/lib/h5tools_utils.h b/tools/lib/h5tools_utils.h index 42144cc..a05f883 100644 --- a/tools/lib/h5tools_utils.h +++ b/tools/lib/h5tools_utils.h @@ -32,6 +32,7 @@ extern "C" { H5TOOLS_DLLVAR int g_nTasks; H5TOOLS_DLLVAR unsigned char g_Parallel; +H5TOOLS_DLLVAR unsigned char g_CollectInfoOnly; H5TOOLS_DLLVAR char outBuff[]; H5TOOLS_DLLVAR unsigned outBuffOffset; H5TOOLS_DLLVAR FILE *overflow_file; diff --git a/tools/lib/h5trav.c b/tools/lib/h5trav.c index dc7e27d..a9b5b75 100644 --- a/tools/lib/h5trav.c +++ b/tools/lib/h5trav.c @@ -15,6 +15,9 @@ #include "h5trav.h" #include "h5tools.h" #include "H5private.h" +#ifdef H5_HAVE_PARALLEL +#include "h5tools_utils.h" +#endif /*------------------------------------------------------------------------- * local typedefs @@ -179,8 +182,10 @@ static herr_t traverse_cb(hid_t loc_id, const char *path, const H5L_info2_t *linfo, void *_udata) { + herr_t ret_value = SUCCEED; trav_ud_traverse_t *udata = (trav_ud_traverse_t *)_udata; /* User data */ char *new_name = NULL; + const char *full_name; const char *already_visited = NULL; /* Whether the link/object was already visited */ @@ -201,6 +206,18 @@ traverse_cb(hid_t loc_id, const char *path, const H5L_info2_t *linfo, else full_name = path; +#ifdef H5_HAVE_PARALLEL + if(linfo->type == H5L_TYPE_EXTERNAL) { + h5tool_link_info_t lnk_info; + if ((ret_value = H5tools_get_symlink_info(loc_id, path, &lnk_info, FALSE)) < 0) { + puts("H5tools_get_symlink_info failed!"); + } + else if (ret_value == 0) { + puts("Dangling link?"); + } + printf("Visiting external link: %s\n", path); + } +#endif /* Perform the correct action for different types of links */ if(linfo->type == H5L_TYPE_HARD) { H5O_info2_t oinfo; -- cgit v0.12