summaryrefslogtreecommitdiffstats
path: root/src/mercury/mercury_atomic_queue.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mercury/mercury_atomic_queue.h')
-rw-r--r--src/mercury/mercury_atomic_queue.h271
1 files changed, 271 insertions, 0 deletions
diff --git a/src/mercury/mercury_atomic_queue.h b/src/mercury/mercury_atomic_queue.h
new file mode 100644
index 0000000..73c0b15
--- /dev/null
+++ b/src/mercury/mercury_atomic_queue.h
@@ -0,0 +1,271 @@
+/*
+ * Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
+ * UChicago Argonne, LLC and The HDF Group.
+ * All rights reserved.
+ *
+ * The full copyright notice, including terms governing use, modification,
+ * and redistribution, is contained in the COPYING file that can be
+ * found at the root of the source code distribution tree.
+ */
+
+/* Implementation derived from:
+ * https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h
+ *
+ * -
+ * Copyright (c) 2007-2009 Kip Macy <kmacy@freebsd.org>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ *
+ */
+
+#ifndef MERCURY_ATOMIC_QUEUE_H
+#define MERCURY_ATOMIC_QUEUE_H
+
+#include "mercury_atomic.h"
+#include "mercury_mem.h"
+
+/*************************************/
+/* Public Type and Struct Definition */
+/*************************************/
+
+struct hg_atomic_queue {
+ hg_atomic_int32_t prod_head;
+ hg_atomic_int32_t prod_tail;
+ unsigned int prod_size;
+ unsigned int prod_mask;
+ hg_util_uint64_t drops;
+ hg_atomic_int32_t cons_head
+ __attribute__((aligned(HG_MEM_CACHE_LINE_SIZE)));
+ hg_atomic_int32_t cons_tail;
+ unsigned int cons_size;
+ unsigned int cons_mask;
+ hg_atomic_int64_t ring[] __attribute__((aligned(HG_MEM_CACHE_LINE_SIZE)));
+};
+
+/*****************/
+/* Public Macros */
+/*****************/
+
+#ifndef cpu_spinwait
+# if defined(__x86_64__) || defined(__amd64__)
+# define cpu_spinwait() asm volatile("pause\n" : : : "memory");
+# else
+# define cpu_spinwait() ;
+# endif
+#endif
+
+/*********************/
+/* Public Prototypes */
+/*********************/
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Allocate a new queue that can hold \count elements.
+ *
+ * \param count [IN] maximum number of elements
+ *
+ * \return pointer to allocated queue or NULL on failure
+ */
+HG_UTIL_PUBLIC struct hg_atomic_queue *
+hg_atomic_queue_alloc(unsigned int count);
+
+/**
+ * Free an existing queue.
+ *
+ * \param hg_atomic_queue [IN] pointer to queue
+ */
+HG_UTIL_PUBLIC void
+hg_atomic_queue_free(struct hg_atomic_queue *hg_atomic_queue);
+
+/**
+ * Push an entry to the queue.
+ *
+ * \param hg_atomic_queue [IN/OUT] pointer to queue
+ * \param entry [IN] pointer to object
+ *
+ * \return Non-negative on success or negative on failure
+ */
+static HG_UTIL_INLINE int
+hg_atomic_queue_push(struct hg_atomic_queue *hg_atomic_queue, void *entry);
+
+/**
+ * Pop an entry from the queue (multi-consumer).
+ *
+ * \param hg_atomic_queue [IN/OUT] pointer to queue
+ *
+ * \return Pointer to popped object or NULL if queue is empty
+ */
+static HG_UTIL_INLINE void *
+hg_atomic_queue_pop_mc(struct hg_atomic_queue *hg_atomic_queue);
+
+/**
+ * Pop an entry from the queue (single consumer).
+ *
+ * \param hg_atomic_queue [IN/OUT] pointer to queue
+ *
+ * \return Pointer to popped object or NULL if queue is empty
+ */
+static HG_UTIL_INLINE void *
+hg_atomic_queue_pop_sc(struct hg_atomic_queue *hg_atomic_queue);
+
+/**
+ * Determine whether queue is empty.
+ *
+ * \param hg_atomic_queue [IN/OUT] pointer to queue
+ *
+ * \return HG_UTIL_TRUE if empty, HG_UTIL_FALSE if not
+ */
+static HG_UTIL_INLINE hg_util_bool_t
+hg_atomic_queue_is_empty(struct hg_atomic_queue *hg_atomic_queue);
+
+/**
+ * Determine number of entries in a queue.
+ *
+ * \param hg_atomic_queue [IN/OUT] pointer to queue
+ *
+ * \return Number of entries queued or 0 if none
+ */
+static HG_UTIL_INLINE unsigned int
+hg_atomic_queue_count(struct hg_atomic_queue *hg_atomic_queue);
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE int
+hg_atomic_queue_push(struct hg_atomic_queue *hg_atomic_queue, void *entry)
+{
+ hg_util_int32_t prod_head, prod_next, cons_tail;
+
+ do {
+ prod_head = hg_atomic_get32(&hg_atomic_queue->prod_head);
+ prod_next = (prod_head + 1) & (int) hg_atomic_queue->prod_mask;
+ cons_tail = hg_atomic_get32(&hg_atomic_queue->cons_tail);
+
+ if (prod_next == cons_tail) {
+ hg_atomic_fence();
+ if (prod_head == hg_atomic_get32(&hg_atomic_queue->prod_head) &&
+ cons_tail == hg_atomic_get32(&hg_atomic_queue->cons_tail)) {
+ hg_atomic_queue->drops++;
+ /* Full */
+ return HG_UTIL_FAIL;
+ }
+ continue;
+ }
+ } while (
+ !hg_atomic_cas32(&hg_atomic_queue->prod_head, prod_head, prod_next));
+
+ hg_atomic_set64(&hg_atomic_queue->ring[prod_head], (hg_util_int64_t) entry);
+
+ /*
+ * If there are other enqueues in progress
+ * that preceded us, we need to wait for them
+ * to complete
+ */
+ while (hg_atomic_get32(&hg_atomic_queue->prod_tail) != prod_head)
+ cpu_spinwait();
+
+ hg_atomic_set32(&hg_atomic_queue->prod_tail, prod_next);
+
+ return HG_UTIL_SUCCESS;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE void *
+hg_atomic_queue_pop_mc(struct hg_atomic_queue *hg_atomic_queue)
+{
+ hg_util_int32_t cons_head, cons_next;
+ void *entry = NULL;
+
+ do {
+ cons_head = hg_atomic_get32(&hg_atomic_queue->cons_head);
+ cons_next = (cons_head + 1) & (int) hg_atomic_queue->cons_mask;
+
+ if (cons_head == hg_atomic_get32(&hg_atomic_queue->prod_tail))
+ return NULL;
+ } while (
+ !hg_atomic_cas32(&hg_atomic_queue->cons_head, cons_head, cons_next));
+
+ entry = (void *) hg_atomic_get64(&hg_atomic_queue->ring[cons_head]);
+
+ /*
+ * If there are other dequeues in progress
+ * that preceded us, we need to wait for them
+ * to complete
+ */
+ while (hg_atomic_get32(&hg_atomic_queue->cons_tail) != cons_head)
+ cpu_spinwait();
+
+ hg_atomic_set32(&hg_atomic_queue->cons_tail, cons_next);
+
+ return entry;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE void *
+hg_atomic_queue_pop_sc(struct hg_atomic_queue *hg_atomic_queue)
+{
+ hg_util_int32_t cons_head, cons_next;
+ hg_util_int32_t prod_tail;
+ void *entry = NULL;
+
+ cons_head = hg_atomic_get32(&hg_atomic_queue->cons_head);
+ prod_tail = hg_atomic_get32(&hg_atomic_queue->prod_tail);
+ cons_next = (cons_head + 1) & (int) hg_atomic_queue->cons_mask;
+
+ if (cons_head == prod_tail)
+ /* Empty */
+ return NULL;
+
+ hg_atomic_set32(&hg_atomic_queue->cons_head, cons_next);
+
+ entry = (void *) hg_atomic_get64(&hg_atomic_queue->ring[cons_head]);
+
+ hg_atomic_set32(&hg_atomic_queue->cons_tail, cons_next);
+
+ return entry;
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE hg_util_bool_t
+hg_atomic_queue_is_empty(struct hg_atomic_queue *hg_atomic_queue)
+{
+ return (hg_atomic_get32(&hg_atomic_queue->cons_head) ==
+ hg_atomic_get32(&hg_atomic_queue->prod_tail));
+}
+
+/*---------------------------------------------------------------------------*/
+static HG_UTIL_INLINE unsigned int
+hg_atomic_queue_count(struct hg_atomic_queue *hg_atomic_queue)
+{
+ return ((hg_atomic_queue->prod_size +
+ (unsigned int) hg_atomic_get32(&hg_atomic_queue->prod_tail) -
+ (unsigned int) hg_atomic_get32(&hg_atomic_queue->cons_tail)) &
+ hg_atomic_queue->prod_mask);
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* MERCURY_ATOMIC_QUEUE_H */