From 055c9c41b3f659cbbf758d15e8350c5dd2b5faa1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Morten=20S=C3=B8rvig?= Date: Wed, 24 Jun 2009 15:20:02 +0200 Subject: Improve QtConcurrent scalability. Reduce lock contention QtConcurrent by swiching the ThreadEngineSemaphore class over to a QAtomic-based implementation. --- src/corelib/concurrent/qtconcurrentthreadengine.h | 86 ++++++++++++++++------- 1 file changed, 62 insertions(+), 24 deletions(-) diff --git a/src/corelib/concurrent/qtconcurrentthreadengine.h b/src/corelib/concurrent/qtconcurrentthreadengine.h index 6f1c7e7..21a3a28 100644 --- a/src/corelib/concurrent/qtconcurrentthreadengine.h +++ b/src/corelib/concurrent/qtconcurrentthreadengine.h @@ -51,6 +51,8 @@ #include #include #include +#include +#include QT_BEGIN_HEADER QT_BEGIN_NAMESPACE @@ -61,55 +63,91 @@ QT_MODULE(Core) namespace QtConcurrent { -// A Semaphore that can wait until all resources are returned. +// The ThreadEngineSemaphore counts worker threads, and allows one +// thread to wait for all others to finish. Tested for its use in +// QtConcurrent, requires more testing for use as a general class. class ThreadEngineSemaphore { +private: + // The thread count is maintained as an integer in the count atomic + // variable. The count can be either positive or negative - a negative + // count signals that a thread is waiting on the semaphore. + QAtomicInt count; + QSemaphore semaphore; public: ThreadEngineSemaphore() :count(0) { } void acquire() { - QMutexLocker lock(&mutex); - ++count; + forever { + int localCount = int(count); + if (localCount < 0) { + if (count.testAndSetOrdered(localCount, localCount -1)) + return; + } else { + if (count.testAndSetOrdered(localCount, localCount + 1)) + return; + } + } } int release() { - QMutexLocker lock(&mutex); - if (--count == 0) - waitCondition.wakeAll(); - return count; + forever { + int localCount = int(count); + if (localCount == -1) { + if (count.testAndSetOrdered(-1, 0)) { + semaphore.release(); + return 0; + } + } else if (localCount < 0) { + if (count.testAndSetOrdered(localCount, localCount + 1)) + return qAbs(localCount + 1); + } else { + if (count.testAndSetOrdered(localCount, localCount - 1)) + return localCount - 1; + } + } } - // Wait until all resources are released. + // Wait until all threads have been released void wait() { - QMutexLocker lock(&mutex); - if (count != 0) - waitCondition.wait(&mutex); + forever { + int localCount = int(count); + if (localCount == 0) + return; + + if (count.testAndSetOrdered(localCount, -localCount)) { + semaphore.acquire(); + return; + } + } } int currentCount() { - return count; + return int(count); } - // releases a resource, unless this is the last resource. - // returns true if a resource was released. + // releases a thread, unless this is the last thread. + // returns true if the thread was released. bool releaseUnlessLast() { - QMutexLocker lock(&mutex); - if (count == 1) - return false; - --count; - return true; + forever { + int localCount = int(count); + if (qAbs(localCount) == 1) { + return false; + } else if (localCount < 0) { + if (count.testAndSetOrdered(localCount, localCount + 1)) + return true; + } else { + if (count.testAndSetOrdered(localCount, localCount - 1)) + return true; + } + } } - -private: - QMutex mutex; - int count; - QWaitCondition waitCondition; }; enum ThreadFunctionResult { ThrottleThread, ThreadFinished }; -- cgit v0.12