summaryrefslogtreecommitdiff
path: root/package/source
diff options
context:
space:
mode:
Diffstat (limited to 'package/source')
-rw-r--r--package/source/zipapi/ZipOutputEntry.cxx6
-rw-r--r--package/source/zipapi/ZipOutputStream.cxx100
-rw-r--r--package/source/zippackage/ZipPackageStream.cxx9
3 files changed, 87 insertions, 28 deletions
diff --git a/package/source/zipapi/ZipOutputEntry.cxx b/package/source/zipapi/ZipOutputEntry.cxx
index 47ecbb4622af..6f983d372bfd 100644
--- a/package/source/zipapi/ZipOutputEntry.cxx
+++ b/package/source/zipapi/ZipOutputEntry.cxx
@@ -55,8 +55,9 @@ ZipOutputEntry::ZipOutputEntry(
, m_xOutStream(rxOutput)
, m_pCurrentEntry(&rEntry)
, m_nDigested(0)
-, m_bEncryptCurrentEntry(bEncrypt)
, m_pCurrentStream(pStream)
+, m_bEncryptCurrentEntry(bEncrypt)
+, m_bFinished(false)
{
assert(m_pCurrentEntry->nMethod == DEFLATED && "Use ZipPackageStream::rawWrite() for STORED entries");
assert(m_xOutStream.is());
@@ -77,8 +78,9 @@ ZipOutputEntry::ZipOutputEntry(
, m_xContext(rxContext)
, m_pCurrentEntry(&rEntry)
, m_nDigested(0)
-, m_bEncryptCurrentEntry(bEncrypt)
, m_pCurrentStream(pStream)
+, m_bEncryptCurrentEntry(bEncrypt)
+, m_bFinished(false)
{
assert(m_pCurrentEntry->nMethod == DEFLATED && "Use ZipPackageStream::rawWrite() for STORED entries");
if (m_bEncryptCurrentEntry)
diff --git a/package/source/zipapi/ZipOutputStream.cxx b/package/source/zipapi/ZipOutputStream.cxx
index 7d8378261666..8cd8b9056989 100644
--- a/package/source/zipapi/ZipOutputStream.cxx
+++ b/package/source/zipapi/ZipOutputStream.cxx
@@ -93,42 +93,90 @@ void ZipOutputStream::rawCloseEntry( bool bEncrypt )
m_pCurrentEntry = nullptr;
}
-void ZipOutputStream::finish()
- throw(IOException, RuntimeException)
+void ZipOutputStream::consumeScheduledThreadEntry(ZipOutputEntry* pCandidate)
{
- assert(!m_aZipList.empty() && "Zip file must have at least one entry!");
+ //Any exceptions thrown in the threads were caught and stored for now
+ ::css::uno::Any aCaughtException(pCandidate->getParallelDeflateException());
+ if (aCaughtException.hasValue())
+ ::cppu::throwException(aCaughtException);
- // Wait for all threads to finish & write
- m_rSharedThreadPool.waitUntilEmpty();
- for (size_t i = 0; i < m_aEntries.size(); i++)
+ writeLOC(pCandidate->getZipEntry(), pCandidate->isEncrypt());
+
+ sal_Int32 nRead;
+ uno::Sequence< sal_Int8 > aSequence(n_ConstBufferSize);
+ uno::Reference< io::XInputStream > xInput = pCandidate->getData();
+ do
{
- //Any exceptions thrown in the threads were caught and stored for now
- ::css::uno::Any aCaughtException(m_aEntries[i]->getParallelDeflateException());
- if (aCaughtException.hasValue())
- ::cppu::throwException(aCaughtException);
+ nRead = xInput->readBytes(aSequence, n_ConstBufferSize);
+ if (nRead < n_ConstBufferSize)
+ aSequence.realloc(nRead);
- writeLOC(m_aEntries[i]->getZipEntry(), m_aEntries[i]->isEncrypt());
+ rawWrite(aSequence);
+ }
+ while (nRead == n_ConstBufferSize);
+ xInput.clear();
- sal_Int32 nRead;
- uno::Sequence< sal_Int8 > aSequence(n_ConstBufferSize);
- uno::Reference< io::XInputStream > xInput = m_aEntries[i]->getData();
- do
- {
- nRead = xInput->readBytes(aSequence, n_ConstBufferSize);
- if (nRead < n_ConstBufferSize)
- aSequence.realloc(nRead);
+ rawCloseEntry(pCandidate->isEncrypt());
+
+ pCandidate->getZipPackageStream()->successfullyWritten(pCandidate->getZipEntry());
+ pCandidate->deleteBufferFile();
+ delete pCandidate;
+}
- rawWrite(aSequence);
+void ZipOutputStream::consumeFinishedScheduledThreadEntries()
+{
+ std::vector< ZipOutputEntry* > aNonFinishedEntries;
+
+ for(auto aIter = m_aEntries.begin(); aIter != m_aEntries.end(); ++aIter)
+ {
+ if((*aIter)->isFinished())
+ {
+ consumeScheduledThreadEntry(*aIter);
}
- while (nRead == n_ConstBufferSize);
- xInput.clear();
+ else
+ {
+ aNonFinishedEntries.push_back(*aIter);
+ }
+ }
+
+ // always reset to non-consumed entries
+ m_aEntries = aNonFinishedEntries;
+}
- rawCloseEntry(m_aEntries[i]->isEncrypt());
+void ZipOutputStream::consumeAllScheduledThreadEntries()
+{
+ while(!m_aEntries.empty())
+ {
+ ZipOutputEntry* pCandidate = m_aEntries.back();
+ m_aEntries.pop_back();
+ consumeScheduledThreadEntry(pCandidate);
+ }
+}
+
+void ZipOutputStream::reduceScheduledThreadsToGivenNumberOrLess(sal_Int32 nThreads, sal_Int32 nWaitTimeInTenthSeconds)
+{
+ while(static_cast< sal_Int32 >(m_aEntries.size()) > nThreads)
+ {
+ consumeFinishedScheduledThreadEntries();
- m_aEntries[i]->getZipPackageStream()->successfullyWritten(m_aEntries[i]->getZipEntry());
- m_aEntries[i]->deleteBufferFile();
- delete m_aEntries[i];
+ if(static_cast< sal_Int32 >(m_aEntries.size()) > nThreads)
+ {
+ const TimeValue aTimeValue(0, 100000 * nWaitTimeInTenthSeconds);
+ osl_waitThread(&aTimeValue);
+ }
}
+}
+
+void ZipOutputStream::finish()
+ throw(IOException, RuntimeException)
+{
+ assert(!m_aZipList.empty() && "Zip file must have at least one entry!");
+
+ // Wait for all threads to finish & write
+ m_rSharedThreadPool.waitUntilEmpty();
+
+ // consume all processed entries
+ consumeAllScheduledThreadEntries();
sal_Int32 nOffset= static_cast < sal_Int32 > (m_aChucker.GetPosition());
for (size_t i = 0; i < m_aZipList.size(); i++)
diff --git a/package/source/zippackage/ZipPackageStream.cxx b/package/source/zippackage/ZipPackageStream.cxx
index 2fbcfdba19ff..a183cf651796 100644
--- a/package/source/zippackage/ZipPackageStream.cxx
+++ b/package/source/zippackage/ZipPackageStream.cxx
@@ -54,6 +54,7 @@
#include <rtl/random.h>
#include <PackageConstants.hxx>
+#include <thread>
using namespace com::sun::star::packages::zip::ZipConstants;
using namespace com::sun::star::packages::zip;
@@ -478,6 +479,7 @@ private:
deflateZipEntry(mpEntry, mxInStream);
mxInStream.clear();
mpEntry->closeBufferFile();
+ mpEntry->setFinished();
}
catch (const uno::Exception&)
{
@@ -824,6 +826,13 @@ bool ZipPackageStream::saveChild(
if (bParallelDeflate)
{
+ // tdf#93553 limit to a useful amount of threads. Taking number of available
+ // cores and allow 4-times the amount for having the queue well filled. The
+ // 2nd pparameter is the time to wait beweeen cleanups in 10th of a second.
+ // Both values may be added to the configuration settings if needed.
+ static sal_Int32 nAllowedThreads(std::max(std::thread::hardware_concurrency(), 1U) * 4);
+ rZipOut.reduceScheduledThreadsToGivenNumberOrLess(nAllowedThreads, 1);
+
// Start a new thread deflating this zip entry
ZipOutputEntry *pZipEntry = new ZipOutputEntry(
m_xContext, *pTempEntry, this, bToBeEncrypted);