summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--comphelper/CppunitTest_comphelper_threadpool_test.mk1
-rw-r--r--comphelper/qa/unit/threadpooltest.cxx50
-rw-r--r--comphelper/source/misc/threadpool.cxx24
-rw-r--r--include/comphelper/threadpool.hxx7
4 files changed, 78 insertions, 4 deletions
diff --git a/comphelper/CppunitTest_comphelper_threadpool_test.mk b/comphelper/CppunitTest_comphelper_threadpool_test.mk
index 16bbd6fff69b..24467c898f80 100644
--- a/comphelper/CppunitTest_comphelper_threadpool_test.mk
+++ b/comphelper/CppunitTest_comphelper_threadpool_test.mk
@@ -24,6 +24,7 @@ $(eval $(call gb_CppunitTest_use_libraries,comphelper_threadpool_test, \
cppuhelper \
cppu \
sal \
+ tl \
))
# vim: set noet sw=4 ts=4:
diff --git a/comphelper/qa/unit/threadpooltest.cxx b/comphelper/qa/unit/threadpooltest.cxx
index 10fb90c3014f..03bd4a33d69c 100644
--- a/comphelper/qa/unit/threadpooltest.cxx
+++ b/comphelper/qa/unit/threadpooltest.cxx
@@ -12,17 +12,22 @@
#include <cppunit/TestFixture.h>
#include <cppunit/extensions/HelperMacros.h>
#include <cppunit/plugin/TestPlugIn.h>
+#include <tools/time.hxx>
+#include <osl/thread.hxx>
#include <stdlib.h>
#include <thread>
+#include <mutex>
class ThreadPoolTest : public CppUnit::TestFixture
{
public:
void testPreferredConcurrency();
+ void testWorkerUsage();
CPPUNIT_TEST_SUITE(ThreadPoolTest);
CPPUNIT_TEST(testPreferredConcurrency);
+ CPPUNIT_TEST(testWorkerUsage);
CPPUNIT_TEST_SUITE_END();
};
@@ -48,6 +53,51 @@ void ThreadPoolTest::testPreferredConcurrency()
#endif
}
+namespace
+{
+class UsageTask : public comphelper::ThreadTask
+{
+public:
+ UsageTask(const std::shared_ptr<comphelper::ThreadTaskTag>& pTag)
+ : ThreadTask(pTag)
+ {
+ }
+ virtual void doWork()
+ {
+ ++count;
+ mutex.lock();
+ mutex.unlock();
+ }
+ static inline int count = 0;
+ static inline std::mutex mutex;
+};
+} // namespace
+
+void ThreadPoolTest::testWorkerUsage()
+{
+ // Create tasks for each available worker. Lock a shared mutex before that to make all
+ // tasks block on it. And check that all workers have started, i.e. that the full
+ // thread pool capacity is used.
+ comphelper::ThreadPool& rSharedPool = comphelper::ThreadPool::getSharedOptimalPool();
+ std::shared_ptr<comphelper::ThreadTaskTag> pTag = comphelper::ThreadPool::createThreadTaskTag();
+ UsageTask::mutex.lock();
+ for (int i = 0; i < rSharedPool.getWorkerCount(); ++i)
+ {
+ rSharedPool.pushTask(std::make_unique<UsageTask>(pTag));
+ osl::Thread::wait(std::chrono::milliseconds(10)); // give it a time to start
+ }
+ sal_uInt64 startTicks = tools::Time::GetSystemTicks();
+ while (UsageTask::count != rSharedPool.getWorkerCount())
+ {
+ // Wait at most 5 seconds, that should do even on slow systems.
+ CPPUNIT_ASSERT_MESSAGE("Thread pool does not use all worker threads.",
+ startTicks + 5000 > tools::Time::GetSystemTicks());
+ osl::Thread::wait(std::chrono::milliseconds(10));
+ }
+ UsageTask::mutex.unlock();
+ rSharedPool.waitUntilDone(pTag);
+}
+
CPPUNIT_TEST_SUITE_REGISTRATION(ThreadPoolTest);
CPPUNIT_PLUGIN_IMPLEMENT();
diff --git a/comphelper/source/misc/threadpool.cxx b/comphelper/source/misc/threadpool.cxx
index f93400d96f9f..906189202cdd 100644
--- a/comphelper/source/misc/threadpool.cxx
+++ b/comphelper/source/misc/threadpool.cxx
@@ -79,12 +79,14 @@ public:
std::unique_ptr<ThreadTask> pTask = mpPool->popWorkLocked( aGuard, true );
if( pTask )
{
+ mpPool->incBusyWorker();
aGuard.unlock();
pTask->exec();
pTask.reset();
aGuard.lock();
+ mpPool->decBusyWorker();
}
}
}
@@ -92,7 +94,8 @@ public:
ThreadPool::ThreadPool(sal_Int32 nWorkers)
: mbTerminate(true)
- , mnWorkers(nWorkers)
+ , mnMaxWorkers(nWorkers)
+ , mnBusyWorkers(0)
{
}
@@ -104,6 +107,7 @@ ThreadPool::~ThreadPool()
// still 0, but hopefully they will be more helpful on non-WNT platforms
assert(mbTerminate);
assert(maTasks.empty());
+ assert(mnBusyWorkers == 0);
}
namespace {
@@ -198,7 +202,8 @@ void ThreadPool::pushTask( std::unique_ptr<ThreadTask> pTask )
mbTerminate = false;
- if (maWorkers.size() < mnWorkers && maWorkers.size() <= maTasks.size())
+ // Worked on tasks are already removed from maTasks, so include the count of busy workers.
+ if (maWorkers.size() < mnMaxWorkers && maWorkers.size() <= maTasks.size() + mnBusyWorkers)
{
maWorkers.push_back( new ThreadWorker( this ) );
maWorkers.back()->launch();
@@ -230,6 +235,17 @@ std::unique_ptr<ThreadTask> ThreadPool::popWorkLocked( std::unique_lock< std::mu
return nullptr;
}
+void ThreadPool::incBusyWorker()
+{
+ ++mnBusyWorkers;
+}
+
+void ThreadPool::decBusyWorker()
+{
+ --mnBusyWorkers;
+ assert(mnBusyWorkers >= 0);
+}
+
void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag, bool bJoinAll)
{
#if defined DBG_UTIL && (defined LINUX || defined _WIN32)
@@ -294,6 +310,10 @@ void ThreadTask::exec()
{
SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e);
}
+ catch (...)
+ {
+ SAL_WARN("comphelper", "unknown exception in thread worker while calling doWork()");
+ }
pTag->onTaskWorkerDone();
}
diff --git a/include/comphelper/threadpool.hxx b/include/comphelper/threadpool.hxx
index 1cb9441cfdd1..f51daf4f70a4 100644
--- a/include/comphelper/threadpool.hxx
+++ b/include/comphelper/threadpool.hxx
@@ -72,7 +72,7 @@ public:
void joinAll();
/// return the number of live worker threads
- sal_Int32 getWorkerCount() const { return mnWorkers; }
+ sal_Int32 getWorkerCount() const { return mnMaxWorkers; }
/// wait until all work is completed, then join all threads
void shutdown();
@@ -90,12 +90,15 @@ private:
*/
std::unique_ptr<ThreadTask> popWorkLocked( std::unique_lock< std::mutex > & rGuard, bool bWait );
void shutdownLocked(std::unique_lock<std::mutex>&);
+ void incBusyWorker();
+ void decBusyWorker();
/// signalled when all in-progress tasks are complete
std::mutex maMutex;
std::condition_variable maTasksChanged;
bool mbTerminate;
- std::size_t const mnWorkers;
+ std::size_t const mnMaxWorkers;
+ std::size_t mnBusyWorkers;
std::vector< std::unique_ptr<ThreadTask> > maTasks;
std::vector< rtl::Reference< ThreadWorker > > maWorkers;
};