diff options
author | Michael Meeks <michael.meeks@collabora.com> | 2014-10-31 12:32:12 +0000 |
---|---|---|
committer | Michael Meeks <michael.meeks@collabora.com> | 2014-10-31 18:13:05 +0000 |
commit | 68f912e896b14546f17ef8bd090f08229842519f (patch) | |
tree | b900d366e72a32c4dad31014d5aa222137cee710 | |
parent | fc9e78f4498a3b44be72e13c0c64d235f62b8a99 (diff) |
thread-pool: fix waiting until all tasks are complete.
Change-Id: Iaf5a3bf28879f229a223a8760fd878f96958a53c
-rw-r--r-- | comphelper/source/misc/threadpool.cxx | 40 | ||||
-rw-r--r-- | include/comphelper/threadpool.hxx | 10 |
2 files changed, 41 insertions, 9 deletions
diff --git a/comphelper/source/misc/threadpool.cxx b/comphelper/source/misc/threadpool.cxx index 236a314019f8..3717ffbd006c 100644 --- a/comphelper/source/misc/threadpool.cxx +++ b/comphelper/source/misc/threadpool.cxx @@ -20,10 +20,15 @@ class ThreadPool::ThreadWorker : public salhelper::Thread { ThreadPool *mpPool; osl::Condition maNewWork; + bool mbWorking; public: + ThreadWorker( ThreadPool *pPool ) : salhelper::Thread("thread-pool"), - mpPool( pPool ) {} + mpPool( pPool ), + mbWorking( false ) + { + } virtual void execute() SAL_OVERRIDE { @@ -45,6 +50,9 @@ public: while( !pRet ) { + if (mbWorking) + mpPool->stopWork(); + mbWorking = false; maNewWork.reset(); if( mpPool->mbTerminate ) @@ -59,6 +67,13 @@ public: pRet = mpPool->popWork(); } + if (pRet) + { + if (!mbWorking) + mpPool->startWork(); + mbWorking = true; + } + return pRet; } @@ -78,12 +93,13 @@ public: }; ThreadPool::ThreadPool( sal_Int32 nWorkers ) : + mnThreadsWorking( 0 ), mbTerminate( false ) { for( sal_Int32 i = 0; i < nWorkers; i++ ) maWorkers.push_back( new ThreadWorker( this ) ); - maTasksEmpty.reset(); + maTasksComplete.reset(); osl::MutexGuard aGuard( maGuard ); for( size_t i = 0; i < maWorkers.size(); i++ ) @@ -136,10 +152,11 @@ void ThreadPool::pushTask( ThreadTask *pTask ) { osl::MutexGuard aGuard( maGuard ); maTasks.insert( maTasks.begin(), pTask ); + // horrible beyond belief: for( size_t i = 0; i < maWorkers.size(); i++ ) maWorkers[ i ]->signalNewWork(); - maTasksEmpty.reset(); + maTasksComplete.reset(); } ThreadTask *ThreadPool::popWork() @@ -151,8 +168,19 @@ ThreadTask *ThreadPool::popWork() return pTask; } else - maTasksEmpty.set(); - return NULL; + return NULL; +} + +void ThreadPool::startWork() +{ + mnThreadsWorking++; +} + +void ThreadPool::stopWork() +{ + assert( mnThreadsWorking > 0 ); + if ( --mnThreadsWorking == 0 ) + maTasksComplete.set(); } void ThreadPool::waitUntilEmpty() @@ -171,7 +199,7 @@ void ThreadPool::waitUntilEmpty() else { aGuard.clear(); - maTasksEmpty.wait(); + maTasksComplete.wait(); aGuard.reset(); } assert( maTasks.empty() ); diff --git a/include/comphelper/threadpool.hxx b/include/comphelper/threadpool.hxx index 2e5171902f55..231a735d9107 100644 --- a/include/comphelper/threadpool.hxx +++ b/include/comphelper/threadpool.hxx @@ -54,10 +54,14 @@ private: ThreadTask *waitForWork( osl::Condition &rNewWork ); ThreadTask *popWork(); + void startWork(); + void stopWork(); - osl::Mutex maGuard; - osl::Condition maTasksEmpty; - bool mbTerminate; + osl::Mutex maGuard; + sal_Int32 mnThreadsWorking; + /// signalled when all in-progress tasks are complete + osl::Condition maTasksComplete; + bool mbTerminate; std::vector< rtl::Reference< ThreadWorker > > maWorkers; std::vector< ThreadTask * > maTasks; }; |