summaryrefslogtreecommitdiffstats
path: root/src/H5FDsubfile_threads.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/H5FDsubfile_threads.c')
-rw-r--r--src/H5FDsubfile_threads.c132
1 files changed, 132 insertions, 0 deletions
diff --git a/src/H5FDsubfile_threads.c b/src/H5FDsubfile_threads.c
new file mode 100644
index 0000000..bc9e352
--- /dev/null
+++ b/src/H5FDsubfile_threads.c
@@ -0,0 +1,132 @@
+
+#include "H5FDsubfile_private.h"
+
+#include "mercury/mercury_util_config.h"
+#include "mercury/mercury_log.h"
+#include "mercury/mercury_log.c"
+#include "mercury/mercury_util_error.c"
+#include "mercury/mercury_thread.c"
+#include "mercury/mercury_thread_mutex.c"
+#include "mercury/mercury_thread_condition.h"
+#include "mercury/mercury_thread_condition.c"
+#include "mercury/mercury_thread_pool.c"
+#include "mercury/mercury_thread_spin.c"
+
+static hg_thread_mutex_t ioc_mutex = PTHREAD_MUTEX_INITIALIZER;
+static hg_thread_pool_t *ioc_thread_pool = NULL;
+static hg_thread_t ioc_thread;
+
+#define HG_TEST_NUM_THREADS_DEFAULT 4
+#define POOL_CONCURRENT_MAX 64
+
+static struct hg_thread_work pool_request[POOL_CONCURRENT_MAX];
+
+static HG_THREAD_RETURN_TYPE
+ioc_thread_main(void *arg)
+{
+ hg_thread_ret_t thread_ret = (hg_thread_ret_t) 0;
+
+ /* Pass along the subfiling_context_t */
+ ioc_main(arg);
+
+ // hg_thread_exit(thread_ret);
+ return thread_ret;
+}
+
+int
+initialize_ioc_threads(subfiling_context_t *sf_context)
+{
+ int status;
+ status = hg_thread_mutex_init(&ioc_mutex);
+ if (status) {
+ puts("hg_thread_mutex_init failed");
+ goto err_exit;
+ }
+ status = hg_thread_pool_init(HG_TEST_NUM_THREADS_DEFAULT, &ioc_thread_pool);
+ if (status) {
+ puts("hg_thread_pool_init failed");
+ goto err_exit;
+ }
+ status = hg_thread_create(&ioc_thread, ioc_thread_main, sf_context);
+ if (status) {
+ puts("hg_thread_create failed");
+ goto err_exit;
+ }
+ return 0;
+
+err_exit:
+ return -1;
+}
+
+
+void __attribute__((destructor)) finalize_ioc_threads()
+{
+ if (ioc_thread_pool != NULL) {
+ hg_thread_pool_destroy(ioc_thread_pool);
+ ioc_thread_pool = NULL;
+
+ if (hg_thread_join(ioc_thread) == 0)
+ puts("thread_join succeeded");
+ else puts("thread_join failed");
+ }
+}
+
+
+static HG_THREAD_RETURN_TYPE
+handle_work_request(void *arg)
+{
+ hg_thread_ret_t ret = 0;
+ sf_work_request_t *msg = (sf_work_request_t *)arg;
+ int status = 0;
+
+ atomic_fetch_add(&sf_work_pending, 1); // atomic
+ switch(msg->tag) {
+ case WRITE_COLL:
+ status = queue_write_coll( msg, msg->subfile_rank, msg->source, sf_data_comm);
+ break;
+ case READ_COLL:
+ status = queue_read_coll( msg, msg->subfile_rank, msg->source, sf_data_comm);
+ break;
+ case WRITE_INDEP:
+ status = queue_write_indep( msg, msg->subfile_rank, msg->source, sf_data_comm);
+ break;
+ case READ_INDEP:
+ status = queue_read_indep( msg, msg->subfile_rank, msg->source, sf_data_comm);
+ break;
+ case CLOSE_OP:
+ hg_thread_mutex_lock(&ioc_mutex);
+ status = decrement_file_ref_counts( msg->subfile_rank, msg->source, sf_data_comm,
+ subfiling_close_file);
+ hg_thread_mutex_unlock(&ioc_mutex);
+ break;
+ case OPEN_OP:
+ status = queue_file_open( msg, msg->subfile_rank, msg->source, sf_data_comm);
+ break;
+
+ default:
+ printf("[ioc(%d)] received message tag(%x)from rank %d\n", msg->subfile_rank, msg->tag, msg->source);
+ status = -1;
+ break;
+ }
+
+ atomic_fetch_sub(&sf_work_pending, 1); // atomic
+ if (status < 0) {
+ printf("[ioc(%d) %s]: Error encounted processing request(%x) from rank(%d\n",
+ msg->subfile_rank, __func__, msg->tag, msg->source);
+ fflush(stdout);
+ }
+ return ret;
+}
+
+int tpool_add_work(sf_work_request_t *work)
+{
+ static int work_index = 0;
+ hg_thread_mutex_lock(&ioc_mutex);
+ if (work_index == POOL_CONCURRENT_MAX)
+ work_index = 0;
+ pool_request[work_index].func = handle_work_request;
+ pool_request[work_index].args = work;
+ hg_thread_pool_post(ioc_thread_pool, &pool_request[work_index++]);
+ hg_thread_mutex_unlock(&ioc_mutex);
+ return 0;
+}