summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuboš Luňák <l.lunak@collabora.com>2020-11-26 15:05:16 +0100
committerLuboš Luňák <l.lunak@collabora.com>2020-11-30 13:16:53 +0100
commit583b0612696f42571ac97b66c159570ea452fe17 (patch)
treee370a37e6a6ed6b019f8be78d62a676d89e0c7c5
parentaf361b464b46dcc39a1bb4ab098a6ddf6ff40a53 (diff)
add ThreadPool::isIdle() to avoid incorrect detection of "no tasks"
Tasks that are being worked on but are not yet finished are removed from maTasks, so maTasks.empty() does not mean "idle". I fixed one case already in 2ad4e77a0f266ae6e6fccaebb1d080d2880bdac3, this one fixes joinAll() which has a similar problem and triggers https://gerrit.libreoffice.org/c/core/+/69473/3/sc/source/core/data/documen2.cxx#312 Also rename joinAll() to joinThreadsIfIdle(), as that's what it really is. Change-Id: I8129cfadb81be968575ea8718de9ae997b877a4e Reviewed-on: https://gerrit.libreoffice.org/c/core/+/106701 Tested-by: Jenkins Reviewed-by: Luboš Luňák <l.lunak@collabora.com>
-rw-r--r--comphelper/qa/unit/parallelsorttest.cxx2
-rw-r--r--comphelper/qa/unit/threadpooltest.cxx62
-rw-r--r--comphelper/source/misc/threadpool.cxx21
-rw-r--r--include/comphelper/threadpool.hxx9
-rw-r--r--sc/source/core/data/documen2.cxx2
5 files changed, 83 insertions, 13 deletions
diff --git a/comphelper/qa/unit/parallelsorttest.cxx b/comphelper/qa/unit/parallelsorttest.cxx
index 90dcb3c235ba..a3618244ab8d 100644
--- a/comphelper/qa/unit/parallelsorttest.cxx
+++ b/comphelper/qa/unit/parallelsorttest.cxx
@@ -53,7 +53,7 @@ void ParallelSortTest::setUp()
void ParallelSortTest::tearDown()
{
if (pThreadPool)
- pThreadPool->joinAll();
+ pThreadPool->joinThreadsIfIdle();
}
void ParallelSortTest::fillRandomUptoN(std::vector<size_t>& rVector, size_t N)
diff --git a/comphelper/qa/unit/threadpooltest.cxx b/comphelper/qa/unit/threadpooltest.cxx
index 03bd4a33d69c..695aca5b421a 100644
--- a/comphelper/qa/unit/threadpooltest.cxx
+++ b/comphelper/qa/unit/threadpooltest.cxx
@@ -24,10 +24,16 @@ class ThreadPoolTest : public CppUnit::TestFixture
public:
void testPreferredConcurrency();
void testWorkerUsage();
+ void testTasksInThreads();
+ void testNoThreads();
+ void testDedicatedPool();
CPPUNIT_TEST_SUITE(ThreadPoolTest);
CPPUNIT_TEST(testPreferredConcurrency);
CPPUNIT_TEST(testWorkerUsage);
+ CPPUNIT_TEST(testTasksInThreads);
+ CPPUNIT_TEST(testNoThreads);
+ CPPUNIT_TEST(testDedicatedPool);
CPPUNIT_TEST_SUITE_END();
};
@@ -98,6 +104,62 @@ void ThreadPoolTest::testWorkerUsage()
rSharedPool.waitUntilDone(pTag);
}
+namespace
+{
+class CheckThreadTask : public comphelper::ThreadTask
+{
+ oslThreadIdentifier mThreadId;
+ bool mCheckEqual;
+
+public:
+ CheckThreadTask(oslThreadIdentifier threadId, bool checkEqual,
+ const std::shared_ptr<comphelper::ThreadTaskTag>& pTag)
+ : ThreadTask(pTag)
+ , mThreadId(threadId)
+ , mCheckEqual(checkEqual)
+ {
+ }
+ virtual void doWork()
+ {
+ assert(mCheckEqual ? osl::Thread::getCurrentIdentifier() == mThreadId
+ : osl::Thread::getCurrentIdentifier() != mThreadId);
+ }
+};
+} // namespace
+
+void ThreadPoolTest::testTasksInThreads()
+{
+ // Check that all tasks are run in worker threads, not this thread.
+ comphelper::ThreadPool& pool = comphelper::ThreadPool::getSharedOptimalPool();
+ std::shared_ptr<comphelper::ThreadTaskTag> pTag = comphelper::ThreadPool::createThreadTaskTag();
+ for (int i = 0; i < 8; ++i)
+ pool.pushTask(
+ std::make_unique<CheckThreadTask>(osl::Thread::getCurrentIdentifier(), false, pTag));
+ pool.waitUntilDone(pTag);
+}
+
+void ThreadPoolTest::testNoThreads()
+{
+ // No worker threads, tasks will be run in this thread.
+ comphelper::ThreadPool pool(0);
+ std::shared_ptr<comphelper::ThreadTaskTag> pTag = comphelper::ThreadPool::createThreadTaskTag();
+ for (int i = 0; i < 8; ++i)
+ pool.pushTask(
+ std::make_unique<CheckThreadTask>(osl::Thread::getCurrentIdentifier(), true, pTag));
+ pool.waitUntilDone(pTag);
+}
+
+void ThreadPoolTest::testDedicatedPool()
+{
+ // Test that a separate thread pool works. The tasks themselves do not matter.
+ comphelper::ThreadPool pool(4);
+ std::shared_ptr<comphelper::ThreadTaskTag> pTag = comphelper::ThreadPool::createThreadTaskTag();
+ for (int i = 0; i < 8; ++i)
+ pool.pushTask(
+ std::make_unique<CheckThreadTask>(osl::Thread::getCurrentIdentifier(), false, pTag));
+ pool.waitUntilDone(pTag);
+}
+
CPPUNIT_TEST_SUITE_REGISTRATION(ThreadPoolTest);
CPPUNIT_PLUGIN_IMPLEMENT();
diff --git a/comphelper/source/misc/threadpool.cxx b/comphelper/source/misc/threadpool.cxx
index 4ff7bac3aede..1ef0eeaaba5c 100644
--- a/comphelper/source/misc/threadpool.cxx
+++ b/comphelper/source/misc/threadpool.cxx
@@ -76,6 +76,7 @@ public:
std::unique_ptr<ThreadTask> pTask = mpPool->popWorkLocked( aGuard, true );
if( pTask )
{
+ std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag);
mpPool->incBusyWorker();
aGuard.unlock();
@@ -84,6 +85,7 @@ public:
aGuard.lock();
mpPool->decBusyWorker();
+ pTag->onTaskWorkerDone();
}
}
}
@@ -162,7 +164,11 @@ void ThreadPool::shutdownLocked(std::unique_lock<std::mutex>& aGuard)
{ // no threads at all -> execute the work in-line
std::unique_ptr<ThreadTask> pTask;
while ( ( pTask = popWorkLocked(aGuard, false) ) )
+ {
+ std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag);
pTask->exec();
+ pTag->onTaskWorkerDone();
+ }
}
else
{
@@ -250,7 +256,7 @@ void ThreadPool::decBusyWorker()
--mnBusyWorkers;
}
-void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag, bool bJoinAll)
+void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag, bool bJoin)
{
#if defined DBG_UTIL && (defined LINUX || defined _WIN32)
assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task");
@@ -265,21 +271,23 @@ void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag, bool
std::unique_ptr<ThreadTask> pTask = popWorkLocked(aGuard, false);
if (!pTask)
break;
+ std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag);
pTask->exec();
+ pTag->onTaskWorkerDone();
}
}
}
rTag->waitUntilDone();
- if (bJoinAll)
- joinAll();
+ if (bJoin)
+ joinThreadsIfIdle();
}
-void ThreadPool::joinAll()
+void ThreadPool::joinThreadsIfIdle()
{
std::unique_lock< std::mutex > aGuard( maMutex );
- if (maTasks.empty()) // check if there are still tasks from another tag
+ if (isIdle()) // check if there are still tasks from another tag
{
shutdownLocked(aGuard);
}
@@ -302,7 +310,6 @@ ThreadTask::ThreadTask(const std::shared_ptr<ThreadTaskTag>& pTag)
void ThreadTask::exec()
{
- std::shared_ptr<ThreadTaskTag> pTag(mpTag);
try {
doWork();
}
@@ -318,8 +325,6 @@ void ThreadTask::exec()
{
SAL_WARN("comphelper", "unknown exception in thread worker while calling doWork()");
}
-
- pTag->onTaskWorkerDone();
}
ThreadTaskTag::ThreadTaskTag() : mnTasksWorking(0)
diff --git a/include/comphelper/threadpool.hxx b/include/comphelper/threadpool.hxx
index f51daf4f70a4..ec128b1422bd 100644
--- a/include/comphelper/threadpool.hxx
+++ b/include/comphelper/threadpool.hxx
@@ -64,12 +64,15 @@ public:
void pushTask( std::unique_ptr<ThreadTask> pTask);
/** Wait until all queued tasks associated with the tag are completed
- @param bJoinAll - if set it joins all threads at the end if no other tasks from other tags.
+ @param bJoin - if set call joinThreadsIfIdle() at the end
*/
- void waitUntilDone(const std::shared_ptr<ThreadTaskTag>&, bool bJoinAll = true);
+ void waitUntilDone(const std::shared_ptr<ThreadTaskTag>&, bool bJoin = true);
/// join all threads if there are no tasks presently.
- void joinAll();
+ void joinThreadsIfIdle();
+
+ /// return true if there are no queued or worked-on tasks
+ bool isIdle() const { return maTasks.empty() && mnBusyWorkers == 0; };
/// return the number of live worker threads
sal_Int32 getWorkerCount() const { return mnMaxWorkers; }
diff --git a/sc/source/core/data/documen2.cxx b/sc/source/core/data/documen2.cxx
index 93d958f1b8ea..60f62ae6587b 100644
--- a/sc/source/core/data/documen2.cxx
+++ b/sc/source/core/data/documen2.cxx
@@ -325,7 +325,7 @@ ScDocument::~ScDocument()
OSL_PRECOND( !bInLinkUpdate, "bInLinkUpdate in dtor" );
// Join any pending(recalc) threads in global threadpool
- comphelper::ThreadPool::getSharedOptimalPool().joinAll();
+ comphelper::ThreadPool::getSharedOptimalPool().joinThreadsIfIdle();
bInDtorClear = true;