diff options
author | Michael Meeks <michael.meeks@collabora.com> | 2016-12-01 11:14:24 +0000 |
---|---|---|
committer | Michael Meeks <michael.meeks@collabora.com> | 2016-12-01 18:44:08 +0000 |
commit | aa68c99d88fd7abe08c4aee5206c859a0cdba38e (patch) | |
tree | 6bc2054a2d16799e1effe6e817455f4b8821843c /comphelper/source | |
parent | 0afbe8d5ca17c4e24bb49272dc6711925f82f5d5 (diff) |
tdf#104126 - comphelper thread-pool, use reliable std::condition_variable.
The existing osl::Condition is an API and reliability disaster area.
Change-Id: I3be84e1c6a83e58c43c40c9c8720790d923a6694
Reviewed-on: https://gerrit.libreoffice.org/31163
Tested-by: Jenkins <ci@libreoffice.org>
Reviewed-by: Michael Meeks <michael.meeks@collabora.com>
Tested-by: Michael Meeks <michael.meeks@collabora.com>
Diffstat (limited to 'comphelper/source')
-rw-r--r-- | comphelper/source/misc/threadpool.cxx | 266 |
1 files changed, 119 insertions, 147 deletions
diff --git a/comphelper/source/misc/threadpool.cxx b/comphelper/source/misc/threadpool.cxx index 0fda2646b7f1..63291436e931 100644 --- a/comphelper/source/misc/threadpool.cxx +++ b/comphelper/source/misc/threadpool.cxx @@ -10,11 +10,14 @@ #include <comphelper/threadpool.hxx> #include <com/sun/star/uno/Exception.hpp> +#include <sal/config.h> #include <rtl/instance.hxx> #include <rtl/string.hxx> +#include <salhelper/thread.hxx> #include <algorithm> #include <memory> #include <thread> +#include <chrono> namespace comphelper { @@ -26,30 +29,27 @@ static thread_local bool gbIsWorkerThread; // used to group thread-tasks for waiting in waitTillDone() class COMPHELPER_DLLPUBLIC ThreadTaskTag { - osl::Mutex mMutex; - std::size_t mnTasksWorking; - osl::Condition maTasksComplete; + std::mutex maMutex; + sal_Int32 mnTasksWorking; + std::condition_variable maTasksComplete; public: ThreadTaskTag(); - bool isDone(); - void waitUntilDone(); - void onTaskWorkerDone(); - void onTaskPushed(); + bool isDone(); + void waitUntilDone(); + void onTaskWorkerDone(); + void onTaskPushed(); }; class ThreadPool::ThreadWorker : public salhelper::Thread { - ThreadPool *mpPool; - osl::Condition maNewWork; - bool mbWorking; + ThreadPool *mpPool; public: explicit ThreadWorker( ThreadPool *pPool ) : salhelper::Thread("thread-pool"), - mpPool( pPool ), - mbWorking( false ) + mpPool( pPool ) { } @@ -58,74 +58,20 @@ public: #if defined DBG_UTIL && defined LINUX gbIsWorkerThread = true; #endif - while ( ThreadTask * pTask = waitForWork() ) - { - std::shared_ptr<ThreadTaskTag> pTag(pTask->getTag()); - try { - pTask->doWork(); - } - catch (const std::exception &e) - { - SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.what()); - } - catch (const css::uno::Exception &e) - { - SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.Message); - } - delete pTask; - pTag->onTaskWorkerDone(); - } - } - - ThreadTask *waitForWork() - { - ThreadTask *pRet = nullptr; + std::unique_lock< std::mutex > aGuard( mpPool->maMutex ); - osl::ResettableMutexGuard aGuard( mpPool->maGuard ); - - pRet = mpPool->popWork(); - - while( !pRet ) + while( !mpPool->mbTerminate ) { - if (mbWorking) - mpPool->stopWork(); - mbWorking = false; - maNewWork.reset(); - - if( mpPool->mbTerminate ) - break; - - aGuard.clear(); // unlock - - maNewWork.wait(); - - aGuard.reset(); // lock + ThreadTask *pTask = mpPool->popWorkLocked( aGuard, true ); + if( pTask ) + { + aGuard.unlock(); - pRet = mpPool->popWork(); - } + pTask->execAndDelete(); - if (pRet) - { - if (!mbWorking) - mpPool->startWork(); - mbWorking = true; + aGuard.lock(); + } } - - return pRet; - } - - // Why a condition per worker thread - you may ask. - // - // Unfortunately the Windows synchronisation API that we wrap - // is horribly inadequate cf. - // http://www.cs.wustl.edu/~schmidt/win32-cv-1.html - // The existing osl::Condition API should only ever be used - // between one producer and one consumer thread to avoid the - // lost wakeup problem. - - void signalNewWork() - { - maNewWork.set(); } }; @@ -133,19 +79,18 @@ ThreadPool::ThreadPool( sal_Int32 nWorkers ) : mnThreadsWorking( 0 ), mbTerminate( false ) { + std::unique_lock< std::mutex > aGuard( maMutex ); + for( sal_Int32 i = 0; i < nWorkers; i++ ) maWorkers.push_back( new ThreadWorker( this ) ); - maTasksComplete.set(); - - osl::MutexGuard aGuard( maGuard ); for(rtl::Reference<ThreadWorker> & rpWorker : maWorkers) rpWorker->launch(); } ThreadPool::~ThreadPool() { - waitAndCleanupWorkers(); + shutdown(); } struct ThreadPoolStatic : public rtl::StaticWithInit< std::shared_ptr< ThreadPool >, @@ -183,100 +128,108 @@ sal_Int32 ThreadPool::getPreferredConcurrency() return ThreadCount; } -void ThreadPool::waitAndCleanupWorkers() +// FIXME: there should be no need for this as/when our baseline +// is >VS2015 and drop WinXP; the sorry details are here: +// https://connect.microsoft.com/VisualStudio/feedback/details/1282596 +void ThreadPool::shutdown() { - osl::ResettableMutexGuard aGuard( maGuard ); + if (mbTerminate) + return; + + std::unique_lock< std::mutex > aGuard( maMutex ); if( maWorkers.empty() ) { // no threads at all -> execute the work in-line - while ( ThreadTask * pTask = popWork() ) - { - std::shared_ptr<ThreadTaskTag> pTag(pTask->getTag()); - pTask->doWork(); - delete pTask; - pTag->onTaskWorkerDone(); - } + ThreadTask *pTask; + while ( ( pTask = popWorkLocked(aGuard, false) ) ) + pTask->execAndDelete(); } else { - aGuard.clear(); - maTasksComplete.wait(); - aGuard.reset(); + while( !maTasks.empty() ) + maTasksChanged.wait( aGuard ); } assert( maTasks.empty() ); mbTerminate = true; + maTasksChanged.notify_all(); + while( !maWorkers.empty() ) { rtl::Reference< ThreadWorker > xWorker = maWorkers.back(); maWorkers.pop_back(); assert(std::find(maWorkers.begin(), maWorkers.end(), xWorker) == maWorkers.end()); - xWorker->signalNewWork(); - aGuard.clear(); - { // unlocked + aGuard.unlock(); + { xWorker->join(); xWorker.clear(); } - aGuard.reset(); + aGuard.lock(); } } void ThreadPool::pushTask( ThreadTask *pTask ) { - osl::MutexGuard aGuard( maGuard ); + std::unique_lock< std::mutex > aGuard( maMutex ); + pTask->mpTag->onTaskPushed(); maTasks.insert( maTasks.begin(), pTask ); - // horrible beyond belief: - for(rtl::Reference<ThreadWorker> & rpWorker : maWorkers) - rpWorker->signalNewWork(); - maTasksComplete.reset(); + maTasksChanged.notify_one(); } -ThreadTask *ThreadPool::popWork() +ThreadTask *ThreadPool::popWorkLocked( std::unique_lock< std::mutex > & rGuard, bool bWait ) { - if( !maTasks.empty() ) + do { - ThreadTask *pTask = maTasks.back(); - maTasks.pop_back(); - return pTask; - } - else - return nullptr; + if( !maTasks.empty() ) + { + ThreadTask *pTask = maTasks.back(); + maTasks.pop_back(); + return pTask; + } + else if (!bWait || mbTerminate) + return nullptr; + + maTasksChanged.wait( rGuard ); + + } while (!mbTerminate); + + return nullptr; } -void ThreadPool::startWork() +void ThreadPool::startWorkLocked() { mnThreadsWorking++; } -void ThreadPool::stopWork() +void ThreadPool::stopWorkLocked() { assert( mnThreadsWorking > 0 ); if ( --mnThreadsWorking == 0 ) - maTasksComplete.set(); + maTasksChanged.notify_all(); } + void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag) { #if defined DBG_UTIL && defined LINUX assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task"); #endif - osl::ResettableMutexGuard aGuard( maGuard ); - - if( maWorkers.empty() ) - { // no threads at all -> execute the work in-line - while ( ThreadTask * pTask = popWork() ) - { - std::shared_ptr<ThreadTaskTag> pTag(pTask->getTag()); - pTask->doWork(); - delete pTask; - pTag->onTaskWorkerDone(); + { + std::unique_lock< std::mutex > aGuard( maMutex ); + + if( maWorkers.empty() ) + { // no threads at all -> execute the work in-line + ThreadTask *pTask; + while (!rTag->isDone() && + ( pTask = popWorkLocked(aGuard, false) ) ) + pTask->execAndDelete(); } } - aGuard.clear(); + rTag->waitUntilDone(); } @@ -290,54 +243,73 @@ bool ThreadPool::isTaskTagDone(const std::shared_ptr<ThreadTaskTag>& pTag) return pTag->isDone(); } - ThreadTask::ThreadTask(const std::shared_ptr<ThreadTaskTag>& pTag) : mpTag(pTag) { } +void ThreadTask::execAndDelete() +{ + std::shared_ptr<ThreadTaskTag> pTag(mpTag); + try { + doWork(); + } + catch (const std::exception &e) + { + SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.what()); + } + catch (const css::uno::Exception &e) + { + SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.Message); + } + + delete this; + pTag->onTaskWorkerDone(); +} + ThreadTaskTag::ThreadTaskTag() : mnTasksWorking(0) { - maTasksComplete.set(); } void ThreadTaskTag::onTaskPushed() { - osl::MutexGuard g(mMutex); - assert( mnTasksWorking < 65535 ); // sanity checking - ++mnTasksWorking; - maTasksComplete.reset(); + std::unique_lock< std::mutex > aGuard( maMutex ); + mnTasksWorking++; + assert( mnTasksWorking < 65536 ); // sanity checking } void ThreadTaskTag::onTaskWorkerDone() { - osl::MutexGuard g(mMutex); - assert(mnTasksWorking > 0); - --mnTasksWorking; + std::unique_lock< std::mutex > aGuard( maMutex ); + mnTasksWorking--; + assert(mnTasksWorking >= 0); if (mnTasksWorking == 0) - maTasksComplete.set(); + maTasksComplete.notify_all(); } -void ThreadTaskTag::waitUntilDone() +bool ThreadTaskTag::isDone() { -#if defined DBG_UTIL && defined LINUX - assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task"); -#endif + std::unique_lock< std::mutex > aGuard( maMutex ); + return mnTasksWorking == 0; +} +void ThreadTaskTag::waitUntilDone() +{ + std::unique_lock< std::mutex > aGuard( maMutex ); + while( mnTasksWorking > 0 ) + { #ifdef DBG_UTIL - // 3 minute timeout in debug mode so our tests fail sooner rather than later - osl::Condition::Result rv = maTasksComplete.wait(TimeValue { 3*60, 0 }); - assert(rv != osl::Condition::result_timeout); + // 3 minute timeout in debug mode so our tests fail sooner rather than later + std::cv_status result = maTasksComplete.wait_for( + aGuard, std::chrono::seconds( 3 * 60 )); + assert(result != std::cv_status::timeout); #else - // 10 minute timeout in production so the app eventually throws some kind of error - if (maTasksComplete.wait(TimeValue { 10*60, 0 }) == osl::Condition::Result::result_timeout) - throw std::runtime_error("timeout waiting for threadpool tasks"); + // 10 minute timeout in production so the app eventually throws some kind of error + if (maTasksComplete.wait_for( + aGuard, std::chrono::seconds( 10 * 60 )) == std::cv_status::timeout) + throw std::runtime_error("timeout waiting for threadpool tasks"); #endif -} - -bool ThreadTaskTag::isDone() -{ - return mnTasksWorking == 0; + } } } // namespace comphelper |