summaryrefslogtreecommitdiffstats
path: root/src/H5FDsubfile_threads.c
blob: fb99930613908666c84228508c6be84cf631077c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161

#include "H5FDsubfile_private.h"

#include "mercury/mercury_util_config.h"
#include "mercury/mercury_log.h"
#include "mercury/mercury_log.c"
#include "mercury/mercury_util_error.c"
#include "mercury/mercury_thread.c"
#include "mercury/mercury_thread_mutex.c"
#include "mercury/mercury_thread_condition.h"
#include "mercury/mercury_thread_condition.c"
#include "mercury/mercury_thread_pool.c"
#include "mercury/mercury_thread_spin.c"

static hg_thread_mutex_t ioc_mutex = PTHREAD_MUTEX_INITIALIZER;
static hg_thread_mutex_t ioc_thread_mutex = PTHREAD_MUTEX_INITIALIZER;
static hg_thread_pool_t *ioc_thread_pool = NULL;
static hg_thread_t ioc_thread;

#ifndef HG_TEST_NUM_THREADS_DEFAULT
#define HG_TEST_NUM_THREADS_DEFAULT 4
#endif
#define POOL_CONCURRENT_MAX 64

static struct hg_thread_work pool_request[POOL_CONCURRENT_MAX];

static HG_THREAD_RETURN_TYPE
ioc_thread_main(void *arg)
{
    hg_thread_ret_t thread_ret = (hg_thread_ret_t) 0;

	/* Pass along the subfiling_context_t */
	ioc_main(arg);

    // hg_thread_exit(thread_ret);
    return thread_ret;
}

int
initialize_ioc_threads(subfiling_context_t *sf_context)
{
	int status;
	status = hg_thread_mutex_init(&ioc_mutex);
	if (status) {
		puts("hg_thread_mutex_init failed");
		goto err_exit;
	}
	status = hg_thread_mutex_init(&ioc_thread_mutex);
	if (status) {
		puts("hg_thread_mutex_init failed");
		goto err_exit;
	}

	status = hg_thread_pool_init(HG_TEST_NUM_THREADS_DEFAULT, &ioc_thread_pool);
	if (status) {
		puts("hg_thread_pool_init failed");
		goto err_exit;
	}
	status = hg_thread_create(&ioc_thread, ioc_thread_main, sf_context);
	if (status) {
		puts("hg_thread_create failed");
		goto err_exit;
	}
	return 0;

err_exit:
	return -1;
}


void __attribute__((destructor)) finalize_ioc_threads(void)
{
	if (ioc_thread_pool != NULL) {
		hg_thread_pool_destroy(ioc_thread_pool);
		ioc_thread_pool = NULL;
	}
}


static HG_THREAD_RETURN_TYPE
handle_work_request(void *arg)
{
    hg_thread_ret_t ret = 0;
    sf_work_request_t *msg = (sf_work_request_t *)arg;
    int status = 0;

	atomic_fetch_add(&sf_work_pending, 1); // atomic
	switch(msg->tag) {
	case WRITE_COLL:
            status = queue_write_coll( msg, msg->subfile_rank, msg->source, sf_data_comm);
            break;
	case READ_COLL:
            status = queue_read_coll( msg, msg->subfile_rank, msg->source, sf_data_comm);
            break;
	case WRITE_INDEP:
            status = queue_write_indep( msg, msg->subfile_rank, msg->source, sf_data_comm);
            break;
	case READ_INDEP:
            status = queue_read_indep( msg, msg->subfile_rank, msg->source, sf_data_comm);
            break;
	case CLOSE_OP:
            status = decrement_file_ref_counts( msg->subfile_rank, msg->source, sf_data_comm,
												subfiling_close_file);
            break;
	case OPEN_OP:
            status = queue_file_open( msg, msg->subfile_rank, msg->source, sf_data_comm);
            break;

	default:
            printf("[ioc(%d)] received message tag(%x)from rank %d\n", msg->subfile_rank, msg->tag, msg->source);
            status = -1;
            break;
    }
	
	atomic_fetch_sub(&sf_work_pending, 1); // atomic
    if (status < 0) {
        printf("[ioc(%d) %s]: Error encounted processing request(%x) from rank(%d\n",
               msg->subfile_rank, __func__, msg->tag, msg->source);
		fflush(stdout);
    }
	return ret;
}

int tpool_add_work(sf_work_request_t *work)
{
	static int work_index = 0;
	hg_thread_mutex_lock(&ioc_mutex);
	if (work_index == POOL_CONCURRENT_MAX)
		work_index = 0;
	pool_request[work_index].func = handle_work_request;
	pool_request[work_index].args = work;
	hg_thread_pool_post(ioc_thread_pool, &pool_request[work_index++]);
	hg_thread_mutex_unlock(&ioc_mutex);
	return 0;
}

bool tpool_is_empty(void)
{
	return HG_QUEUE_IS_EMPTY(&ioc_thread_pool->queue);
}

int begin_thread_exclusive(void)
{
    return hg_thread_mutex_lock(&ioc_thread_mutex);
}

int end_thread_exclusive(void)
{
    return hg_thread_mutex_unlock(&ioc_thread_mutex);
}

int wait_for_thread_main(void)
{
	if (hg_thread_join(ioc_thread) == 0)
		puts("thread_join succeeded");
	else {
		puts("thread_join failed");
		return -1;
	}
	return 0;
}