From 62090f65b804a08a66ca26675ae610ed07c7c341 Mon Sep 17 00:00:00 2001 From: Michael Meeks Date: Thu, 30 Oct 2014 18:37:42 +0000 Subject: Move thread-pool down into comphelper for re-use elsewhere. Change-Id: Ib27b8b1ccc07ff194035d6c2ef3d45c429e3cea1 --- comphelper/Library_comphelper.mk | 1 + comphelper/source/misc/threadpool.cxx | 185 ++++++++++++++++++++++++++++++ include/comphelper/threadpool.hxx | 64 +++++++++++ sc/Library_scfilt.mk | 1 - sc/source/filter/oox/threadpool.cxx | 164 -------------------------- sc/source/filter/oox/threadpool.hxx | 53 --------- sc/source/filter/oox/workbookfragment.cxx | 6 +- 7 files changed, 253 insertions(+), 221 deletions(-) create mode 100644 comphelper/source/misc/threadpool.cxx create mode 100644 include/comphelper/threadpool.hxx delete mode 100644 sc/source/filter/oox/threadpool.cxx delete mode 100644 sc/source/filter/oox/threadpool.hxx diff --git a/comphelper/Library_comphelper.mk b/comphelper/Library_comphelper.mk index cfe48f631992..84bf6988ffb4 100644 --- a/comphelper/Library_comphelper.mk +++ b/comphelper/Library_comphelper.mk @@ -114,6 +114,7 @@ $(eval $(call gb_Library_add_exception_objects,comphelper,\ comphelper/source/misc/string \ comphelper/source/misc/synchronousdispatch \ comphelper/source/misc/syntaxhighlight \ + comphelper/source/misc/threadpool \ comphelper/source/misc/types \ comphelper/source/misc/weak \ comphelper/source/misc/weakeventlistener \ diff --git a/comphelper/source/misc/threadpool.cxx b/comphelper/source/misc/threadpool.cxx new file mode 100644 index 000000000000..d2101ada54d2 --- /dev/null +++ b/comphelper/source/misc/threadpool.cxx @@ -0,0 +1,185 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include + +#include +#include +#include +#include + +namespace comphelper { + +class ThreadPool::ThreadWorker : public salhelper::Thread +{ + ThreadPool *mpPool; + osl::Condition maNewWork; +public: + ThreadWorker( ThreadPool *pPool ) : + salhelper::Thread("thread-pool"), + mpPool( pPool ) {} + + virtual void execute() SAL_OVERRIDE + { + ThreadTask *pTask; + while ( ( pTask = waitForWork() ) ) + { + pTask->doWork(); + delete pTask; + } + } + + ThreadTask *waitForWork() + { + ThreadTask *pRet = NULL; + + osl::ResettableMutexGuard aGuard( mpPool->maGuard ); + + pRet = mpPool->popWork(); + + while( !pRet ) + { + maNewWork.reset(); + + if( mpPool->mbTerminate ) + break; + + aGuard.clear(); // unlock + + maNewWork.wait(); + + aGuard.reset(); // lock + + pRet = mpPool->popWork(); + } + + return pRet; + } + + // Why a condition per worker thread - you may ask. + // + // Unfortunately the Windows synchronisation API that we wrap + // is horribly inadequate cf. + // http://www.cs.wustl.edu/~schmidt/win32-cv-1.html + // The existing osl::Condition API should only ever be used + // between one producer and one consumer thread to avoid the + // lost wakeup problem. + + void signalNewWork() + { + maNewWork.set(); + } +}; + +ThreadPool::ThreadPool( sal_Int32 nWorkers ) : + mbTerminate( false ) +{ + for( sal_Int32 i = 0; i < nWorkers; i++ ) + maWorkers.push_back( new ThreadWorker( this ) ); + + maTasksEmpty.reset(); + + osl::MutexGuard aGuard( maGuard ); + for( size_t i = 0; i < maWorkers.size(); i++ ) + maWorkers[ i ]->launch(); +} + +ThreadPool::~ThreadPool() +{ + waitUntilWorkersDone(); +} + +struct ThreadPoolStatic : public rtl::StaticWithInit< boost::shared_ptr< ThreadPool >, + ThreadPoolStatic > +{ + boost::shared_ptr< ThreadPool > operator () () { + sal_Int32 nThreads = std::max( std::thread::hardware_concurrency(), 1U ); + return boost::shared_ptr< ThreadPool >( new ThreadPool( nThreads ) ); + }; +}; + +ThreadPool& ThreadPool::getSharedOptimalPool() +{ + return *ThreadPoolStatic::get().get(); +} + +/// wait until all the workers have completed and +/// terminate all threads +void ThreadPool::waitUntilWorkersDone() +{ + waitUntilEmpty(); + + osl::ResettableMutexGuard aGuard( maGuard ); + mbTerminate = true; + + while( !maWorkers.empty() ) + { + rtl::Reference< ThreadWorker > xWorker = maWorkers.back(); + maWorkers.pop_back(); + assert(std::find(maWorkers.begin(), maWorkers.end(), xWorker) + == maWorkers.end()); + xWorker->signalNewWork(); + aGuard.clear(); + { // unlocked + xWorker->join(); + xWorker.clear(); + } + aGuard.reset(); + } +} + +void ThreadPool::pushTask( ThreadTask *pTask ) +{ + osl::MutexGuard aGuard( maGuard ); + maTasks.insert( maTasks.begin(), pTask ); + // horrible beyond belief: + for( size_t i = 0; i < maWorkers.size(); i++ ) + maWorkers[ i ]->signalNewWork(); + maTasksEmpty.reset(); +} + +ThreadTask *ThreadPool::popWork() +{ + if( !maTasks.empty() ) + { + ThreadTask *pTask = maTasks.back(); + maTasks.pop_back(); + return pTask; + } + else + maTasksEmpty.set(); + return NULL; +} + +void ThreadPool::waitUntilEmpty() +{ + osl::ResettableMutexGuard aGuard( maGuard ); + + if( maWorkers.empty() ) + { // no threads at all -> execute the work in-line + ThreadTask *pTask; + while ( ( pTask = popWork() ) ) + { + pTask->doWork(); + delete pTask; + } + mbTerminate = true; + } + else + { + aGuard.clear(); + maTasksEmpty.wait(); + aGuard.reset(); + } + assert( maTasks.empty() ); +} + +} // namespace comphelper + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/include/comphelper/threadpool.hxx b/include/comphelper/threadpool.hxx new file mode 100644 index 000000000000..ae103f1164f1 --- /dev/null +++ b/include/comphelper/threadpool.hxx @@ -0,0 +1,64 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#ifndef INCLUDED_COMPHELPER_THREADPOOL_HXX +#define INCLUDED_COMPHELPER_THREADPOOL_HXX + +#include +#include +#include +#include +#include +#include +#include + +namespace comphelper +{ + +class COMPHELPER_DLLPUBLIC ThreadTask +{ +public: + virtual ~ThreadTask() {} + virtual void doWork() = 0; +}; + +/// A very basic thread pool implementation +class COMPHELPER_DLLPUBLIC ThreadPool +{ +public: + /// returns a pointer to a shared pool with optimal thread + /// count for the CPU + static ThreadPool& getSharedOptimalPool(); + + ThreadPool( sal_Int32 nWorkers ); + virtual ~ThreadPool(); + + void pushTask( ThreadTask *pTask /* takes ownership */ ); + void waitUntilEmpty(); + void waitUntilWorkersDone(); + +private: + class ThreadWorker; + friend class ThreadWorker; + + ThreadTask *waitForWork( osl::Condition &rNewWork ); + ThreadTask *popWork(); + + osl::Mutex maGuard; + osl::Condition maTasksEmpty; + bool mbTerminate; + std::vector< rtl::Reference< ThreadWorker > > maWorkers; + std::vector< ThreadTask * > maTasks; +}; + +} // namespace comphelper + +#endif // INCLUDED_COMPHELPER_THREADPOOL_HXX + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/sc/Library_scfilt.mk b/sc/Library_scfilt.mk index 00d32daa1923..77c848621446 100644 --- a/sc/Library_scfilt.mk +++ b/sc/Library_scfilt.mk @@ -214,7 +214,6 @@ $(eval $(call gb_Library_add_exception_objects,scfilt,\ sc/source/filter/oox/tablebuffer \ sc/source/filter/oox/tablefragment \ sc/source/filter/oox/themebuffer \ - sc/source/filter/oox/threadpool \ sc/source/filter/oox/unitconverter \ sc/source/filter/oox/viewsettings \ sc/source/filter/oox/workbookfragment \ diff --git a/sc/source/filter/oox/threadpool.cxx b/sc/source/filter/oox/threadpool.cxx deleted file mode 100644 index 3fcfa755129c..000000000000 --- a/sc/source/filter/oox/threadpool.cxx +++ /dev/null @@ -1,164 +0,0 @@ -/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ -/* - * This file is part of the LibreOffice project. - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ - -#include "threadpool.hxx" - -#include - -class ThreadPool::ThreadWorker : public salhelper::Thread -{ - ThreadPool *mpPool; - osl::Condition maNewWork; -public: - ThreadWorker( ThreadPool *pPool ) : - salhelper::Thread("sheet-import-thread-pool"), - mpPool( pPool ) {} - - virtual void execute() SAL_OVERRIDE - { - ThreadTask *pTask; - while ( ( pTask = waitForWork() ) ) - { - pTask->doWork(); - delete pTask; - } - } - - ThreadTask *waitForWork() - { - ThreadTask *pRet = NULL; - - osl::ResettableMutexGuard aGuard( mpPool->maGuard ); - - pRet = mpPool->popWork(); - - while( !pRet ) - { - maNewWork.reset(); - - if( mpPool->mbTerminate ) - break; - - aGuard.clear(); // unlock - - maNewWork.wait(); - - aGuard.reset(); // lock - - pRet = mpPool->popWork(); - } - - return pRet; - } - - // Why a condition per worker thread - you may ask. - // - // Unfortunately the Windows synchronisation API that we wrap - // is horribly inadequate cf. - // http://www.cs.wustl.edu/~schmidt/win32-cv-1.html - // The existing osl::Condition API should only ever be used - // between one producer and one consumer thread to avoid the - // lost wakeup problem. - - void signalNewWork() - { - maNewWork.set(); - } -}; - -ThreadPool::ThreadPool( sal_Int32 nWorkers ) : - mbTerminate( false ) -{ - for( sal_Int32 i = 0; i < nWorkers; i++ ) - maWorkers.push_back( new ThreadWorker( this ) ); - - maTasksEmpty.reset(); - - osl::MutexGuard aGuard( maGuard ); - for( size_t i = 0; i < maWorkers.size(); i++ ) - maWorkers[ i ]->launch(); -} - -ThreadPool::~ThreadPool() -{ - waitUntilWorkersDone(); -} - -/// wait until all the workers have completed and -/// terminate all threads -void ThreadPool::waitUntilWorkersDone() -{ - waitUntilEmpty(); - - osl::ResettableMutexGuard aGuard( maGuard ); - mbTerminate = true; - - while( !maWorkers.empty() ) - { - rtl::Reference< ThreadWorker > xWorker = maWorkers.back(); - maWorkers.pop_back(); - assert(std::find(maWorkers.begin(), maWorkers.end(), xWorker) - == maWorkers.end()); - xWorker->signalNewWork(); - aGuard.clear(); - { // unlocked - xWorker->join(); - xWorker.clear(); - } - aGuard.reset(); - } -} - -void ThreadPool::pushTask( ThreadTask *pTask ) -{ - osl::MutexGuard aGuard( maGuard ); - maTasks.insert( maTasks.begin(), pTask ); - // horrible beyond belief: - for( size_t i = 0; i < maWorkers.size(); i++ ) - maWorkers[ i ]->signalNewWork(); - maTasksEmpty.reset(); -} - -ThreadTask *ThreadPool::popWork() -{ - if( !maTasks.empty() ) - { - ThreadTask *pTask = maTasks.back(); - maTasks.pop_back(); - return pTask; - } - else - maTasksEmpty.set(); - return NULL; -} - -void ThreadPool::waitUntilEmpty() -{ - osl::ResettableMutexGuard aGuard( maGuard ); - - if( maWorkers.empty() ) - { // no threads at all -> execute the work in-line - ThreadTask *pTask; - while ( ( pTask = popWork() ) ) - { - pTask->doWork(); - delete pTask; - } - mbTerminate = true; - } - else - { - aGuard.clear(); - maTasksEmpty.wait(); - aGuard.reset(); - } - assert( maTasks.empty() ); -} - -/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/sc/source/filter/oox/threadpool.hxx b/sc/source/filter/oox/threadpool.hxx deleted file mode 100644 index 19b524c57d4e..000000000000 --- a/sc/source/filter/oox/threadpool.hxx +++ /dev/null @@ -1,53 +0,0 @@ -/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ -/* - * This file is part of the LibreOffice project. - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ - -#ifndef INCLUDED_SC_SOURCE_FILTER_OOX_THREADPOOL_HXX -#define INCLUDED_SC_SOURCE_FILTER_OOX_THREADPOOL_HXX - -#include -#include -#include -#include -#include -#include - -class ThreadTask -{ -public: - virtual ~ThreadTask() {} - virtual void doWork() = 0; -}; - -/// A very basic thread pool implementation -class ThreadPool -{ -public: - ThreadPool( sal_Int32 nWorkers ); - virtual ~ThreadPool(); - void pushTask( ThreadTask *pTask /* takes ownership */ ); - void waitUntilEmpty(); - void waitUntilWorkersDone(); - -private: - class ThreadWorker; - friend class ThreadWorker; - - ThreadTask *waitForWork( osl::Condition &rNewWork ); - ThreadTask *popWork(); - - osl::Mutex maGuard; - osl::Condition maTasksEmpty; - bool mbTerminate; - std::vector< rtl::Reference< ThreadWorker > > maWorkers; - std::vector< ThreadTask * > maTasks; -}; - -#endif // INCLUDED_SC_SOURCE_FILTER_OOX_THREADPOOL_HXX - -/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/sc/source/filter/oox/workbookfragment.cxx b/sc/source/filter/oox/workbookfragment.cxx index 26bf3b48e06b..62ea5bb415e1 100644 --- a/sc/source/filter/oox/workbookfragment.cxx +++ b/sc/source/filter/oox/workbookfragment.cxx @@ -46,7 +46,6 @@ #include "worksheethelper.hxx" #include "worksheetfragment.hxx" #include "sheetdatacontext.hxx" -#include "threadpool.hxx" #include "officecfg/Office/Common.hxx" #include "document.hxx" @@ -58,6 +57,7 @@ #include #include +#include #include #include @@ -207,7 +207,7 @@ namespace { typedef std::pair SheetFragmentHandler; typedef std::vector SheetFragmentVector; -class WorkerThread : public ThreadTask +class WorkerThread : public comphelper::ThreadTask { sal_Int32 &mrSheetsLeft; WorkbookFragment& mrWorkbookHandler; @@ -311,7 +311,7 @@ void importSheetFragments( WorkbookFragment& rWorkbookHandler, SheetFragmentVect // test sequential read in this mode if( nThreads < 0) nThreads = 0; - ThreadPool aPool( nThreads ); + comphelper::ThreadPool aPool( nThreads ); sal_Int32 nSheetsLeft = 0; ProgressBarTimer aProgressUpdater; -- cgit