diff options
-rw-r--r-- | comphelper/CppunitTest_comphelper_threadpool_test.mk | 1 | ||||
-rw-r--r-- | comphelper/qa/unit/threadpooltest.cxx | 50 | ||||
-rw-r--r-- | comphelper/source/misc/threadpool.cxx | 24 | ||||
-rw-r--r-- | include/comphelper/threadpool.hxx | 7 |
4 files changed, 78 insertions, 4 deletions
diff --git a/comphelper/CppunitTest_comphelper_threadpool_test.mk b/comphelper/CppunitTest_comphelper_threadpool_test.mk index 16bbd6fff69b..24467c898f80 100644 --- a/comphelper/CppunitTest_comphelper_threadpool_test.mk +++ b/comphelper/CppunitTest_comphelper_threadpool_test.mk @@ -24,6 +24,7 @@ $(eval $(call gb_CppunitTest_use_libraries,comphelper_threadpool_test, \ cppuhelper \ cppu \ sal \ + tl \ )) # vim: set noet sw=4 ts=4: diff --git a/comphelper/qa/unit/threadpooltest.cxx b/comphelper/qa/unit/threadpooltest.cxx index 10fb90c3014f..03bd4a33d69c 100644 --- a/comphelper/qa/unit/threadpooltest.cxx +++ b/comphelper/qa/unit/threadpooltest.cxx @@ -12,17 +12,22 @@ #include <cppunit/TestFixture.h> #include <cppunit/extensions/HelperMacros.h> #include <cppunit/plugin/TestPlugIn.h> +#include <tools/time.hxx> +#include <osl/thread.hxx> #include <stdlib.h> #include <thread> +#include <mutex> class ThreadPoolTest : public CppUnit::TestFixture { public: void testPreferredConcurrency(); + void testWorkerUsage(); CPPUNIT_TEST_SUITE(ThreadPoolTest); CPPUNIT_TEST(testPreferredConcurrency); + CPPUNIT_TEST(testWorkerUsage); CPPUNIT_TEST_SUITE_END(); }; @@ -48,6 +53,51 @@ void ThreadPoolTest::testPreferredConcurrency() #endif } +namespace +{ +class UsageTask : public comphelper::ThreadTask +{ +public: + UsageTask(const std::shared_ptr<comphelper::ThreadTaskTag>& pTag) + : ThreadTask(pTag) + { + } + virtual void doWork() + { + ++count; + mutex.lock(); + mutex.unlock(); + } + static inline int count = 0; + static inline std::mutex mutex; +}; +} // namespace + +void ThreadPoolTest::testWorkerUsage() +{ + // Create tasks for each available worker. Lock a shared mutex before that to make all + // tasks block on it. And check that all workers have started, i.e. that the full + // thread pool capacity is used. + comphelper::ThreadPool& rSharedPool = comphelper::ThreadPool::getSharedOptimalPool(); + std::shared_ptr<comphelper::ThreadTaskTag> pTag = comphelper::ThreadPool::createThreadTaskTag(); + UsageTask::mutex.lock(); + for (int i = 0; i < rSharedPool.getWorkerCount(); ++i) + { + rSharedPool.pushTask(std::make_unique<UsageTask>(pTag)); + osl::Thread::wait(std::chrono::milliseconds(10)); // give it a time to start + } + sal_uInt64 startTicks = tools::Time::GetSystemTicks(); + while (UsageTask::count != rSharedPool.getWorkerCount()) + { + // Wait at most 5 seconds, that should do even on slow systems. + CPPUNIT_ASSERT_MESSAGE("Thread pool does not use all worker threads.", + startTicks + 5000 > tools::Time::GetSystemTicks()); + osl::Thread::wait(std::chrono::milliseconds(10)); + } + UsageTask::mutex.unlock(); + rSharedPool.waitUntilDone(pTag); +} + CPPUNIT_TEST_SUITE_REGISTRATION(ThreadPoolTest); CPPUNIT_PLUGIN_IMPLEMENT(); diff --git a/comphelper/source/misc/threadpool.cxx b/comphelper/source/misc/threadpool.cxx index f93400d96f9f..906189202cdd 100644 --- a/comphelper/source/misc/threadpool.cxx +++ b/comphelper/source/misc/threadpool.cxx @@ -79,12 +79,14 @@ public: std::unique_ptr<ThreadTask> pTask = mpPool->popWorkLocked( aGuard, true ); if( pTask ) { + mpPool->incBusyWorker(); aGuard.unlock(); pTask->exec(); pTask.reset(); aGuard.lock(); + mpPool->decBusyWorker(); } } } @@ -92,7 +94,8 @@ public: ThreadPool::ThreadPool(sal_Int32 nWorkers) : mbTerminate(true) - , mnWorkers(nWorkers) + , mnMaxWorkers(nWorkers) + , mnBusyWorkers(0) { } @@ -104,6 +107,7 @@ ThreadPool::~ThreadPool() // still 0, but hopefully they will be more helpful on non-WNT platforms assert(mbTerminate); assert(maTasks.empty()); + assert(mnBusyWorkers == 0); } namespace { @@ -198,7 +202,8 @@ void ThreadPool::pushTask( std::unique_ptr<ThreadTask> pTask ) mbTerminate = false; - if (maWorkers.size() < mnWorkers && maWorkers.size() <= maTasks.size()) + // Worked on tasks are already removed from maTasks, so include the count of busy workers. + if (maWorkers.size() < mnMaxWorkers && maWorkers.size() <= maTasks.size() + mnBusyWorkers) { maWorkers.push_back( new ThreadWorker( this ) ); maWorkers.back()->launch(); @@ -230,6 +235,17 @@ std::unique_ptr<ThreadTask> ThreadPool::popWorkLocked( std::unique_lock< std::mu return nullptr; } +void ThreadPool::incBusyWorker() +{ + ++mnBusyWorkers; +} + +void ThreadPool::decBusyWorker() +{ + --mnBusyWorkers; + assert(mnBusyWorkers >= 0); +} + void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag, bool bJoinAll) { #if defined DBG_UTIL && (defined LINUX || defined _WIN32) @@ -294,6 +310,10 @@ void ThreadTask::exec() { SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e); } + catch (...) + { + SAL_WARN("comphelper", "unknown exception in thread worker while calling doWork()"); + } pTag->onTaskWorkerDone(); } diff --git a/include/comphelper/threadpool.hxx b/include/comphelper/threadpool.hxx index 1cb9441cfdd1..f51daf4f70a4 100644 --- a/include/comphelper/threadpool.hxx +++ b/include/comphelper/threadpool.hxx @@ -72,7 +72,7 @@ public: void joinAll(); /// return the number of live worker threads - sal_Int32 getWorkerCount() const { return mnWorkers; } + sal_Int32 getWorkerCount() const { return mnMaxWorkers; } /// wait until all work is completed, then join all threads void shutdown(); @@ -90,12 +90,15 @@ private: */ std::unique_ptr<ThreadTask> popWorkLocked( std::unique_lock< std::mutex > & rGuard, bool bWait ); void shutdownLocked(std::unique_lock<std::mutex>&); + void incBusyWorker(); + void decBusyWorker(); /// signalled when all in-progress tasks are complete std::mutex maMutex; std::condition_variable maTasksChanged; bool mbTerminate; - std::size_t const mnWorkers; + std::size_t const mnMaxWorkers; + std::size_t mnBusyWorkers; std::vector< std::unique_ptr<ThreadTask> > maTasks; std::vector< rtl::Reference< ThreadWorker > > maWorkers; }; |