summaryrefslogtreecommitdiff
path: root/package
diff options
context:
space:
mode:
Diffstat (limited to 'package')
-rw-r--r--package/inc/ZipOutputEntry.hxx10
-rw-r--r--package/inc/ZipOutputStream.hxx10
-rw-r--r--package/source/zipapi/ZipOutputEntry.cxx6
-rw-r--r--package/source/zipapi/ZipOutputStream.cxx100
-rw-r--r--package/source/zippackage/ZipPackageStream.cxx9
5 files changed, 106 insertions, 29 deletions
diff --git a/package/inc/ZipOutputEntry.hxx b/package/inc/ZipOutputEntry.hxx
index 450cc39b9bc8..cac0c2ffa584 100644
--- a/package/inc/ZipOutputEntry.hxx
+++ b/package/inc/ZipOutputEntry.hxx
@@ -28,6 +28,7 @@
#include <package/Deflater.hxx>
#include <CRC32.hxx>
+#include <atomic>
struct ZipEntry;
class ZipPackageBuffer;
@@ -35,6 +36,9 @@ class ZipPackageStream;
class ZipOutputEntry
{
+ // allow only DeflateThread to change m_bFinished using setFinished()
+ friend class DeflateThread;
+
css::uno::Sequence< sal_Int8 > m_aDeflateBuffer;
ZipUtils::Deflater m_aDeflater;
css::uno::Reference< css::uno::XComponentContext > m_xContext;
@@ -48,8 +52,9 @@ class ZipOutputEntry
CRC32 m_aCRC;
ZipEntry *m_pCurrentEntry;
sal_Int16 m_nDigested;
- bool m_bEncryptCurrentEntry;
ZipPackageStream* m_pCurrentStream;
+ bool m_bEncryptCurrentEntry;
+ std::atomic<bool> m_bFinished;
public:
ZipOutputEntry(
@@ -78,7 +83,10 @@ public:
void closeEntry();
void write(const css::uno::Sequence< sal_Int8 >& rBuffer);
+ bool isFinished() const { return m_bFinished; }
+
private:
+ void setFinished() { m_bFinished = true; }
void doDeflate();
};
diff --git a/package/inc/ZipOutputStream.hxx b/package/inc/ZipOutputStream.hxx
index 16740095bd92..f995469b29e6 100644
--- a/package/inc/ZipOutputStream.hxx
+++ b/package/inc/ZipOutputStream.hxx
@@ -69,6 +69,16 @@ private:
throw(css::io::IOException, css::uno::RuntimeException);
void writeEXT( const ZipEntry &rEntry )
throw(css::io::IOException, css::uno::RuntimeException);
+
+ // ScheduledThread handling helpers
+ void consumeScheduledThreadEntry(ZipOutputEntry* pCandidate);
+ void consumeFinishedScheduledThreadEntries();
+ void consumeAllScheduledThreadEntries();
+
+public:
+ void reduceScheduledThreadsToGivenNumberOrLess(
+ sal_Int32 nThreads,
+ sal_Int32 nWaitTimeInTenthSeconds);
};
#endif
diff --git a/package/source/zipapi/ZipOutputEntry.cxx b/package/source/zipapi/ZipOutputEntry.cxx
index 47ecbb4622af..6f983d372bfd 100644
--- a/package/source/zipapi/ZipOutputEntry.cxx
+++ b/package/source/zipapi/ZipOutputEntry.cxx
@@ -55,8 +55,9 @@ ZipOutputEntry::ZipOutputEntry(
, m_xOutStream(rxOutput)
, m_pCurrentEntry(&rEntry)
, m_nDigested(0)
-, m_bEncryptCurrentEntry(bEncrypt)
, m_pCurrentStream(pStream)
+, m_bEncryptCurrentEntry(bEncrypt)
+, m_bFinished(false)
{
assert(m_pCurrentEntry->nMethod == DEFLATED && "Use ZipPackageStream::rawWrite() for STORED entries");
assert(m_xOutStream.is());
@@ -77,8 +78,9 @@ ZipOutputEntry::ZipOutputEntry(
, m_xContext(rxContext)
, m_pCurrentEntry(&rEntry)
, m_nDigested(0)
-, m_bEncryptCurrentEntry(bEncrypt)
, m_pCurrentStream(pStream)
+, m_bEncryptCurrentEntry(bEncrypt)
+, m_bFinished(false)
{
assert(m_pCurrentEntry->nMethod == DEFLATED && "Use ZipPackageStream::rawWrite() for STORED entries");
if (m_bEncryptCurrentEntry)
diff --git a/package/source/zipapi/ZipOutputStream.cxx b/package/source/zipapi/ZipOutputStream.cxx
index 7d8378261666..8cd8b9056989 100644
--- a/package/source/zipapi/ZipOutputStream.cxx
+++ b/package/source/zipapi/ZipOutputStream.cxx
@@ -93,42 +93,90 @@ void ZipOutputStream::rawCloseEntry( bool bEncrypt )
m_pCurrentEntry = nullptr;
}
-void ZipOutputStream::finish()
- throw(IOException, RuntimeException)
+void ZipOutputStream::consumeScheduledThreadEntry(ZipOutputEntry* pCandidate)
{
- assert(!m_aZipList.empty() && "Zip file must have at least one entry!");
+ //Any exceptions thrown in the threads were caught and stored for now
+ ::css::uno::Any aCaughtException(pCandidate->getParallelDeflateException());
+ if (aCaughtException.hasValue())
+ ::cppu::throwException(aCaughtException);
- // Wait for all threads to finish & write
- m_rSharedThreadPool.waitUntilEmpty();
- for (size_t i = 0; i < m_aEntries.size(); i++)
+ writeLOC(pCandidate->getZipEntry(), pCandidate->isEncrypt());
+
+ sal_Int32 nRead;
+ uno::Sequence< sal_Int8 > aSequence(n_ConstBufferSize);
+ uno::Reference< io::XInputStream > xInput = pCandidate->getData();
+ do
{
- //Any exceptions thrown in the threads were caught and stored for now
- ::css::uno::Any aCaughtException(m_aEntries[i]->getParallelDeflateException());
- if (aCaughtException.hasValue())
- ::cppu::throwException(aCaughtException);
+ nRead = xInput->readBytes(aSequence, n_ConstBufferSize);
+ if (nRead < n_ConstBufferSize)
+ aSequence.realloc(nRead);
- writeLOC(m_aEntries[i]->getZipEntry(), m_aEntries[i]->isEncrypt());
+ rawWrite(aSequence);
+ }
+ while (nRead == n_ConstBufferSize);
+ xInput.clear();
- sal_Int32 nRead;
- uno::Sequence< sal_Int8 > aSequence(n_ConstBufferSize);
- uno::Reference< io::XInputStream > xInput = m_aEntries[i]->getData();
- do
- {
- nRead = xInput->readBytes(aSequence, n_ConstBufferSize);
- if (nRead < n_ConstBufferSize)
- aSequence.realloc(nRead);
+ rawCloseEntry(pCandidate->isEncrypt());
+
+ pCandidate->getZipPackageStream()->successfullyWritten(pCandidate->getZipEntry());
+ pCandidate->deleteBufferFile();
+ delete pCandidate;
+}
- rawWrite(aSequence);
+void ZipOutputStream::consumeFinishedScheduledThreadEntries()
+{
+ std::vector< ZipOutputEntry* > aNonFinishedEntries;
+
+ for(auto aIter = m_aEntries.begin(); aIter != m_aEntries.end(); ++aIter)
+ {
+ if((*aIter)->isFinished())
+ {
+ consumeScheduledThreadEntry(*aIter);
}
- while (nRead == n_ConstBufferSize);
- xInput.clear();
+ else
+ {
+ aNonFinishedEntries.push_back(*aIter);
+ }
+ }
+
+ // always reset to non-consumed entries
+ m_aEntries = aNonFinishedEntries;
+}
- rawCloseEntry(m_aEntries[i]->isEncrypt());
+void ZipOutputStream::consumeAllScheduledThreadEntries()
+{
+ while(!m_aEntries.empty())
+ {
+ ZipOutputEntry* pCandidate = m_aEntries.back();
+ m_aEntries.pop_back();
+ consumeScheduledThreadEntry(pCandidate);
+ }
+}
+
+void ZipOutputStream::reduceScheduledThreadsToGivenNumberOrLess(sal_Int32 nThreads, sal_Int32 nWaitTimeInTenthSeconds)
+{
+ while(static_cast< sal_Int32 >(m_aEntries.size()) > nThreads)
+ {
+ consumeFinishedScheduledThreadEntries();
- m_aEntries[i]->getZipPackageStream()->successfullyWritten(m_aEntries[i]->getZipEntry());
- m_aEntries[i]->deleteBufferFile();
- delete m_aEntries[i];
+ if(static_cast< sal_Int32 >(m_aEntries.size()) > nThreads)
+ {
+ const TimeValue aTimeValue(0, 100000 * nWaitTimeInTenthSeconds);
+ osl_waitThread(&aTimeValue);
+ }
}
+}
+
+void ZipOutputStream::finish()
+ throw(IOException, RuntimeException)
+{
+ assert(!m_aZipList.empty() && "Zip file must have at least one entry!");
+
+ // Wait for all threads to finish & write
+ m_rSharedThreadPool.waitUntilEmpty();
+
+ // consume all processed entries
+ consumeAllScheduledThreadEntries();
sal_Int32 nOffset= static_cast < sal_Int32 > (m_aChucker.GetPosition());
for (size_t i = 0; i < m_aZipList.size(); i++)
diff --git a/package/source/zippackage/ZipPackageStream.cxx b/package/source/zippackage/ZipPackageStream.cxx
index 2fbcfdba19ff..a183cf651796 100644
--- a/package/source/zippackage/ZipPackageStream.cxx
+++ b/package/source/zippackage/ZipPackageStream.cxx
@@ -54,6 +54,7 @@
#include <rtl/random.h>
#include <PackageConstants.hxx>
+#include <thread>
using namespace com::sun::star::packages::zip::ZipConstants;
using namespace com::sun::star::packages::zip;
@@ -478,6 +479,7 @@ private:
deflateZipEntry(mpEntry, mxInStream);
mxInStream.clear();
mpEntry->closeBufferFile();
+ mpEntry->setFinished();
}
catch (const uno::Exception&)
{
@@ -824,6 +826,13 @@ bool ZipPackageStream::saveChild(
if (bParallelDeflate)
{
+ // tdf#93553 limit to a useful amount of threads. Taking number of available
+ // cores and allow 4-times the amount for having the queue well filled. The
+ // 2nd pparameter is the time to wait beweeen cleanups in 10th of a second.
+ // Both values may be added to the configuration settings if needed.
+ static sal_Int32 nAllowedThreads(std::max(std::thread::hardware_concurrency(), 1U) * 4);
+ rZipOut.reduceScheduledThreadsToGivenNumberOrLess(nAllowedThreads, 1);
+
// Start a new thread deflating this zip entry
ZipOutputEntry *pZipEntry = new ZipOutputEntry(
m_xContext, *pTempEntry, this, bToBeEncrypted);