/************************************************************************* * * $RCSfile: opump.cxx,v $ * * $Revision: 1.9 $ * * last change: $Author: jbu $ $Date: 2002-07-09 15:11:53 $ * * The Contents of this file are made available subject to the terms of * either of the following licenses * * - GNU Lesser General Public License Version 2.1 * - Sun Industry Standards Source License Version 1.1 * * Sun Microsystems Inc., October, 2000 * * GNU Lesser General Public License Version 2.1 * ============================================= * Copyright 2000 by Sun Microsystems, Inc. * 901 San Antonio Road, Palo Alto, CA 94303, USA * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software Foundation. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, * MA 02111-1307 USA * * * Sun Industry Standards Source License Version 1.1 * ================================================= * The contents of this file are subject to the Sun Industry Standards * Source License Version 1.1 (the "License"); You may not use this file * except in compliance with the License. You may obtain a copy of the * License at http://www.openoffice.org/license.html. * * Software provided under this License is provided on an "AS IS" basis, * WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, * WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS, * MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING. * See the License for the specific provisions governing your rights and * obligations concerning the Software. * * The Initial Developer of the Original Code is: Sun Microsystems, Inc. * * Copyright: 2000 by Sun Microsystems, Inc. * * All Rights Reserved. * * Contributor(s): _______________________________________ * * ************************************************************************/ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace osl; using namespace std; using namespace rtl; using namespace cppu; using namespace com::sun::star::uno; using namespace com::sun::star::lang; using namespace com::sun::star::registry; using namespace com::sun::star::io; #include "factreg.hxx" namespace io_stm { class Pump : public WeakImplHelper5< XActiveDataSource, XActiveDataSink, XActiveDataControl, XConnectable, XServiceInfo > { Mutex m_aMutex; oslThread m_aThread; Reference< XConnectable > m_xPred; Reference< XConnectable > m_xSucc; Reference< XInputStream > m_xInput; Reference< XOutputStream > m_xOutput; OInterfaceContainerHelper m_cnt; sal_Bool m_closeFired; void run(); static void static_run( void* pObject ); void close(); void fireClose(); void fireStarted(); void fireTerminated(); void fireError( const Any &a ); public: Pump(); virtual ~Pump(); // XActiveDataSource virtual void SAL_CALL setOutputStream( const Reference< ::com::sun::star::io::XOutputStream >& xOutput ) throw(); virtual Reference< ::com::sun::star::io::XOutputStream > SAL_CALL getOutputStream() throw(); // XActiveDataSink virtual void SAL_CALL setInputStream( const Reference< ::com::sun::star::io::XInputStream >& xStream ) throw(); virtual Reference< ::com::sun::star::io::XInputStream > SAL_CALL getInputStream() throw(); // XActiveDataControl virtual void SAL_CALL addListener( const Reference< ::com::sun::star::io::XStreamListener >& xListener ) throw(); virtual void SAL_CALL removeListener( const Reference< ::com::sun::star::io::XStreamListener >& xListener ) throw(); virtual void SAL_CALL start() throw( RuntimeException ); virtual void SAL_CALL terminate() throw(); // XConnectable virtual void SAL_CALL setPredecessor( const Reference< ::com::sun::star::io::XConnectable >& xPred ) throw(); virtual Reference< ::com::sun::star::io::XConnectable > SAL_CALL getPredecessor() throw(); virtual void SAL_CALL setSuccessor( const Reference< ::com::sun::star::io::XConnectable >& xSucc ) throw(); virtual Reference< ::com::sun::star::io::XConnectable > SAL_CALL getSuccessor() throw(); public: // XServiceInfo virtual OUString SAL_CALL getImplementationName() throw( ); virtual Sequence< OUString > SAL_CALL getSupportedServiceNames(void) throw( ); virtual sal_Bool SAL_CALL supportsService(const OUString& ServiceName) throw( ); }; Pump::Pump() : m_aThread( 0 ), m_cnt( m_aMutex ), m_closeFired( sal_False ) { g_moduleCount.modCnt.acquire( &g_moduleCount.modCnt ); } Pump::~Pump() { // exit gracefully if( m_aThread ) { osl_joinWithThread( m_aThread ); osl_destroyThread( m_aThread ); } g_moduleCount.modCnt.release( &g_moduleCount.modCnt ); } void Pump::fireError( const Any & exception ) { OInterfaceIteratorHelper iter( m_cnt ); while( iter.hasMoreElements() ) { try { static_cast< XStreamListener * > ( iter.next() )->error( exception ); } catch ( RuntimeException &e ) { OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US ); OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() ); } } } void Pump::fireClose() { sal_Bool bFire = sal_False; { MutexGuard guard( m_aMutex ); if( ! m_closeFired ) { m_closeFired = sal_True; bFire = sal_True; } } if( bFire ) { OInterfaceIteratorHelper iter( m_cnt ); while( iter.hasMoreElements() ) { try { static_cast< XStreamListener * > ( iter.next() )->closed( ); } catch ( RuntimeException &e ) { OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US ); OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() ); } } } } void Pump::fireStarted() { OInterfaceIteratorHelper iter( m_cnt ); while( iter.hasMoreElements() ) { try { static_cast< XStreamListener * > ( iter.next() )->started( ); } catch ( RuntimeException &e ) { OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US ); OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() ); } } } void Pump::fireTerminated() { OInterfaceIteratorHelper iter( m_cnt ); while( iter.hasMoreElements() ) { try { static_cast< XStreamListener * > ( iter.next() )->terminated(); } catch ( RuntimeException &e ) { OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US ); OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() ); } } } void Pump::close() { // close streams and release references Reference< XInputStream > rInput; Reference< XOutputStream > rOutput; { MutexGuard guard( m_aMutex ); rInput = m_xInput; m_xInput.clear(); rOutput = m_xOutput; m_xOutput.clear(); m_xSucc.clear(); m_xPred.clear(); } if( rInput.is() ) { try { rInput->closeInput(); } catch( Exception &e ) { // go down calm } } if( rOutput.is() ) { try { rOutput->closeOutput(); } catch( Exception &e ) { // go down calm } } } void Pump::static_run( void* pObject ) { ((Pump*)pObject)->run(); ((Pump*)pObject)->release(); } void Pump::run() { try { fireStarted(); try { Reference< XInputStream > rInput; Reference< XOutputStream > rOutput; { Guard< Mutex > aGuard( m_aMutex ); rInput = m_xInput; rOutput = m_xOutput; } if( ! rInput.is() ) { NotConnectedException exception( OUString::createFromAscii( "no input stream set" ) , Reference((OWeakObject*)this) ); throw exception; } Sequence< sal_Int8 > aData; long nBytes; while( nBytes = rInput->readSomeBytes( aData, 65536 ) ) { if( ! rOutput.is() ) { NotConnectedException exception( OUString::createFromAscii( "no output stream set" ) , Reference( (OWeakObject*)this) ); throw exception; } rOutput->writeBytes( aData ); osl_yieldThread(); } } catch ( IOException & e ) { fireError( makeAny( e ) ); } catch ( RuntimeException & e ) { fireError( makeAny( e ) ); } catch ( Exception & e ) { fireError( makeAny( e ) ); } close(); fireClose(); } catch ( com::sun::star::uno::Exception &e ) { // we are the last on the stack. // this is to avoid crashing the program, when e.g. a bridge crashes OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US ); OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception", sMessage.getStr() ); } } // ------------------------------------------------------------ /* * XConnectable */ void Pump::setPredecessor( const Reference< XConnectable >& xPred ) throw() { Guard< Mutex > aGuard( m_aMutex ); m_xPred = xPred; } // ------------------------------------------------------------ Reference< XConnectable > Pump::getPredecessor() throw() { Guard< Mutex > aGuard( m_aMutex ); return m_xPred; } // ------------------------------------------------------------ void Pump::setSuccessor( const Reference< XConnectable >& xSucc ) throw() { Guard< Mutex > aGuard( m_aMutex ); m_xSucc = xSucc; } // ------------------------------------------------------------ Reference< XConnectable > Pump::getSuccessor() throw() { Guard< Mutex > aGuard( m_aMutex ); return m_xSucc; } // ----------------------------------------------------------------- /* * XActiveDataControl */ void Pump::addListener( const Reference< XStreamListener >& xListener ) throw() { m_cnt.addInterface( xListener ); } // ------------------------------------------------------------ void Pump::removeListener( const Reference< XStreamListener >& xListener ) throw() { m_cnt.removeInterface( xListener ); } // ------------------------------------------------------------ void Pump::start() throw( RuntimeException ) { Guard< Mutex > aGuard( m_aMutex ); m_aThread = osl_createSuspendedThread((oslWorkerFunction)Pump::static_run,this); if( m_aThread ) { // will be released by OPump::static_run acquire(); osl_resumeThread( m_aThread ); } else { throw RuntimeException( OUString( RTL_CONSTASCII_USTRINGPARAM( "Pump::start Couldn't create worker thread" )), *this); } } // ------------------------------------------------------------ void Pump::terminate() throw() { close(); // wait for the worker to die if( m_aThread ) osl_joinWithThread( m_aThread ); fireTerminated(); fireClose(); } // ------------------------------------------------------------ /* * XActiveDataSink */ void Pump::setInputStream( const Reference< XInputStream >& xStream ) throw() { Guard< Mutex > aGuard( m_aMutex ); m_xInput = xStream; Reference< XConnectable > xConnect( xStream, UNO_QUERY ); if( xConnect.is() ) xConnect->setSuccessor( this ); // data transfer starts in XActiveDataControl::start } // ------------------------------------------------------------ Reference< XInputStream > Pump::getInputStream() throw() { Guard< Mutex > aGuard( m_aMutex ); return m_xInput; } // ------------------------------------------------------------ /* * XActiveDataSource */ void Pump::setOutputStream( const Reference< XOutputStream >& xOut ) throw() { Guard< Mutex > aGuard( m_aMutex ); m_xOutput = xOut; Reference< XConnectable > xConnect( xOut, UNO_QUERY ); if( xConnect.is() ) xConnect->setPredecessor( this ); // data transfer starts in XActiveDataControl::start } // ------------------------------------------------------------ Reference< XOutputStream > Pump::getOutputStream() throw() { Guard< Mutex > aGuard( m_aMutex ); return m_xOutput; } // XServiceInfo OUString Pump::getImplementationName() throw( ) { return OPumpImpl_getImplementationName(); } // XServiceInfo sal_Bool Pump::supportsService(const OUString& ServiceName) throw( ) { Sequence< OUString > aSNL = getSupportedServiceNames(); const OUString * pArray = aSNL.getConstArray(); for( sal_Int32 i = 0; i < aSNL.getLength(); i++ ) if( pArray[i] == ServiceName ) return sal_True; return sal_False; } // XServiceInfo Sequence< OUString > Pump::getSupportedServiceNames(void) throw( ) { return OPumpImpl_getSupportedServiceNames(); } Reference< XInterface > SAL_CALL OPumpImpl_CreateInstance( const Reference< XComponentContext > & rSMgr ) throw (Exception) { return Reference< XInterface >( *new Pump ); } OUString OPumpImpl_getImplementationName() { return OUString( RTL_CONSTASCII_USTRINGPARAM( "com.sun.star.comp.io.Pump") ); } Sequence OPumpImpl_getSupportedServiceNames(void) { OUString s( RTL_CONSTASCII_USTRINGPARAM( "com.sun.star.io.Pump" ) ); Sequence< OUString > seq( &s , 1 ); return seq; } }