summaryrefslogtreecommitdiff
path: root/comphelper
diff options
context:
space:
mode:
authorNoel Grandin <noel@peralex.com>2016-07-08 14:29:53 +0200
committerNoel Grandin <noelgrandin@gmail.com>2016-07-18 06:49:09 +0000
commit76ad32bec8e2c00c21247041b16d9e09e73d2504 (patch)
tree7b2b1277151bc7904ff63684ebd7e3d6d8a7d661 /comphelper
parent9bf9f88e4c7e0b182ec6d8b4aefb7d735bb0653b (diff)
add tagging to ThreadTasks so we don't need more one pool
If more than one place in the code submits tasks to the shared pool, then waitTillDone() becomes unreliable. Add a tagging mechanism, so different callsites can wait on different sets of tasks. Also try to protect our worker threads against exceptions from the thread tasks code. Change-Id: Idde664ab50008d31a2dd73910bb22f50e62ae22f Reviewed-on: https://gerrit.libreoffice.org/27042 Tested-by: Jenkins <ci@libreoffice.org> Reviewed-by: Noel Grandin <noelgrandin@gmail.com>
Diffstat (limited to 'comphelper')
-rw-r--r--comphelper/source/misc/threadpool.cxx149
1 files changed, 139 insertions, 10 deletions
diff --git a/comphelper/source/misc/threadpool.cxx b/comphelper/source/misc/threadpool.cxx
index 8680e00e6e34..32170a1a0cd1 100644
--- a/comphelper/source/misc/threadpool.cxx
+++ b/comphelper/source/misc/threadpool.cxx
@@ -9,6 +9,7 @@
#include <comphelper/threadpool.hxx>
+#include <com/sun/star/uno/Exception.hpp>
#include <rtl/instance.hxx>
#include <rtl/string.hxx>
#include <algorithm>
@@ -17,6 +18,26 @@
namespace comphelper {
+/** prevent waiting for a task from inside a task */
+#if defined DBG_UTIL && defined LINUX
+static thread_local bool gbIsWorkerThread;
+#endif
+
+// used to group thread-tasks for waiting in waitTillDone()
+class COMPHELPER_DLLPUBLIC ThreadTaskTag
+{
+ oslInterlockedCount mnTasksWorking;
+ osl::Condition maTasksComplete;
+
+public:
+ ThreadTaskTag();
+ bool isDone();
+ void waitUntilDone();
+ void onTaskWorkerDone();
+ void onTaskPushed();
+};
+
+
class ThreadPool::ThreadWorker : public salhelper::Thread
{
ThreadPool *mpPool;
@@ -33,11 +54,36 @@ public:
virtual void execute() override
{
+#if defined DBG_UTIL && defined LINUX
+ gbIsWorkerThread = true;
+#endif
ThreadTask *pTask;
while ( ( pTask = waitForWork() ) )
{
- pTask->doWork();
- delete pTask;
+ std::shared_ptr<ThreadTaskTag> pTag(pTask->getTag());
+ try {
+ pTask->doWork();
+ }
+ catch (const std::exception &e)
+ {
+ SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.what());
+ }
+ catch (const css::uno::Exception &e)
+ {
+ SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.Message);
+ }
+ try {
+ delete pTask;
+ }
+ catch (const std::exception &e)
+ {
+ SAL_WARN("comphelper", "exception in thread worker while deleting task: " << e.what());
+ }
+ catch (const css::uno::Exception &e)
+ {
+ SAL_WARN("comphelper", "exception in thread worker while deleting task: " << e.Message);
+ }
+ pTag->onTaskWorkerDone();
}
}
@@ -149,9 +195,27 @@ sal_Int32 ThreadPool::getPreferredConcurrency()
void ThreadPool::waitAndCleanupWorkers()
{
- waitUntilEmpty();
-
osl::ResettableMutexGuard aGuard( maGuard );
+
+ if( maWorkers.empty() )
+ { // no threads at all -> execute the work in-line
+ ThreadTask *pTask;
+ while ( ( pTask = popWork() ) )
+ {
+ std::shared_ptr<ThreadTaskTag> pTag(pTask->getTag());
+ pTask->doWork();
+ delete pTask;
+ pTag->onTaskWorkerDone();
+ }
+ }
+ else
+ {
+ aGuard.clear();
+ maTasksComplete.wait();
+ aGuard.reset();
+ }
+ assert( maTasks.empty() );
+
mbTerminate = true;
while( !maWorkers.empty() )
@@ -173,6 +237,7 @@ void ThreadPool::waitAndCleanupWorkers()
void ThreadPool::pushTask( ThreadTask *pTask )
{
osl::MutexGuard aGuard( maGuard );
+ pTask->mpTag->onTaskPushed();
maTasks.insert( maTasks.begin(), pTask );
// horrible beyond belief:
@@ -205,8 +270,11 @@ void ThreadPool::stopWork()
maTasksComplete.set();
}
-void ThreadPool::waitUntilEmpty()
+void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag)
{
+#if defined DBG_UTIL && defined LINUX
+ assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task");
+#endif
osl::ResettableMutexGuard aGuard( maGuard );
if( maWorkers.empty() )
@@ -214,17 +282,78 @@ void ThreadPool::waitUntilEmpty()
ThreadTask *pTask;
while ( ( pTask = popWork() ) )
{
+ std::shared_ptr<ThreadTaskTag> pTag(pTask->getTag());
pTask->doWork();
delete pTask;
+ pTag->onTaskWorkerDone();
}
}
- else
+ aGuard.clear();
+ rTag->waitUntilDone();
+}
+
+std::shared_ptr<ThreadTaskTag> ThreadPool::createThreadTaskTag()
+{
+ return std::make_shared<ThreadTaskTag>();
+}
+
+bool ThreadPool::isTaskTagDone(const std::shared_ptr<ThreadTaskTag>& pTag)
+{
+ return pTag->isDone();
+}
+
+
+ThreadTask::ThreadTask(const std::shared_ptr<ThreadTaskTag>& pTag)
+ : mpTag(pTag)
+{
+}
+
+ThreadTaskTag::ThreadTaskTag() : mnTasksWorking(0)
+{
+ maTasksComplete.set();
+}
+
+void ThreadTaskTag::onTaskPushed()
+{
+ oslInterlockedCount n = osl_atomic_increment(&mnTasksWorking);
+ assert( n < 65536 ); // sanity checking
+ (void)n; // avoid -Wunused-variable in release build
+ maTasksComplete.reset();
+}
+
+void ThreadTaskTag::onTaskWorkerDone()
+{
+ sal_Int32 nCount = osl_atomic_decrement(&mnTasksWorking);
+ assert(nCount >= 0);
+ if (nCount == 0)
+ maTasksComplete.set();
+}
+
+void ThreadTaskTag::waitUntilDone()
+{
+#if defined DBG_UTIL && defined LINUX
+ assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task");
+#endif
+
+#ifdef DBG_UTIL
+ // 2 minute timeout in debug mode so our tests fail sooner rather than later
+ osl::Condition::Result rv = maTasksComplete.wait(TimeValue { 2*60, 0 });
+ assert(rv != osl_cond_result_timeout);
+#else
+ // 10 minute timeout in production so the app eventually throws some kind of error
+ if (maTasksComplete.wait(TimeValue { 10*60, 0 }) == osl_cond_result_timeout)
{
- aGuard.clear();
- maTasksComplete.wait();
- aGuard.reset();
+ SAL_DEBUG_TRACE("comphelper::ThreadTaskTag::waitUntilDone() "
+ << "tasksWorking " << mnTasksWorking
+ << "noThreads " << ThreadPool::getPreferredConcurrency());
+ throw std::runtime_error("timeout waiting for threadpool tasks");
}
- assert( maTasks.empty() );
+#endif
+}
+
+bool ThreadTaskTag::isDone()
+{
+ return mnTasksWorking == 0;
}
} // namespace comphelper