diff options
author | Noel Grandin <noel@peralex.com> | 2016-07-08 14:29:53 +0200 |
---|---|---|
committer | Noel Grandin <noelgrandin@gmail.com> | 2016-07-18 06:49:09 +0000 |
commit | 76ad32bec8e2c00c21247041b16d9e09e73d2504 (patch) | |
tree | 7b2b1277151bc7904ff63684ebd7e3d6d8a7d661 /comphelper | |
parent | 9bf9f88e4c7e0b182ec6d8b4aefb7d735bb0653b (diff) |
add tagging to ThreadTasks so we don't need more one pool
If more than one place in the code submits tasks to the shared
pool, then waitTillDone() becomes unreliable.
Add a tagging mechanism, so different callsites can wait
on different sets of tasks.
Also try to protect our worker threads against exceptions from
the thread tasks code.
Change-Id: Idde664ab50008d31a2dd73910bb22f50e62ae22f
Reviewed-on: https://gerrit.libreoffice.org/27042
Tested-by: Jenkins <ci@libreoffice.org>
Reviewed-by: Noel Grandin <noelgrandin@gmail.com>
Diffstat (limited to 'comphelper')
-rw-r--r-- | comphelper/source/misc/threadpool.cxx | 149 |
1 files changed, 139 insertions, 10 deletions
diff --git a/comphelper/source/misc/threadpool.cxx b/comphelper/source/misc/threadpool.cxx index 8680e00e6e34..32170a1a0cd1 100644 --- a/comphelper/source/misc/threadpool.cxx +++ b/comphelper/source/misc/threadpool.cxx @@ -9,6 +9,7 @@ #include <comphelper/threadpool.hxx> +#include <com/sun/star/uno/Exception.hpp> #include <rtl/instance.hxx> #include <rtl/string.hxx> #include <algorithm> @@ -17,6 +18,26 @@ namespace comphelper { +/** prevent waiting for a task from inside a task */ +#if defined DBG_UTIL && defined LINUX +static thread_local bool gbIsWorkerThread; +#endif + +// used to group thread-tasks for waiting in waitTillDone() +class COMPHELPER_DLLPUBLIC ThreadTaskTag +{ + oslInterlockedCount mnTasksWorking; + osl::Condition maTasksComplete; + +public: + ThreadTaskTag(); + bool isDone(); + void waitUntilDone(); + void onTaskWorkerDone(); + void onTaskPushed(); +}; + + class ThreadPool::ThreadWorker : public salhelper::Thread { ThreadPool *mpPool; @@ -33,11 +54,36 @@ public: virtual void execute() override { +#if defined DBG_UTIL && defined LINUX + gbIsWorkerThread = true; +#endif ThreadTask *pTask; while ( ( pTask = waitForWork() ) ) { - pTask->doWork(); - delete pTask; + std::shared_ptr<ThreadTaskTag> pTag(pTask->getTag()); + try { + pTask->doWork(); + } + catch (const std::exception &e) + { + SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.what()); + } + catch (const css::uno::Exception &e) + { + SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.Message); + } + try { + delete pTask; + } + catch (const std::exception &e) + { + SAL_WARN("comphelper", "exception in thread worker while deleting task: " << e.what()); + } + catch (const css::uno::Exception &e) + { + SAL_WARN("comphelper", "exception in thread worker while deleting task: " << e.Message); + } + pTag->onTaskWorkerDone(); } } @@ -149,9 +195,27 @@ sal_Int32 ThreadPool::getPreferredConcurrency() void ThreadPool::waitAndCleanupWorkers() { - waitUntilEmpty(); - osl::ResettableMutexGuard aGuard( maGuard ); + + if( maWorkers.empty() ) + { // no threads at all -> execute the work in-line + ThreadTask *pTask; + while ( ( pTask = popWork() ) ) + { + std::shared_ptr<ThreadTaskTag> pTag(pTask->getTag()); + pTask->doWork(); + delete pTask; + pTag->onTaskWorkerDone(); + } + } + else + { + aGuard.clear(); + maTasksComplete.wait(); + aGuard.reset(); + } + assert( maTasks.empty() ); + mbTerminate = true; while( !maWorkers.empty() ) @@ -173,6 +237,7 @@ void ThreadPool::waitAndCleanupWorkers() void ThreadPool::pushTask( ThreadTask *pTask ) { osl::MutexGuard aGuard( maGuard ); + pTask->mpTag->onTaskPushed(); maTasks.insert( maTasks.begin(), pTask ); // horrible beyond belief: @@ -205,8 +270,11 @@ void ThreadPool::stopWork() maTasksComplete.set(); } -void ThreadPool::waitUntilEmpty() +void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag) { +#if defined DBG_UTIL && defined LINUX + assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task"); +#endif osl::ResettableMutexGuard aGuard( maGuard ); if( maWorkers.empty() ) @@ -214,17 +282,78 @@ void ThreadPool::waitUntilEmpty() ThreadTask *pTask; while ( ( pTask = popWork() ) ) { + std::shared_ptr<ThreadTaskTag> pTag(pTask->getTag()); pTask->doWork(); delete pTask; + pTag->onTaskWorkerDone(); } } - else + aGuard.clear(); + rTag->waitUntilDone(); +} + +std::shared_ptr<ThreadTaskTag> ThreadPool::createThreadTaskTag() +{ + return std::make_shared<ThreadTaskTag>(); +} + +bool ThreadPool::isTaskTagDone(const std::shared_ptr<ThreadTaskTag>& pTag) +{ + return pTag->isDone(); +} + + +ThreadTask::ThreadTask(const std::shared_ptr<ThreadTaskTag>& pTag) + : mpTag(pTag) +{ +} + +ThreadTaskTag::ThreadTaskTag() : mnTasksWorking(0) +{ + maTasksComplete.set(); +} + +void ThreadTaskTag::onTaskPushed() +{ + oslInterlockedCount n = osl_atomic_increment(&mnTasksWorking); + assert( n < 65536 ); // sanity checking + (void)n; // avoid -Wunused-variable in release build + maTasksComplete.reset(); +} + +void ThreadTaskTag::onTaskWorkerDone() +{ + sal_Int32 nCount = osl_atomic_decrement(&mnTasksWorking); + assert(nCount >= 0); + if (nCount == 0) + maTasksComplete.set(); +} + +void ThreadTaskTag::waitUntilDone() +{ +#if defined DBG_UTIL && defined LINUX + assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task"); +#endif + +#ifdef DBG_UTIL + // 2 minute timeout in debug mode so our tests fail sooner rather than later + osl::Condition::Result rv = maTasksComplete.wait(TimeValue { 2*60, 0 }); + assert(rv != osl_cond_result_timeout); +#else + // 10 minute timeout in production so the app eventually throws some kind of error + if (maTasksComplete.wait(TimeValue { 10*60, 0 }) == osl_cond_result_timeout) { - aGuard.clear(); - maTasksComplete.wait(); - aGuard.reset(); + SAL_DEBUG_TRACE("comphelper::ThreadTaskTag::waitUntilDone() " + << "tasksWorking " << mnTasksWorking + << "noThreads " << ThreadPool::getPreferredConcurrency()); + throw std::runtime_error("timeout waiting for threadpool tasks"); } - assert( maTasks.empty() ); +#endif +} + +bool ThreadTaskTag::isDone() +{ + return mnTasksWorking == 0; } } // namespace comphelper |