summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Meeks <michael.meeks@collabora.com>2014-10-31 12:32:12 +0000
committerMichael Meeks <michael.meeks@collabora.com>2014-10-31 18:13:05 +0000
commit68f912e896b14546f17ef8bd090f08229842519f (patch)
treeb900d366e72a32c4dad31014d5aa222137cee710
parentfc9e78f4498a3b44be72e13c0c64d235f62b8a99 (diff)
thread-pool: fix waiting until all tasks are complete.
Change-Id: Iaf5a3bf28879f229a223a8760fd878f96958a53c
-rw-r--r--comphelper/source/misc/threadpool.cxx40
-rw-r--r--include/comphelper/threadpool.hxx10
2 files changed, 41 insertions, 9 deletions
diff --git a/comphelper/source/misc/threadpool.cxx b/comphelper/source/misc/threadpool.cxx
index 236a314019f8..3717ffbd006c 100644
--- a/comphelper/source/misc/threadpool.cxx
+++ b/comphelper/source/misc/threadpool.cxx
@@ -20,10 +20,15 @@ class ThreadPool::ThreadWorker : public salhelper::Thread
{
ThreadPool *mpPool;
osl::Condition maNewWork;
+ bool mbWorking;
public:
+
ThreadWorker( ThreadPool *pPool ) :
salhelper::Thread("thread-pool"),
- mpPool( pPool ) {}
+ mpPool( pPool ),
+ mbWorking( false )
+ {
+ }
virtual void execute() SAL_OVERRIDE
{
@@ -45,6 +50,9 @@ public:
while( !pRet )
{
+ if (mbWorking)
+ mpPool->stopWork();
+ mbWorking = false;
maNewWork.reset();
if( mpPool->mbTerminate )
@@ -59,6 +67,13 @@ public:
pRet = mpPool->popWork();
}
+ if (pRet)
+ {
+ if (!mbWorking)
+ mpPool->startWork();
+ mbWorking = true;
+ }
+
return pRet;
}
@@ -78,12 +93,13 @@ public:
};
ThreadPool::ThreadPool( sal_Int32 nWorkers ) :
+ mnThreadsWorking( 0 ),
mbTerminate( false )
{
for( sal_Int32 i = 0; i < nWorkers; i++ )
maWorkers.push_back( new ThreadWorker( this ) );
- maTasksEmpty.reset();
+ maTasksComplete.reset();
osl::MutexGuard aGuard( maGuard );
for( size_t i = 0; i < maWorkers.size(); i++ )
@@ -136,10 +152,11 @@ void ThreadPool::pushTask( ThreadTask *pTask )
{
osl::MutexGuard aGuard( maGuard );
maTasks.insert( maTasks.begin(), pTask );
+
// horrible beyond belief:
for( size_t i = 0; i < maWorkers.size(); i++ )
maWorkers[ i ]->signalNewWork();
- maTasksEmpty.reset();
+ maTasksComplete.reset();
}
ThreadTask *ThreadPool::popWork()
@@ -151,8 +168,19 @@ ThreadTask *ThreadPool::popWork()
return pTask;
}
else
- maTasksEmpty.set();
- return NULL;
+ return NULL;
+}
+
+void ThreadPool::startWork()
+{
+ mnThreadsWorking++;
+}
+
+void ThreadPool::stopWork()
+{
+ assert( mnThreadsWorking > 0 );
+ if ( --mnThreadsWorking == 0 )
+ maTasksComplete.set();
}
void ThreadPool::waitUntilEmpty()
@@ -171,7 +199,7 @@ void ThreadPool::waitUntilEmpty()
else
{
aGuard.clear();
- maTasksEmpty.wait();
+ maTasksComplete.wait();
aGuard.reset();
}
assert( maTasks.empty() );
diff --git a/include/comphelper/threadpool.hxx b/include/comphelper/threadpool.hxx
index 2e5171902f55..231a735d9107 100644
--- a/include/comphelper/threadpool.hxx
+++ b/include/comphelper/threadpool.hxx
@@ -54,10 +54,14 @@ private:
ThreadTask *waitForWork( osl::Condition &rNewWork );
ThreadTask *popWork();
+ void startWork();
+ void stopWork();
- osl::Mutex maGuard;
- osl::Condition maTasksEmpty;
- bool mbTerminate;
+ osl::Mutex maGuard;
+ sal_Int32 mnThreadsWorking;
+ /// signalled when all in-progress tasks are complete
+ osl::Condition maTasksComplete;
+ bool mbTerminate;
std::vector< rtl::Reference< ThreadWorker > > maWorkers;
std::vector< ThreadTask * > maTasks;
};