diff options
author | Michael Meeks <michael.meeks@collabora.com> | 2013-11-27 18:11:34 +0000 |
---|---|---|
committer | Michael Meeks <michael.meeks@collabora.com> | 2013-11-27 19:19:50 +0000 |
commit | 0710299634a2276749c36ed86a5a60a20d63073f (patch) | |
tree | 6bcd379e905c902d50cb9a10b3fcc6d53d55f4dc /sc | |
parent | 8bdf8dc075928bd0eb3e49ac0f51727c0677dc37 (diff) |
sc: threaded parsing of the core data inside large XLSX files
Enabled in experimental mode only or via SC_IMPORT_THREADS=<N> this
allows significant parallelisation of sheet reading. I also implement
a simple thread pool to manage that.
Change-Id: I66c72211f2699490230e993a374c26b1892eac12
Diffstat (limited to 'sc')
-rw-r--r-- | sc/Library_scfilt.mk | 1 | ||||
-rw-r--r-- | sc/source/filter/inc/sheetdatacontext.hxx | 11 | ||||
-rw-r--r-- | sc/source/filter/oox/sheetdatacontext.cxx | 6 | ||||
-rw-r--r-- | sc/source/filter/oox/threadpool.cxx | 162 | ||||
-rw-r--r-- | sc/source/filter/oox/threadpool.hxx | 53 | ||||
-rw-r--r-- | sc/source/filter/oox/workbookfragment.cxx | 212 |
6 files changed, 285 insertions, 160 deletions
diff --git a/sc/Library_scfilt.mk b/sc/Library_scfilt.mk index 499f87326295..eb0d5d247036 100644 --- a/sc/Library_scfilt.mk +++ b/sc/Library_scfilt.mk @@ -211,6 +211,7 @@ $(eval $(call gb_Library_add_exception_objects,scfilt,\ sc/source/filter/oox/tablebuffer \ sc/source/filter/oox/tablefragment \ sc/source/filter/oox/themebuffer \ + sc/source/filter/oox/threadpool \ sc/source/filter/oox/unitconverter \ sc/source/filter/oox/viewsettings \ sc/source/filter/oox/workbookfragment \ diff --git a/sc/source/filter/inc/sheetdatacontext.hxx b/sc/source/filter/inc/sheetdatacontext.hxx index b492d2ab4ea7..3f3e377f5c54 100644 --- a/sc/source/filter/inc/sheetdatacontext.hxx +++ b/sc/source/filter/inc/sheetdatacontext.hxx @@ -23,6 +23,9 @@ #include "excelhandlers.hxx" #include "richstring.hxx" #include "sheetdatabuffer.hxx" +#include <vcl/svapp.hxx> + +#define MULTI_THREAD_SHEET_PARSING 1 namespace oox { namespace xls { @@ -54,8 +57,16 @@ struct SheetDataContextBase */ class SheetDataContext : public WorksheetContextBase, private SheetDataContextBase { + // If we are doing threaded parsing, this SheetDataContext + // forms the inner loop for bulk data parsing, and for the + // duration of this we can drop the solar mutex. +#if MULTI_THREAD_SHEET_PARSING + SolarMutexReleaser aReleaser; +#endif + public: explicit SheetDataContext( WorksheetFragmentBase& rFragment ); + virtual ~SheetDataContext(); protected: virtual ::oox::core::ContextHandlerRef onCreateContext( sal_Int32 nElement, const AttributeList& rAttribs ); diff --git a/sc/source/filter/oox/sheetdatacontext.cxx b/sc/source/filter/oox/sheetdatacontext.cxx index 5170234158ea..9a0f7dfd83f6 100644 --- a/sc/source/filter/oox/sheetdatacontext.cxx +++ b/sc/source/filter/oox/sheetdatacontext.cxx @@ -90,6 +90,12 @@ SheetDataContext::SheetDataContext( WorksheetFragmentBase& rFragment ) : mnRow( -1 ), mnCol( -1 ) { + SAL_INFO( "sc.filter", "start safe sheet data context - unlock\n" ); +} + +SheetDataContext::~SheetDataContext() +{ + SAL_INFO( "sc.filter", "end safe sheet data context - relock\n" ); } ContextHandlerRef SheetDataContext::onCreateContext( sal_Int32 nElement, const AttributeList& rAttribs ) diff --git a/sc/source/filter/oox/threadpool.cxx b/sc/source/filter/oox/threadpool.cxx new file mode 100644 index 000000000000..9de1a1454ceb --- /dev/null +++ b/sc/source/filter/oox/threadpool.cxx @@ -0,0 +1,162 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include "threadpool.hxx" + +class ThreadPool::ThreadWorker : public salhelper::Thread +{ + ThreadPool *mpPool; + osl::Condition maNewWork; +public: + ThreadWorker( ThreadPool *pPool ) : + salhelper::Thread("sheet-import-thread-pool"), + mpPool( pPool ) {} + + virtual void execute() + { + ThreadTask *pTask; + while ( ( pTask = waitForWork() ) ) + { + pTask->doWork(); + delete pTask; + } + } + + ThreadTask *waitForWork() + { + ThreadTask *pRet = NULL; + + osl::ResettableMutexGuard aGuard( mpPool->maGuard ); + + pRet = mpPool->popWork(); + + while( !pRet ) + { + maNewWork.reset(); + + if( mpPool->mbTerminate ) + break; + + aGuard.clear(); // unlock + + maNewWork.wait(); + + aGuard.reset(); // lock + + pRet = mpPool->popWork(); + } + + return pRet; + } + + // + // Why a condition per worker thread - you may ask. + // + // Unfortunately the Windows synchronisation API that we wrap + // is horribly inadequate cf. + // http://www.cs.wustl.edu/~schmidt/win32-cv-1.html + // The existing osl::Condition API should only ever be used + // between one producer and one consumer thread to avoid the + // lost wakeup problem. + // + void signalNewWork() + { + maNewWork.set(); + } +}; + +ThreadPool::ThreadPool( sal_Int32 nWorkers ) : + mbTerminate( false ) +{ + for( sal_Int32 i = 0; i < nWorkers; i++ ) + maWorkers.push_back( new ThreadWorker( this ) ); + + maTasksEmpty.reset(); + + osl::MutexGuard aGuard( maGuard ); + for( size_t i = 0; i < maWorkers.size(); i++ ) + maWorkers[ i ]->launch(); +} + +ThreadPool::~ThreadPool() +{ + waitUntilWorkersDone(); +} + +/// wait until all the workers have completed and +/// terminate all threads +void ThreadPool::waitUntilWorkersDone() +{ + waitUntilEmpty(); + + osl::ResettableMutexGuard aGuard( maGuard ); + mbTerminate = true; + + while( !maWorkers.empty() ) + { + rtl::Reference< ThreadWorker > xWorker = maWorkers.back(); + maWorkers.pop_back(); + assert( maWorkers.find( xWorker ) == maWorkers.end() ); + xWorker->signalNewWork(); + aGuard.clear(); + { // unlocked + xWorker->join(); + xWorker.clear(); + } + aGuard.reset(); + } +} + +void ThreadPool::pushTask( ThreadTask *pTask ) +{ + osl::MutexGuard aGuard( maGuard ); + maTasks.insert( maTasks.begin(), pTask ); + // horrible beyond belief: + for( size_t i = 0; i < maWorkers.size(); i++ ) + maWorkers[ i ]->signalNewWork(); + maTasksEmpty.reset(); +} + +ThreadTask *ThreadPool::popWork() +{ + if( !maTasks.empty() ) + { + ThreadTask *pTask = maTasks.back(); + maTasks.pop_back(); + return pTask; + } + else + maTasksEmpty.set(); + return NULL; +} + +void ThreadPool::waitUntilEmpty() +{ + osl::ResettableMutexGuard aGuard( maGuard ); + + if( maWorkers.empty() ) + { // no threads at all -> execute the work in-line + ThreadTask *pTask; + while ( ( pTask = popWork() ) ) + { + pTask->doWork(); + delete pTask; + } + mbTerminate = true; + } + else + { + aGuard.clear(); + maTasksEmpty.wait(); + aGuard.reset(); + } + assert( maTasks.empty() ); +} + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/sc/source/filter/oox/threadpool.hxx b/sc/source/filter/oox/threadpool.hxx new file mode 100644 index 000000000000..036534f6ff9d --- /dev/null +++ b/sc/source/filter/oox/threadpool.hxx @@ -0,0 +1,53 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#ifndef SC_THREADPOOL_HXX +#define SC_THREADPOOL_HXX + +#include <sal/config.h> +#include <salhelper/thread.hxx> +#include <osl/mutex.hxx> +#include <osl/conditn.hxx> +#include <rtl/ref.hxx> +#include <vector> + +class ThreadTask +{ +public: + virtual ~ThreadTask() {} + virtual void doWork() = 0; +}; + +/// A very basic thread pool implementation +class ThreadPool +{ +public: + ThreadPool( sal_Int32 nWorkers ); + virtual ~ThreadPool(); + void pushTask( ThreadTask *pTask /* takes ownership */ ); + void waitUntilEmpty(); + void waitUntilWorkersDone(); + +private: + class ThreadWorker; + friend class ThreadWorker; + + ThreadTask *waitForWork( osl::Condition &rNewWork ); + ThreadTask *popWork(); + + osl::Mutex maGuard; + osl::Condition maTasksEmpty; + bool mbTerminate; + std::vector< rtl::Reference< ThreadWorker > > maWorkers; + std::vector< ThreadTask * > maTasks; +}; + +#endif // SC_THREADPOOL_HXX + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/sc/source/filter/oox/workbookfragment.cxx b/sc/source/filter/oox/workbookfragment.cxx index 7acb16ae4699..d3c06ac90c73 100644 --- a/sc/source/filter/oox/workbookfragment.cxx +++ b/sc/source/filter/oox/workbookfragment.cxx @@ -42,11 +42,16 @@ #include "workbooksettings.hxx" #include "worksheetbuffer.hxx" #include "worksheetfragment.hxx" +#include "sheetdatacontext.hxx" +#include "threadpool.hxx" +#include "officecfg/Office/Common.hxx" #include "document.hxx" #include "docsh.hxx" #include "calcconfig.hxx" +#include <vcl/svapp.hxx> + #include <oox/core/fastparser.hxx> #include <salhelper/thread.hxx> #include <osl/conditn.hxx> @@ -54,8 +59,6 @@ #include <queue> #include <boost/scoped_ptr.hpp> -#define MULTI_THREAD_SHEET_PARSING 0 - #include "oox/ole/vbaproject.hxx" namespace oox { @@ -204,188 +207,77 @@ namespace { typedef std::pair<WorksheetGlobalsRef, FragmentHandlerRef> SheetFragmentHandler; typedef std::vector<SheetFragmentHandler> SheetFragmentVector; -#if MULTI_THREAD_SHEET_PARSING - -class WorkerThread; -typedef rtl::Reference<WorkerThread> WorkerThreadRef; - -struct WorkerThreadData -{ - osl::Mutex maMtx; - std::vector<WorkerThreadRef> maThreads; -}; - -struct IdleWorkerThreadData -{ - osl::Mutex maMtx; - osl::Condition maCondAdded; - std::queue<WorkerThread*> maThreads; -}; - -struct -{ - boost::scoped_ptr<WorkerThreadData> mpWorkerThreads; - boost::scoped_ptr<IdleWorkerThreadData> mpIdleThreads; - -} aThreadGlobals; - -enum WorkerAction -{ - None = 0, - TerminateThread, - Work -}; - -class WorkerThread : public salhelper::Thread +class WorkerThread : public ThreadTask { WorkbookFragment& mrWorkbookHandler; - size_t mnID; - FragmentHandlerRef mxHandler; - boost::scoped_ptr<oox::core::FastParser> mxParser; - osl::Mutex maMtxAction; - osl::Condition maCondActionChanged; - WorkerAction meAction; -public: - WorkerThread( WorkbookFragment& rWorkbookHandler, size_t nID ) : - salhelper::Thread("sheet-import-worker-thread"), - mrWorkbookHandler(rWorkbookHandler), - mnID(nID), - mxParser(rWorkbookHandler.getOoxFilter().createParser()), - meAction(None) {} - - virtual void execute() - { - announceIdle(); - - // Keep looping until the terminate request is set. - for (maCondActionChanged.wait(); true; maCondActionChanged.wait()) - { - osl::MutexGuard aGuard(maMtxAction); - if (!maCondActionChanged.check()) - // Wait again. - continue; - - maCondActionChanged.reset(); - - if (meAction == TerminateThread) - // End the thread. - return; - - if (meAction != Work) - continue; - -#if 0 - // TODO : This still deadlocks in the fast parser code. - mrWorkbookHandler.importOoxFragment(mxHandler, *mxParser); -#else - double val = rand() / static_cast<double>(RAND_MAX); - val *= 1000000; // normalize to 1 second. - val *= 1.5; // inflate it a bit. - usleep(val); // pretend to be working while asleep. -#endif - announceIdle(); - } - } - - void announceIdle() - { - // Set itself idle to receive a new task from the main thread. - osl::MutexGuard aGuard(aThreadGlobals.mpIdleThreads->maMtx); - aThreadGlobals.mpIdleThreads->maThreads.push(this); - aThreadGlobals.mpIdleThreads->maCondAdded.set(); - } + rtl::Reference<FragmentHandler> mxHandler; - void terminate() +public: + WorkerThread( WorkbookFragment& rWorkbookHandler, + const rtl::Reference<FragmentHandler>& xHandler ) : + mrWorkbookHandler( rWorkbookHandler ), + mxHandler( xHandler ) { - osl::MutexGuard aGuard(maMtxAction); - meAction = TerminateThread; - maCondActionChanged.set(); } - void assign( const FragmentHandlerRef& rHandler ) + virtual void doWork() { - osl::MutexGuard aGuard(maMtxAction); - mxHandler = rHandler; - meAction = Work; - maCondActionChanged.set(); + // We hold the solar mutex in all threads except for + // the small safe section of the inner loop in + // sheetdatacontext.cxx + SAL_INFO( "sc.filter", "start wait on solar\n" ); + SolarMutexGuard maGuard; + SAL_INFO( "sc.filter", "got solar\n" ); + + boost::scoped_ptr<oox::core::FastParser> xParser( + mrWorkbookHandler.getOoxFilter().createParser() ); + + SAL_INFO( "sc.filter", "start import\n" ); + mrWorkbookHandler.importOoxFragment( mxHandler, *xParser ); + SAL_INFO( "sc.filter", "end import, release solar\n" ); } }; -#endif - void importSheetFragments( WorkbookFragment& rWorkbookHandler, SheetFragmentVector& rSheets ) { -#if MULTI_THREAD_SHEET_PARSING // threaded version - size_t nThreadCount = 3; - if (nThreadCount > rSheets.size()) - nThreadCount = rSheets.size(); + sal_Int32 nThreads = std::min( rSheets.size(), (size_t) 4 /* FIXME: ncpus/2 */ ); - // Create new thread globals. - aThreadGlobals.mpWorkerThreads.reset(new WorkerThreadData); - aThreadGlobals.mpIdleThreads.reset(new IdleWorkerThreadData); + Reference< XComponentContext > xContext = comphelper::getProcessComponentContext(); - SheetFragmentVector::iterator it = rSheets.begin(), itEnd = rSheets.end(); + // Force threading off unless experimental mode or env. var is set. + if( !officecfg::Office::Common::Misc::ExperimentalMode::get( xContext ) ) + nThreads = 0; - { - // Initialize worker threads. - osl::MutexGuard aGuard(aThreadGlobals.mpWorkerThreads->maMtx); - for (size_t i = 0; i < nThreadCount; ++i) - { - WorkerThreadRef pThread(new WorkerThread(rWorkbookHandler, i)); - aThreadGlobals.mpWorkerThreads->maThreads.push_back(pThread); - pThread->launch(); - } - } + const char *pEnv; + if( ( pEnv = getenv( "SC_IMPORT_THREADS" ) ) ) + nThreads = rtl_str_toInt32( pEnv, 10 ); - for (aThreadGlobals.mpIdleThreads->maCondAdded.wait(); true; aThreadGlobals.mpIdleThreads->maCondAdded.wait()) + if( nThreads != 0 ) { - osl::MutexGuard aGuard(aThreadGlobals.mpIdleThreads->maMtx); - if (!aThreadGlobals.mpIdleThreads->maCondAdded.check()) - // Wait again. - continue; - - aThreadGlobals.mpIdleThreads->maCondAdded.reset(); - - // Assign work to all idle threads. - while (!aThreadGlobals.mpIdleThreads->maThreads.empty()) - { - if (it == itEnd) - break; - - WorkerThread* p = aThreadGlobals.mpIdleThreads->maThreads.front(); - aThreadGlobals.mpIdleThreads->maThreads.pop(); - p->assign(it->second); - ++it; - } + // test sequential read in this mode + if( nThreads < 0) + nThreads = 0; + ThreadPool aPool( nThreads ); - if (it == itEnd) - // Finished! Exit the loop. - break; - } + SheetFragmentVector::iterator it = rSheets.begin(), itEnd = rSheets.end(); + for( ; it != itEnd; ++it ) + aPool.pushTask( new WorkerThread( rWorkbookHandler, it->second ) ) + ; - { - // Terminate all worker threads. - osl::MutexGuard aGuard(aThreadGlobals.mpWorkerThreads->maMtx); - for (size_t i = 0, n = aThreadGlobals.mpWorkerThreads->maThreads.size(); i < n; ++i) { - WorkerThreadRef pWorker = aThreadGlobals.mpWorkerThreads->maThreads[i]; - pWorker->terminate(); - if (pWorker.is()) - pWorker->join(); + // Ideally no-one else but our worker threads can re-acquire that. + // potentially if that causes a problem we might want to extend + // the SolarMutex functionality to allow passing it around. + SolarMutexReleaser aReleaser; + aPool.waitUntilWorkersDone(); } } - - // Delete all thread globals. - aThreadGlobals.mpWorkerThreads.reset(); - aThreadGlobals.mpIdleThreads.reset(); - -#else // non-threaded version - for( SheetFragmentVector::iterator it = rSheets.begin(), itEnd = rSheets.end(); it != itEnd; ++it) + else { - // import the sheet fragment - rWorkbookHandler.importOoxFragment(it->second); + SheetFragmentVector::iterator it = rSheets.begin(), itEnd = rSheets.end(); + for( ; it != itEnd; ++it ) + rWorkbookHandler.importOoxFragment( it->second ); } -#endif } } |