summaryrefslogtreecommitdiffstats
path: root/src/mercury/src/util/mercury_thread_pool.h
blob: db973d13937897aadcf2c3cdeb2d8ab8d7f94b01 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
/*
 * Copyright (C) 2013-2020 Argonne National Laboratory, Department of Energy,
 *                    UChicago Argonne, LLC and The HDF Group.
 * All rights reserved.
 *
 * The full copyright notice, including terms governing use, modification,
 * and redistribution, is contained in the COPYING file that can be
 * found at the root of the source code distribution tree.
 */

#ifndef MERCURY_THREAD_POOL_H
#define MERCURY_THREAD_POOL_H

#include "mercury_queue.h"
#include "mercury_thread.h"
#include "mercury_thread_condition.h"

/*************************************/
/* Public Type and Struct Definition */
/*************************************/

typedef struct hg_thread_pool hg_thread_pool_t;

struct hg_thread_pool {
    unsigned int sleeping_worker_count;
    HG_QUEUE_HEAD(hg_thread_work) queue;
    int               shutdown;
    hg_thread_mutex_t mutex;
    hg_thread_cond_t  cond;
};

struct hg_thread_work {
    hg_thread_func_t func;
    void *           args;
    HG_QUEUE_ENTRY(hg_thread_work) entry; /* Internal */
};

/*****************/
/* Public Macros */
/*****************/

/*********************/
/* Public Prototypes */
/*********************/

#ifdef __cplusplus
extern "C" {
#endif

/**
 * Initialize the thread pool.
 *
 * \param thread_count [IN]     number of threads that will be created at
 *                              initialization
 * \param pool [OUT]            pointer to pool object
 *
 * \return Non-negative on success or negative on failure
 */
HG_UTIL_PUBLIC int hg_thread_pool_init(unsigned int thread_count, hg_thread_pool_t **pool);

/**
 * Destroy the thread pool.
 *
 * \param pool [IN/OUT]         pointer to pool object
 *
 * \return Non-negative on success or negative on failure
 */
HG_UTIL_PUBLIC int hg_thread_pool_destroy(hg_thread_pool_t *pool);

/**
 * Post work to the pool. Note that the operation may be queued depending on
 * the number of threads and number of tasks already running.
 *
 * \param pool [IN/OUT]         pointer to pool object
 * \param work [IN]             pointer to work struct
 *
 * \return Non-negative on success or negative on failure
 */
static HG_UTIL_INLINE int hg_thread_pool_post(hg_thread_pool_t *pool, struct hg_thread_work *work);

/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_thread_pool_post(hg_thread_pool_t *pool, struct hg_thread_work *work)
{
    int ret = HG_UTIL_SUCCESS;

    if (!pool || !work)
        return HG_UTIL_FAIL;

    if (!work->func)
        return HG_UTIL_FAIL;

    hg_thread_mutex_lock(&pool->mutex);

    /* Are we shutting down ? */
    if (pool->shutdown) {
        ret = HG_UTIL_FAIL;
        goto unlock;
    }

    /* Add task to task queue */
    HG_QUEUE_PUSH_TAIL(&pool->queue, work, entry);

    /* Wake up sleeping worker */
    if (pool->sleeping_worker_count && (hg_thread_cond_signal(&pool->cond) != HG_UTIL_SUCCESS))
        ret = HG_UTIL_FAIL;

unlock:
    hg_thread_mutex_unlock(&pool->mutex);

    return ret;
}

#ifdef __cplusplus
}
#endif

#endif /* MERCURY_THREAD_POOL_H */