diff options
author | Armin Le Grand <Armin.Le.Grand@cib.de> | 2016-03-16 15:14:13 +0100 |
---|---|---|
committer | Armin Le Grand <Armin.Le.Grand@cib.de> | 2016-03-22 08:56:08 +0000 |
commit | 7e2ea27e5d56f5cf767a6718a0f5edc28e24af14 (patch) | |
tree | 63673e8a3f21ef7438fdb46edfa627e4d1aa7050 /package/source | |
parent | 0f0cea28c75a6565c7803b54536d4a8720ead160 (diff) |
tdf#93553 limit parallelism at zip save time to useful amount
At ODT export time writing and zipping comtained data packages is nicely
parallelized, but not limited to an upper bounds of threads to use.
Together with memory consumption this makes ressource usage and runtime
behaviour bad to crashing (mostly on 32bit).
I have now limited the processing dependent on the number of available
cores to get a good processing/ressource ratio. The result uses much less
memory, is faster and runs on 32bit systems.
Change-Id: I8bd516a9a0cefd644f5d7001214bc717f29770ab
Reviewed-on: https://gerrit.libreoffice.org/23305
Tested-by: Jenkins <ci@libreoffice.org>
Reviewed-by: Noel Grandin <noelgrandin@gmail.com>
Diffstat (limited to 'package/source')
-rw-r--r-- | package/source/zipapi/ZipOutputEntry.cxx | 6 | ||||
-rw-r--r-- | package/source/zipapi/ZipOutputStream.cxx | 100 | ||||
-rw-r--r-- | package/source/zippackage/ZipPackageStream.cxx | 9 |
3 files changed, 87 insertions, 28 deletions
diff --git a/package/source/zipapi/ZipOutputEntry.cxx b/package/source/zipapi/ZipOutputEntry.cxx index 47ecbb4622af..6f983d372bfd 100644 --- a/package/source/zipapi/ZipOutputEntry.cxx +++ b/package/source/zipapi/ZipOutputEntry.cxx @@ -55,8 +55,9 @@ ZipOutputEntry::ZipOutputEntry( , m_xOutStream(rxOutput) , m_pCurrentEntry(&rEntry) , m_nDigested(0) -, m_bEncryptCurrentEntry(bEncrypt) , m_pCurrentStream(pStream) +, m_bEncryptCurrentEntry(bEncrypt) +, m_bFinished(false) { assert(m_pCurrentEntry->nMethod == DEFLATED && "Use ZipPackageStream::rawWrite() for STORED entries"); assert(m_xOutStream.is()); @@ -77,8 +78,9 @@ ZipOutputEntry::ZipOutputEntry( , m_xContext(rxContext) , m_pCurrentEntry(&rEntry) , m_nDigested(0) -, m_bEncryptCurrentEntry(bEncrypt) , m_pCurrentStream(pStream) +, m_bEncryptCurrentEntry(bEncrypt) +, m_bFinished(false) { assert(m_pCurrentEntry->nMethod == DEFLATED && "Use ZipPackageStream::rawWrite() for STORED entries"); if (m_bEncryptCurrentEntry) diff --git a/package/source/zipapi/ZipOutputStream.cxx b/package/source/zipapi/ZipOutputStream.cxx index 7d8378261666..8cd8b9056989 100644 --- a/package/source/zipapi/ZipOutputStream.cxx +++ b/package/source/zipapi/ZipOutputStream.cxx @@ -93,42 +93,90 @@ void ZipOutputStream::rawCloseEntry( bool bEncrypt ) m_pCurrentEntry = nullptr; } -void ZipOutputStream::finish() - throw(IOException, RuntimeException) +void ZipOutputStream::consumeScheduledThreadEntry(ZipOutputEntry* pCandidate) { - assert(!m_aZipList.empty() && "Zip file must have at least one entry!"); + //Any exceptions thrown in the threads were caught and stored for now + ::css::uno::Any aCaughtException(pCandidate->getParallelDeflateException()); + if (aCaughtException.hasValue()) + ::cppu::throwException(aCaughtException); - // Wait for all threads to finish & write - m_rSharedThreadPool.waitUntilEmpty(); - for (size_t i = 0; i < m_aEntries.size(); i++) + writeLOC(pCandidate->getZipEntry(), pCandidate->isEncrypt()); + + sal_Int32 nRead; + uno::Sequence< sal_Int8 > aSequence(n_ConstBufferSize); + uno::Reference< io::XInputStream > xInput = pCandidate->getData(); + do { - //Any exceptions thrown in the threads were caught and stored for now - ::css::uno::Any aCaughtException(m_aEntries[i]->getParallelDeflateException()); - if (aCaughtException.hasValue()) - ::cppu::throwException(aCaughtException); + nRead = xInput->readBytes(aSequence, n_ConstBufferSize); + if (nRead < n_ConstBufferSize) + aSequence.realloc(nRead); - writeLOC(m_aEntries[i]->getZipEntry(), m_aEntries[i]->isEncrypt()); + rawWrite(aSequence); + } + while (nRead == n_ConstBufferSize); + xInput.clear(); - sal_Int32 nRead; - uno::Sequence< sal_Int8 > aSequence(n_ConstBufferSize); - uno::Reference< io::XInputStream > xInput = m_aEntries[i]->getData(); - do - { - nRead = xInput->readBytes(aSequence, n_ConstBufferSize); - if (nRead < n_ConstBufferSize) - aSequence.realloc(nRead); + rawCloseEntry(pCandidate->isEncrypt()); + + pCandidate->getZipPackageStream()->successfullyWritten(pCandidate->getZipEntry()); + pCandidate->deleteBufferFile(); + delete pCandidate; +} - rawWrite(aSequence); +void ZipOutputStream::consumeFinishedScheduledThreadEntries() +{ + std::vector< ZipOutputEntry* > aNonFinishedEntries; + + for(auto aIter = m_aEntries.begin(); aIter != m_aEntries.end(); ++aIter) + { + if((*aIter)->isFinished()) + { + consumeScheduledThreadEntry(*aIter); } - while (nRead == n_ConstBufferSize); - xInput.clear(); + else + { + aNonFinishedEntries.push_back(*aIter); + } + } + + // always reset to non-consumed entries + m_aEntries = aNonFinishedEntries; +} - rawCloseEntry(m_aEntries[i]->isEncrypt()); +void ZipOutputStream::consumeAllScheduledThreadEntries() +{ + while(!m_aEntries.empty()) + { + ZipOutputEntry* pCandidate = m_aEntries.back(); + m_aEntries.pop_back(); + consumeScheduledThreadEntry(pCandidate); + } +} + +void ZipOutputStream::reduceScheduledThreadsToGivenNumberOrLess(sal_Int32 nThreads, sal_Int32 nWaitTimeInTenthSeconds) +{ + while(static_cast< sal_Int32 >(m_aEntries.size()) > nThreads) + { + consumeFinishedScheduledThreadEntries(); - m_aEntries[i]->getZipPackageStream()->successfullyWritten(m_aEntries[i]->getZipEntry()); - m_aEntries[i]->deleteBufferFile(); - delete m_aEntries[i]; + if(static_cast< sal_Int32 >(m_aEntries.size()) > nThreads) + { + const TimeValue aTimeValue(0, 100000 * nWaitTimeInTenthSeconds); + osl_waitThread(&aTimeValue); + } } +} + +void ZipOutputStream::finish() + throw(IOException, RuntimeException) +{ + assert(!m_aZipList.empty() && "Zip file must have at least one entry!"); + + // Wait for all threads to finish & write + m_rSharedThreadPool.waitUntilEmpty(); + + // consume all processed entries + consumeAllScheduledThreadEntries(); sal_Int32 nOffset= static_cast < sal_Int32 > (m_aChucker.GetPosition()); for (size_t i = 0; i < m_aZipList.size(); i++) diff --git a/package/source/zippackage/ZipPackageStream.cxx b/package/source/zippackage/ZipPackageStream.cxx index 2fbcfdba19ff..a183cf651796 100644 --- a/package/source/zippackage/ZipPackageStream.cxx +++ b/package/source/zippackage/ZipPackageStream.cxx @@ -54,6 +54,7 @@ #include <rtl/random.h> #include <PackageConstants.hxx> +#include <thread> using namespace com::sun::star::packages::zip::ZipConstants; using namespace com::sun::star::packages::zip; @@ -478,6 +479,7 @@ private: deflateZipEntry(mpEntry, mxInStream); mxInStream.clear(); mpEntry->closeBufferFile(); + mpEntry->setFinished(); } catch (const uno::Exception&) { @@ -824,6 +826,13 @@ bool ZipPackageStream::saveChild( if (bParallelDeflate) { + // tdf#93553 limit to a useful amount of threads. Taking number of available + // cores and allow 4-times the amount for having the queue well filled. The + // 2nd pparameter is the time to wait beweeen cleanups in 10th of a second. + // Both values may be added to the configuration settings if needed. + static sal_Int32 nAllowedThreads(std::max(std::thread::hardware_concurrency(), 1U) * 4); + rZipOut.reduceScheduledThreadsToGivenNumberOrLess(nAllowedThreads, 1); + // Start a new thread deflating this zip entry ZipOutputEntry *pZipEntry = new ZipOutputEntry( m_xContext, *pTempEntry, this, bToBeEncrypted); |