summaryrefslogtreecommitdiffstats
path: root/Utilities/cmlibuv/src/threadpool.c
diff options
context:
space:
mode:
Diffstat (limited to 'Utilities/cmlibuv/src/threadpool.c')
-rw-r--r--Utilities/cmlibuv/src/threadpool.c96
1 files changed, 81 insertions, 15 deletions
diff --git a/Utilities/cmlibuv/src/threadpool.c b/Utilities/cmlibuv/src/threadpool.c
index 413d1c2..4258933 100644
--- a/Utilities/cmlibuv/src/threadpool.c
+++ b/Utilities/cmlibuv/src/threadpool.c
@@ -33,12 +33,18 @@ static uv_once_t once = UV_ONCE_INIT;
static uv_cond_t cond;
static uv_mutex_t mutex;
static unsigned int idle_threads;
+static unsigned int slow_io_work_running;
static unsigned int nthreads;
static uv_thread_t* threads;
static uv_thread_t default_threads[4];
static QUEUE exit_message;
static QUEUE wq;
+static QUEUE run_slow_work_message;
+static QUEUE slow_io_pending_wq;
+static unsigned int slow_work_thread_threshold(void) {
+ return (nthreads + 1) / 2;
+}
static void uv__cancelled(struct uv__work* w) {
abort();
@@ -51,34 +57,67 @@ static void uv__cancelled(struct uv__work* w) {
static void worker(void* arg) {
struct uv__work* w;
QUEUE* q;
+ int is_slow_work;
uv_sem_post((uv_sem_t*) arg);
arg = NULL;
+ uv_mutex_lock(&mutex);
for (;;) {
- uv_mutex_lock(&mutex);
-
- while (QUEUE_EMPTY(&wq)) {
+ /* `mutex` should always be locked at this point. */
+
+ /* Keep waiting while either no work is present or only slow I/O
+ and we're at the threshold for that. */
+ while (QUEUE_EMPTY(&wq) ||
+ (QUEUE_HEAD(&wq) == &run_slow_work_message &&
+ QUEUE_NEXT(&run_slow_work_message) == &wq &&
+ slow_io_work_running >= slow_work_thread_threshold())) {
idle_threads += 1;
uv_cond_wait(&cond, &mutex);
idle_threads -= 1;
}
q = QUEUE_HEAD(&wq);
-
- if (q == &exit_message)
+ if (q == &exit_message) {
uv_cond_signal(&cond);
- else {
+ uv_mutex_unlock(&mutex);
+ break;
+ }
+
+ QUEUE_REMOVE(q);
+ QUEUE_INIT(q); /* Signal uv_cancel() that the work req is executing. */
+
+ is_slow_work = 0;
+ if (q == &run_slow_work_message) {
+ /* If we're at the slow I/O threshold, re-schedule until after all
+ other work in the queue is done. */
+ if (slow_io_work_running >= slow_work_thread_threshold()) {
+ QUEUE_INSERT_TAIL(&wq, q);
+ continue;
+ }
+
+ /* If we encountered a request to run slow I/O work but there is none
+ to run, that means it's cancelled => Start over. */
+ if (QUEUE_EMPTY(&slow_io_pending_wq))
+ continue;
+
+ is_slow_work = 1;
+ slow_io_work_running++;
+
+ q = QUEUE_HEAD(&slow_io_pending_wq);
QUEUE_REMOVE(q);
- QUEUE_INIT(q); /* Signal uv_cancel() that the work req is
- executing. */
+ QUEUE_INIT(q);
+
+ /* If there is more slow I/O work, schedule it to be run as well. */
+ if (!QUEUE_EMPTY(&slow_io_pending_wq)) {
+ QUEUE_INSERT_TAIL(&wq, &run_slow_work_message);
+ if (idle_threads > 0)
+ uv_cond_signal(&cond);
+ }
}
uv_mutex_unlock(&mutex);
- if (q == &exit_message)
- break;
-
w = QUEUE_DATA(q, struct uv__work, wq);
w->work(w);
@@ -88,12 +127,32 @@ static void worker(void* arg) {
QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
uv_async_send(&w->loop->wq_async);
uv_mutex_unlock(&w->loop->wq_mutex);
+
+ /* Lock `mutex` since that is expected at the start of the next
+ * iteration. */
+ uv_mutex_lock(&mutex);
+ if (is_slow_work) {
+ /* `slow_io_work_running` is protected by `mutex`. */
+ slow_io_work_running--;
+ }
}
}
-static void post(QUEUE* q) {
+static void post(QUEUE* q, enum uv__work_kind kind) {
uv_mutex_lock(&mutex);
+ if (kind == UV__WORK_SLOW_IO) {
+ /* Insert into a separate queue. */
+ QUEUE_INSERT_TAIL(&slow_io_pending_wq, q);
+ if (!QUEUE_EMPTY(&run_slow_work_message)) {
+ /* Running slow I/O tasks is already scheduled => Nothing to do here.
+ The worker that runs said other task will schedule this one as well. */
+ uv_mutex_unlock(&mutex);
+ return;
+ }
+ q = &run_slow_work_message;
+ }
+
QUEUE_INSERT_TAIL(&wq, q);
if (idle_threads > 0)
uv_cond_signal(&cond);
@@ -108,7 +167,7 @@ UV_DESTRUCTOR(static void cleanup(void)) {
if (nthreads == 0)
return;
- post(&exit_message);
+ post(&exit_message, UV__WORK_CPU);
for (i = 0; i < nthreads; i++)
if (uv_thread_join(threads + i))
@@ -156,6 +215,8 @@ static void init_threads(void) {
abort();
QUEUE_INIT(&wq);
+ QUEUE_INIT(&slow_io_pending_wq);
+ QUEUE_INIT(&run_slow_work_message);
if (uv_sem_init(&sem, 0))
abort();
@@ -194,13 +255,14 @@ static void init_once(void) {
void uv__work_submit(uv_loop_t* loop,
struct uv__work* w,
+ enum uv__work_kind kind,
void (*work)(struct uv__work* w),
void (*done)(struct uv__work* w, int status)) {
uv_once(&once, init_once);
w->loop = loop;
w->work = work;
w->done = done;
- post(&w->wq);
+ post(&w->wq, kind);
}
@@ -284,7 +346,11 @@ int uv_queue_work(uv_loop_t* loop,
req->loop = loop;
req->work_cb = work_cb;
req->after_work_cb = after_work_cb;
- uv__work_submit(loop, &req->work_req, uv__queue_work, uv__queue_done);
+ uv__work_submit(loop,
+ &req->work_req,
+ UV__WORK_CPU,
+ uv__queue_work,
+ uv__queue_done);
return 0;
}