summaryrefslogtreecommitdiffstats
path: root/src/H5FDioc_threads.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/H5FDioc_threads.c')
-rw-r--r--src/H5FDioc_threads.c382
1 files changed, 203 insertions, 179 deletions
diff --git a/src/H5FDioc_threads.c b/src/H5FDioc_threads.c
index 6a4536a..1db8c62 100644
--- a/src/H5FDioc_threads.c
+++ b/src/H5FDioc_threads.c
@@ -26,10 +26,10 @@
* use mercury for that purpose...
*/
-static hg_thread_mutex_t ioc_mutex = PTHREAD_MUTEX_INITIALIZER;
+static hg_thread_mutex_t ioc_mutex = PTHREAD_MUTEX_INITIALIZER;
static hg_thread_mutex_t ioc_thread_mutex = PTHREAD_MUTEX_INITIALIZER;
-static hg_thread_pool_t *ioc_thread_pool = NULL;
-static hg_thread_t ioc_thread;
+static hg_thread_pool_t *ioc_thread_pool = NULL;
+static hg_thread_t ioc_thread;
#ifndef HG_TEST_NUM_THREADS_DEFAULT
#define HG_TEST_NUM_THREADS_DEFAULT 4
@@ -37,8 +37,8 @@ static hg_thread_t ioc_thread;
extern int ioc_main(int64_t context_id);
-static int pool_concurrent_max = 0;
-static struct hg_thread_work *pool_request = NULL;
+static int pool_concurrent_max = 0;
+static struct hg_thread_work *pool_request = NULL;
/*-------------------------------------------------------------------------
* Function: local ioc_thread_main
@@ -58,16 +58,18 @@ static struct hg_thread_work *pool_request = NULL;
*
*-------------------------------------------------------------------------
*/
-static HG_THREAD_RETURN_TYPE ioc_thread_main(void *arg) {
- int64_t *context_id = (int64_t *)arg;
- hg_thread_ret_t thread_ret = (hg_thread_ret_t)0;
+static HG_THREAD_RETURN_TYPE
+ioc_thread_main(void *arg)
+{
+ int64_t * context_id = (int64_t *)arg;
+ hg_thread_ret_t thread_ret = (hg_thread_ret_t)0;
- /* Pass along the subfiling_context_t */
- ioc_main(context_id[0]);
+ /* Pass along the subfiling_context_t */
+ ioc_main(context_id[0]);
- /* Upon exit, we can free the input arg */
- free(arg);
- return thread_ret;
+ /* Upon exit, we can free the input arg */
+ free(arg);
+ return thread_ret;
}
/*-------------------------------------------------------------------------
@@ -90,100 +92,103 @@ static HG_THREAD_RETURN_TYPE ioc_thread_main(void *arg) {
*
*-------------------------------------------------------------------------
*/
-int initialize_ioc_threads(void *_sf_context) {
- int status;
- int file_open_count;
- subfiling_context_t *sf_context = _sf_context;
- unsigned int thread_pool_count = HG_TEST_NUM_THREADS_DEFAULT;
- int64_t *context_id = (int64_t *)malloc(sizeof(int64_t));
- int world_size = sf_context->topology->app_layout->world_size;
- size_t alloc_size = ((size_t)world_size * sizeof(struct hg_thread_work));
- char *envValue;
- double t_start = 0.0, t_end = 0.0;
-
- assert(context_id != NULL);
-
- file_open_count = atomic_load(&sf_file_open_count);
- atomic_fetch_add(&sf_file_open_count, 1);
-
- if (file_open_count > 0)
- return 0;
+int
+initialize_ioc_threads(void *_sf_context)
+{
+ int status;
+ int file_open_count;
+ subfiling_context_t *sf_context = _sf_context;
+ unsigned int thread_pool_count = HG_TEST_NUM_THREADS_DEFAULT;
+ int64_t * context_id = (int64_t *)malloc(sizeof(int64_t));
+ int world_size = sf_context->topology->app_layout->world_size;
+ size_t alloc_size = ((size_t)world_size * sizeof(struct hg_thread_work));
+ char * envValue;
+ double t_start = 0.0, t_end = 0.0;
+
+ assert(context_id != NULL);
+
+ file_open_count = atomic_load(&sf_file_open_count);
+ atomic_fetch_add(&sf_file_open_count, 1);
+
+ if (file_open_count > 0)
+ return 0;
+
+ t_start = MPI_Wtime();
+
+ /* Initialize the main IOC thread input argument.
+ * Each IOC request will utilize this context_id which is
+ * consistent across all MPI ranks, to ensure that requests
+ * involving reference counting are correctly using the
+ * correct file contexts.
+ */
+ context_id[0] = sf_context->sf_context_id;
+
+ if (pool_request == NULL) {
+ if ((pool_request = (struct hg_thread_work *)malloc(alloc_size)) == NULL) {
+ perror("malloc error");
+ return -1;
+ }
+ else
+ pool_concurrent_max = world_size;
+ }
+
+ memset(pool_request, 0, alloc_size);
+
+ /* Initialize a couple of mutex variables that are used
+ * during IO concentrator operations to serialize
+ * access to key objects, e.g. reference counting.
+ */
+ status = hg_thread_mutex_init(&ioc_mutex);
+ if (status) {
+ puts("hg_thread_mutex_init failed");
+ goto err_exit;
+ }
+ status = hg_thread_mutex_init(&ioc_thread_mutex);
+ if (status) {
+ puts("hg_thread_mutex_init failed");
+ goto err_exit;
+ }
- t_start = MPI_Wtime();
-
- /* Initialize the main IOC thread input argument.
- * Each IOC request will utilize this context_id which is
- * consistent across all MPI ranks, to ensure that requests
- * involving reference counting are correctly using the
- * correct file contexts.
- */
- context_id[0] = sf_context->sf_context_id;
-
- if (pool_request == NULL) {
- if ((pool_request = (struct hg_thread_work *)malloc(alloc_size)) == NULL) {
- perror("malloc error");
- return -1;
- } else
- pool_concurrent_max = world_size;
- }
-
- memset(pool_request, 0, alloc_size);
-
- /* Initialize a couple of mutex variables that are used
- * during IO concentrator operations to serialize
- * access to key objects, e.g. reference counting.
- */
- status = hg_thread_mutex_init(&ioc_mutex);
- if (status) {
- puts("hg_thread_mutex_init failed");
- goto err_exit;
- }
- status = hg_thread_mutex_init(&ioc_thread_mutex);
- if (status) {
- puts("hg_thread_mutex_init failed");
- goto err_exit;
- }
-
- /* Allow experimentation with the number of helper threads */
- if ((envValue = getenv("IOC_THREAD_POOL_COUNT")) != NULL) {
- int value_check = atoi(envValue);
- if (value_check > 0) {
- thread_pool_count = (unsigned int)value_check;
+ /* Allow experimentation with the number of helper threads */
+ if ((envValue = getenv("IOC_THREAD_POOL_COUNT")) != NULL) {
+ int value_check = atoi(envValue);
+ if (value_check > 0) {
+ thread_pool_count = (unsigned int)value_check;
+ }
+ }
+
+ /* Initialize a thread pool for the IO Concentrator to use */
+ status = hg_thread_pool_init(thread_pool_count, &ioc_thread_pool);
+ if (status) {
+ puts("hg_thread_pool_init failed");
+ goto err_exit;
+ }
+
+ /* Arguments to hg_thread_create are:
+ * 1. A pointer to reference the created thread.
+ * 2. User function pointer for the new thread to execute.
+ * 3. Pointer to the input argument that gets passed along to the user
+ * function.
+ */
+ status = hg_thread_create(&ioc_thread, ioc_thread_main, context_id);
+ if (status) {
+ puts("hg_thread_create failed");
+ goto err_exit;
}
- }
-
- /* Initialize a thread pool for the IO Concentrator to use */
- status = hg_thread_pool_init(thread_pool_count, &ioc_thread_pool);
- if (status) {
- puts("hg_thread_pool_init failed");
- goto err_exit;
- }
-
- /* Arguments to hg_thread_create are:
- * 1. A pointer to reference the created thread.
- * 2. User function pointer for the new thread to execute.
- * 3. Pointer to the input argument that gets passed along to the user
- * function.
- */
- status = hg_thread_create(&ioc_thread, ioc_thread_main, context_id);
- if (status) {
- puts("hg_thread_create failed");
- goto err_exit;
- }
#ifndef NDEBUG
- t_end = MPI_Wtime();
- if (sf_verbose_flag) {
- if (sf_context->topology->subfile_rank == 0) {
- HDprintf("%s: time = %lf seconds\n", __func__, (t_end - t_start));
- HDfflush(stdout);
+ t_end = MPI_Wtime();
+ if (sf_verbose_flag) {
+ if (sf_context->topology->subfile_rank == 0) {
+ HDprintf("%s: time = %lf seconds\n", __func__, (t_end - t_start));
+ HDfflush(stdout);
+ }
}
- }
#endif
- return 0;
+ return 0;
err_exit:
- return -1;
+ return -1;
}
/*-------------------------------------------------------------------------
@@ -202,30 +207,38 @@ err_exit:
*
*-------------------------------------------------------------------------
*/
-void __attribute__((destructor)) finalize_ioc_threads(void) {
- if (ioc_thread_pool != NULL) {
- hg_thread_pool_destroy(ioc_thread_pool);
- ioc_thread_pool = NULL;
- }
+void __attribute__((destructor)) finalize_ioc_threads(void)
+{
+ if (ioc_thread_pool != NULL) {
+ hg_thread_pool_destroy(ioc_thread_pool);
+ ioc_thread_pool = NULL;
+ }
}
-static const char *translate_opcode(io_op_t op)
+static const char *
+translate_opcode(io_op_t op)
{
- switch(op) {
- case READ_OP: return "READ_OP";
- break;
- case WRITE_OP: return "WRITE_OP";
- break;
- case OPEN_OP: return "OPEN_OP";
- break;
- case CLOSE_OP: return "CLOSE_OP";
- break;
- case FINI_OP: return "FINI_OP";
- break;
- case LOGGING_OP: return "LOGGING_OP";
- break;
- }
- return "unknown";
+ switch (op) {
+ case READ_OP:
+ return "READ_OP";
+ break;
+ case WRITE_OP:
+ return "WRITE_OP";
+ break;
+ case OPEN_OP:
+ return "OPEN_OP";
+ break;
+ case CLOSE_OP:
+ return "CLOSE_OP";
+ break;
+ case FINI_OP:
+ return "FINI_OP";
+ break;
+ case LOGGING_OP:
+ return "LOGGING_OP";
+ break;
+ }
+ return "unknown";
}
/*-------------------------------------------------------------------------
* Function: local: handle_work_request
@@ -249,45 +262,44 @@ static const char *translate_opcode(io_op_t op)
*
*-------------------------------------------------------------------------
*/
-static HG_THREAD_RETURN_TYPE handle_work_request(void *arg)
+static HG_THREAD_RETURN_TYPE
+handle_work_request(void *arg)
{
- int status = 0;
- hg_thread_ret_t ret = 0;
- sf_work_request_t *msg = (sf_work_request_t *)arg;
- int64_t file_context_id = msg->header[2];
- subfiling_context_t *sf_context = NULL;
-
- sf_context = get__subfiling_object(file_context_id);
- assert(sf_context != NULL);
-
- atomic_fetch_add(&sf_work_pending, 1); // atomic
- switch (msg->tag) {
- case WRITE_INDEP:
- status = queue_write_indep(msg, msg->subfile_rank, msg->source,
- sf_context->sf_data_comm);
- break;
- case READ_INDEP:
- status = queue_read_indep(msg, msg->subfile_rank, msg->source,
- sf_context->sf_data_comm);
- break;
- default:
- HDprintf("[ioc(%d)] received message tag(%x)from rank %d\n",
- msg->subfile_rank, msg->tag, msg->source);
- status = -1;
- break;
- }
- fflush(stdout);
-
- atomic_fetch_sub(&sf_work_pending, 1); // atomic
- if (status < 0) {
- HDprintf("[ioc(%d) %s]: request(%s) filename=%s from "
- "rank(%d), size=%ld, offset=%ld FAILED\n",
- msg->subfile_rank, __func__, translate_opcode((io_op_t)msg->tag),
- sf_context->filename, msg->source, msg->header[0], msg->header[1]);
-
+ int status = 0;
+ hg_thread_ret_t ret = 0;
+ sf_work_request_t * msg = (sf_work_request_t *)arg;
+ int64_t file_context_id = msg->header[2];
+ subfiling_context_t *sf_context = NULL;
+
+ sf_context = get__subfiling_object(file_context_id);
+ assert(sf_context != NULL);
+
+ atomic_fetch_add(&sf_work_pending, 1); // atomic
+ switch (msg->tag) {
+ case WRITE_INDEP:
+ status = queue_write_indep(msg, msg->subfile_rank, msg->source, sf_context->sf_data_comm);
+ break;
+ case READ_INDEP:
+ status = queue_read_indep(msg, msg->subfile_rank, msg->source, sf_context->sf_data_comm);
+ break;
+ default:
+ HDprintf("[ioc(%d)] received message tag(%x)from rank %d\n", msg->subfile_rank, msg->tag,
+ msg->source);
+ status = -1;
+ break;
+ }
fflush(stdout);
- }
- return ret;
+
+ atomic_fetch_sub(&sf_work_pending, 1); // atomic
+ if (status < 0) {
+ HDprintf("[ioc(%d) %s]: request(%s) filename=%s from "
+ "rank(%d), size=%ld, offset=%ld FAILED\n",
+ msg->subfile_rank, __func__, translate_opcode((io_op_t)msg->tag), sf_context->sf_filename,
+ msg->source, msg->header[0], msg->header[1]);
+
+ fflush(stdout);
+ }
+ return ret;
}
/*-------------------------------------------------------------------------
@@ -306,18 +318,20 @@ static HG_THREAD_RETURN_TYPE handle_work_request(void *arg)
*
*-------------------------------------------------------------------------
*/
-int tpool_add_work(void *_work) {
- static int work_index = 0;
- sf_work_request_t *work = (sf_work_request_t *)_work;
-
- hg_thread_mutex_lock(&ioc_mutex);
- if (work_index == pool_concurrent_max)
- work_index = 0;
- pool_request[work_index].func = handle_work_request;
- pool_request[work_index].args = work;
- hg_thread_pool_post(ioc_thread_pool, &pool_request[work_index++]);
- hg_thread_mutex_unlock(&ioc_mutex);
- return 0;
+int
+tpool_add_work(void *_work)
+{
+ static int work_index = 0;
+ sf_work_request_t *work = (sf_work_request_t *)_work;
+
+ hg_thread_mutex_lock(&ioc_mutex);
+ if (work_index == pool_concurrent_max)
+ work_index = 0;
+ pool_request[work_index].func = handle_work_request;
+ pool_request[work_index].args = work;
+ hg_thread_pool_post(ioc_thread_pool, &pool_request[work_index++]);
+ hg_thread_mutex_unlock(&ioc_mutex);
+ return 0;
}
/*-------------------------------------------------------------------------
@@ -335,7 +349,11 @@ int tpool_add_work(void *_work) {
*
*-------------------------------------------------------------------------
*/
-bool tpool_is_empty(void) { return HG_QUEUE_IS_EMPTY(&ioc_thread_pool->queue); }
+bool
+tpool_is_empty(void)
+{
+ return HG_QUEUE_IS_EMPTY(&ioc_thread_pool->queue);
+}
/*-------------------------------------------------------------------------
* Function: begin_thread_exclusive
@@ -351,8 +369,10 @@ bool tpool_is_empty(void) { return HG_QUEUE_IS_EMPTY(&ioc_thread_pool->queue); }
*
*-------------------------------------------------------------------------
*/
-void begin_thread_exclusive(void) {
- hg_thread_mutex_lock(&ioc_thread_mutex);
+void
+begin_thread_exclusive(void)
+{
+ hg_thread_mutex_lock(&ioc_thread_mutex);
}
/*-------------------------------------------------------------------------
@@ -370,8 +390,10 @@ void begin_thread_exclusive(void) {
*
*-------------------------------------------------------------------------
*/
-void end_thread_exclusive(void) {
- hg_thread_mutex_unlock(&ioc_thread_mutex);
+void
+end_thread_exclusive(void)
+{
+ hg_thread_mutex_unlock(&ioc_thread_mutex);
}
/*-------------------------------------------------------------------------
@@ -389,9 +411,11 @@ void end_thread_exclusive(void) {
*
*-------------------------------------------------------------------------
*/
-int wait_for_thread_main(void) {
- if (hg_thread_join(ioc_thread) != 0) {
- return -1;
- }
- return 0;
+int
+wait_for_thread_main(void)
+{
+ if (hg_thread_join(ioc_thread) != 0) {
+ return -1;
+ }
+ return 0;
}