diff options
Diffstat (limited to 'cppu/source/threadpool')
-rw-r--r-- | cppu/source/threadpool/thread.cxx | 184 | ||||
-rw-r--r-- | cppu/source/threadpool/thread.hxx | 52 | ||||
-rw-r--r-- | cppu/source/threadpool/threadpool.cxx | 124 | ||||
-rw-r--r-- | cppu/source/threadpool/threadpool.hxx | 43 |
4 files changed, 202 insertions, 201 deletions
diff --git a/cppu/source/threadpool/thread.cxx b/cppu/source/threadpool/thread.cxx index cc22a453c79d..12ea09a131ee 100644 --- a/cppu/source/threadpool/thread.cxx +++ b/cppu/source/threadpool/thread.cxx @@ -33,7 +33,6 @@ #include <com/sun/star/lang/DisposedException.hpp> #include <com/sun/star/uno/Reference.hxx> #include <com/sun/star/uno/XInterface.hpp> -#include <rtl/instance.hxx> #include <rtl/ustring.h> #include <rtl/ustring.hxx> @@ -48,17 +47,6 @@ namespace css = com::sun::star; } using namespace osl; -extern "C" { - -void SAL_CALL cppu_requestThreadWorker( void *pVoid ) -{ - ::cppu_threadpool::ORequestThread *pThread = ( ::cppu_threadpool::ORequestThread * ) pVoid; - - pThread->run(); - pThread->onTerminated(); -} - -} namespace cppu_threadpool { @@ -75,7 +63,7 @@ namespace cppu_threadpool { #endif } - void ThreadAdmin::add( ORequestThread *p ) + void ThreadAdmin::add( rtl::Reference< ORequestThread > const & p ) { MutexGuard aGuard( m_mutex ); if( m_disposed ) @@ -90,12 +78,19 @@ namespace cppu_threadpool { m_lst.push_back( p ); } - void ThreadAdmin::remove( ORequestThread * p ) + void ThreadAdmin::remove_locked( rtl::Reference< ORequestThread > const & p ) + { + ::std::list< rtl::Reference< ORequestThread > >::iterator ii = ::std::find( m_lst.begin(), m_lst.end(), p ); + if( ii != m_lst.end() ) + { + m_lst.erase( ii ); + } + } + + void ThreadAdmin::remove( rtl::Reference< ORequestThread > const & p ) { MutexGuard aGuard( m_mutex ); - ::std::list< ORequestThread * >::iterator ii = ::std::find( m_lst.begin(), m_lst.end(), p ); - OSL_ASSERT( ii != m_lst.end() ); - m_lst.erase( ii ); + remove_locked( p ); } void ThreadAdmin::join() @@ -104,62 +99,34 @@ namespace cppu_threadpool { MutexGuard aGuard( m_mutex ); m_disposed = true; } - ORequestThread *pCurrent; - do + for (;;) { - pCurrent = 0; + rtl::Reference< ORequestThread > pCurrent; { MutexGuard aGuard( m_mutex ); - if( ! m_lst.empty() ) + if( m_lst.empty() ) { - pCurrent = m_lst.front(); - pCurrent->setDeleteSelf( sal_False ); + break; } + pCurrent = m_lst.front(); + m_lst.pop_front(); } - if ( pCurrent ) - { - pCurrent->join(); - delete pCurrent; - } - } while( pCurrent ); - } - - struct theThreadAdmin : public rtl::StaticWithInit< ThreadAdminHolder, theThreadAdmin > - { - ThreadAdminHolder operator () () { - ThreadAdminHolder aRet(new ThreadAdmin()); - return aRet; + pCurrent->join(); } - }; - - ThreadAdminHolder& ThreadAdmin::getInstance() - { - return theThreadAdmin::get(); } // ---------------------------------------------------------------------------------- - ORequestThread::ORequestThread( JobQueue *pQueue, + ORequestThread::ORequestThread( ThreadPoolHolder const &aThreadPool, + JobQueue *pQueue, const ByteSequence &aThreadId, sal_Bool bAsynchron ) - : m_thread( 0 ) - , m_aThreadAdmin( ThreadAdmin::getInstance() ) + : m_aThreadPool( aThreadPool ) , m_pQueue( pQueue ) , m_aThreadId( aThreadId ) , m_bAsynchron( bAsynchron ) - , m_bDeleteSelf( sal_True ) - { - m_aThreadAdmin->add( this ); - } - - - ORequestThread::~ORequestThread() - { - if (m_thread != 0) - { - osl_destroyThread(m_thread); - } - } + {} + ORequestThread::~ORequestThread() {} void ORequestThread::setTask( JobQueue *pQueue, const ByteSequence &aThreadId, @@ -170,74 +137,81 @@ namespace cppu_threadpool { m_bAsynchron = bAsynchron; } - sal_Bool ORequestThread::create() + void ORequestThread::launch() { - OSL_ASSERT(m_thread == 0); // only one running thread per instance - - m_thread = osl_createSuspendedThread( cppu_requestThreadWorker, (void*)this); - if ( m_thread ) - { - osl_resumeThread( m_thread ); + // Assumption is that osl::Thread::create returns normally with a true + // return value iff it causes osl::Thread::run to start executing: + acquire(); + ThreadAdmin & rThreadAdmin = m_aThreadPool->getThreadAdmin(); + osl::ClearableMutexGuard g(rThreadAdmin.m_mutex); + rThreadAdmin.add( this ); + try { + if (!create()) { + throw std::runtime_error("osl::Thread::create failed"); + } + } catch (...) { + rThreadAdmin.remove_locked( this ); + g.clear(); + release(); + throw; } - - return m_thread != 0; - } - - void ORequestThread::join() - { - osl_joinWithThread( m_thread ); } void ORequestThread::onTerminated() { - m_aThreadAdmin->remove( this ); - if( m_bDeleteSelf ) - { - delete this; - } + m_aThreadPool->getThreadAdmin().remove( this ); + release(); } void ORequestThread::run() { - ThreadPoolHolder theThreadPool = cppu_threadpool::ThreadPool::getInstance(); - - while ( m_pQueue ) + try { - if( ! m_bAsynchron ) + while ( m_pQueue ) { - if ( !uno_bindIdToCurrentThread( m_aThreadId.getHandle() ) ) + if( ! m_bAsynchron ) { - OSL_ASSERT( false ); + if ( !uno_bindIdToCurrentThread( m_aThreadId.getHandle() ) ) + { + OSL_ASSERT( false ); + } } - } - while( ! m_pQueue->isEmpty() ) - { - // Note : Oneways should not get a disposable disposeid, - // It does not make sense to dispose a call in this state. - // That's way we put it an disposeid, that can't be used otherwise. - m_pQueue->enter( - sal::static_int_cast< sal_Int64 >( - reinterpret_cast< sal_IntPtr >(this)), - sal_True ); - - if( m_pQueue->isEmpty() ) + while( ! m_pQueue->isEmpty() ) { - theThreadPool->revokeQueue( m_aThreadId , m_bAsynchron ); - // Note : revokeQueue might have failed because m_pQueue.isEmpty() - // may be false (race). + // Note : Oneways should not get a disposable disposeid, + // It does not make sense to dispose a call in this state. + // That's way we put it an disposeid, that can't be used otherwise. + m_pQueue->enter( + sal::static_int_cast< sal_Int64 >( + reinterpret_cast< sal_IntPtr >(this)), + sal_True ); + + if( m_pQueue->isEmpty() ) + { + m_aThreadPool->revokeQueue( m_aThreadId , m_bAsynchron ); + // Note : revokeQueue might have failed because m_pQueue.isEmpty() + // may be false (race). + } } - } - delete m_pQueue; - m_pQueue = 0; + delete m_pQueue; + m_pQueue = 0; - if( ! m_bAsynchron ) - { - uno_releaseIdFromCurrentThread(); - } + if( ! m_bAsynchron ) + { + uno_releaseIdFromCurrentThread(); + } - theThreadPool->waitInPool( this ); + m_aThreadPool->waitInPool( this ); + } + } + catch (...) + { + // Work around the problem that onTerminated is not called if run + // throws an exception: + onTerminated(); + throw; } } } diff --git a/cppu/source/threadpool/thread.hxx b/cppu/source/threadpool/thread.hxx index a3ea45aadaed..88f3d91f8722 100644 --- a/cppu/source/threadpool/thread.hxx +++ b/cppu/source/threadpool/thread.hxx @@ -28,63 +28,49 @@ #ifndef _CPPU_THREADPOOL_THREAD_HXX #define _CPPU_THREADPOOL_THREAD_HXX -#include <list> +#include <osl/thread.hxx> #include <sal/types.h> - -#include <osl/thread.h> +#include <salhelper/simplereferenceobject.hxx> #include "jobqueue.hxx" +#include "threadpool.hxx" namespace cppu_threadpool { class JobQueue; - class ThreadAdmin; - typedef boost::shared_ptr<ThreadAdmin> ThreadAdminHolder; //----------------------------------------- // private thread class for the threadpool // independent from vos //----------------------------------------- - class ORequestThread + class ORequestThread: + public salhelper::SimpleReferenceObject, public osl::Thread { public: - ORequestThread( JobQueue * , + ORequestThread( ThreadPoolHolder const &aThreadPool, + JobQueue * , const ::rtl::ByteSequence &aThreadId, sal_Bool bAsynchron ); - ~ORequestThread(); + virtual ~ORequestThread(); void setTask( JobQueue * , const ::rtl::ByteSequence & aThreadId , sal_Bool bAsynchron ); - sal_Bool create(); - void join(); - void onTerminated(); - void run(); - inline void setDeleteSelf( sal_Bool b ) - { m_bDeleteSelf = b; } + void launch(); + + static inline void * operator new(std::size_t size) + { return SimpleReferenceObject::operator new(size); } + + static inline void operator delete(void * pointer) + { SimpleReferenceObject::operator delete(pointer); } private: - oslThread m_thread; - ThreadAdminHolder m_aThreadAdmin; + virtual void SAL_CALL run(); + virtual void SAL_CALL onTerminated(); + + ThreadPoolHolder m_aThreadPool; JobQueue *m_pQueue; ::rtl::ByteSequence m_aThreadId; sal_Bool m_bAsynchron; - sal_Bool m_bDeleteSelf; - }; - - class ThreadAdmin - { - public: - ThreadAdmin(); - ~ThreadAdmin (); - static ThreadAdminHolder &getInstance(); - void add( ORequestThread * ); - void remove( ORequestThread * ); - void join(); - - private: - ::osl::Mutex m_mutex; - ::std::list< ORequestThread * > m_lst; - bool m_disposed; }; } // end cppu_threadpool diff --git a/cppu/source/threadpool/threadpool.cxx b/cppu/source/threadpool/threadpool.cxx index d14e26006e04..e4351cb9cc4a 100644 --- a/cppu/source/threadpool/threadpool.cxx +++ b/cppu/source/threadpool/threadpool.cxx @@ -109,15 +109,6 @@ namespace cppu_threadpool //------------------------------------------------------------------------------- - struct theThreadPool : - public rtl::StaticWithInit< ThreadPoolHolder, theThreadPool > - { - ThreadPoolHolder operator () () { - ThreadPoolHolder aRet(new ThreadPool()); - return aRet; - } - }; - ThreadPool::ThreadPool() { m_DisposedCallerAdmin = DisposedCallerAdmin::getInstance(); @@ -132,46 +123,24 @@ namespace cppu_threadpool } #endif } - ThreadPoolHolder ThreadPool::getInstance() - { - return theThreadPool::get(); - } - void ThreadPool::dispose( sal_Int64 nDisposeId ) { - if( nDisposeId ) - { - m_DisposedCallerAdmin->dispose( nDisposeId ); + m_DisposedCallerAdmin->dispose( nDisposeId ); - MutexGuard guard( m_mutex ); - for( ThreadIdHashMap::iterator ii = m_mapQueue.begin() ; - ii != m_mapQueue.end(); - ++ii) + MutexGuard guard( m_mutex ); + for( ThreadIdHashMap::iterator ii = m_mapQueue.begin() ; + ii != m_mapQueue.end(); + ++ii) + { + if( (*ii).second.first ) { - if( (*ii).second.first ) - { - (*ii).second.first->dispose( nDisposeId ); - } - if( (*ii).second.second ) - { - (*ii).second.second->dispose( nDisposeId ); - } + (*ii).second.first->dispose( nDisposeId ); } - } - else - { + if( (*ii).second.second ) { - MutexGuard guard( m_mutexWaitingThreadList ); - for( WaitingThreadList::iterator ii = m_lstThreads.begin() ; - ii != m_lstThreads.end() ; - ++ ii ) - { - // wake the threads up - osl_setCondition( (*ii)->condition ); - } + (*ii).second.second->dispose( nDisposeId ); } - ThreadAdmin::getInstance()->join(); } } @@ -185,7 +154,7 @@ namespace cppu_threadpool * a new request comes in, this thread is reused. This is done only to improve performance, * it is not required for threadpool functionality. ******************/ - void ThreadPool::waitInPool( ORequestThread * pThread ) + void ThreadPool::waitInPool( rtl::Reference< ORequestThread > const & pThread ) { struct WaitingThread waitingThread; waitingThread.condition = osl_createCondition(); @@ -201,7 +170,7 @@ namespace cppu_threadpool { MutexGuard guard ( m_mutexWaitingThreadList ); - if( waitingThread.thread ) + if( waitingThread.thread.is() ) { // thread wasn't reused, remove it from the list WaitingThreadList::iterator ii = find( @@ -214,6 +183,21 @@ namespace cppu_threadpool osl_destroyCondition( waitingThread.condition ); } + void ThreadPool::joinWorkers() + { + { + MutexGuard guard( m_mutexWaitingThreadList ); + for( WaitingThreadList::iterator ii = m_lstThreads.begin() ; + ii != m_lstThreads.end() ; + ++ ii ) + { + // wake the threads up + osl_setCondition( (*ii)->condition ); + } + } + m_aThreadAdmin.join(); + } + void ThreadPool::createThread( JobQueue *pQueue , const ByteSequence &aThreadId, sal_Bool bAsynchron ) @@ -240,10 +224,9 @@ namespace cppu_threadpool if( bCreate ) { - ORequestThread *pThread = - new ORequestThread( pQueue , aThreadId, bAsynchron); - // deletes itself ! - pThread->create(); + rtl::Reference< ORequestThread > pThread( + new ORequestThread( this, pQueue , aThreadId, bAsynchron) ); + pThread->launch(); } } @@ -385,6 +368,12 @@ namespace cppu_threadpool } } +// All uno_ThreadPool handles in g_pThreadpoolHashSet with overlapping life +// spans share one ThreadPool instance. When g_pThreadpoolHashSet becomes empty +// (within the last uno_threadpool_destroy) all worker threads spawned by that +// ThreadPool instance are joined (which implies that uno_threadpool_destroy +// must never be called from a worker thread); afterwards, the next call to +// uno_threadpool_create (if any) will lead to a new ThreadPool instance. using namespace cppu_threadpool; @@ -415,27 +404,47 @@ struct _uno_ThreadPool sal_Int32 dummy; }; +namespace { + +ThreadPoolHolder getThreadPool( uno_ThreadPool hPool ) +{ + MutexGuard guard( Mutex::getGlobalMutex() ); + assert( g_pThreadpoolHashSet != 0 ); + ThreadpoolHashSet::iterator i( g_pThreadpoolHashSet->find(hPool) ); + assert( i != g_pThreadpoolHashSet->end() ); + return i->second; +} + +} + extern "C" uno_ThreadPool SAL_CALL uno_threadpool_create() SAL_THROW_EXTERN_C() { MutexGuard guard( Mutex::getGlobalMutex() ); + ThreadPoolHolder p; if( ! g_pThreadpoolHashSet ) { g_pThreadpoolHashSet = new ThreadpoolHashSet(); + p = new ThreadPool; + } + else + { + assert( !g_pThreadpoolHashSet->empty() ); + p = g_pThreadpoolHashSet->begin()->second; } // Just ensure that the handle is unique in the process (via heap) uno_ThreadPool h = new struct _uno_ThreadPool; - g_pThreadpoolHashSet->insert( ThreadpoolHashSet::value_type(h, ThreadPool::getInstance()) ); + g_pThreadpoolHashSet->insert( ThreadpoolHashSet::value_type(h, p) ); return h; } extern "C" void SAL_CALL -uno_threadpool_attach(SAL_UNUSED_PARAMETER uno_ThreadPool) SAL_THROW_EXTERN_C() +uno_threadpool_attach( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() { sal_Sequence *pThreadId = 0; uno_getIdOfCurrentThread( &pThreadId ); - ThreadPool::getInstance()->prepare( pThreadId ); + getThreadPool( hPool )->prepare( pThreadId ); rtl_byte_sequence_release( pThreadId ); uno_releaseIdFromCurrentThread(); } @@ -447,7 +456,7 @@ uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob ) sal_Sequence *pThreadId = 0; uno_getIdOfCurrentThread( &pThreadId ); *ppJob = - ThreadPool::getInstance()->enter( + getThreadPool( hPool )->enter( pThreadId, sal::static_int_cast< sal_Int64 >( reinterpret_cast< sal_IntPtr >(hPool)) ); @@ -463,19 +472,19 @@ uno_threadpool_detach(SAL_UNUSED_PARAMETER uno_ThreadPool) SAL_THROW_EXTERN_C() extern "C" void SAL_CALL uno_threadpool_putJob( - SAL_UNUSED_PARAMETER uno_ThreadPool, + uno_ThreadPool hPool, sal_Sequence *pThreadId, void *pJob, void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ), sal_Bool bIsOneway ) SAL_THROW_EXTERN_C() { - ThreadPool::getInstance()->addJob( pThreadId, bIsOneway, pJob ,doRequest ); + getThreadPool(hPool)->addJob( pThreadId, bIsOneway, pJob ,doRequest ); } extern "C" void SAL_CALL uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() { - ThreadPool::getInstance()->dispose( + getThreadPool(hPool)->dispose( sal::static_int_cast< sal_Int64 >( reinterpret_cast< sal_IntPtr >(hPool)) ); } @@ -483,9 +492,8 @@ uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() extern "C" void SAL_CALL uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() { - assert(hPool != 0); - - ThreadPool::getInstance()->destroy( + ThreadPoolHolder p( getThreadPool(hPool) ); + p->destroy( sal::static_int_cast< sal_Int64 >( reinterpret_cast< sal_IntPtr >(hPool)) ); @@ -510,7 +518,7 @@ uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() if( empty ) { - uno_threadpool_dispose( 0 ); + p->joinWorkers(); } } diff --git a/cppu/source/threadpool/threadpool.hxx b/cppu/source/threadpool/threadpool.hxx index 8b64ed18d682..6c186271b5fc 100644 --- a/cppu/source/threadpool/threadpool.hxx +++ b/cppu/source/threadpool/threadpool.hxx @@ -25,11 +25,19 @@ * for a copy of the LGPLv3 License. * ************************************************************************/ + +#ifndef INCLUDED_CPPU_SOURCE_THREADPOOL_THREADPOOL_HXX +#define INCLUDED_CPPU_SOURCE_THREADPOOL_THREADPOOL_HXX + +#include <list> + #include <boost/unordered_map.hpp> #include <osl/conditn.h> #include <rtl/byteseq.hxx> +#include <rtl/ref.hxx> +#include <salhelper/simplereferenceobject.hxx> #include <boost/shared_ptr.hpp> @@ -74,7 +82,7 @@ namespace cppu_threadpool { struct WaitingThread { oslCondition condition; - ORequestThread *thread; + rtl::Reference< ORequestThread > thread; }; typedef ::std::list < struct ::cppu_threadpool::WaitingThread * > WaitingThreadList; @@ -98,15 +106,32 @@ namespace cppu_threadpool { DisposedCallerList m_lst; }; + class ThreadAdmin + { + public: + ThreadAdmin(); + ~ThreadAdmin (); + + void add( rtl::Reference< ORequestThread > const & ); + void remove( rtl::Reference< ORequestThread > const & ); + void join(); + + void remove_locked( rtl::Reference< ORequestThread > const & ); + ::osl::Mutex m_mutex; + + private: + ::std::list< rtl::Reference< ORequestThread > > m_lst; + bool m_disposed; + }; + class ThreadPool; - typedef boost::shared_ptr<ThreadPool> ThreadPoolHolder; + typedef rtl::Reference<ThreadPool> ThreadPoolHolder; - class ThreadPool + class ThreadPool: public salhelper::SimpleReferenceObject { public: ThreadPool(); ~ThreadPool(); - static ThreadPoolHolder getInstance(); void dispose( sal_Int64 nDisposeId ); void destroy( sal_Int64 nDisposeId ); @@ -124,7 +149,12 @@ namespace cppu_threadpool { ********/ sal_Bool revokeQueue( const ByteSequence & aThreadId , sal_Bool bAsynchron ); - void waitInPool( ORequestThread *pThread ); + void waitInPool( rtl::Reference< ORequestThread > const & pThread ); + + void joinWorkers(); + + ThreadAdmin & getThreadAdmin() { return m_aThreadAdmin; } + private: void createThread( JobQueue *pQueue, const ByteSequence &aThreadId, sal_Bool bAsynchron); @@ -136,8 +166,11 @@ namespace cppu_threadpool { WaitingThreadList m_lstThreads; DisposedCallerAdminHolder m_DisposedCallerAdmin; + ThreadAdmin m_aThreadAdmin; }; } // end namespace cppu_threadpool +#endif + /* vim:set shiftwidth=4 softtabstop=4 expandtab: */ |