diff options
author | Luboš Luňák <l.lunak@collabora.com> | 2020-11-26 15:05:16 +0100 |
---|---|---|
committer | Luboš Luňák <l.lunak@collabora.com> | 2020-11-30 13:16:53 +0100 |
commit | 583b0612696f42571ac97b66c159570ea452fe17 (patch) | |
tree | e370a37e6a6ed6b019f8be78d62a676d89e0c7c5 | |
parent | af361b464b46dcc39a1bb4ab098a6ddf6ff40a53 (diff) |
add ThreadPool::isIdle() to avoid incorrect detection of "no tasks"
Tasks that are being worked on but are not yet finished are removed
from maTasks, so maTasks.empty() does not mean "idle". I fixed one
case already in 2ad4e77a0f266ae6e6fccaebb1d080d2880bdac3, this one
fixes joinAll() which has a similar problem and triggers
https://gerrit.libreoffice.org/c/core/+/69473/3/sc/source/core/data/documen2.cxx#312
Also rename joinAll() to joinThreadsIfIdle(), as that's what it really is.
Change-Id: I8129cfadb81be968575ea8718de9ae997b877a4e
Reviewed-on: https://gerrit.libreoffice.org/c/core/+/106701
Tested-by: Jenkins
Reviewed-by: Luboš Luňák <l.lunak@collabora.com>
-rw-r--r-- | comphelper/qa/unit/parallelsorttest.cxx | 2 | ||||
-rw-r--r-- | comphelper/qa/unit/threadpooltest.cxx | 62 | ||||
-rw-r--r-- | comphelper/source/misc/threadpool.cxx | 21 | ||||
-rw-r--r-- | include/comphelper/threadpool.hxx | 9 | ||||
-rw-r--r-- | sc/source/core/data/documen2.cxx | 2 |
5 files changed, 83 insertions, 13 deletions
diff --git a/comphelper/qa/unit/parallelsorttest.cxx b/comphelper/qa/unit/parallelsorttest.cxx index 90dcb3c235ba..a3618244ab8d 100644 --- a/comphelper/qa/unit/parallelsorttest.cxx +++ b/comphelper/qa/unit/parallelsorttest.cxx @@ -53,7 +53,7 @@ void ParallelSortTest::setUp() void ParallelSortTest::tearDown() { if (pThreadPool) - pThreadPool->joinAll(); + pThreadPool->joinThreadsIfIdle(); } void ParallelSortTest::fillRandomUptoN(std::vector<size_t>& rVector, size_t N) diff --git a/comphelper/qa/unit/threadpooltest.cxx b/comphelper/qa/unit/threadpooltest.cxx index 03bd4a33d69c..695aca5b421a 100644 --- a/comphelper/qa/unit/threadpooltest.cxx +++ b/comphelper/qa/unit/threadpooltest.cxx @@ -24,10 +24,16 @@ class ThreadPoolTest : public CppUnit::TestFixture public: void testPreferredConcurrency(); void testWorkerUsage(); + void testTasksInThreads(); + void testNoThreads(); + void testDedicatedPool(); CPPUNIT_TEST_SUITE(ThreadPoolTest); CPPUNIT_TEST(testPreferredConcurrency); CPPUNIT_TEST(testWorkerUsage); + CPPUNIT_TEST(testTasksInThreads); + CPPUNIT_TEST(testNoThreads); + CPPUNIT_TEST(testDedicatedPool); CPPUNIT_TEST_SUITE_END(); }; @@ -98,6 +104,62 @@ void ThreadPoolTest::testWorkerUsage() rSharedPool.waitUntilDone(pTag); } +namespace +{ +class CheckThreadTask : public comphelper::ThreadTask +{ + oslThreadIdentifier mThreadId; + bool mCheckEqual; + +public: + CheckThreadTask(oslThreadIdentifier threadId, bool checkEqual, + const std::shared_ptr<comphelper::ThreadTaskTag>& pTag) + : ThreadTask(pTag) + , mThreadId(threadId) + , mCheckEqual(checkEqual) + { + } + virtual void doWork() + { + assert(mCheckEqual ? osl::Thread::getCurrentIdentifier() == mThreadId + : osl::Thread::getCurrentIdentifier() != mThreadId); + } +}; +} // namespace + +void ThreadPoolTest::testTasksInThreads() +{ + // Check that all tasks are run in worker threads, not this thread. + comphelper::ThreadPool& pool = comphelper::ThreadPool::getSharedOptimalPool(); + std::shared_ptr<comphelper::ThreadTaskTag> pTag = comphelper::ThreadPool::createThreadTaskTag(); + for (int i = 0; i < 8; ++i) + pool.pushTask( + std::make_unique<CheckThreadTask>(osl::Thread::getCurrentIdentifier(), false, pTag)); + pool.waitUntilDone(pTag); +} + +void ThreadPoolTest::testNoThreads() +{ + // No worker threads, tasks will be run in this thread. + comphelper::ThreadPool pool(0); + std::shared_ptr<comphelper::ThreadTaskTag> pTag = comphelper::ThreadPool::createThreadTaskTag(); + for (int i = 0; i < 8; ++i) + pool.pushTask( + std::make_unique<CheckThreadTask>(osl::Thread::getCurrentIdentifier(), true, pTag)); + pool.waitUntilDone(pTag); +} + +void ThreadPoolTest::testDedicatedPool() +{ + // Test that a separate thread pool works. The tasks themselves do not matter. + comphelper::ThreadPool pool(4); + std::shared_ptr<comphelper::ThreadTaskTag> pTag = comphelper::ThreadPool::createThreadTaskTag(); + for (int i = 0; i < 8; ++i) + pool.pushTask( + std::make_unique<CheckThreadTask>(osl::Thread::getCurrentIdentifier(), false, pTag)); + pool.waitUntilDone(pTag); +} + CPPUNIT_TEST_SUITE_REGISTRATION(ThreadPoolTest); CPPUNIT_PLUGIN_IMPLEMENT(); diff --git a/comphelper/source/misc/threadpool.cxx b/comphelper/source/misc/threadpool.cxx index 4ff7bac3aede..1ef0eeaaba5c 100644 --- a/comphelper/source/misc/threadpool.cxx +++ b/comphelper/source/misc/threadpool.cxx @@ -76,6 +76,7 @@ public: std::unique_ptr<ThreadTask> pTask = mpPool->popWorkLocked( aGuard, true ); if( pTask ) { + std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag); mpPool->incBusyWorker(); aGuard.unlock(); @@ -84,6 +85,7 @@ public: aGuard.lock(); mpPool->decBusyWorker(); + pTag->onTaskWorkerDone(); } } } @@ -162,7 +164,11 @@ void ThreadPool::shutdownLocked(std::unique_lock<std::mutex>& aGuard) { // no threads at all -> execute the work in-line std::unique_ptr<ThreadTask> pTask; while ( ( pTask = popWorkLocked(aGuard, false) ) ) + { + std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag); pTask->exec(); + pTag->onTaskWorkerDone(); + } } else { @@ -250,7 +256,7 @@ void ThreadPool::decBusyWorker() --mnBusyWorkers; } -void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag, bool bJoinAll) +void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag, bool bJoin) { #if defined DBG_UTIL && (defined LINUX || defined _WIN32) assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task"); @@ -265,21 +271,23 @@ void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag, bool std::unique_ptr<ThreadTask> pTask = popWorkLocked(aGuard, false); if (!pTask) break; + std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag); pTask->exec(); + pTag->onTaskWorkerDone(); } } } rTag->waitUntilDone(); - if (bJoinAll) - joinAll(); + if (bJoin) + joinThreadsIfIdle(); } -void ThreadPool::joinAll() +void ThreadPool::joinThreadsIfIdle() { std::unique_lock< std::mutex > aGuard( maMutex ); - if (maTasks.empty()) // check if there are still tasks from another tag + if (isIdle()) // check if there are still tasks from another tag { shutdownLocked(aGuard); } @@ -302,7 +310,6 @@ ThreadTask::ThreadTask(const std::shared_ptr<ThreadTaskTag>& pTag) void ThreadTask::exec() { - std::shared_ptr<ThreadTaskTag> pTag(mpTag); try { doWork(); } @@ -318,8 +325,6 @@ void ThreadTask::exec() { SAL_WARN("comphelper", "unknown exception in thread worker while calling doWork()"); } - - pTag->onTaskWorkerDone(); } ThreadTaskTag::ThreadTaskTag() : mnTasksWorking(0) diff --git a/include/comphelper/threadpool.hxx b/include/comphelper/threadpool.hxx index f51daf4f70a4..ec128b1422bd 100644 --- a/include/comphelper/threadpool.hxx +++ b/include/comphelper/threadpool.hxx @@ -64,12 +64,15 @@ public: void pushTask( std::unique_ptr<ThreadTask> pTask); /** Wait until all queued tasks associated with the tag are completed - @param bJoinAll - if set it joins all threads at the end if no other tasks from other tags. + @param bJoin - if set call joinThreadsIfIdle() at the end */ - void waitUntilDone(const std::shared_ptr<ThreadTaskTag>&, bool bJoinAll = true); + void waitUntilDone(const std::shared_ptr<ThreadTaskTag>&, bool bJoin = true); /// join all threads if there are no tasks presently. - void joinAll(); + void joinThreadsIfIdle(); + + /// return true if there are no queued or worked-on tasks + bool isIdle() const { return maTasks.empty() && mnBusyWorkers == 0; }; /// return the number of live worker threads sal_Int32 getWorkerCount() const { return mnMaxWorkers; } diff --git a/sc/source/core/data/documen2.cxx b/sc/source/core/data/documen2.cxx index 93d958f1b8ea..60f62ae6587b 100644 --- a/sc/source/core/data/documen2.cxx +++ b/sc/source/core/data/documen2.cxx @@ -325,7 +325,7 @@ ScDocument::~ScDocument() OSL_PRECOND( !bInLinkUpdate, "bInLinkUpdate in dtor" ); // Join any pending(recalc) threads in global threadpool - comphelper::ThreadPool::getSharedOptimalPool().joinAll(); + comphelper::ThreadPool::getSharedOptimalPool().joinThreadsIfIdle(); bInDtorClear = true; |