diff options
author | Matúš Kukan <matus.kukan@collabora.com> | 2014-10-21 15:17:13 +0200 |
---|---|---|
committer | Matúš Kukan <matus.kukan@collabora.com> | 2014-11-17 10:49:23 +0100 |
commit | fbf714b45625c50bb1c736ef231b5dbbab0016a1 (patch) | |
tree | 0e1a9e9002a8ce8ca46d7a7071f40c08ffea77e4 /package | |
parent | db5552631b13e5a1d330929cd5093bd0f9894ec8 (diff) |
package: Finally implement parallel zip entries deflating
For that:
1, create ZipPackageStream::successfullyWritten to be called after
the content is written
2, Do not take mutex when reading from WrapStreamForShare - threads should
be using different streams anyway, but there is only one common mutex. :-/
Change-Id: I90303e49206b19454dd4141e24cc8be29c433045
Diffstat (limited to 'package')
-rw-r--r-- | package/inc/ZipOutputEntry.hxx | 3 | ||||
-rw-r--r-- | package/inc/ZipOutputStream.hxx | 7 | ||||
-rw-r--r-- | package/inc/ZipPackageStream.hxx | 6 | ||||
-rw-r--r-- | package/source/zipapi/ZipOutputEntry.cxx | 3 | ||||
-rw-r--r-- | package/source/zipapi/ZipOutputStream.cxx | 37 | ||||
-rw-r--r-- | package/source/zippackage/ZipPackageStream.cxx | 133 | ||||
-rw-r--r-- | package/source/zippackage/wrapstreamforshare.cxx | 4 |
7 files changed, 121 insertions, 72 deletions
diff --git a/package/inc/ZipOutputEntry.hxx b/package/inc/ZipOutputEntry.hxx index c24d5a905bfe..9e396ce4dc7b 100644 --- a/package/inc/ZipOutputEntry.hxx +++ b/package/inc/ZipOutputEntry.hxx @@ -54,6 +54,9 @@ public: ~ZipOutputEntry(); css::uno::Sequence< sal_Int8 > getData(); + ZipEntry* getZipEntry() { return m_pCurrentEntry; } + ZipPackageStream* getZipPackageStream() { return m_pCurrentStream; } + bool isEncrypt() { return m_bEncryptCurrentEntry; } void closeEntry(); void write(const css::uno::Sequence< sal_Int8 >& rBuffer, sal_Int32 nNewOffset, sal_Int32 nNewLength); diff --git a/package/inc/ZipOutputStream.hxx b/package/inc/ZipOutputStream.hxx index f11b8833d146..4e8e4ff150be 100644 --- a/package/inc/ZipOutputStream.hxx +++ b/package/inc/ZipOutputStream.hxx @@ -23,10 +23,12 @@ #include <com/sun/star/io/XOutputStream.hpp> #include <ByteChucker.hxx> +#include <comphelper/threadpool.hxx> #include <vector> struct ZipEntry; +class ZipOutputEntry; class ZipPackageStream; class ZipOutputStream @@ -35,14 +37,17 @@ class ZipOutputStream ::std::vector < ZipEntry * > m_aZipList; ByteChucker m_aChucker; - bool m_bFinished; ZipEntry *m_pCurrentEntry; + comphelper::ThreadPool &m_rSharedThreadPool; + std::vector< ZipOutputEntry* > m_aEntries; public: ZipOutputStream( const ::com::sun::star::uno::Reference< ::com::sun::star::io::XOutputStream > &xOStream ); ~ZipOutputStream(); + void addDeflatingThread( ZipOutputEntry *pEntry, comphelper::ThreadTask *pThreadTask ); + void writeLOC( ZipEntry *pEntry, bool bEncrypt = false ) throw(::com::sun::star::io::IOException, ::com::sun::star::uno::RuntimeException); void rawWrite( ::com::sun::star::uno::Sequence< sal_Int8 >& rBuffer, sal_Int32 nNewOffset, sal_Int32 nNewLength ) diff --git a/package/inc/ZipPackageStream.hxx b/package/inc/ZipPackageStream.hxx index 356d42b78f11..ff6d3db1f6f9 100644 --- a/package/inc/ZipPackageStream.hxx +++ b/package/inc/ZipPackageStream.hxx @@ -63,14 +63,13 @@ private: sal_uInt8 m_nStreamMode; sal_uInt32 m_nMagicalHackPos; sal_uInt32 m_nMagicalHackSize; + sal_Int64 m_nOwnStreamOrigSize; bool m_bHasSeekable; - bool m_bCompressedIsSetFromOutside; - bool m_bFromManifest; - bool m_bUseWinEncoding; + bool m_bRawStream; ::com::sun::star::uno::Reference< ::com::sun::star::io::XInputStream > GetOwnSeekStream(); @@ -138,6 +137,7 @@ public: void setZipEntryOnLoading( const ZipEntry &rInEntry); ::com::sun::star::uno::Reference< ::com::sun::star::io::XInputStream > SAL_CALL getRawData() throw(::com::sun::star::uno::RuntimeException); + void successfullyWritten( ZipEntry *pEntry ); static ::com::sun::star::uno::Sequence < sal_Int8 > static_getImplementationId(); diff --git a/package/source/zipapi/ZipOutputEntry.cxx b/package/source/zipapi/ZipOutputEntry.cxx index f43b5c7e843e..a5fbe25eef61 100644 --- a/package/source/zipapi/ZipOutputEntry.cxx +++ b/package/source/zipapi/ZipOutputEntry.cxx @@ -47,14 +47,13 @@ ZipOutputEntry::ZipOutputEntry( const uno::Reference< uno::XComponentContext >& , m_pCurrentEntry(&rEntry) , m_nDigested(0) , m_bEncryptCurrentEntry(bEncrypt) -, m_pCurrentStream(NULL) +, m_pCurrentStream(pStream) { assert(m_pCurrentEntry->nMethod == DEFLATED && "Use ZipPackageStream::rawWrite() for STORED entries"); if (m_bEncryptCurrentEntry) { m_xCipherContext = ZipFile::StaticGetCipher( rxContext, pStream->GetEncryptionData(), true ); m_xDigestContext = ZipFile::StaticGetDigestContextForChecksum( rxContext, pStream->GetEncryptionData() ); - m_pCurrentStream = pStream; } } diff --git a/package/source/zipapi/ZipOutputStream.cxx b/package/source/zipapi/ZipOutputStream.cxx index c9b6e08cfad7..fcfe35f82070 100644 --- a/package/source/zipapi/ZipOutputStream.cxx +++ b/package/source/zipapi/ZipOutputStream.cxx @@ -27,6 +27,7 @@ #include <PackageConstants.hxx> #include <ZipEntry.hxx> +#include <ZipOutputEntry.hxx> #include <ZipPackageStream.hxx> using namespace com::sun::star; @@ -39,15 +40,13 @@ using namespace com::sun::star::packages::zip::ZipConstants; ZipOutputStream::ZipOutputStream( const uno::Reference < io::XOutputStream > &xOStream ) : m_xStream(xOStream) , m_aChucker(xOStream) -, m_bFinished(false) , m_pCurrentEntry(NULL) +, m_rSharedThreadPool(comphelper::ThreadPool::getSharedOptimalPool()) { } ZipOutputStream::~ZipOutputStream( void ) { - for (sal_Int32 i = 0, nEnd = m_aZipList.size(); i < nEnd; i++) - delete m_aZipList[i]; } void ZipOutputStream::setEntry( ZipEntry *pEntry ) @@ -66,6 +65,12 @@ void ZipOutputStream::setEntry( ZipEntry *pEntry ) } } +void ZipOutputStream::addDeflatingThread( ZipOutputEntry *pEntry, comphelper::ThreadTask *pThread ) +{ + m_rSharedThreadPool.pushTask(pThread); + m_aEntries.push_back(pEntry); +} + void ZipOutputStream::rawWrite( Sequence< sal_Int8 >& rBuffer, sal_Int32 /*nNewOffset*/, sal_Int32 nNewLength ) throw(IOException, RuntimeException) { @@ -85,21 +90,33 @@ void ZipOutputStream::rawCloseEntry( bool bEncrypt ) m_pCurrentEntry = NULL; } -void ZipOutputStream::finish( ) +void ZipOutputStream::finish() throw(IOException, RuntimeException) { - if (m_bFinished) - return; + assert(!m_aZipList.empty() && "Zip file must have at least one entry!"); + + // Wait for all threads to finish & write + m_rSharedThreadPool.waitUntilEmpty(); + for (size_t i = 0; i < m_aEntries.size(); i++) + { + writeLOC(m_aEntries[i]->getZipEntry(), m_aEntries[i]->isEncrypt()); + uno::Sequence< sal_Int8 > aCompressedData = m_aEntries[i]->getData(); + rawWrite(aCompressedData, 0, aCompressedData.getLength()); + rawCloseEntry(m_aEntries[i]->isEncrypt()); - if (m_aZipList.size() < 1) - OSL_FAIL("Zip file must have at least one entry!\n"); + m_aEntries[i]->getZipPackageStream()->successfullyWritten(m_aEntries[i]->getZipEntry()); + delete m_aEntries[i]; + } sal_Int32 nOffset= static_cast < sal_Int32 > (m_aChucker.GetPosition()); - for (sal_Int32 i =0, nEnd = m_aZipList.size(); i < nEnd; i++) + for (size_t i = 0; i < m_aZipList.size(); i++) + { writeCEN( *m_aZipList[i] ); + delete m_aZipList[i]; + } writeEND( nOffset, static_cast < sal_Int32 > (m_aChucker.GetPosition()) - nOffset); - m_bFinished = true; m_xStream->flush(); + m_aZipList.clear(); } void ZipOutputStream::writeEND(sal_uInt32 nOffset, sal_uInt32 nLength) diff --git a/package/source/zippackage/ZipPackageStream.cxx b/package/source/zippackage/ZipPackageStream.cxx index ca2ad01d0bb1..9f29a6800ecc 100644 --- a/package/source/zippackage/ZipPackageStream.cxx +++ b/package/source/zippackage/ZipPackageStream.cxx @@ -90,10 +90,12 @@ ZipPackageStream::ZipPackageStream ( ZipPackage & rNewPackage, , m_nStreamMode( PACKAGE_STREAM_NOTSET ) , m_nMagicalHackPos( 0 ) , m_nMagicalHackSize( 0 ) +, m_nOwnStreamOrigSize( 0 ) , m_bHasSeekable( false ) , m_bCompressedIsSetFromOutside( false ) , m_bFromManifest( false ) , m_bUseWinEncoding( false ) +, m_bRawStream( false ) { m_xContext = xContext; m_nFormat = nFormat; @@ -437,6 +439,35 @@ bool ZipPackageStream::ParsePackageRawStream() return true; } +class DeflateThread: public comphelper::ThreadTask +{ + ZipOutputEntry *mpEntry; + uno::Reference< io::XInputStream > mxInStream; + +public: + DeflateThread( ZipOutputEntry *pEntry, + const uno::Reference< io::XInputStream >& xInStream ) + : mpEntry(pEntry) + , mxInStream(xInStream) + {} + +private: + virtual void doWork() SAL_OVERRIDE + { + sal_Int32 nLength = 0; + uno::Sequence< sal_Int8 > aSeq(n_ConstBufferSize); + do + { + nLength = mxInStream->readBytes(aSeq, n_ConstBufferSize); + mpEntry->write(aSeq, 0, nLength); + } + while (nLength == n_ConstBufferSize); + mpEntry->closeEntry(); + + mxInStream.clear(); + } +}; + static void ImplSetStoredData( ZipEntry & rEntry, uno::Reference< io::XInputStream> & rStream ) { // It's very annoying that we have to do this, but lots of zip packages @@ -497,20 +528,21 @@ bool ZipPackageStream::saveChild( OSL_ENSURE( m_nStreamMode != PACKAGE_STREAM_NOTSET, "Unacceptable ZipPackageStream mode!" ); - bool bRawStream = false; + m_bRawStream = false; if ( m_nStreamMode == PACKAGE_STREAM_DETECT ) - bRawStream = ParsePackageRawStream(); + m_bRawStream = ParsePackageRawStream(); else if ( m_nStreamMode == PACKAGE_STREAM_RAW ) - bRawStream = true; + m_bRawStream = true; + bool bParallelDeflate = false; bool bTransportOwnEncrStreamAsRaw = false; // During the storing the original size of the stream can be changed // TODO/LATER: get rid of this hack - sal_Int64 nOwnStreamOrigSize = bRawStream ? m_nMagicalHackSize : aEntry.nSize; + m_nOwnStreamOrigSize = m_bRawStream ? m_nMagicalHackSize : aEntry.nSize; bool bUseNonSeekableAccess = false; uno::Reference < io::XInputStream > xStream; - if ( !IsPackageMember() && !bRawStream && !bToBeEncrypted && bToBeCompressed ) + if ( !IsPackageMember() && !m_bRawStream && !bToBeEncrypted && bToBeCompressed ) { // the stream is not a package member, not a raw stream, // it should not be encrypted and it should be compressed, @@ -540,11 +572,11 @@ bool ZipPackageStream::saveChild( { // If the stream is a raw one, then we should be positioned // at the beginning of the actual data - if ( !bToBeCompressed || bRawStream ) + if ( !bToBeCompressed || m_bRawStream ) { // The raw stream can neither be encrypted nor connected - OSL_ENSURE( !bRawStream || !(bToBeCompressed || bToBeEncrypted), "The stream is already encrypted!\n" ); - xSeek->seek ( bRawStream ? m_nMagicalHackPos : 0 ); + OSL_ENSURE( !m_bRawStream || !(bToBeCompressed || bToBeEncrypted), "The stream is already encrypted!\n" ); + xSeek->seek ( m_bRawStream ? m_nMagicalHackPos : 0 ); ImplSetStoredData ( *pTempEntry, xStream ); // TODO/LATER: Get rid of hacks related to switching of Flag Method and Size properties! @@ -553,7 +585,7 @@ bool ZipPackageStream::saveChild( { // this is the correct original size pTempEntry->nSize = xSeek->getLength(); - nOwnStreamOrigSize = pTempEntry->nSize; + m_nOwnStreamOrigSize = pTempEntry->nSize; } xSeek->seek ( 0 ); @@ -592,7 +624,7 @@ bool ZipPackageStream::saveChild( return bSuccess; } - if ( bToBeEncrypted || bRawStream || bTransportOwnEncrStreamAsRaw ) + if ( bToBeEncrypted || m_bRawStream || bTransportOwnEncrStreamAsRaw ) { if ( bToBeEncrypted && !bTransportOwnEncrStreamAsRaw ) { @@ -624,11 +656,11 @@ bool ZipPackageStream::saveChild( aPropSet[PKG_MNFST_ITERATION].Value <<= m_xBaseEncryptionData->m_nIterationCount; // Need to store the uncompressed size in the manifest - OSL_ENSURE( nOwnStreamOrigSize >= 0, "The stream size was not correctly initialized!\n" ); + OSL_ENSURE( m_nOwnStreamOrigSize >= 0, "The stream size was not correctly initialized!\n" ); aPropSet[PKG_MNFST_UCOMPSIZE].Name = sSizeProperty; - aPropSet[PKG_MNFST_UCOMPSIZE].Value <<= nOwnStreamOrigSize; + aPropSet[PKG_MNFST_UCOMPSIZE].Value <<= m_nOwnStreamOrigSize; - if ( bRawStream || bTransportOwnEncrStreamAsRaw ) + if ( m_bRawStream || bTransportOwnEncrStreamAsRaw ) { ::rtl::Reference< EncryptionData > xEncData = GetEncryptionData(); if ( !xEncData.is() ) @@ -651,7 +683,7 @@ bool ZipPackageStream::saveChild( // If the entry is already stored in the zip file in the format we // want for this write...copy it raw if ( !bUseNonSeekableAccess - && ( bRawStream || bTransportOwnEncrStreamAsRaw + && ( m_bRawStream || bTransportOwnEncrStreamAsRaw || ( IsPackageMember() && !bToBeEncrypted && ( ( aEntry.nMethod == DEFLATED && bToBeCompressed ) || ( aEntry.nMethod == STORED && !bToBeCompressed ) ) ) ) ) @@ -671,7 +703,7 @@ bool ZipPackageStream::saveChild( try { - if ( bRawStream ) + if ( m_bRawStream ) xStream->skipBytes( m_nMagicalHackPos ); ZipOutputStream::setEntry(pTempEntry); @@ -733,35 +765,29 @@ bool ZipPackageStream::saveChild( try { ZipOutputStream::setEntry(pTempEntry); - rZipOut.writeLOC(pTempEntry, bToBeEncrypted); // the entry is provided to the ZipOutputStream that will delete it pAutoTempEntry.release(); - sal_Int32 nLength; - uno::Sequence < sal_Int8 > aSeq (n_ConstBufferSize); if (pTempEntry->nMethod == STORED) { + sal_Int32 nLength; + uno::Sequence< sal_Int8 > aSeq(n_ConstBufferSize); + rZipOut.writeLOC(pTempEntry, bToBeEncrypted); do { nLength = xStream->readBytes(aSeq, n_ConstBufferSize); rZipOut.rawWrite(aSeq, 0, nLength); } while ( nLength == n_ConstBufferSize ); + rZipOut.rawCloseEntry(bToBeEncrypted); } else { - ZipOutputEntry aZipEntry(m_xContext, *pTempEntry, this, bToBeEncrypted); - do - { - nLength = xStream->readBytes(aSeq, n_ConstBufferSize); - aZipEntry.write(aSeq, 0, nLength); - } - while ( nLength == n_ConstBufferSize ); - aZipEntry.closeEntry(); - uno::Sequence< sal_Int8 > aCompressedData = aZipEntry.getData(); - rZipOut.rawWrite(aCompressedData, 0, aCompressedData.getLength()); + bParallelDeflate = true; + // Start a new thread deflating this zip entry + ZipOutputEntry *pZipEntry = new ZipOutputEntry(m_xContext, *pTempEntry, this, bToBeEncrypted); + rZipOut.addDeflatingThread( pZipEntry, new DeflateThread(pZipEntry, xStream) ); } - rZipOut.rawCloseEntry(bToBeEncrypted); } catch ( ZipException& ) { @@ -793,36 +819,39 @@ bool ZipPackageStream::saveChild( } } - if( bSuccess ) - { - if ( !IsPackageMember() ) - { - CloseOwnStreamIfAny(); - SetPackageMember ( true ); - } + if (bSuccess && !bParallelDeflate) + successfullyWritten(pTempEntry); - if ( bRawStream ) - { - // the raw stream was integrated and now behaves - // as usual encrypted stream - SetToBeEncrypted( true ); - } + if ( aPropSet.getLength() + && ( m_nFormat == embed::StorageFormats::PACKAGE || m_nFormat == embed::StorageFormats::OFOPXML ) ) + rManList.push_back( aPropSet ); - // Then copy it back afterwards... - ZipPackageFolder::copyZipEntry ( aEntry, *pTempEntry ); + return bSuccess; +} - // TODO/LATER: get rid of this hack ( the encrypted stream size property is changed during saving ) - if ( IsEncrypted() ) - setSize( nOwnStreamOrigSize ); +void ZipPackageStream::successfullyWritten( ZipEntry *pEntry ) +{ + if ( !IsPackageMember() ) + { + CloseOwnStreamIfAny(); + SetPackageMember ( true ); + } - aEntry.nOffset *= -1; + if ( m_bRawStream ) + { + // the raw stream was integrated and now behaves + // as usual encrypted stream + SetToBeEncrypted( true ); } - if ( aPropSet.getLength() - && ( m_nFormat == embed::StorageFormats::PACKAGE || m_nFormat == embed::StorageFormats::OFOPXML ) ) - rManList.push_back( aPropSet ); + // Then copy it back afterwards... + ZipPackageFolder::copyZipEntry( aEntry, *pEntry ); - return bSuccess; + // TODO/LATER: get rid of this hack ( the encrypted stream size property is changed during saving ) + if ( IsEncrypted() ) + setSize( m_nOwnStreamOrigSize ); + + aEntry.nOffset *= -1; } void ZipPackageStream::SetPackageMember( bool bNewValue ) diff --git a/package/source/zippackage/wrapstreamforshare.cxx b/package/source/zippackage/wrapstreamforshare.cxx index 4f737bfa2964..c74e4b2de18b 100644 --- a/package/source/zippackage/wrapstreamforshare.cxx +++ b/package/source/zippackage/wrapstreamforshare.cxx @@ -54,8 +54,6 @@ sal_Int32 SAL_CALL WrapStreamForShare::readBytes( uno::Sequence< sal_Int8 >& aDa io::IOException, uno::RuntimeException, std::exception ) { - ::osl::MutexGuard aGuard( m_rMutexRef->GetMutex() ); - if ( !m_xInStream.is() ) throw io::IOException(THROW_WHERE ); @@ -73,8 +71,6 @@ sal_Int32 SAL_CALL WrapStreamForShare::readSomeBytes( uno::Sequence< sal_Int8 >& io::IOException, uno::RuntimeException, std::exception ) { - ::osl::MutexGuard aGuard( m_rMutexRef->GetMutex() ); - if ( !m_xInStream.is() ) throw io::IOException(THROW_WHERE ); |