diff options
author | Vladimir Glazounov <vg@openoffice.org> | 2003-07-25 10:35:45 +0000 |
---|---|---|
committer | Vladimir Glazounov <vg@openoffice.org> | 2003-07-25 10:35:45 +0000 |
commit | 220076ebc5526142836f10473b4265da6c6ce683 (patch) | |
tree | 8567449aa3898a301c2a818460ec309f78a9134c | |
parent | 0fffd1b58f345d69792012c833f88f0582db7260 (diff) |
INTEGRATION: CWS abi3 (1.44.10); FILE MERGED
2003/07/23 14:22:51 abi 1.44.10.7: #110333# old method for nonremote schemata
2003/07/23 09:23:40 abi 1.44.10.6: #110335# introduced untimed waiting for non-remote protocols, doubling of wait time for first retry, and macro ABI_IS_ON_VACATION_AETSCH for easier recovery
2003/07/18 17:41:49 abi 1.44.10.5: #110335# Loading from input stream in the background now possible.
2003/07/16 09:46:30 abi 1.44.10.4: #110668# moved condition-object to comphelper
2003/07/15 14:59:24 abi 1.44.10.3: #110688# replaced thread unsafe default active data sink(SvRefBase member) with thread safe and posting the result trough the queue
2003/07/09 11:58:17 abi 1.44.10.2: #110688# forgotten to remove unneeded include-file
2003/07/09 11:55:47 abi 1.44.10.1: #110688# copy-and-paste error, holding a reference to content not necessary
-rw-r--r-- | unotools/source/ucbhelper/ucblockbytes.cxx | 534 |
1 files changed, 469 insertions, 65 deletions
diff --git a/unotools/source/ucbhelper/ucblockbytes.cxx b/unotools/source/ucbhelper/ucblockbytes.cxx index 82df14669f30..13e1669833c1 100644 --- a/unotools/source/ucbhelper/ucblockbytes.cxx +++ b/unotools/source/ucbhelper/ucblockbytes.cxx @@ -2,9 +2,9 @@ * * $RCSfile: ucblockbytes.cxx,v $ * - * $Revision: 1.44 $ + * $Revision: 1.45 $ * - * last change: $Author: vg $ $Date: 2003-05-26 08:26:22 $ + * last change: $Author: vg $ $Date: 2003-07-25 11:35:45 $ * * The Contents of this file are made available subject to the terms of * either of the following licenses @@ -59,10 +59,12 @@ * ************************************************************************/ -#include <unotools/condition.hxx> #include <unotools/ucblockbytes.hxx> #include <comphelper/processfactory.hxx> +#ifndef _COMPHELPER_CONDITION_HXX_ +#include <comphelper/condition.hxx> +#endif #ifndef _OSL_THREAD_HXX_ #include <osl/thread.hxx> #endif @@ -347,7 +349,12 @@ public: Reference < XContent >& xContent, Reference < XInteractionHandler >& xInteract, Reference < XProgressHandler >& xProgress, - const Command& rArg); + const Command& rArg + ) + throw( + ContentCreationException, + RuntimeException + ); ~Moderator(); @@ -361,6 +368,9 @@ public: PROGRESSUPDATE, PROGRESSPOP, + INPUTSTREAM, + STREAM, + RESULT, TIMEDOUT, COMMANDABORTED, @@ -372,12 +382,13 @@ public: class ConditionRes - : public Condition + : public comphelper::Condition { public: - ConditionRes(Moderator& aModerator) - : m_aModerator(aModerator) + ConditionRes(osl::Mutex& aMutex,Moderator& aModerator) + : comphelper::Condition(aMutex), + m_aModerator(aModerator) { } @@ -399,6 +410,7 @@ public: sal_Int32 ioErrorCode; }; + Result getResult(const sal_uInt32 milliSec); @@ -411,12 +423,13 @@ public: class ConditionRep - : public Condition + : public comphelper::Condition { public: - ConditionRep(Moderator& aModerator) - : m_aModerator(aModerator) + ConditionRep(osl::Mutex& aMutex,Moderator& aModerator) + : comphelper::Condition(aMutex), + m_aModerator(aModerator) { } @@ -442,6 +455,10 @@ public: void pop( ); + void setStream(const Reference< XStream >& aStream); + + void setInputStream(const Reference<XInputStream> &rxInputStream); + protected: @@ -451,6 +468,8 @@ protected: private: + osl::Mutex m_aMutex; + friend class ConditionRes; ConditionRes m_aRes; @@ -463,13 +482,143 @@ private: ConditionRep m_aRep; ReplyType m_aReplyType; - Reference < XContent > m_xContent; - bool m_bInteract; - bool m_bProgress; - const Command& m_rArg; + Command m_aArg; + ::ucb::Content m_aContent; }; +class ModeratorsActiveDataStreamer + : public ::cppu::WeakImplHelper1<XActiveDataStreamer> +{ +public: + + ModeratorsActiveDataStreamer(Moderator &theModerator); + + ~ModeratorsActiveDataStreamer(); + + // XActiveDataStreamer + virtual void SAL_CALL + setStream( + const Reference< XStream >& aStream + ) + throw( + RuntimeException + ); + + virtual Reference<XStream> SAL_CALL + getStream ( + void + ) throw( + RuntimeException + ) + { + osl::MutexGuard aGuard(m_aMutex); + return m_xStream; + } + + +private: + + Moderator& m_aModerator; + + osl::Mutex m_aMutex; + Reference<XStream> m_xStream; +}; + + + +class ModeratorsActiveDataSink + : public ::cppu::WeakImplHelper1<XActiveDataSink> +{ +public: + + ModeratorsActiveDataSink(Moderator &theModerator); + + ~ModeratorsActiveDataSink(); + + // XActiveDataSink. + virtual void SAL_CALL + setInputStream ( + const Reference<XInputStream> &rxInputStream + ) + throw( + RuntimeException + ); + + virtual Reference<XInputStream> SAL_CALL + getInputStream ( + void + ) throw( + RuntimeException + ) + { + osl::MutexGuard aGuard(m_aMutex); + return m_xStream; + } + + +private: + + Moderator& m_aModerator; + osl::Mutex m_aMutex; + Reference<XInputStream> m_xStream; +}; + + + +ModeratorsActiveDataSink::ModeratorsActiveDataSink(Moderator &theModerator) + : m_aModerator(theModerator) +{ +} + + +ModeratorsActiveDataSink::~ModeratorsActiveDataSink() +{ +} + +// XActiveDataSink. +void SAL_CALL +ModeratorsActiveDataSink::setInputStream ( + const Reference<XInputStream> &rxInputStream +) + throw( + RuntimeException + ) +{ + m_aModerator.setInputStream(rxInputStream); + osl::MutexGuard aGuard(m_aMutex); + m_xStream = rxInputStream; +} + + +ModeratorsActiveDataStreamer::ModeratorsActiveDataStreamer( + Moderator &theModerator +) + : m_aModerator(theModerator) +{ +} + + +ModeratorsActiveDataStreamer::~ModeratorsActiveDataStreamer() +{ +} + +// XActiveDataStreamer. +void SAL_CALL +ModeratorsActiveDataStreamer::setStream ( + const Reference<XStream> &rxStream +) + throw( + RuntimeException + ) +{ + m_aModerator.setStream(rxStream); + osl::MutexGuard aGuard(m_aMutex); + m_xStream = rxStream; +} + + + class ModeratorsInteractionHandler : public ::cppu::WeakImplHelper1<XInteractionHandler> { @@ -582,19 +731,63 @@ Moderator::Moderator( Reference < XProgressHandler >& xProgress, const Command& rArg ) - : m_aRes(*this), + throw( + ::com::sun::star::ucb::ContentCreationException, + ::com::sun::star::uno::RuntimeException + ) + : m_aMutex(), + + m_aRes(m_aMutex,*this), m_aResultType(NORESULT), m_aResult(), m_nIOErrorCode(0), - m_aRep(*this), + m_aRep(m_aMutex,*this), m_aReplyType(NOREPLY), - m_xContent(xContent), - m_bInteract(xInteract.is()), - m_bProgress(xProgress.is()), - m_rArg(rArg) + m_aArg(rArg), + m_aContent( + xContent, + new UcbTaskEnvironment( + xInteract.is() ? new ModeratorsInteractionHandler(*this) : 0, + xProgress.is() ? new ModeratorsProgressHandler(*this) : 0 + )) { + // now exchange the whole data sink stuff + // with a thread safe version + + Reference<XInterface> *pxSink; + + PostCommandArgument2 aPostArg; + OpenCommandArgument2 aOpenArg; + + int dec(2); + if(m_aArg.Argument >>= aPostArg) { + pxSink = &aPostArg.Sink; + dec = 0; + } + else if(m_aArg.Argument >>= aOpenArg) { + pxSink = &aOpenArg.Sink; + dec = 1; + } + + if(dec ==2) + throw ContentCreationException(); + + Reference < XActiveDataSink > xActiveSink(*pxSink,UNO_QUERY); + if(xActiveSink.is()) + *pxSink = Reference<XInterface>( + (cppu::OWeakObject*)new ModeratorsActiveDataSink(*this)); + + Reference<XActiveDataStreamer> xStreamer( *pxSink, UNO_QUERY ); + if ( xStreamer.is() ) + *pxSink = Reference<XInterface>( + (cppu::OWeakObject*)new ModeratorsActiveDataStreamer(*this)); + + if(dec == 0) + m_aArg.Argument <<= aPostArg; + else if(dec == 1) + m_aArg.Argument <<= aOpenArg; } @@ -607,7 +800,7 @@ Moderator::Result Moderator::getResult(const sal_uInt32 milliSec) { Result ret; try { - ConditionWaiter aWaiter(m_aRes,milliSec); + comphelper::ConditionWaiter aWaiter(m_aRes,milliSec); ret.type = m_aResultType; ret.result = m_aResult; ret.ioErrorCode = m_nIOErrorCode; @@ -615,7 +808,7 @@ Moderator::Result Moderator::getResult(const sal_uInt32 milliSec) // reset m_aResultType = NORESULT; } - catch(const ConditionWaiter::timedout&) + catch(const comphelper::ConditionWaiter::timedout&) { ret.type = TIMEDOUT; } @@ -626,7 +819,7 @@ Moderator::Result Moderator::getResult(const sal_uInt32 milliSec) void Moderator::setReply(ReplyType aReplyType ) { - ConditionModifier aMod(m_aRep); + comphelper::ConditionModifier aMod(m_aRep); m_aReplyType = aReplyType; } @@ -637,13 +830,13 @@ void Moderator::handle( const Reference<XInteractionRequest >& Request ) do { { - ConditionModifier aMod(m_aRes); + comphelper::ConditionModifier aMod(m_aRes); m_aResultType = INTERACTIONREQUEST; m_aResult <<= Request; } { - ConditionWaiter aWait(m_aRep); + comphelper::ConditionWaiter aWait(m_aRep); aReplyType = m_aReplyType; // reset @@ -671,24 +864,90 @@ void Moderator::handle( const Reference<XInteractionRequest >& Request ) void Moderator::push( const Any& Status ) { - ConditionModifier aMod(m_aRes); - m_aResultType = PROGRESSPUSH; - m_aResult = Status; + { + comphelper::ConditionModifier aMod(m_aRes); + m_aResultType = PROGRESSPUSH; + m_aResult = Status; + } + ReplyType aReplyType; + { + comphelper::ConditionWaiter aWait(m_aRep); + aReplyType = m_aReplyType; + m_aReplyType = NOREPLY; + } + if(aReplyType == EXIT) + setReply(EXIT); } void Moderator::update( const Any& Status ) { - ConditionModifier aMod(m_aRes); - m_aResultType = PROGRESSUPDATE; - m_aResult = Status; + { + comphelper::ConditionModifier aMod(m_aRes); + m_aResultType = PROGRESSUPDATE; + m_aResult = Status; + } + ReplyType aReplyType; + { + comphelper::ConditionWaiter aWait(m_aRep); + aReplyType = m_aReplyType; + m_aReplyType = NOREPLY; + } + if(aReplyType == EXIT) + setReply(EXIT); } void Moderator::pop( ) { - ConditionModifier aMod(m_aRes); - m_aResultType = PROGRESSPOP; + { + comphelper::ConditionModifier aMod(m_aRes); + m_aResultType = PROGRESSPOP; + } + ReplyType aReplyType; + { + comphelper::ConditionWaiter aWait(m_aRep); + aReplyType = m_aReplyType; + m_aReplyType = NOREPLY; + } + if(aReplyType == EXIT) + setReply(EXIT); +} + + +void Moderator::setStream(const Reference< XStream >& aStream) +{ + { + comphelper::ConditionModifier aMod(m_aRes); + m_aResultType = STREAM; + m_aResult <<= aStream; + } + ReplyType aReplyType; + { + comphelper::ConditionWaiter aWait(m_aRep); + aReplyType = m_aReplyType; + m_aReplyType = NOREPLY; + } + if(aReplyType == EXIT) + setReply(EXIT); +} + + +void Moderator::setInputStream(const Reference<XInputStream> &rxInputStream) +{ + { + comphelper::ConditionModifier aMod(m_aRes); + m_aResultType = INPUTSTREAM; + m_aResult <<= rxInputStream; + } + ReplyType aReplyType; + { + comphelper::ConditionWaiter aWait(m_aRep); + m_aReplyType = NOREPLY; + m_aReplyType = NOREPLY; + } + if(aReplyType == EXIT) + setReply(EXIT); } @@ -701,15 +960,7 @@ void SAL_CALL Moderator::run() try { - ::ucb::Content aContent( - m_xContent, - new UcbTaskEnvironment( - m_bInteract ? new ModeratorsInteractionHandler(*this) : 0, - m_bProgress ? new ModeratorsProgressHandler(*this) : 0 - ) - ); - - aResult = aContent.executeCommand(m_rArg.Name,m_rArg.Argument); + aResult = m_aContent.executeCommand(m_aArg.Name,m_aArg.Argument); aResultType = RESULT; } catch ( CommandAbortedException ) @@ -735,19 +986,20 @@ void SAL_CALL Moderator::run() } { - ConditionModifier aMod(m_aRes); + comphelper::ConditionModifier aMod(m_aRes); m_aResultType = aResultType; m_aResult = aResult; m_nIOErrorCode = nIOErrorCode; } - - ConditionWaiter aWaiter(m_aRep); } void SAL_CALL Moderator::onTerminated() { + { + comphelper::ConditionWaiter aWaiter(m_aRep); + } delete this; } @@ -757,6 +1009,15 @@ void SAL_CALL Moderator::onTerminated() but with handled timeout; */ +static sal_Bool _UCBOpenContentSync( + UcbLockBytesRef xLockBytes, + Reference < XContent > xContent, + const Command& rArg, + Reference < XInterface > xSink, + Reference < XInteractionHandler > xInteract, + Reference < XProgressHandler > xProgress, + UcbLockBytesHandlerRef xHandler ); + static sal_Bool UCBOpenContentSync( UcbLockBytesRef xLockBytes, @@ -774,8 +1035,20 @@ static sal_Bool UCBOpenContentSync( Reference<XContentIdentifier> xContId( xContent.is() ? xContent->getIdentifier() : 0 ); - if (xContId.is() && xContId->getContentProviderScheme().compareToAscii( - "http") != COMPARE_EQUAL ) + + rtl::OUString aScheme; + if(xContId.is()) + aScheme = xContId->getContentProviderScheme(); + + // now determine wether we use a timeout or not; + if( ! aScheme.equalsIgnoreAsciiCaseAscii("http") && + ! aScheme.equalsIgnoreAsciiCaseAscii("vnd.sun.star.webdav") && + ! aScheme.equalsIgnoreAsciiCaseAscii("ftp")) + return _UCBOpenContentSync( + xLockBytes,xContent,rArg,xSink,xInteract,xProgress,xHandler); + + if (aScheme.compareToAscii( + "http") != COMPARE_EQUAL ) xLockBytes->SetStreamValid_Impl(); Reference< XPropertiesChangeListener > xListener; @@ -791,37 +1064,74 @@ static sal_Bool UCBOpenContentSync( Any aResult; bool bException(false); bool bAborted(false); - - Moderator* pMod = new Moderator(xContent,xInteract,xProgress,rArg); - pMod->create(); - bool bResultAchieved(false); + Moderator* pMod = 0; + try { + pMod = new Moderator(xContent,xInteract,xProgress,rArg); + pMod->create(); + } catch(const ContentCreationException&) { + bResultAchieved = bException = true; + xLockBytes->SetError( ERRCODE_IO_GENERAL ); + } + + sal_uInt32 nTimeout(5000); // initially 5000 milliSec while(!bResultAchieved) { - // try to get the result for 5000 milli seconds - Moderator::Result res = pMod->getResult(5000); + Moderator::Result res; + // try to get the result for with timeout + res = pMod->getResult(nTimeout); switch(res.type) { - case Moderator::PROGRESSPUSH: + case Moderator::PROGRESSPUSH: { if(xProgress.is()) xProgress->push(res.result); + pMod->setReply(Moderator::REQUESTHANDLED); break; } - case Moderator::PROGRESSUPDATE: + case Moderator::PROGRESSUPDATE: { if(xProgress.is()) xProgress->update(res.result); + pMod->setReply(Moderator::REQUESTHANDLED); break; } - case Moderator::PROGRESSPOP: + case Moderator::PROGRESSPOP: { if(xProgress.is()) xProgress->pop(); + pMod->setReply(Moderator::REQUESTHANDLED); + break; + } + case Moderator::STREAM: + { + Reference<XStream> result; + if(res.result >>= result) { + Reference < XActiveDataStreamer > xStreamer( + xSink, UNO_QUERY + ); + + if(xStreamer.is()) + xStreamer->setStream(result); + } + pMod->setReply(Moderator::REQUESTHANDLED); + break; + } + case Moderator::INPUTSTREAM: + { + Reference<XInputStream> result; + res.result >>= result; + Reference < XActiveDataSink > xActiveSink( + xSink, UNO_QUERY + ); + + if(xActiveSink.is()) + xActiveSink->setInputStream(result); + pMod->setReply(Moderator::REQUESTHANDLED); break; } - case Moderator::TIMEDOUT: + case Moderator::TIMEDOUT: { Reference<XInteractionRetry> xRet; if(xInteract.is()) { @@ -866,7 +1176,7 @@ static sal_Bool UCBOpenContentSync( break; } - case Moderator::INTERACTIONREQUEST: + case Moderator::INTERACTIONREQUEST: { Reference<XInteractionRequest> Request; res.result >>= Request; @@ -874,25 +1184,25 @@ static sal_Bool UCBOpenContentSync( pMod->setReply(Moderator::REQUESTHANDLED); break; } - case Moderator::RESULT: + case Moderator::RESULT: { bResultAchieved = true; aResult = res.result; break; } - case Moderator::COMMANDABORTED: + case Moderator::COMMANDABORTED: { bAborted = true; xLockBytes->SetError( ERRCODE_ABORT ); break; } - case Moderator::COMMANDFAILED: + case Moderator::COMMANDFAILED: { bAborted = true; xLockBytes->SetError( ERRCODE_ABORT ); break; } - case Moderator::INTERACTIVEIO: + case Moderator::INTERACTIVEIO: { bException = true; if ( res.ioErrorCode == IOErrorCode_ACCESS_DENIED || @@ -906,13 +1216,13 @@ static sal_Bool UCBOpenContentSync( xLockBytes->SetError( ERRCODE_IO_GENERAL ); break; } - case Moderator::UNSUPPORTED: + case Moderator::UNSUPPORTED: { bException = true; xLockBytes->SetError( ERRCODE_IO_NOTSUPPORTED ); break; } - default: + default: { bException = true; xLockBytes->SetError( ERRCODE_IO_GENERAL ); @@ -922,9 +1232,10 @@ static sal_Bool UCBOpenContentSync( bResultAchieved |= bException; bResultAchieved |= bAborted; + if(nTimeout == 5000) nTimeout *= 2; } - pMod->setReply(Moderator::EXIT); + if(pMod) pMod->setReply(Moderator::EXIT); if ( bAborted || bException ) { @@ -952,6 +1263,99 @@ static sal_Bool UCBOpenContentSync( return ( bAborted || bException ); } +/** + Function for opening UCB contents synchronously + */ +static sal_Bool _UCBOpenContentSync( + UcbLockBytesRef xLockBytes, + Reference < XContent > xContent, + const Command& rArg, + Reference < XInterface > xSink, + Reference < XInteractionHandler > xInteract, + Reference < XProgressHandler > xProgress, + UcbLockBytesHandlerRef xHandler ) +{ + ::ucb::Content aContent( xContent, new UcbTaskEnvironment( xInteract, xProgress ) ); + Reference < XContentIdentifier > xIdent = xContent->getIdentifier(); + ::rtl::OUString aScheme = xIdent->getContentProviderScheme(); + + // http protocol must be handled in a special way: during the opening process the input stream may change + // only the last inputstream after notifying the document headers is valid + if ( aScheme.compareToAscii("http") != COMPARE_EQUAL ) + xLockBytes->SetStreamValid_Impl(); + + Reference< XPropertiesChangeListener > xListener = new UcbPropertiesChangeListener_Impl( xLockBytes ); + Reference< XPropertiesChangeNotifier > xProps ( xContent, UNO_QUERY ); + if ( xProps.is() ) + xProps->addPropertiesChangeListener( Sequence< ::rtl::OUString >(), xListener ); + + Any aResult; + bool bException = false; + bool bAborted = false; + + try + { + aResult = aContent.executeCommand( rArg.Name, rArg.Argument ); + } + catch ( CommandAbortedException ) + { + bAborted = true; + xLockBytes->SetError( ERRCODE_ABORT ); + } + catch ( CommandFailedException ) + { + bAborted = true; + xLockBytes->SetError( ERRCODE_ABORT ); + } + catch ( InteractiveIOException& r ) + { + bException = true; + if ( r.Code == IOErrorCode_ACCESS_DENIED || r.Code == IOErrorCode_LOCKING_VIOLATION ) + xLockBytes->SetError( ERRCODE_IO_ACCESSDENIED ); + else if ( r.Code == IOErrorCode_NOT_EXISTING ) + xLockBytes->SetError( ERRCODE_IO_NOTEXISTS ); + else if ( r.Code == IOErrorCode_CANT_READ ) + xLockBytes->SetError( ERRCODE_IO_CANTREAD ); + else + xLockBytes->SetError( ERRCODE_IO_GENERAL ); + } + catch ( UnsupportedDataSinkException& ) + { + bException = true; + xLockBytes->SetError( ERRCODE_IO_NOTSUPPORTED ); + } + catch ( Exception ) + { + bException = true; + xLockBytes->SetError( ERRCODE_IO_GENERAL ); + } + + if ( bAborted || bException ) + { + if( xHandler.Is() ) + xHandler->Handle( UcbLockBytesHandler::CANCEL, xLockBytes ); + + Reference < XActiveDataSink > xActiveSink( xSink, UNO_QUERY ); + if ( xActiveSink.is() ) + xActiveSink->setInputStream( Reference < XInputStream >() ); + + Reference < XActiveDataStreamer > xStreamer( xSink, UNO_QUERY ); + if ( xStreamer.is() ) + xStreamer->setStream( Reference < XStream >() ); + } + + Reference < XActiveDataControl > xControl( xSink, UNO_QUERY ); + if ( xControl.is() ) + xControl->terminate(); + + + if ( xProps.is() ) + xProps->removePropertiesChangeListener( Sequence< ::rtl::OUString >(), xListener ); + + return ( bAborted || bException ); +} + + //---------------------------------------------------------------------------- static void copyInputToOutput( const Reference< XInputStream >& aIn, const Reference< XOutputStream >& aOut ) { |