summaryrefslogtreecommitdiffstats
path: root/src/H5FDsubfile_threads.c
diff options
context:
space:
mode:
authorRichard Warren <Richard.Warren@hdfgroup.org>2020-07-01 16:15:16 (GMT)
committerRichard Warren <Richard.Warren@hdfgroup.org>2020-07-01 16:15:16 (GMT)
commit8b7167488091054e77f2b5ece385963f3cd1f923 (patch)
tree1769b9a2a3141ffa6a35d7d212e76b709700330e /src/H5FDsubfile_threads.c
parent5a6d9b3a4e12cc399ce27361755f0c8d069e256b (diff)
downloadhdf5-8b7167488091054e77f2b5ece385963f3cd1f923.zip
hdf5-8b7167488091054e77f2b5ece385963f3cd1f923.tar.gz
hdf5-8b7167488091054e77f2b5ece385963f3cd1f923.tar.bz2
Various bug fixes for the multi-threaded IO concentrator(s)
Diffstat (limited to 'src/H5FDsubfile_threads.c')
-rw-r--r--src/H5FDsubfile_threads.c41
1 files changed, 35 insertions, 6 deletions
diff --git a/src/H5FDsubfile_threads.c b/src/H5FDsubfile_threads.c
index 9c54a53..fb99930 100644
--- a/src/H5FDsubfile_threads.c
+++ b/src/H5FDsubfile_threads.c
@@ -13,10 +13,13 @@
#include "mercury/mercury_thread_spin.c"
static hg_thread_mutex_t ioc_mutex = PTHREAD_MUTEX_INITIALIZER;
+static hg_thread_mutex_t ioc_thread_mutex = PTHREAD_MUTEX_INITIALIZER;
static hg_thread_pool_t *ioc_thread_pool = NULL;
static hg_thread_t ioc_thread;
+#ifndef HG_TEST_NUM_THREADS_DEFAULT
#define HG_TEST_NUM_THREADS_DEFAULT 4
+#endif
#define POOL_CONCURRENT_MAX 64
static struct hg_thread_work pool_request[POOL_CONCURRENT_MAX];
@@ -42,6 +45,12 @@ initialize_ioc_threads(subfiling_context_t *sf_context)
puts("hg_thread_mutex_init failed");
goto err_exit;
}
+ status = hg_thread_mutex_init(&ioc_thread_mutex);
+ if (status) {
+ puts("hg_thread_mutex_init failed");
+ goto err_exit;
+ }
+
status = hg_thread_pool_init(HG_TEST_NUM_THREADS_DEFAULT, &ioc_thread_pool);
if (status) {
puts("hg_thread_pool_init failed");
@@ -64,10 +73,6 @@ void __attribute__((destructor)) finalize_ioc_threads(void)
if (ioc_thread_pool != NULL) {
hg_thread_pool_destroy(ioc_thread_pool);
ioc_thread_pool = NULL;
-
- if (hg_thread_join(ioc_thread) == 0)
- puts("thread_join succeeded");
- else puts("thread_join failed");
}
}
@@ -94,10 +99,8 @@ handle_work_request(void *arg)
status = queue_read_indep( msg, msg->subfile_rank, msg->source, sf_data_comm);
break;
case CLOSE_OP:
- hg_thread_mutex_lock(&ioc_mutex);
status = decrement_file_ref_counts( msg->subfile_rank, msg->source, sf_data_comm,
subfiling_close_file);
- hg_thread_mutex_unlock(&ioc_mutex);
break;
case OPEN_OP:
status = queue_file_open( msg, msg->subfile_rank, msg->source, sf_data_comm);
@@ -130,3 +133,29 @@ int tpool_add_work(sf_work_request_t *work)
hg_thread_mutex_unlock(&ioc_mutex);
return 0;
}
+
+bool tpool_is_empty(void)
+{
+ return HG_QUEUE_IS_EMPTY(&ioc_thread_pool->queue);
+}
+
+int begin_thread_exclusive(void)
+{
+ return hg_thread_mutex_lock(&ioc_thread_mutex);
+}
+
+int end_thread_exclusive(void)
+{
+ return hg_thread_mutex_unlock(&ioc_thread_mutex);
+}
+
+int wait_for_thread_main(void)
+{
+ if (hg_thread_join(ioc_thread) == 0)
+ puts("thread_join succeeded");
+ else {
+ puts("thread_join failed");
+ return -1;
+ }
+ return 0;
+}