summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--comphelper/source/misc/threadpool.cxx266
-rw-r--r--desktop/source/app/app.cxx4
-rw-r--r--include/comphelper/threadpool.hxx47
-rw-r--r--package/source/zipapi/ZipOutputStream.cxx1
4 files changed, 152 insertions, 166 deletions
diff --git a/comphelper/source/misc/threadpool.cxx b/comphelper/source/misc/threadpool.cxx
index 0fda2646b7f1..63291436e931 100644
--- a/comphelper/source/misc/threadpool.cxx
+++ b/comphelper/source/misc/threadpool.cxx
@@ -10,11 +10,14 @@
#include <comphelper/threadpool.hxx>
#include <com/sun/star/uno/Exception.hpp>
+#include <sal/config.h>
#include <rtl/instance.hxx>
#include <rtl/string.hxx>
+#include <salhelper/thread.hxx>
#include <algorithm>
#include <memory>
#include <thread>
+#include <chrono>
namespace comphelper {
@@ -26,30 +29,27 @@ static thread_local bool gbIsWorkerThread;
// used to group thread-tasks for waiting in waitTillDone()
class COMPHELPER_DLLPUBLIC ThreadTaskTag
{
- osl::Mutex mMutex;
- std::size_t mnTasksWorking;
- osl::Condition maTasksComplete;
+ std::mutex maMutex;
+ sal_Int32 mnTasksWorking;
+ std::condition_variable maTasksComplete;
public:
ThreadTaskTag();
- bool isDone();
- void waitUntilDone();
- void onTaskWorkerDone();
- void onTaskPushed();
+ bool isDone();
+ void waitUntilDone();
+ void onTaskWorkerDone();
+ void onTaskPushed();
};
class ThreadPool::ThreadWorker : public salhelper::Thread
{
- ThreadPool *mpPool;
- osl::Condition maNewWork;
- bool mbWorking;
+ ThreadPool *mpPool;
public:
explicit ThreadWorker( ThreadPool *pPool ) :
salhelper::Thread("thread-pool"),
- mpPool( pPool ),
- mbWorking( false )
+ mpPool( pPool )
{
}
@@ -58,74 +58,20 @@ public:
#if defined DBG_UTIL && defined LINUX
gbIsWorkerThread = true;
#endif
- while ( ThreadTask * pTask = waitForWork() )
- {
- std::shared_ptr<ThreadTaskTag> pTag(pTask->getTag());
- try {
- pTask->doWork();
- }
- catch (const std::exception &e)
- {
- SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.what());
- }
- catch (const css::uno::Exception &e)
- {
- SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.Message);
- }
- delete pTask;
- pTag->onTaskWorkerDone();
- }
- }
-
- ThreadTask *waitForWork()
- {
- ThreadTask *pRet = nullptr;
+ std::unique_lock< std::mutex > aGuard( mpPool->maMutex );
- osl::ResettableMutexGuard aGuard( mpPool->maGuard );
-
- pRet = mpPool->popWork();
-
- while( !pRet )
+ while( !mpPool->mbTerminate )
{
- if (mbWorking)
- mpPool->stopWork();
- mbWorking = false;
- maNewWork.reset();
-
- if( mpPool->mbTerminate )
- break;
-
- aGuard.clear(); // unlock
-
- maNewWork.wait();
-
- aGuard.reset(); // lock
+ ThreadTask *pTask = mpPool->popWorkLocked( aGuard, true );
+ if( pTask )
+ {
+ aGuard.unlock();
- pRet = mpPool->popWork();
- }
+ pTask->execAndDelete();
- if (pRet)
- {
- if (!mbWorking)
- mpPool->startWork();
- mbWorking = true;
+ aGuard.lock();
+ }
}
-
- return pRet;
- }
-
- // Why a condition per worker thread - you may ask.
- //
- // Unfortunately the Windows synchronisation API that we wrap
- // is horribly inadequate cf.
- // http://www.cs.wustl.edu/~schmidt/win32-cv-1.html
- // The existing osl::Condition API should only ever be used
- // between one producer and one consumer thread to avoid the
- // lost wakeup problem.
-
- void signalNewWork()
- {
- maNewWork.set();
}
};
@@ -133,19 +79,18 @@ ThreadPool::ThreadPool( sal_Int32 nWorkers ) :
mnThreadsWorking( 0 ),
mbTerminate( false )
{
+ std::unique_lock< std::mutex > aGuard( maMutex );
+
for( sal_Int32 i = 0; i < nWorkers; i++ )
maWorkers.push_back( new ThreadWorker( this ) );
- maTasksComplete.set();
-
- osl::MutexGuard aGuard( maGuard );
for(rtl::Reference<ThreadWorker> & rpWorker : maWorkers)
rpWorker->launch();
}
ThreadPool::~ThreadPool()
{
- waitAndCleanupWorkers();
+ shutdown();
}
struct ThreadPoolStatic : public rtl::StaticWithInit< std::shared_ptr< ThreadPool >,
@@ -183,100 +128,108 @@ sal_Int32 ThreadPool::getPreferredConcurrency()
return ThreadCount;
}
-void ThreadPool::waitAndCleanupWorkers()
+// FIXME: there should be no need for this as/when our baseline
+// is >VS2015 and drop WinXP; the sorry details are here:
+// https://connect.microsoft.com/VisualStudio/feedback/details/1282596
+void ThreadPool::shutdown()
{
- osl::ResettableMutexGuard aGuard( maGuard );
+ if (mbTerminate)
+ return;
+
+ std::unique_lock< std::mutex > aGuard( maMutex );
if( maWorkers.empty() )
{ // no threads at all -> execute the work in-line
- while ( ThreadTask * pTask = popWork() )
- {
- std::shared_ptr<ThreadTaskTag> pTag(pTask->getTag());
- pTask->doWork();
- delete pTask;
- pTag->onTaskWorkerDone();
- }
+ ThreadTask *pTask;
+ while ( ( pTask = popWorkLocked(aGuard, false) ) )
+ pTask->execAndDelete();
}
else
{
- aGuard.clear();
- maTasksComplete.wait();
- aGuard.reset();
+ while( !maTasks.empty() )
+ maTasksChanged.wait( aGuard );
}
assert( maTasks.empty() );
mbTerminate = true;
+ maTasksChanged.notify_all();
+
while( !maWorkers.empty() )
{
rtl::Reference< ThreadWorker > xWorker = maWorkers.back();
maWorkers.pop_back();
assert(std::find(maWorkers.begin(), maWorkers.end(), xWorker)
== maWorkers.end());
- xWorker->signalNewWork();
- aGuard.clear();
- { // unlocked
+ aGuard.unlock();
+ {
xWorker->join();
xWorker.clear();
}
- aGuard.reset();
+ aGuard.lock();
}
}
void ThreadPool::pushTask( ThreadTask *pTask )
{
- osl::MutexGuard aGuard( maGuard );
+ std::unique_lock< std::mutex > aGuard( maMutex );
+
pTask->mpTag->onTaskPushed();
maTasks.insert( maTasks.begin(), pTask );
- // horrible beyond belief:
- for(rtl::Reference<ThreadWorker> & rpWorker : maWorkers)
- rpWorker->signalNewWork();
- maTasksComplete.reset();
+ maTasksChanged.notify_one();
}
-ThreadTask *ThreadPool::popWork()
+ThreadTask *ThreadPool::popWorkLocked( std::unique_lock< std::mutex > & rGuard, bool bWait )
{
- if( !maTasks.empty() )
+ do
{
- ThreadTask *pTask = maTasks.back();
- maTasks.pop_back();
- return pTask;
- }
- else
- return nullptr;
+ if( !maTasks.empty() )
+ {
+ ThreadTask *pTask = maTasks.back();
+ maTasks.pop_back();
+ return pTask;
+ }
+ else if (!bWait || mbTerminate)
+ return nullptr;
+
+ maTasksChanged.wait( rGuard );
+
+ } while (!mbTerminate);
+
+ return nullptr;
}
-void ThreadPool::startWork()
+void ThreadPool::startWorkLocked()
{
mnThreadsWorking++;
}
-void ThreadPool::stopWork()
+void ThreadPool::stopWorkLocked()
{
assert( mnThreadsWorking > 0 );
if ( --mnThreadsWorking == 0 )
- maTasksComplete.set();
+ maTasksChanged.notify_all();
}
+
void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag)
{
#if defined DBG_UTIL && defined LINUX
assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task");
#endif
- osl::ResettableMutexGuard aGuard( maGuard );
-
- if( maWorkers.empty() )
- { // no threads at all -> execute the work in-line
- while ( ThreadTask * pTask = popWork() )
- {
- std::shared_ptr<ThreadTaskTag> pTag(pTask->getTag());
- pTask->doWork();
- delete pTask;
- pTag->onTaskWorkerDone();
+ {
+ std::unique_lock< std::mutex > aGuard( maMutex );
+
+ if( maWorkers.empty() )
+ { // no threads at all -> execute the work in-line
+ ThreadTask *pTask;
+ while (!rTag->isDone() &&
+ ( pTask = popWorkLocked(aGuard, false) ) )
+ pTask->execAndDelete();
}
}
- aGuard.clear();
+
rTag->waitUntilDone();
}
@@ -290,54 +243,73 @@ bool ThreadPool::isTaskTagDone(const std::shared_ptr<ThreadTaskTag>& pTag)
return pTag->isDone();
}
-
ThreadTask::ThreadTask(const std::shared_ptr<ThreadTaskTag>& pTag)
: mpTag(pTag)
{
}
+void ThreadTask::execAndDelete()
+{
+ std::shared_ptr<ThreadTaskTag> pTag(mpTag);
+ try {
+ doWork();
+ }
+ catch (const std::exception &e)
+ {
+ SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.what());
+ }
+ catch (const css::uno::Exception &e)
+ {
+ SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.Message);
+ }
+
+ delete this;
+ pTag->onTaskWorkerDone();
+}
+
ThreadTaskTag::ThreadTaskTag() : mnTasksWorking(0)
{
- maTasksComplete.set();
}
void ThreadTaskTag::onTaskPushed()
{
- osl::MutexGuard g(mMutex);
- assert( mnTasksWorking < 65535 ); // sanity checking
- ++mnTasksWorking;
- maTasksComplete.reset();
+ std::unique_lock< std::mutex > aGuard( maMutex );
+ mnTasksWorking++;
+ assert( mnTasksWorking < 65536 ); // sanity checking
}
void ThreadTaskTag::onTaskWorkerDone()
{
- osl::MutexGuard g(mMutex);
- assert(mnTasksWorking > 0);
- --mnTasksWorking;
+ std::unique_lock< std::mutex > aGuard( maMutex );
+ mnTasksWorking--;
+ assert(mnTasksWorking >= 0);
if (mnTasksWorking == 0)
- maTasksComplete.set();
+ maTasksComplete.notify_all();
}
-void ThreadTaskTag::waitUntilDone()
+bool ThreadTaskTag::isDone()
{
-#if defined DBG_UTIL && defined LINUX
- assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task");
-#endif
+ std::unique_lock< std::mutex > aGuard( maMutex );
+ return mnTasksWorking == 0;
+}
+void ThreadTaskTag::waitUntilDone()
+{
+ std::unique_lock< std::mutex > aGuard( maMutex );
+ while( mnTasksWorking > 0 )
+ {
#ifdef DBG_UTIL
- // 3 minute timeout in debug mode so our tests fail sooner rather than later
- osl::Condition::Result rv = maTasksComplete.wait(TimeValue { 3*60, 0 });
- assert(rv != osl::Condition::result_timeout);
+ // 3 minute timeout in debug mode so our tests fail sooner rather than later
+ std::cv_status result = maTasksComplete.wait_for(
+ aGuard, std::chrono::seconds( 3 * 60 ));
+ assert(result != std::cv_status::timeout);
#else
- // 10 minute timeout in production so the app eventually throws some kind of error
- if (maTasksComplete.wait(TimeValue { 10*60, 0 }) == osl::Condition::Result::result_timeout)
- throw std::runtime_error("timeout waiting for threadpool tasks");
+ // 10 minute timeout in production so the app eventually throws some kind of error
+ if (maTasksComplete.wait_for(
+ aGuard, std::chrono::seconds( 10 * 60 )) == std::cv_status::timeout)
+ throw std::runtime_error("timeout waiting for threadpool tasks");
#endif
-}
-
-bool ThreadTaskTag::isDone()
-{
- return mnTasksWorking == 0;
+ }
}
} // namespace comphelper
diff --git a/desktop/source/app/app.cxx b/desktop/source/app/app.cxx
index 501ebe1497ce..9a681584c59d 100644
--- a/desktop/source/app/app.cxx
+++ b/desktop/source/app/app.cxx
@@ -81,6 +81,7 @@
#include <toolkit/helper/vclunohelper.hxx>
#include <comphelper/configuration.hxx>
#include <comphelper/fileurl.hxx>
+#include <comphelper/threadpool.hxx>
#include <comphelper/processfactory.hxx>
#include <comphelper/backupfilehelper.hxx>
#include <unotools/bootstrap.hxx>
@@ -1791,11 +1792,14 @@ int Desktop::doShutdown()
StarBASIC::DetachAllDocBasicItems();
#endif
}
+
// be sure that path/language options gets destroyed before
// UCB is deinitialized
pExecGlobals->pLanguageOptions.reset( nullptr );
pExecGlobals->pPathOptions.reset( nullptr );
+ comphelper::ThreadPool::getSharedOptimalPool().shutdown();
+
bool bRR = pExecGlobals->bRestartRequested;
delete pExecGlobals;
pExecGlobals = nullptr;
diff --git a/include/comphelper/threadpool.hxx b/include/comphelper/threadpool.hxx
index 7910a83ceeb7..9f7692252fbf 100644
--- a/include/comphelper/threadpool.hxx
+++ b/include/comphelper/threadpool.hxx
@@ -11,11 +11,11 @@
#define INCLUDED_COMPHELPER_THREADPOOL_HXX
#include <sal/config.h>
-#include <salhelper/thread.hxx>
-#include <osl/mutex.hxx>
-#include <osl/conditn.hxx>
#include <rtl/ref.hxx>
#include <comphelper/comphelperdllapi.h>
+#include <mutex>
+#include <thread>
+#include <condition_variable>
#include <vector>
#include <memory>
@@ -28,14 +28,19 @@ class COMPHELPER_DLLPUBLIC ThreadTask
{
friend class ThreadPool;
std::shared_ptr<ThreadTaskTag> mpTag;
+
+ /// execute and delete this task
+ void execAndDelete();
+protected:
+ /// override to get your task performed by the pool
+ virtual void doWork() = 0;
+ /// once pushed ThreadTasks are destroyed by the pool
+ virtual ~ThreadTask() {}
public:
ThreadTask(const std::shared_ptr<ThreadTaskTag>& pTag);
- virtual ~ThreadTask() {}
- virtual void doWork() = 0;
- const std::shared_ptr<ThreadTaskTag>& getTag() { return mpTag; }
};
-/// A very basic thread pool implementation
+/// A very basic thread-safe thread pool implementation
class COMPHELPER_DLLPUBLIC ThreadPool final
{
public:
@@ -50,7 +55,7 @@ public:
/// returns a configurable max-concurrency
/// limit to avoid spawning an unnecessarily
/// large number of threads on high-core boxes.
- /// MAX_CONCURRENCY envar controls the cap.
+ /// MAX_CONCURRENCY env. var. controls the cap.
static sal_Int32 getPreferredConcurrency();
ThreadPool( sal_Int32 nWorkers );
@@ -65,6 +70,9 @@ public:
/// return the number of live worker threads
sal_Int32 getWorkerCount() const { return maWorkers.size(); }
+ /// wait until all work is completed, then join all threads
+ void shutdown();
+
private:
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
@@ -72,20 +80,21 @@ private:
class ThreadWorker;
friend class ThreadWorker;
- /// wait until all work is completed, then join all threads
- void waitAndCleanupWorkers();
-
- ThreadTask *popWork();
- void startWork();
- void stopWork();
+ /** Pop a work task
+ @param bWait - if set wait until task present or termination
+ @return a new task to perform, or NULL if list empty or terminated
+ */
+ ThreadTask *popWorkLocked( std::unique_lock< std::mutex > & rGuard, bool bWait );
+ void startWorkLocked();
+ void stopWorkLocked();
- osl::Mutex maGuard;
- sal_Int32 mnThreadsWorking;
/// signalled when all in-progress tasks are complete
- osl::Condition maTasksComplete;
- bool mbTerminate;
- std::vector< rtl::Reference< ThreadWorker > > maWorkers;
+ std::mutex maMutex;
+ std::condition_variable maTasksChanged;
+ sal_Int32 mnThreadsWorking;
+ bool mbTerminate;
std::vector< ThreadTask * > maTasks;
+ std::vector< rtl::Reference< ThreadWorker > > maWorkers;
};
} // namespace comphelper
diff --git a/package/source/zipapi/ZipOutputStream.cxx b/package/source/zipapi/ZipOutputStream.cxx
index 603a61423eb4..d0fce8951ed9 100644
--- a/package/source/zipapi/ZipOutputStream.cxx
+++ b/package/source/zipapi/ZipOutputStream.cxx
@@ -27,6 +27,7 @@
#include <osl/diagnose.h>
#include <osl/time.h>
+#include <osl/thread.hxx>
#include <PackageConstants.hxx>
#include <ZipEntry.hxx>