summaryrefslogtreecommitdiff
path: root/comphelper
diff options
context:
space:
mode:
Diffstat (limited to 'comphelper')
-rw-r--r--comphelper/source/misc/threadpool.cxx149
1 files changed, 139 insertions, 10 deletions
diff --git a/comphelper/source/misc/threadpool.cxx b/comphelper/source/misc/threadpool.cxx
index 8680e00e6e34..32170a1a0cd1 100644
--- a/comphelper/source/misc/threadpool.cxx
+++ b/comphelper/source/misc/threadpool.cxx
@@ -9,6 +9,7 @@
#include <comphelper/threadpool.hxx>
+#include <com/sun/star/uno/Exception.hpp>
#include <rtl/instance.hxx>
#include <rtl/string.hxx>
#include <algorithm>
@@ -17,6 +18,26 @@
namespace comphelper {
+/** prevent waiting for a task from inside a task */
+#if defined DBG_UTIL && defined LINUX
+static thread_local bool gbIsWorkerThread;
+#endif
+
+// used to group thread-tasks for waiting in waitTillDone()
+class COMPHELPER_DLLPUBLIC ThreadTaskTag
+{
+ oslInterlockedCount mnTasksWorking;
+ osl::Condition maTasksComplete;
+
+public:
+ ThreadTaskTag();
+ bool isDone();
+ void waitUntilDone();
+ void onTaskWorkerDone();
+ void onTaskPushed();
+};
+
+
class ThreadPool::ThreadWorker : public salhelper::Thread
{
ThreadPool *mpPool;
@@ -33,11 +54,36 @@ public:
virtual void execute() override
{
+#if defined DBG_UTIL && defined LINUX
+ gbIsWorkerThread = true;
+#endif
ThreadTask *pTask;
while ( ( pTask = waitForWork() ) )
{
- pTask->doWork();
- delete pTask;
+ std::shared_ptr<ThreadTaskTag> pTag(pTask->getTag());
+ try {
+ pTask->doWork();
+ }
+ catch (const std::exception &e)
+ {
+ SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.what());
+ }
+ catch (const css::uno::Exception &e)
+ {
+ SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.Message);
+ }
+ try {
+ delete pTask;
+ }
+ catch (const std::exception &e)
+ {
+ SAL_WARN("comphelper", "exception in thread worker while deleting task: " << e.what());
+ }
+ catch (const css::uno::Exception &e)
+ {
+ SAL_WARN("comphelper", "exception in thread worker while deleting task: " << e.Message);
+ }
+ pTag->onTaskWorkerDone();
}
}
@@ -149,9 +195,27 @@ sal_Int32 ThreadPool::getPreferredConcurrency()
void ThreadPool::waitAndCleanupWorkers()
{
- waitUntilEmpty();
-
osl::ResettableMutexGuard aGuard( maGuard );
+
+ if( maWorkers.empty() )
+ { // no threads at all -> execute the work in-line
+ ThreadTask *pTask;
+ while ( ( pTask = popWork() ) )
+ {
+ std::shared_ptr<ThreadTaskTag> pTag(pTask->getTag());
+ pTask->doWork();
+ delete pTask;
+ pTag->onTaskWorkerDone();
+ }
+ }
+ else
+ {
+ aGuard.clear();
+ maTasksComplete.wait();
+ aGuard.reset();
+ }
+ assert( maTasks.empty() );
+
mbTerminate = true;
while( !maWorkers.empty() )
@@ -173,6 +237,7 @@ void ThreadPool::waitAndCleanupWorkers()
void ThreadPool::pushTask( ThreadTask *pTask )
{
osl::MutexGuard aGuard( maGuard );
+ pTask->mpTag->onTaskPushed();
maTasks.insert( maTasks.begin(), pTask );
// horrible beyond belief:
@@ -205,8 +270,11 @@ void ThreadPool::stopWork()
maTasksComplete.set();
}
-void ThreadPool::waitUntilEmpty()
+void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag)
{
+#if defined DBG_UTIL && defined LINUX
+ assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task");
+#endif
osl::ResettableMutexGuard aGuard( maGuard );
if( maWorkers.empty() )
@@ -214,17 +282,78 @@ void ThreadPool::waitUntilEmpty()
ThreadTask *pTask;
while ( ( pTask = popWork() ) )
{
+ std::shared_ptr<ThreadTaskTag> pTag(pTask->getTag());
pTask->doWork();
delete pTask;
+ pTag->onTaskWorkerDone();
}
}
- else
+ aGuard.clear();
+ rTag->waitUntilDone();
+}
+
+std::shared_ptr<ThreadTaskTag> ThreadPool::createThreadTaskTag()
+{
+ return std::make_shared<ThreadTaskTag>();
+}
+
+bool ThreadPool::isTaskTagDone(const std::shared_ptr<ThreadTaskTag>& pTag)
+{
+ return pTag->isDone();
+}
+
+
+ThreadTask::ThreadTask(const std::shared_ptr<ThreadTaskTag>& pTag)
+ : mpTag(pTag)
+{
+}
+
+ThreadTaskTag::ThreadTaskTag() : mnTasksWorking(0)
+{
+ maTasksComplete.set();
+}
+
+void ThreadTaskTag::onTaskPushed()
+{
+ oslInterlockedCount n = osl_atomic_increment(&mnTasksWorking);
+ assert( n < 65536 ); // sanity checking
+ (void)n; // avoid -Wunused-variable in release build
+ maTasksComplete.reset();
+}
+
+void ThreadTaskTag::onTaskWorkerDone()
+{
+ sal_Int32 nCount = osl_atomic_decrement(&mnTasksWorking);
+ assert(nCount >= 0);
+ if (nCount == 0)
+ maTasksComplete.set();
+}
+
+void ThreadTaskTag::waitUntilDone()
+{
+#if defined DBG_UTIL && defined LINUX
+ assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task");
+#endif
+
+#ifdef DBG_UTIL
+ // 2 minute timeout in debug mode so our tests fail sooner rather than later
+ osl::Condition::Result rv = maTasksComplete.wait(TimeValue { 2*60, 0 });
+ assert(rv != osl_cond_result_timeout);
+#else
+ // 10 minute timeout in production so the app eventually throws some kind of error
+ if (maTasksComplete.wait(TimeValue { 10*60, 0 }) == osl_cond_result_timeout)
{
- aGuard.clear();
- maTasksComplete.wait();
- aGuard.reset();
+ SAL_DEBUG_TRACE("comphelper::ThreadTaskTag::waitUntilDone() "
+ << "tasksWorking " << mnTasksWorking
+ << "noThreads " << ThreadPool::getPreferredConcurrency());
+ throw std::runtime_error("timeout waiting for threadpool tasks");
}
- assert( maTasks.empty() );
+#endif
+}
+
+bool ThreadTaskTag::isDone()
+{
+ return mnTasksWorking == 0;
}
} // namespace comphelper