diff options
-rw-r--r-- | binaryurp/source/bridge.cxx | 205 | ||||
-rw-r--r-- | binaryurp/source/bridge.hxx | 25 | ||||
-rw-r--r-- | binaryurp/source/bridgefactory.cxx | 2 | ||||
-rw-r--r-- | binaryurp/source/incomingrequest.cxx | 2 | ||||
-rw-r--r-- | binaryurp/source/reader.cxx | 2 | ||||
-rw-r--r-- | binaryurp/source/writer.cxx | 2 | ||||
-rw-r--r-- | cppu/Library_cppu.mk | 1 | ||||
-rw-r--r-- | cppu/inc/uno/threadpool.h | 8 | ||||
-rw-r--r-- | cppu/source/threadpool/thread.cxx | 184 | ||||
-rw-r--r-- | cppu/source/threadpool/thread.hxx | 52 | ||||
-rw-r--r-- | cppu/source/threadpool/threadpool.cxx | 124 | ||||
-rw-r--r-- | cppu/source/threadpool/threadpool.hxx | 43 | ||||
-rw-r--r-- | cpputools/source/unoexe/unoexe.cxx | 4 |
13 files changed, 372 insertions, 282 deletions
diff --git a/binaryurp/source/bridge.cxx b/binaryurp/source/bridge.cxx index 2d1f622369a3..fbb70a40c835 100644 --- a/binaryurp/source/bridge.cxx +++ b/binaryurp/source/bridge.cxx @@ -106,11 +106,9 @@ extern "C" void SAL_CALL freeProxyCallback( static_cast< Proxy * >(pProxy)->do_free(); } -void joinThread(salhelper::Thread * thread) { +bool isThread(salhelper::Thread * thread) { assert(thread != 0); - if (thread->getIdentifier() != osl::Thread::getCurrentIdentifier()) { - thread->join(); - } + return osl::Thread::getCurrentIdentifier() == thread->getIdentifier(); } class AttachThread: private boost::noncopyable { @@ -214,8 +212,8 @@ Bridge::Bridge( rtl::OUString( RTL_CONSTASCII_USTRINGPARAM( "com.sun.star.bridge.XProtocolProperties::commitChange"))), - threadPool_(0), currentContextMode_(false), proxies_(0), calls_(0), - normalCall_(false), activeCalls_(0), terminated_(false), + state_(STATE_INITIAL), threadPool_(0), currentContextMode_(false), + proxies_(0), calls_(0), normalCall_(false), activeCalls_(0), mode_(MODE_REQUESTED) { assert(factory.is() && connection.is()); @@ -239,11 +237,14 @@ void Bridge::start() { rtl::Reference< Writer > w(new Writer(this)); { osl::MutexGuard g(mutex_); - assert(threadPool_ == 0 && !writer_.is() && !reader_.is()); + assert( + state_ == STATE_INITIAL && threadPool_ == 0 && !writer_.is() && + !reader_.is()); threadPool_ = uno_threadpool_create(); assert(threadPool_ != 0); reader_ = r; writer_ = w; + state_ = STATE_STARTED; } // It is important to call reader_->launch() last here; both // Writer::execute and Reader::execute can call Bridge::terminate, but @@ -255,60 +256,117 @@ void Bridge::start() { r->launch(); } -void Bridge::terminate() { +void Bridge::terminate(bool final) { uno_ThreadPool tp; - rtl::Reference< Reader > r; - rtl::Reference< Writer > w; - Listeners ls; + // Make sure function-local variables (Stubs s, etc.) are destroyed before + // the final uno_threadpool_destroy/threadPool_ = 0: { - osl::MutexGuard g(mutex_); - if (terminated_) { - return; + rtl::Reference< Reader > r; + rtl::Reference< Writer > w; + bool joinW; + Listeners ls; + { + osl::ClearableMutexGuard g(mutex_); + switch (state_) { + case STATE_INITIAL: // via ~Bridge -> dispose -> terminate + case STATE_FINAL: + return; + case STATE_STARTED: + break; + case STATE_TERMINATED: + if (final) { + g.clear(); + terminated_.wait(); + { + osl::MutexGuard g2(mutex_); + tp = threadPool_; + threadPool_ = 0; + assert(!(reader_.is() && isThread(reader_.get()))); + std::swap(reader_, r); + assert(!(writer_.is() && isThread(writer_.get()))); + std::swap(writer_, w); + state_ = STATE_FINAL; + } + assert(!(r.is() && w.is())); + if (r.is()) { + r->join(); + } else if (w.is()) { + w->join(); + } + if (tp != 0) { + uno_threadpool_destroy(tp); + } + } + return; + } + tp = threadPool_; + assert(!(final && isThread(reader_.get()))); + if (!isThread(reader_.get())) { + std::swap(reader_, r); + } + w = writer_; + joinW = !isThread(writer_.get()); + assert(!final || joinW); + if (joinW) { + writer_.clear(); + } + ls.swap(listeners_); + state_ = final ? STATE_FINAL : STATE_TERMINATED; + } + try { + connection_->close(); + } catch (const css::io::IOException & e) { + SAL_INFO("binaryurp", "caught IO exception '" << e.Message << '\''); + } + assert(w.is()); + w->stop(); + if (r.is()) { + r->join(); + } + if (joinW) { + w->join(); + } + assert(tp != 0); + uno_threadpool_dispose(tp); + Stubs s; + { + osl::MutexGuard g(mutex_); + s.swap(stubs_); + } + for (Stubs::iterator i(s.begin()); i != s.end(); ++i) { + for (Stub::iterator j(i->second.begin()); j != i->second.end(); ++j) + { + SAL_INFO( + "binaryurp", + "stub '" << i->first << "', '" << toString(j->first) + << "' still mapped at Bridge::terminate"); + binaryUno_.get()->pExtEnv->revokeInterface( + binaryUno_.get()->pExtEnv, j->second.object.get()); + } + } + factory_->removeBridge(this); + for (Listeners::iterator i(ls.begin()); i != ls.end(); ++i) { + try { + (*i)->disposing( + css::lang::EventObject( + static_cast< cppu::OWeakObject * >(this))); + } catch (const css::uno::RuntimeException & e) { + SAL_WARN( + "binaryurp", + "caught runtime exception '" << e.Message << '\''); + } } - tp = threadPool_; - std::swap(reader_, r); - std::swap(writer_, w); - ls.swap(listeners_); - terminated_ = true; } - try { - connection_->close(); - } catch (const css::io::IOException & e) { - SAL_INFO("binaryurp", "caught IO exception '" << e.Message << '\''); - } - assert(w.is()); - w->stop(); - joinThread(r.get()); - joinThread(w.get()); - assert(tp != 0); - uno_threadpool_dispose(tp); - Stubs s; + if (final) { + uno_threadpool_destroy(tp); + } { osl::MutexGuard g(mutex_); - s.swap(stubs_); - } - for (Stubs::iterator i(s.begin()); i != s.end(); ++i) { - for (Stub::iterator j(i->second.begin()); j != i->second.end(); ++j) { - SAL_INFO( - "binaryurp", - "stub '" << i->first << "', '" << toString(j->first) - << "' still mapped at Bridge::terminate"); - binaryUno_.get()->pExtEnv->revokeInterface( - binaryUno_.get()->pExtEnv, j->second.object.get()); + if (final) { + threadPool_ = 0; } } - factory_->removeBridge(this); - for (Listeners::iterator i(ls.begin()); i != ls.end(); ++i) { - try { - (*i)->disposing( - css::lang::EventObject( - static_cast< cppu::OWeakObject * >(this))); - } catch (const css::uno::RuntimeException & e) { - SAL_WARN( - "binaryurp", "caught runtime exception '" << e.Message << '\''); - } - } - uno_threadpool_destroy(tp); + terminated_.set(); } css::uno::Reference< css::connection::XConnection > Bridge::getConnection() @@ -340,19 +398,14 @@ BinaryAny Bridge::mapCppToBinaryAny(css::uno::Any const & cppAny) { uno_ThreadPool Bridge::getThreadPool() { osl::MutexGuard g(mutex_); + checkDisposed(); assert(threadPool_ != 0); return threadPool_; } rtl::Reference< Writer > Bridge::getWriter() { osl::MutexGuard g(mutex_); - if (terminated_) { - throw css::lang::DisposedException( - rtl::OUString( - RTL_CONSTASCII_USTRINGPARAM( - "Binary URP bridge already disposed")), - static_cast< cppu::OWeakObject * >(this)); - } + checkDisposed(); assert(writer_.is()); return writer_; } @@ -822,9 +875,15 @@ bool Bridge::isCurrentContextMode() { } Bridge::~Bridge() { - if (getThreadPool() != 0) { - terminate(); +#if OSL_DEBUG_LEVEL > 0 + { + osl::MutexGuard g(mutex_); + SAL_WARN_IF( + state_ == STATE_STARTED || state_ == STATE_TERMINATED, "binaryurp", + "undisposed bridge, potential deadlock ahead"); } +#endif + dispose(); } css::uno::Reference< css::uno::XInterface > Bridge::getInstance( @@ -885,7 +944,11 @@ rtl::OUString Bridge::getDescription() throw (css::uno::RuntimeException) { } void Bridge::dispose() throw (css::uno::RuntimeException) { - terminate(); + // For terminate(true) not to deadlock, an external protocol must ensure + // that dispose is not called from a thread pool worker thread (that dispose + // is never called from the reader or writer thread is already ensured + // internally): + terminate(true); // OOo expects dispose to not return while there are still remote calls in // progress; an external protocol must ensure that dispose is not called // from within an incoming or outgoing remote call, as passive_.wait() would @@ -900,7 +963,8 @@ void Bridge::addEventListener( assert(xListener.is()); { osl::MutexGuard g(mutex_); - if (!terminated_) { + assert(state_ != STATE_INITIAL); + if (state_ == STATE_STARTED) { listeners_.push_back(xListener); return; } @@ -995,7 +1059,18 @@ void Bridge::terminateWhenUnused(bool unused) { // That the current thread considers the bridge unused implies that it // is not within an incoming or outgoing remote call (so calling // terminate cannot lead to deadlock): - terminate(); + terminate(false); + } +} + +void Bridge::checkDisposed() { + assert(state_ != STATE_INITIAL); + if (state_ != STATE_STARTED) { + throw css::lang::DisposedException( + rtl::OUString( + RTL_CONSTASCII_USTRINGPARAM( + "Binary URP bridge already disposed")), + static_cast< cppu::OWeakObject * >(this)); } } diff --git a/binaryurp/source/bridge.hxx b/binaryurp/source/bridge.hxx index 8d667897d253..3ffbfbaeb43b 100644 --- a/binaryurp/source/bridge.hxx +++ b/binaryurp/source/bridge.hxx @@ -93,8 +93,11 @@ public: void start(); // Internally waits for all incoming and outgoing remote calls to terminate, - // so must not be called from within such a call: - void terminate(); + // so must not be called from within such a call; when final is true, also + // joins all remaining threads (reader, writer, and worker threads from the + // thread pool), so must not be called with final set to true from such a + // thread: + void terminate(bool final); com::sun::star::uno::Reference< com::sun::star::connection::XConnection > getConnection() const; @@ -228,6 +231,9 @@ private: void terminateWhenUnused(bool unused); + // Must only be called with mutex_ locked: + void checkDisposed(); + typedef std::list< com::sun::star::uno::Reference< @@ -240,6 +246,8 @@ private: typedef std::map< rtl::OUString, Stub > Stubs; + enum State { STATE_INITIAL, STATE_STARTED, STATE_TERMINATED, STATE_FINAL }; + enum Mode { MODE_REQUESTED, MODE_REPLY_MINUS1, MODE_REPLY_0, MODE_REPLY_1, MODE_WAIT, MODE_NORMAL, MODE_NORMAL_WAIT }; @@ -259,8 +267,15 @@ private: com::sun::star::uno::TypeDescription protPropRequest_; com::sun::star::uno::TypeDescription protPropCommit_; OutgoingRequests outgoingRequests_; + osl::Condition passive_; + // to guarantee that passive_ is eventually set (to avoid deadlock, see + // dispose), activeCalls_ only counts those calls for which it can be + // guaranteed that incrementActiveCalls is indeed followed by + // decrementActiveCalls, without an intervening exception + osl::Condition terminated_; osl::Mutex mutex_; + State state_; Listeners listeners_; uno_ThreadPool threadPool_; rtl::Reference< Writer > writer_; @@ -271,12 +286,6 @@ private: std::size_t calls_; bool normalCall_; std::size_t activeCalls_; - osl::Condition passive_; - // to guarantee that passive_ is eventually set (to avoid deadlock, see - // dispose), activeCalls_ only counts those calls for which it can be - // guaranteed that incrementActiveCalls is indeed followed by - // decrementActiveCalls, without an intervening exception - bool terminated_; // Only accessed from reader_ thread: Mode mode_; diff --git a/binaryurp/source/bridgefactory.cxx b/binaryurp/source/bridgefactory.cxx index de7762f3d6ed..55a9a78ace25 100644 --- a/binaryurp/source/bridgefactory.cxx +++ b/binaryurp/source/bridgefactory.cxx @@ -210,7 +210,7 @@ static cppu::ImplementationEntry const services[] = { { &binaryurp::BridgeFactory::static_create, &binaryurp::BridgeFactory::static_getImplementationName, &binaryurp::BridgeFactory::static_getSupportedServiceNames, - &cppu::createSingleComponentFactory, 0, 0 }, + &cppu::createOneInstanceComponentFactory, 0, 0 }, { 0, 0, 0, 0, 0, 0 } }; diff --git a/binaryurp/source/incomingrequest.cxx b/binaryurp/source/incomingrequest.cxx index 431c88505ad1..83b0030623e7 100644 --- a/binaryurp/source/incomingrequest.cxx +++ b/binaryurp/source/incomingrequest.cxx @@ -123,7 +123,7 @@ void IncomingRequest::execute() const { } catch (const std::exception & e) { OSL_TRACE(OSL_LOG_PREFIX "caught C++ exception '%s'", e.what()); } - bridge_->terminate(); + bridge_->terminate(false); } else { if (isExc) { OSL_TRACE(OSL_LOG_PREFIX "oneway method raised exception"); diff --git a/binaryurp/source/reader.cxx b/binaryurp/source/reader.cxx index 5a4491efb61b..c59d92ae3dd9 100644 --- a/binaryurp/source/reader.cxx +++ b/binaryurp/source/reader.cxx @@ -149,7 +149,7 @@ void Reader::execute() { } catch (const std::exception & e) { SAL_WARN("binaryurp", "caught C++ exception '" << e.what() << '\''); } - bridge_->terminate(); + bridge_->terminate(false); } void Reader::readMessage(Unmarshal & unmarshal) { diff --git a/binaryurp/source/writer.cxx b/binaryurp/source/writer.cxx index e1b2291ff28a..0273fa65f4a3 100644 --- a/binaryurp/source/writer.cxx +++ b/binaryurp/source/writer.cxx @@ -194,7 +194,7 @@ void Writer::execute() { } catch (const std::exception & e) { OSL_TRACE(OSL_LOG_PREFIX "caught C++ exception '%s'", e.what()); } - bridge_->terminate(); + bridge_->terminate(false); } void Writer::sendRequest( diff --git a/cppu/Library_cppu.mk b/cppu/Library_cppu.mk index 78eae8f817ef..cc96d046fbc4 100644 --- a/cppu/Library_cppu.mk +++ b/cppu/Library_cppu.mk @@ -42,6 +42,7 @@ $(eval $(call gb_Library_add_defs,cppu,\ $(eval $(call gb_Library_use_libraries,cppu,\ sal \ + salhelper \ $(gb_STDLIBS) \ )) diff --git a/cppu/inc/uno/threadpool.h b/cppu/inc/uno/threadpool.h index 08c60cc3e44a..d12969d8c8fc 100644 --- a/cppu/inc/uno/threadpool.h +++ b/cppu/inc/uno/threadpool.h @@ -166,10 +166,6 @@ uno_threadpool_putJob( return immeadiatly with *ppJob == 0. @param hPool The handle to be disposed. - In case, hPool is 0, this function joins on all threads created - by the threadpool administration. This may e.g. used to ensure, that - no threads are inside the cppu library anymore, in case it needs to get - unloaded. This function is called i.e. by a bridge, that is forced to dispose itself. */ @@ -180,6 +176,10 @@ uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C(); /** Releases the previously with uno_threadpool_create() created handle. The handle thus becomes invalid. It is an error to use the handle after uno_threadpool_destroy(). + + A call to uno_threadpool_destroy can synchronously join on spawned worker + threads, so this function must never be called from such a worker thread. + @see uno_threadpool_create() */ CPPU_DLLPUBLIC void SAL_CALL diff --git a/cppu/source/threadpool/thread.cxx b/cppu/source/threadpool/thread.cxx index cc22a453c79d..12ea09a131ee 100644 --- a/cppu/source/threadpool/thread.cxx +++ b/cppu/source/threadpool/thread.cxx @@ -33,7 +33,6 @@ #include <com/sun/star/lang/DisposedException.hpp> #include <com/sun/star/uno/Reference.hxx> #include <com/sun/star/uno/XInterface.hpp> -#include <rtl/instance.hxx> #include <rtl/ustring.h> #include <rtl/ustring.hxx> @@ -48,17 +47,6 @@ namespace css = com::sun::star; } using namespace osl; -extern "C" { - -void SAL_CALL cppu_requestThreadWorker( void *pVoid ) -{ - ::cppu_threadpool::ORequestThread *pThread = ( ::cppu_threadpool::ORequestThread * ) pVoid; - - pThread->run(); - pThread->onTerminated(); -} - -} namespace cppu_threadpool { @@ -75,7 +63,7 @@ namespace cppu_threadpool { #endif } - void ThreadAdmin::add( ORequestThread *p ) + void ThreadAdmin::add( rtl::Reference< ORequestThread > const & p ) { MutexGuard aGuard( m_mutex ); if( m_disposed ) @@ -90,12 +78,19 @@ namespace cppu_threadpool { m_lst.push_back( p ); } - void ThreadAdmin::remove( ORequestThread * p ) + void ThreadAdmin::remove_locked( rtl::Reference< ORequestThread > const & p ) + { + ::std::list< rtl::Reference< ORequestThread > >::iterator ii = ::std::find( m_lst.begin(), m_lst.end(), p ); + if( ii != m_lst.end() ) + { + m_lst.erase( ii ); + } + } + + void ThreadAdmin::remove( rtl::Reference< ORequestThread > const & p ) { MutexGuard aGuard( m_mutex ); - ::std::list< ORequestThread * >::iterator ii = ::std::find( m_lst.begin(), m_lst.end(), p ); - OSL_ASSERT( ii != m_lst.end() ); - m_lst.erase( ii ); + remove_locked( p ); } void ThreadAdmin::join() @@ -104,62 +99,34 @@ namespace cppu_threadpool { MutexGuard aGuard( m_mutex ); m_disposed = true; } - ORequestThread *pCurrent; - do + for (;;) { - pCurrent = 0; + rtl::Reference< ORequestThread > pCurrent; { MutexGuard aGuard( m_mutex ); - if( ! m_lst.empty() ) + if( m_lst.empty() ) { - pCurrent = m_lst.front(); - pCurrent->setDeleteSelf( sal_False ); + break; } + pCurrent = m_lst.front(); + m_lst.pop_front(); } - if ( pCurrent ) - { - pCurrent->join(); - delete pCurrent; - } - } while( pCurrent ); - } - - struct theThreadAdmin : public rtl::StaticWithInit< ThreadAdminHolder, theThreadAdmin > - { - ThreadAdminHolder operator () () { - ThreadAdminHolder aRet(new ThreadAdmin()); - return aRet; + pCurrent->join(); } - }; - - ThreadAdminHolder& ThreadAdmin::getInstance() - { - return theThreadAdmin::get(); } // ---------------------------------------------------------------------------------- - ORequestThread::ORequestThread( JobQueue *pQueue, + ORequestThread::ORequestThread( ThreadPoolHolder const &aThreadPool, + JobQueue *pQueue, const ByteSequence &aThreadId, sal_Bool bAsynchron ) - : m_thread( 0 ) - , m_aThreadAdmin( ThreadAdmin::getInstance() ) + : m_aThreadPool( aThreadPool ) , m_pQueue( pQueue ) , m_aThreadId( aThreadId ) , m_bAsynchron( bAsynchron ) - , m_bDeleteSelf( sal_True ) - { - m_aThreadAdmin->add( this ); - } - - - ORequestThread::~ORequestThread() - { - if (m_thread != 0) - { - osl_destroyThread(m_thread); - } - } + {} + ORequestThread::~ORequestThread() {} void ORequestThread::setTask( JobQueue *pQueue, const ByteSequence &aThreadId, @@ -170,74 +137,81 @@ namespace cppu_threadpool { m_bAsynchron = bAsynchron; } - sal_Bool ORequestThread::create() + void ORequestThread::launch() { - OSL_ASSERT(m_thread == 0); // only one running thread per instance - - m_thread = osl_createSuspendedThread( cppu_requestThreadWorker, (void*)this); - if ( m_thread ) - { - osl_resumeThread( m_thread ); + // Assumption is that osl::Thread::create returns normally with a true + // return value iff it causes osl::Thread::run to start executing: + acquire(); + ThreadAdmin & rThreadAdmin = m_aThreadPool->getThreadAdmin(); + osl::ClearableMutexGuard g(rThreadAdmin.m_mutex); + rThreadAdmin.add( this ); + try { + if (!create()) { + throw std::runtime_error("osl::Thread::create failed"); + } + } catch (...) { + rThreadAdmin.remove_locked( this ); + g.clear(); + release(); + throw; } - - return m_thread != 0; - } - - void ORequestThread::join() - { - osl_joinWithThread( m_thread ); } void ORequestThread::onTerminated() { - m_aThreadAdmin->remove( this ); - if( m_bDeleteSelf ) - { - delete this; - } + m_aThreadPool->getThreadAdmin().remove( this ); + release(); } void ORequestThread::run() { - ThreadPoolHolder theThreadPool = cppu_threadpool::ThreadPool::getInstance(); - - while ( m_pQueue ) + try { - if( ! m_bAsynchron ) + while ( m_pQueue ) { - if ( !uno_bindIdToCurrentThread( m_aThreadId.getHandle() ) ) + if( ! m_bAsynchron ) { - OSL_ASSERT( false ); + if ( !uno_bindIdToCurrentThread( m_aThreadId.getHandle() ) ) + { + OSL_ASSERT( false ); + } } - } - while( ! m_pQueue->isEmpty() ) - { - // Note : Oneways should not get a disposable disposeid, - // It does not make sense to dispose a call in this state. - // That's way we put it an disposeid, that can't be used otherwise. - m_pQueue->enter( - sal::static_int_cast< sal_Int64 >( - reinterpret_cast< sal_IntPtr >(this)), - sal_True ); - - if( m_pQueue->isEmpty() ) + while( ! m_pQueue->isEmpty() ) { - theThreadPool->revokeQueue( m_aThreadId , m_bAsynchron ); - // Note : revokeQueue might have failed because m_pQueue.isEmpty() - // may be false (race). + // Note : Oneways should not get a disposable disposeid, + // It does not make sense to dispose a call in this state. + // That's way we put it an disposeid, that can't be used otherwise. + m_pQueue->enter( + sal::static_int_cast< sal_Int64 >( + reinterpret_cast< sal_IntPtr >(this)), + sal_True ); + + if( m_pQueue->isEmpty() ) + { + m_aThreadPool->revokeQueue( m_aThreadId , m_bAsynchron ); + // Note : revokeQueue might have failed because m_pQueue.isEmpty() + // may be false (race). + } } - } - delete m_pQueue; - m_pQueue = 0; + delete m_pQueue; + m_pQueue = 0; - if( ! m_bAsynchron ) - { - uno_releaseIdFromCurrentThread(); - } + if( ! m_bAsynchron ) + { + uno_releaseIdFromCurrentThread(); + } - theThreadPool->waitInPool( this ); + m_aThreadPool->waitInPool( this ); + } + } + catch (...) + { + // Work around the problem that onTerminated is not called if run + // throws an exception: + onTerminated(); + throw; } } } diff --git a/cppu/source/threadpool/thread.hxx b/cppu/source/threadpool/thread.hxx index a3ea45aadaed..88f3d91f8722 100644 --- a/cppu/source/threadpool/thread.hxx +++ b/cppu/source/threadpool/thread.hxx @@ -28,63 +28,49 @@ #ifndef _CPPU_THREADPOOL_THREAD_HXX #define _CPPU_THREADPOOL_THREAD_HXX -#include <list> +#include <osl/thread.hxx> #include <sal/types.h> - -#include <osl/thread.h> +#include <salhelper/simplereferenceobject.hxx> #include "jobqueue.hxx" +#include "threadpool.hxx" namespace cppu_threadpool { class JobQueue; - class ThreadAdmin; - typedef boost::shared_ptr<ThreadAdmin> ThreadAdminHolder; //----------------------------------------- // private thread class for the threadpool // independent from vos //----------------------------------------- - class ORequestThread + class ORequestThread: + public salhelper::SimpleReferenceObject, public osl::Thread { public: - ORequestThread( JobQueue * , + ORequestThread( ThreadPoolHolder const &aThreadPool, + JobQueue * , const ::rtl::ByteSequence &aThreadId, sal_Bool bAsynchron ); - ~ORequestThread(); + virtual ~ORequestThread(); void setTask( JobQueue * , const ::rtl::ByteSequence & aThreadId , sal_Bool bAsynchron ); - sal_Bool create(); - void join(); - void onTerminated(); - void run(); - inline void setDeleteSelf( sal_Bool b ) - { m_bDeleteSelf = b; } + void launch(); + + static inline void * operator new(std::size_t size) + { return SimpleReferenceObject::operator new(size); } + + static inline void operator delete(void * pointer) + { SimpleReferenceObject::operator delete(pointer); } private: - oslThread m_thread; - ThreadAdminHolder m_aThreadAdmin; + virtual void SAL_CALL run(); + virtual void SAL_CALL onTerminated(); + + ThreadPoolHolder m_aThreadPool; JobQueue *m_pQueue; ::rtl::ByteSequence m_aThreadId; sal_Bool m_bAsynchron; - sal_Bool m_bDeleteSelf; - }; - - class ThreadAdmin - { - public: - ThreadAdmin(); - ~ThreadAdmin (); - static ThreadAdminHolder &getInstance(); - void add( ORequestThread * ); - void remove( ORequestThread * ); - void join(); - - private: - ::osl::Mutex m_mutex; - ::std::list< ORequestThread * > m_lst; - bool m_disposed; }; } // end cppu_threadpool diff --git a/cppu/source/threadpool/threadpool.cxx b/cppu/source/threadpool/threadpool.cxx index d14e26006e04..e4351cb9cc4a 100644 --- a/cppu/source/threadpool/threadpool.cxx +++ b/cppu/source/threadpool/threadpool.cxx @@ -109,15 +109,6 @@ namespace cppu_threadpool //------------------------------------------------------------------------------- - struct theThreadPool : - public rtl::StaticWithInit< ThreadPoolHolder, theThreadPool > - { - ThreadPoolHolder operator () () { - ThreadPoolHolder aRet(new ThreadPool()); - return aRet; - } - }; - ThreadPool::ThreadPool() { m_DisposedCallerAdmin = DisposedCallerAdmin::getInstance(); @@ -132,46 +123,24 @@ namespace cppu_threadpool } #endif } - ThreadPoolHolder ThreadPool::getInstance() - { - return theThreadPool::get(); - } - void ThreadPool::dispose( sal_Int64 nDisposeId ) { - if( nDisposeId ) - { - m_DisposedCallerAdmin->dispose( nDisposeId ); + m_DisposedCallerAdmin->dispose( nDisposeId ); - MutexGuard guard( m_mutex ); - for( ThreadIdHashMap::iterator ii = m_mapQueue.begin() ; - ii != m_mapQueue.end(); - ++ii) + MutexGuard guard( m_mutex ); + for( ThreadIdHashMap::iterator ii = m_mapQueue.begin() ; + ii != m_mapQueue.end(); + ++ii) + { + if( (*ii).second.first ) { - if( (*ii).second.first ) - { - (*ii).second.first->dispose( nDisposeId ); - } - if( (*ii).second.second ) - { - (*ii).second.second->dispose( nDisposeId ); - } + (*ii).second.first->dispose( nDisposeId ); } - } - else - { + if( (*ii).second.second ) { - MutexGuard guard( m_mutexWaitingThreadList ); - for( WaitingThreadList::iterator ii = m_lstThreads.begin() ; - ii != m_lstThreads.end() ; - ++ ii ) - { - // wake the threads up - osl_setCondition( (*ii)->condition ); - } + (*ii).second.second->dispose( nDisposeId ); } - ThreadAdmin::getInstance()->join(); } } @@ -185,7 +154,7 @@ namespace cppu_threadpool * a new request comes in, this thread is reused. This is done only to improve performance, * it is not required for threadpool functionality. ******************/ - void ThreadPool::waitInPool( ORequestThread * pThread ) + void ThreadPool::waitInPool( rtl::Reference< ORequestThread > const & pThread ) { struct WaitingThread waitingThread; waitingThread.condition = osl_createCondition(); @@ -201,7 +170,7 @@ namespace cppu_threadpool { MutexGuard guard ( m_mutexWaitingThreadList ); - if( waitingThread.thread ) + if( waitingThread.thread.is() ) { // thread wasn't reused, remove it from the list WaitingThreadList::iterator ii = find( @@ -214,6 +183,21 @@ namespace cppu_threadpool osl_destroyCondition( waitingThread.condition ); } + void ThreadPool::joinWorkers() + { + { + MutexGuard guard( m_mutexWaitingThreadList ); + for( WaitingThreadList::iterator ii = m_lstThreads.begin() ; + ii != m_lstThreads.end() ; + ++ ii ) + { + // wake the threads up + osl_setCondition( (*ii)->condition ); + } + } + m_aThreadAdmin.join(); + } + void ThreadPool::createThread( JobQueue *pQueue , const ByteSequence &aThreadId, sal_Bool bAsynchron ) @@ -240,10 +224,9 @@ namespace cppu_threadpool if( bCreate ) { - ORequestThread *pThread = - new ORequestThread( pQueue , aThreadId, bAsynchron); - // deletes itself ! - pThread->create(); + rtl::Reference< ORequestThread > pThread( + new ORequestThread( this, pQueue , aThreadId, bAsynchron) ); + pThread->launch(); } } @@ -385,6 +368,12 @@ namespace cppu_threadpool } } +// All uno_ThreadPool handles in g_pThreadpoolHashSet with overlapping life +// spans share one ThreadPool instance. When g_pThreadpoolHashSet becomes empty +// (within the last uno_threadpool_destroy) all worker threads spawned by that +// ThreadPool instance are joined (which implies that uno_threadpool_destroy +// must never be called from a worker thread); afterwards, the next call to +// uno_threadpool_create (if any) will lead to a new ThreadPool instance. using namespace cppu_threadpool; @@ -415,27 +404,47 @@ struct _uno_ThreadPool sal_Int32 dummy; }; +namespace { + +ThreadPoolHolder getThreadPool( uno_ThreadPool hPool ) +{ + MutexGuard guard( Mutex::getGlobalMutex() ); + assert( g_pThreadpoolHashSet != 0 ); + ThreadpoolHashSet::iterator i( g_pThreadpoolHashSet->find(hPool) ); + assert( i != g_pThreadpoolHashSet->end() ); + return i->second; +} + +} + extern "C" uno_ThreadPool SAL_CALL uno_threadpool_create() SAL_THROW_EXTERN_C() { MutexGuard guard( Mutex::getGlobalMutex() ); + ThreadPoolHolder p; if( ! g_pThreadpoolHashSet ) { g_pThreadpoolHashSet = new ThreadpoolHashSet(); + p = new ThreadPool; + } + else + { + assert( !g_pThreadpoolHashSet->empty() ); + p = g_pThreadpoolHashSet->begin()->second; } // Just ensure that the handle is unique in the process (via heap) uno_ThreadPool h = new struct _uno_ThreadPool; - g_pThreadpoolHashSet->insert( ThreadpoolHashSet::value_type(h, ThreadPool::getInstance()) ); + g_pThreadpoolHashSet->insert( ThreadpoolHashSet::value_type(h, p) ); return h; } extern "C" void SAL_CALL -uno_threadpool_attach(SAL_UNUSED_PARAMETER uno_ThreadPool) SAL_THROW_EXTERN_C() +uno_threadpool_attach( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() { sal_Sequence *pThreadId = 0; uno_getIdOfCurrentThread( &pThreadId ); - ThreadPool::getInstance()->prepare( pThreadId ); + getThreadPool( hPool )->prepare( pThreadId ); rtl_byte_sequence_release( pThreadId ); uno_releaseIdFromCurrentThread(); } @@ -447,7 +456,7 @@ uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob ) sal_Sequence *pThreadId = 0; uno_getIdOfCurrentThread( &pThreadId ); *ppJob = - ThreadPool::getInstance()->enter( + getThreadPool( hPool )->enter( pThreadId, sal::static_int_cast< sal_Int64 >( reinterpret_cast< sal_IntPtr >(hPool)) ); @@ -463,19 +472,19 @@ uno_threadpool_detach(SAL_UNUSED_PARAMETER uno_ThreadPool) SAL_THROW_EXTERN_C() extern "C" void SAL_CALL uno_threadpool_putJob( - SAL_UNUSED_PARAMETER uno_ThreadPool, + uno_ThreadPool hPool, sal_Sequence *pThreadId, void *pJob, void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ), sal_Bool bIsOneway ) SAL_THROW_EXTERN_C() { - ThreadPool::getInstance()->addJob( pThreadId, bIsOneway, pJob ,doRequest ); + getThreadPool(hPool)->addJob( pThreadId, bIsOneway, pJob ,doRequest ); } extern "C" void SAL_CALL uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() { - ThreadPool::getInstance()->dispose( + getThreadPool(hPool)->dispose( sal::static_int_cast< sal_Int64 >( reinterpret_cast< sal_IntPtr >(hPool)) ); } @@ -483,9 +492,8 @@ uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() extern "C" void SAL_CALL uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() { - assert(hPool != 0); - - ThreadPool::getInstance()->destroy( + ThreadPoolHolder p( getThreadPool(hPool) ); + p->destroy( sal::static_int_cast< sal_Int64 >( reinterpret_cast< sal_IntPtr >(hPool)) ); @@ -510,7 +518,7 @@ uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() if( empty ) { - uno_threadpool_dispose( 0 ); + p->joinWorkers(); } } diff --git a/cppu/source/threadpool/threadpool.hxx b/cppu/source/threadpool/threadpool.hxx index 8b64ed18d682..6c186271b5fc 100644 --- a/cppu/source/threadpool/threadpool.hxx +++ b/cppu/source/threadpool/threadpool.hxx @@ -25,11 +25,19 @@ * for a copy of the LGPLv3 License. * ************************************************************************/ + +#ifndef INCLUDED_CPPU_SOURCE_THREADPOOL_THREADPOOL_HXX +#define INCLUDED_CPPU_SOURCE_THREADPOOL_THREADPOOL_HXX + +#include <list> + #include <boost/unordered_map.hpp> #include <osl/conditn.h> #include <rtl/byteseq.hxx> +#include <rtl/ref.hxx> +#include <salhelper/simplereferenceobject.hxx> #include <boost/shared_ptr.hpp> @@ -74,7 +82,7 @@ namespace cppu_threadpool { struct WaitingThread { oslCondition condition; - ORequestThread *thread; + rtl::Reference< ORequestThread > thread; }; typedef ::std::list < struct ::cppu_threadpool::WaitingThread * > WaitingThreadList; @@ -98,15 +106,32 @@ namespace cppu_threadpool { DisposedCallerList m_lst; }; + class ThreadAdmin + { + public: + ThreadAdmin(); + ~ThreadAdmin (); + + void add( rtl::Reference< ORequestThread > const & ); + void remove( rtl::Reference< ORequestThread > const & ); + void join(); + + void remove_locked( rtl::Reference< ORequestThread > const & ); + ::osl::Mutex m_mutex; + + private: + ::std::list< rtl::Reference< ORequestThread > > m_lst; + bool m_disposed; + }; + class ThreadPool; - typedef boost::shared_ptr<ThreadPool> ThreadPoolHolder; + typedef rtl::Reference<ThreadPool> ThreadPoolHolder; - class ThreadPool + class ThreadPool: public salhelper::SimpleReferenceObject { public: ThreadPool(); ~ThreadPool(); - static ThreadPoolHolder getInstance(); void dispose( sal_Int64 nDisposeId ); void destroy( sal_Int64 nDisposeId ); @@ -124,7 +149,12 @@ namespace cppu_threadpool { ********/ sal_Bool revokeQueue( const ByteSequence & aThreadId , sal_Bool bAsynchron ); - void waitInPool( ORequestThread *pThread ); + void waitInPool( rtl::Reference< ORequestThread > const & pThread ); + + void joinWorkers(); + + ThreadAdmin & getThreadAdmin() { return m_aThreadAdmin; } + private: void createThread( JobQueue *pQueue, const ByteSequence &aThreadId, sal_Bool bAsynchron); @@ -136,8 +166,11 @@ namespace cppu_threadpool { WaitingThreadList m_lstThreads; DisposedCallerAdminHolder m_DisposedCallerAdmin; + ThreadAdmin m_aThreadAdmin; }; } // end namespace cppu_threadpool +#endif + /* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/cpputools/source/unoexe/unoexe.cxx b/cpputools/source/unoexe/unoexe.cxx index a522f2de53b7..0c218d1cc207 100644 --- a/cpputools/source/unoexe/unoexe.cxx +++ b/cpputools/source/unoexe/unoexe.cxx @@ -845,6 +845,10 @@ SAL_IMPLEMENT_MAIN() if (! xComp.is()) throw RuntimeException( OUString( RTL_CONSTASCII_USTRINGPARAM("bridge factory does not export interface \"com.sun.star.lang.XComponent\"!" ) ), Reference< XInterface >() ); ODisposingListener::waitFor( xComp ); + xComp->dispose(); + // explicitly dispose the remote bridge so that it joins + // on all spawned threads before process exit (see + // binaryurp/source/bridge.cxx for details) break; } } |