diff options
Diffstat (limited to 'package/source')
-rw-r--r-- | package/source/zipapi/ZipOutputEntry.cxx | 6 | ||||
-rw-r--r-- | package/source/zipapi/ZipOutputStream.cxx | 100 | ||||
-rw-r--r-- | package/source/zippackage/ZipPackageStream.cxx | 9 |
3 files changed, 87 insertions, 28 deletions
diff --git a/package/source/zipapi/ZipOutputEntry.cxx b/package/source/zipapi/ZipOutputEntry.cxx index 47ecbb4622af..6f983d372bfd 100644 --- a/package/source/zipapi/ZipOutputEntry.cxx +++ b/package/source/zipapi/ZipOutputEntry.cxx @@ -55,8 +55,9 @@ ZipOutputEntry::ZipOutputEntry( , m_xOutStream(rxOutput) , m_pCurrentEntry(&rEntry) , m_nDigested(0) -, m_bEncryptCurrentEntry(bEncrypt) , m_pCurrentStream(pStream) +, m_bEncryptCurrentEntry(bEncrypt) +, m_bFinished(false) { assert(m_pCurrentEntry->nMethod == DEFLATED && "Use ZipPackageStream::rawWrite() for STORED entries"); assert(m_xOutStream.is()); @@ -77,8 +78,9 @@ ZipOutputEntry::ZipOutputEntry( , m_xContext(rxContext) , m_pCurrentEntry(&rEntry) , m_nDigested(0) -, m_bEncryptCurrentEntry(bEncrypt) , m_pCurrentStream(pStream) +, m_bEncryptCurrentEntry(bEncrypt) +, m_bFinished(false) { assert(m_pCurrentEntry->nMethod == DEFLATED && "Use ZipPackageStream::rawWrite() for STORED entries"); if (m_bEncryptCurrentEntry) diff --git a/package/source/zipapi/ZipOutputStream.cxx b/package/source/zipapi/ZipOutputStream.cxx index 7d8378261666..8cd8b9056989 100644 --- a/package/source/zipapi/ZipOutputStream.cxx +++ b/package/source/zipapi/ZipOutputStream.cxx @@ -93,42 +93,90 @@ void ZipOutputStream::rawCloseEntry( bool bEncrypt ) m_pCurrentEntry = nullptr; } -void ZipOutputStream::finish() - throw(IOException, RuntimeException) +void ZipOutputStream::consumeScheduledThreadEntry(ZipOutputEntry* pCandidate) { - assert(!m_aZipList.empty() && "Zip file must have at least one entry!"); + //Any exceptions thrown in the threads were caught and stored for now + ::css::uno::Any aCaughtException(pCandidate->getParallelDeflateException()); + if (aCaughtException.hasValue()) + ::cppu::throwException(aCaughtException); - // Wait for all threads to finish & write - m_rSharedThreadPool.waitUntilEmpty(); - for (size_t i = 0; i < m_aEntries.size(); i++) + writeLOC(pCandidate->getZipEntry(), pCandidate->isEncrypt()); + + sal_Int32 nRead; + uno::Sequence< sal_Int8 > aSequence(n_ConstBufferSize); + uno::Reference< io::XInputStream > xInput = pCandidate->getData(); + do { - //Any exceptions thrown in the threads were caught and stored for now - ::css::uno::Any aCaughtException(m_aEntries[i]->getParallelDeflateException()); - if (aCaughtException.hasValue()) - ::cppu::throwException(aCaughtException); + nRead = xInput->readBytes(aSequence, n_ConstBufferSize); + if (nRead < n_ConstBufferSize) + aSequence.realloc(nRead); - writeLOC(m_aEntries[i]->getZipEntry(), m_aEntries[i]->isEncrypt()); + rawWrite(aSequence); + } + while (nRead == n_ConstBufferSize); + xInput.clear(); - sal_Int32 nRead; - uno::Sequence< sal_Int8 > aSequence(n_ConstBufferSize); - uno::Reference< io::XInputStream > xInput = m_aEntries[i]->getData(); - do - { - nRead = xInput->readBytes(aSequence, n_ConstBufferSize); - if (nRead < n_ConstBufferSize) - aSequence.realloc(nRead); + rawCloseEntry(pCandidate->isEncrypt()); + + pCandidate->getZipPackageStream()->successfullyWritten(pCandidate->getZipEntry()); + pCandidate->deleteBufferFile(); + delete pCandidate; +} - rawWrite(aSequence); +void ZipOutputStream::consumeFinishedScheduledThreadEntries() +{ + std::vector< ZipOutputEntry* > aNonFinishedEntries; + + for(auto aIter = m_aEntries.begin(); aIter != m_aEntries.end(); ++aIter) + { + if((*aIter)->isFinished()) + { + consumeScheduledThreadEntry(*aIter); } - while (nRead == n_ConstBufferSize); - xInput.clear(); + else + { + aNonFinishedEntries.push_back(*aIter); + } + } + + // always reset to non-consumed entries + m_aEntries = aNonFinishedEntries; +} - rawCloseEntry(m_aEntries[i]->isEncrypt()); +void ZipOutputStream::consumeAllScheduledThreadEntries() +{ + while(!m_aEntries.empty()) + { + ZipOutputEntry* pCandidate = m_aEntries.back(); + m_aEntries.pop_back(); + consumeScheduledThreadEntry(pCandidate); + } +} + +void ZipOutputStream::reduceScheduledThreadsToGivenNumberOrLess(sal_Int32 nThreads, sal_Int32 nWaitTimeInTenthSeconds) +{ + while(static_cast< sal_Int32 >(m_aEntries.size()) > nThreads) + { + consumeFinishedScheduledThreadEntries(); - m_aEntries[i]->getZipPackageStream()->successfullyWritten(m_aEntries[i]->getZipEntry()); - m_aEntries[i]->deleteBufferFile(); - delete m_aEntries[i]; + if(static_cast< sal_Int32 >(m_aEntries.size()) > nThreads) + { + const TimeValue aTimeValue(0, 100000 * nWaitTimeInTenthSeconds); + osl_waitThread(&aTimeValue); + } } +} + +void ZipOutputStream::finish() + throw(IOException, RuntimeException) +{ + assert(!m_aZipList.empty() && "Zip file must have at least one entry!"); + + // Wait for all threads to finish & write + m_rSharedThreadPool.waitUntilEmpty(); + + // consume all processed entries + consumeAllScheduledThreadEntries(); sal_Int32 nOffset= static_cast < sal_Int32 > (m_aChucker.GetPosition()); for (size_t i = 0; i < m_aZipList.size(); i++) diff --git a/package/source/zippackage/ZipPackageStream.cxx b/package/source/zippackage/ZipPackageStream.cxx index 2fbcfdba19ff..a183cf651796 100644 --- a/package/source/zippackage/ZipPackageStream.cxx +++ b/package/source/zippackage/ZipPackageStream.cxx @@ -54,6 +54,7 @@ #include <rtl/random.h> #include <PackageConstants.hxx> +#include <thread> using namespace com::sun::star::packages::zip::ZipConstants; using namespace com::sun::star::packages::zip; @@ -478,6 +479,7 @@ private: deflateZipEntry(mpEntry, mxInStream); mxInStream.clear(); mpEntry->closeBufferFile(); + mpEntry->setFinished(); } catch (const uno::Exception&) { @@ -824,6 +826,13 @@ bool ZipPackageStream::saveChild( if (bParallelDeflate) { + // tdf#93553 limit to a useful amount of threads. Taking number of available + // cores and allow 4-times the amount for having the queue well filled. The + // 2nd pparameter is the time to wait beweeen cleanups in 10th of a second. + // Both values may be added to the configuration settings if needed. + static sal_Int32 nAllowedThreads(std::max(std::thread::hardware_concurrency(), 1U) * 4); + rZipOut.reduceScheduledThreadsToGivenNumberOrLess(nAllowedThreads, 1); + // Start a new thread deflating this zip entry ZipOutputEntry *pZipEntry = new ZipOutputEntry( m_xContext, *pTempEntry, this, bToBeEncrypted); |