summaryrefslogtreecommitdiff
path: root/package
diff options
context:
space:
mode:
Diffstat (limited to 'package')
-rw-r--r--package/Library_package2.mk1
-rw-r--r--package/inc/ThreadedDeflater.hxx62
-rw-r--r--package/inc/ZipOutputEntry.hxx68
-rw-r--r--package/source/zipapi/ThreadedDeflater.cxx181
-rw-r--r--package/source/zipapi/ZipOutputEntry.cxx195
-rw-r--r--package/source/zippackage/ZipPackageStream.cxx30
6 files changed, 477 insertions, 60 deletions
diff --git a/package/Library_package2.mk b/package/Library_package2.mk
index 75a15f0e0d08..195c87f9ff4b 100644
--- a/package/Library_package2.mk
+++ b/package/Library_package2.mk
@@ -53,6 +53,7 @@ $(eval $(call gb_Library_add_exception_objects,package2,\
package/source/zipapi/Deflater \
package/source/zipapi/Inflater \
package/source/zipapi/sha1context \
+ package/source/zipapi/ThreadedDeflater \
package/source/zipapi/XBufferedThreadedStream \
package/source/zipapi/XUnbufferedStream \
package/source/zipapi/ZipEnumeration \
diff --git a/package/inc/ThreadedDeflater.hxx b/package/inc/ThreadedDeflater.hxx
new file mode 100644
index 000000000000..90801700a37e
--- /dev/null
+++ b/package/inc/ThreadedDeflater.hxx
@@ -0,0 +1,62 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * This file incorporates work covered by the following license notice:
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding copyright
+ * ownership. The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of
+ * the License at http://www.apache.org/licenses/LICENSE-2.0 .
+ */
+
+#ifndef INCLUDED_PACKAGE_THREADEDDEFLATER_HXX
+#define INCLUDED_PACKAGE_THREADEDDEFLATER_HXX
+
+#include <com/sun/star/uno/Sequence.hxx>
+#include <package/packagedllapi.hxx>
+#include <comphelper/threadpool.hxx>
+#include <atomic>
+#include <memory>
+
+namespace ZipUtils
+{
+/// Parallel compression a stream using the libz deflate algorithm.
+///
+/// Almost a replacement for the Deflater class. Call startDeflate() with the data,
+/// check with finished() or waitForTasks() and retrieve result with getOutput().
+/// The class will internally split into multiple threads.
+class ThreadedDeflater final
+{
+ class Task;
+ // Note: All this should be lock-less. Each task writes only to its part
+ // of the data, flags are atomic.
+ css::uno::Sequence<sal_Int8> inBuffer;
+ int zlibLevel;
+ std::shared_ptr<comphelper::ThreadTaskTag> threadTaskTag;
+ std::atomic<int> pendingTasksCount;
+ std::vector<std::vector<sal_Int8>> outBuffers;
+
+public:
+ // Unlike with Deflater class, bNoWrap is always true.
+ ThreadedDeflater(sal_Int32 nSetLevel);
+ ~ThreadedDeflater();
+ void startDeflate(const css::uno::Sequence<sal_Int8>& rBuffer);
+ void waitForTasks();
+ bool finished() const;
+ css::uno::Sequence<sal_Int8> getOutput() const;
+ void clear();
+};
+
+} // namespace
+
+#endif
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/package/inc/ZipOutputEntry.hxx b/package/inc/ZipOutputEntry.hxx
index af6528f04ea1..7234d890f4c2 100644
--- a/package/inc/ZipOutputEntry.hxx
+++ b/package/inc/ZipOutputEntry.hxx
@@ -35,11 +35,9 @@ struct ZipEntry;
class ZipPackageBuffer;
class ZipPackageStream;
-class ZipOutputEntry
+class ZipOutputEntryBase
{
protected:
- css::uno::Sequence< sal_Int8 > m_aDeflateBuffer;
- ZipUtils::Deflater m_aDeflater;
css::uno::Reference< css::uno::XComponentContext > m_xContext;
css::uno::Reference< css::io::XOutputStream > m_xOutStream;
@@ -53,10 +51,9 @@ protected:
bool const m_bEncryptCurrentEntry;
public:
- ZipOutputEntry(
- const css::uno::Reference< css::io::XOutputStream >& rxOutStream,
- const css::uno::Reference< css::uno::XComponentContext >& rxContext,
- ZipEntry& rEntry, ZipPackageStream* pStream, bool bEncrypt);
+ virtual ~ZipOutputEntryBase() = default;
+
+ virtual void writeStream(const css::uno::Reference< css::io::XInputStream >& xInStream) = 0;
ZipEntry* getZipEntry() { return m_pCurrentEntry; }
ZipPackageStream* getZipPackageStream() { return m_pCurrentStream; }
@@ -64,7 +61,36 @@ public:
void closeEntry();
- void writeStream(const css::uno::Reference< css::io::XInputStream >& xInStream);
+protected:
+ ZipOutputEntryBase(
+ const css::uno::Reference< css::io::XOutputStream >& rxOutStream,
+ const css::uno::Reference< css::uno::XComponentContext >& rxContext,
+ ZipEntry& rEntry, ZipPackageStream* pStream, bool bEncrypt, bool checkStream);
+
+ // Inherited classes call this with deflated data buffer.
+ void processDeflated( const css::uno::Sequence< sal_Int8 >& deflateBuffer, sal_Int32 nLength );
+ // Inherited classes call this with the input buffer.
+ void processInput( const css::uno::Sequence< sal_Int8 >& rBuffer );
+
+ virtual void finishDeflater() = 0;
+ virtual sal_Int64 getDeflaterTotalIn() const = 0;
+ virtual sal_Int64 getDeflaterTotalOut() const = 0;
+ virtual void deflaterReset() = 0;
+ virtual bool isDeflaterFinished() const = 0;
+};
+
+// Normal non-threaded case.
+class ZipOutputEntry : public ZipOutputEntryBase
+{
+ css::uno::Sequence< sal_Int8 > m_aDeflateBuffer;
+ ZipUtils::Deflater m_aDeflater;
+
+public:
+ ZipOutputEntry(
+ const css::uno::Reference< css::io::XOutputStream >& rxOutStream,
+ const css::uno::Reference< css::uno::XComponentContext >& rxContext,
+ ZipEntry& rEntry, ZipPackageStream* pStream, bool bEncrypt);
+ void writeStream(const css::uno::Reference< css::io::XInputStream >& xInStream) override;
void write(const css::uno::Sequence< sal_Int8 >& rBuffer);
protected:
@@ -72,10 +98,15 @@ protected:
const css::uno::Reference< css::io::XOutputStream >& rxOutStream,
const css::uno::Reference< css::uno::XComponentContext >& rxContext,
ZipEntry& rEntry, ZipPackageStream* pStream, bool bEncrypt, bool checkStream);
+ virtual void finishDeflater() override;
+ virtual sal_Int64 getDeflaterTotalIn() const override;
+ virtual sal_Int64 getDeflaterTotalOut() const override;
+ virtual void deflaterReset() override;
+ virtual bool isDeflaterFinished() const override;
void doDeflate();
};
-// Class that runs the compression in a thread.
+// Class that runs the compression in a background thread.
class ZipOutputEntryInThread : public ZipOutputEntry
{
class Task;
@@ -103,6 +134,25 @@ private:
void setFinished() { m_bFinished = true; }
};
+// Class that synchronously runs the compression in multiple threads (using ThreadDeflater).
+class ZipOutputEntryParallel : public ZipOutputEntryBase
+{
+ sal_Int64 totalIn;
+ sal_Int64 totalOut;
+public:
+ ZipOutputEntryParallel(
+ const css::uno::Reference< css::io::XOutputStream >& rxOutStream,
+ const css::uno::Reference< css::uno::XComponentContext >& rxContext,
+ ZipEntry& rEntry, ZipPackageStream* pStream, bool bEncrypt);
+ void writeStream(const css::uno::Reference< css::io::XInputStream >& xInStream) override;
+protected:
+ virtual void finishDeflater() override;
+ virtual sal_Int64 getDeflaterTotalIn() const override;
+ virtual sal_Int64 getDeflaterTotalOut() const override;
+ virtual void deflaterReset() override;
+ virtual bool isDeflaterFinished() const override;
+};
+
#endif
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/package/source/zipapi/ThreadedDeflater.cxx b/package/source/zipapi/ThreadedDeflater.cxx
new file mode 100644
index 000000000000..b136981b3bdb
--- /dev/null
+++ b/package/source/zipapi/ThreadedDeflater.cxx
@@ -0,0 +1,181 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * This file incorporates work covered by the following license notice:
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding copyright
+ * ownership. The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of
+ * the License at http://www.apache.org/licenses/LICENSE-2.0 .
+ */
+
+#include <ThreadedDeflater.hxx>
+#include <zlib.h>
+#include <com/sun/star/packages/zip/ZipConstants.hpp>
+#include <sal/log.hxx>
+
+using namespace com::sun::star::packages::zip::ZipConstants;
+using namespace com::sun::star;
+
+namespace ZipUtils
+{
+const sal_Int64 MaxBlockSize = 128 * 1024;
+
+// Parallel ZLIB compression using threads. The class internally splits the data into
+// blocks and spawns ThreadPool tasks to process them independently. This is achieved
+// in a similar way how pigz works, see comments from Mark Adler at
+// https://stackoverflow.com/questions/30294766/how-to-use-multiple-threads-for-zlib-compression
+// and
+// https://stackoverflow.com/questions/30794053/how-to-use-multiple-threads-for-zlib-compression-same-input-source
+
+// Everything here should be either read-only, or writing to distinct data, or atomic.
+
+class ThreadedDeflater::Task : public comphelper::ThreadTask
+{
+ z_stream stream;
+ ThreadedDeflater* deflater;
+ int sequence;
+ int blockSize;
+
+public:
+ Task(ThreadedDeflater* deflater_, int sequence_, int blockSize_)
+ : comphelper::ThreadTask(deflater_->threadTaskTag)
+ , deflater(deflater_)
+ , sequence(sequence_)
+ , blockSize(blockSize_)
+ {
+ }
+
+private:
+ virtual void doWork() override;
+};
+
+ThreadedDeflater::ThreadedDeflater(sal_Int32 nSetLevel)
+ : zlibLevel(nSetLevel)
+ , threadTaskTag(comphelper::ThreadPool::createThreadTaskTag())
+ , pendingTasksCount(0)
+{
+}
+
+ThreadedDeflater::~ThreadedDeflater()
+{
+ waitForTasks();
+ clear();
+}
+
+void ThreadedDeflater::startDeflate(const uno::Sequence<sal_Int8>& rBuffer)
+{
+ inBuffer = rBuffer;
+ sal_Int64 size = inBuffer.getLength();
+ int tasksCount = (size + MaxBlockSize - 1) / MaxBlockSize;
+ tasksCount = std::max(tasksCount, 1);
+ pendingTasksCount = tasksCount;
+ outBuffers.resize(pendingTasksCount);
+ for (int sequence = 0; sequence < tasksCount; ++sequence)
+ {
+ sal_Int64 thisSize = std::min(MaxBlockSize, size);
+ size -= thisSize;
+ comphelper::ThreadPool::getSharedOptimalPool().pushTask(
+ std::make_unique<Task>(this, sequence, thisSize));
+ }
+ assert(size == 0);
+}
+
+bool ThreadedDeflater::finished() const { return pendingTasksCount == 0; }
+
+css::uno::Sequence<sal_Int8> ThreadedDeflater::getOutput() const
+{
+ assert(finished());
+ sal_Int64 totalSize = 0;
+ for (const auto& buffer : outBuffers)
+ totalSize += buffer.size();
+ uno::Sequence<sal_Int8> outBuffer(totalSize);
+ auto pos = outBuffer.begin();
+ for (const auto& buffer : outBuffers)
+ pos = std::copy(buffer.begin(), buffer.end(), pos);
+ return outBuffer;
+}
+
+void ThreadedDeflater::waitForTasks()
+{
+ comphelper::ThreadPool::getSharedOptimalPool().waitUntilDone(threadTaskTag);
+}
+
+void ThreadedDeflater::clear()
+{
+ assert(finished());
+ inBuffer = uno::Sequence<sal_Int8>();
+ outBuffers.clear();
+}
+
+#if defined Z_PREFIX
+#define deflateInit2 z_deflateInit2
+#define deflateBound z_deflateBound
+#define deflateSetDictionary z_deflateSetDictionary
+#define deflate z_deflate
+#define deflateEnd z_deflateEnd
+#endif
+
+void ThreadedDeflater::Task::doWork()
+{
+ stream.zalloc = nullptr;
+ stream.zfree = nullptr;
+ stream.opaque = nullptr;
+ // -MAX_WBITS means 32k window size and raw stream
+ if (deflateInit2(&stream, deflater->zlibLevel, Z_DEFLATED, -MAX_WBITS, DEF_MEM_LEVEL,
+ Z_DEFAULT_STRATEGY)
+ != Z_OK)
+ {
+ SAL_WARN("package.threadeddeflate", "deflateInit2() failed");
+ abort();
+ }
+ // Find out size for our output buffer to be large enough for deflate() needing to be called just once.
+ sal_Int64 outputMaxSize = deflateBound(&stream, blockSize);
+ // add extra size for Z_SYNC_FLUSH
+ outputMaxSize += 20;
+ deflater->outBuffers[sequence].resize(outputMaxSize);
+ sal_Int64 myInBufferStart = sequence * MaxBlockSize;
+ // zlib doesn't handle const properly
+ unsigned char* inBufferPtr = reinterpret_cast<unsigned char*>(
+ const_cast<signed char*>(deflater->inBuffer.getConstArray()));
+ if (sequence != 0)
+ {
+ // the window size is 32k, so set last 32k of previous data as the dictionary
+ assert(MAX_WBITS == 15);
+ assert(MaxBlockSize >= 32768);
+ deflateSetDictionary(&stream, inBufferPtr + myInBufferStart - 32768, 32768);
+ }
+ stream.next_in = inBufferPtr + myInBufferStart;
+ stream.avail_in = blockSize;
+ stream.next_out = reinterpret_cast<unsigned char*>(deflater->outBuffers[sequence].data());
+ stream.avail_out = outputMaxSize;
+ bool last = sequence == int(deflater->outBuffers.size() - 1); // Last block?
+ // The trick is in using Z_SYNC_FLUSH instead of Z_NO_FLUSH. It will align the data at a byte boundary,
+ // and since we use a raw stream, the data blocks then can be simply concatenated.
+ int res = deflate(&stream, last ? Z_FINISH : Z_SYNC_FLUSH);
+ assert(stream.avail_in == 0); // Check that everything has been deflated.
+ if (last ? res == Z_STREAM_END : res == Z_OK)
+ { // ok
+ sal_Int64 outSize = outputMaxSize - stream.avail_out;
+ deflater->outBuffers[sequence].resize(outSize);
+ --deflater->pendingTasksCount;
+ }
+ else
+ {
+ SAL_WARN("package.threadeddeflate", "deflate() failed");
+ abort();
+ }
+ deflateEnd(&stream);
+}
+
+} // namespace
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/package/source/zipapi/ZipOutputEntry.cxx b/package/source/zipapi/ZipOutputEntry.cxx
index 74281fd063dd..bee9d0aeb70c 100644
--- a/package/source/zipapi/ZipOutputEntry.cxx
+++ b/package/source/zipapi/ZipOutputEntry.cxx
@@ -27,6 +27,7 @@
#include <osl/diagnose.h>
#include <PackageConstants.hxx>
+#include <ThreadedDeflater.hxx>
#include <ZipEntry.hxx>
#include <ZipFile.hxx>
#include <ZipPackageBuffer.hxx>
@@ -41,16 +42,14 @@ using namespace com::sun::star::packages::zip::ZipConstants;
/** This class is used to deflate Zip entries
*/
-ZipOutputEntry::ZipOutputEntry(
+ZipOutputEntryBase::ZipOutputEntryBase(
const css::uno::Reference< css::io::XOutputStream >& rxOutput,
const uno::Reference< uno::XComponentContext >& rxContext,
ZipEntry& rEntry,
ZipPackageStream* pStream,
bool bEncrypt,
bool checkStream)
-: m_aDeflateBuffer(n_ConstBufferSize)
-, m_aDeflater(DEFAULT_COMPRESSION, true)
-, m_xContext(rxContext)
+: m_xContext(rxContext)
, m_xOutStream(rxOutput)
, m_pCurrentEntry(&rEntry)
, m_nDigested(0)
@@ -67,33 +66,21 @@ ZipOutputEntry::ZipOutputEntry(
}
}
-ZipOutputEntry::ZipOutputEntry(
- const css::uno::Reference< css::io::XOutputStream >& rxOutput,
- const uno::Reference< uno::XComponentContext >& rxContext,
- ZipEntry& rEntry,
- ZipPackageStream* pStream,
- bool bEncrypt)
-: ZipOutputEntry( rxOutput, rxContext, rEntry, pStream, bEncrypt, true)
-{
-}
-
-void ZipOutputEntry::closeEntry()
+void ZipOutputEntryBase::closeEntry()
{
- m_aDeflater.finish();
- while (!m_aDeflater.finished())
- doDeflate();
+ finishDeflater();
if ((m_pCurrentEntry->nFlag & 8) == 0)
{
- if (m_pCurrentEntry->nSize != m_aDeflater.getTotalIn())
+ if (m_pCurrentEntry->nSize != getDeflaterTotalIn())
{
OSL_FAIL("Invalid entry size");
}
- if (m_pCurrentEntry->nCompressedSize != m_aDeflater.getTotalOut())
+ if (m_pCurrentEntry->nCompressedSize != getDeflaterTotalOut())
{
// Different compression strategies make the merit of this
// test somewhat dubious
- m_pCurrentEntry->nCompressedSize = m_aDeflater.getTotalOut();
+ m_pCurrentEntry->nCompressedSize = getDeflaterTotalOut();
}
if (m_pCurrentEntry->nCrc != m_aCRC.getValue())
{
@@ -104,12 +91,12 @@ void ZipOutputEntry::closeEntry()
{
if ( !m_bEncryptCurrentEntry )
{
- m_pCurrentEntry->nSize = m_aDeflater.getTotalIn();
- m_pCurrentEntry->nCompressedSize = m_aDeflater.getTotalOut();
+ m_pCurrentEntry->nSize = getDeflaterTotalIn();
+ m_pCurrentEntry->nCompressedSize = getDeflaterTotalOut();
}
m_pCurrentEntry->nCrc = m_aCRC.getValue();
}
- m_aDeflater.reset();
+ deflaterReset();
m_aCRC.reset();
if (m_bEncryptCurrentEntry)
@@ -128,25 +115,11 @@ void ZipOutputEntry::closeEntry()
}
}
-void ZipOutputEntry::write( const Sequence< sal_Int8 >& rBuffer )
+void ZipOutputEntryBase::processDeflated( const uno::Sequence< sal_Int8 >& deflateBuffer, sal_Int32 nLength )
{
- if (!m_aDeflater.finished())
- {
- m_aDeflater.setInputSegment(rBuffer);
- while (!m_aDeflater.needsInput())
- doDeflate();
- if (!m_bEncryptCurrentEntry)
- m_aCRC.updateSegment(rBuffer, rBuffer.getLength());
- }
-}
-
-void ZipOutputEntry::doDeflate()
-{
- sal_Int32 nLength = m_aDeflater.doDeflateSegment(m_aDeflateBuffer, m_aDeflateBuffer.getLength());
-
if ( nLength > 0 )
{
- uno::Sequence< sal_Int8 > aTmpBuffer( m_aDeflateBuffer.getConstArray(), nLength );
+ uno::Sequence< sal_Int8 > aTmpBuffer( deflateBuffer.getConstArray(), nLength );
if ( m_bEncryptCurrentEntry && m_xDigestContext.is() && m_xCipherContext.is() )
{
// Need to update our digest before encryption...
@@ -175,7 +148,7 @@ void ZipOutputEntry::doDeflate()
}
}
- if ( m_aDeflater.finished() && m_bEncryptCurrentEntry && m_xDigestContext.is() && m_xCipherContext.is() )
+ if ( isDeflaterFinished() && m_bEncryptCurrentEntry && m_xDigestContext.is() && m_xCipherContext.is() )
{
// FIXME64: sequence not 64bit safe.
uno::Sequence< sal_Int8 > aEncryptionBuffer = m_xCipherContext->finalizeCipherContextAndDispose();
@@ -191,6 +164,80 @@ void ZipOutputEntry::doDeflate()
}
}
+void ZipOutputEntryBase::processInput( const uno::Sequence< sal_Int8 >& rBuffer )
+{
+ if (!m_bEncryptCurrentEntry)
+ m_aCRC.updateSegment(rBuffer, rBuffer.getLength());
+}
+
+ZipOutputEntry::ZipOutputEntry(
+ const css::uno::Reference< css::io::XOutputStream >& rxOutput,
+ const uno::Reference< uno::XComponentContext >& rxContext,
+ ZipEntry& rEntry,
+ ZipPackageStream* pStream,
+ bool bEncrypt,
+ bool checkStream)
+: ZipOutputEntryBase(rxOutput, rxContext, rEntry, pStream, bEncrypt, checkStream)
+, m_aDeflateBuffer(n_ConstBufferSize)
+, m_aDeflater(DEFAULT_COMPRESSION, true)
+{
+}
+
+ZipOutputEntry::ZipOutputEntry(
+ const css::uno::Reference< css::io::XOutputStream >& rxOutput,
+ const uno::Reference< uno::XComponentContext >& rxContext,
+ ZipEntry& rEntry,
+ ZipPackageStream* pStream,
+ bool bEncrypt)
+: ZipOutputEntry( rxOutput, rxContext, rEntry, pStream, bEncrypt, true)
+{
+}
+
+void ZipOutputEntry::write( const Sequence< sal_Int8 >& rBuffer )
+{
+ if (!m_aDeflater.finished())
+ {
+ m_aDeflater.setInputSegment(rBuffer);
+ while (!m_aDeflater.needsInput())
+ doDeflate();
+ processInput(rBuffer);
+ }
+}
+
+void ZipOutputEntry::doDeflate()
+{
+ sal_Int32 nLength = m_aDeflater.doDeflateSegment(m_aDeflateBuffer, m_aDeflateBuffer.getLength());
+ processDeflated( m_aDeflateBuffer, nLength );
+}
+
+void ZipOutputEntry::finishDeflater()
+{
+ m_aDeflater.finish();
+ while (!m_aDeflater.finished())
+ doDeflate();
+}
+
+sal_Int64 ZipOutputEntry::getDeflaterTotalIn() const
+{
+ return m_aDeflater.getTotalIn();
+}
+
+sal_Int64 ZipOutputEntry::getDeflaterTotalOut() const
+{
+ return m_aDeflater.getTotalOut();
+}
+
+void ZipOutputEntry::deflaterReset()
+{
+ m_aDeflater.reset();
+}
+
+bool ZipOutputEntry::isDeflaterFinished() const
+{
+ return m_aDeflater.finished();
+}
+
+
ZipOutputEntryInThread::ZipOutputEntryInThread(
const uno::Reference< uno::XComponentContext >& rxContext,
ZipEntry& rEntry,
@@ -301,4 +348,70 @@ void ZipOutputEntry::writeStream(const uno::Reference< io::XInputStream >& xInSt
closeEntry();
}
+
+ZipOutputEntryParallel::ZipOutputEntryParallel(
+ const css::uno::Reference< css::io::XOutputStream >& rxOutput,
+ const uno::Reference< uno::XComponentContext >& rxContext,
+ ZipEntry& rEntry,
+ ZipPackageStream* pStream,
+ bool bEncrypt)
+: ZipOutputEntryBase(rxOutput, rxContext, rEntry, pStream, bEncrypt, true)
+, totalIn(0)
+, totalOut(0)
+{
+}
+
+void ZipOutputEntryParallel::writeStream(const uno::Reference< io::XInputStream >& xInStream)
+{
+ sal_Int64 toRead = xInStream->available();
+ uno::Sequence< sal_Int8 > inBuffer( toRead );
+ sal_Int64 read = xInStream->readBytes(inBuffer, toRead);
+ if (read < toRead)
+ inBuffer.realloc( read );
+ while( xInStream->available() > 0 )
+ { // We didn't get the full size from available().
+ uno::Sequence< sal_Int8 > buf( xInStream->available());
+ read = xInStream->readBytes( buf, xInStream->available());
+ sal_Int64 oldSize = inBuffer.getLength();
+ inBuffer.realloc( oldSize + read );
+ std::copy( buf.begin(), buf.end(), inBuffer.begin() + oldSize );
+ }
+ ZipUtils::ThreadedDeflater deflater( DEFAULT_COMPRESSION );
+ totalIn = inBuffer.getLength();
+ deflater.startDeflate( inBuffer );
+ processInput( inBuffer );
+ deflater.waitForTasks();
+ uno::Sequence< sal_Int8 > outBuffer = deflater.getOutput();
+ deflater.clear(); // release memory
+ totalOut = outBuffer.getLength();
+ processDeflated(outBuffer, outBuffer.getLength());
+ closeEntry();
+}
+
+void ZipOutputEntryParallel::finishDeflater()
+{
+ // ThreadedDeflater is called synchronously in one call, so nothing to do here.
+}
+
+sal_Int64 ZipOutputEntryParallel::getDeflaterTotalIn() const
+{
+ return totalIn;
+}
+
+sal_Int64 ZipOutputEntryParallel::getDeflaterTotalOut() const
+{
+ return totalOut;
+}
+
+void ZipOutputEntryParallel::deflaterReset()
+{
+ totalIn = 0;
+ totalOut = 0;
+}
+
+bool ZipOutputEntryParallel::isDeflaterFinished() const
+{
+ return true;
+}
+
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/package/source/zippackage/ZipPackageStream.cxx b/package/source/zippackage/ZipPackageStream.cxx
index e795776ab065..4692372c8fe5 100644
--- a/package/source/zippackage/ZipPackageStream.cxx
+++ b/package/source/zippackage/ZipPackageStream.cxx
@@ -35,6 +35,7 @@
#include <string.h>
#include <CRC32.hxx>
+#include <ThreadedDeflater.hxx>
#include <ZipOutputEntry.hxx>
#include <ZipOutputStream.hxx>
#include <ZipPackage.hxx>
@@ -496,7 +497,7 @@ bool ZipPackageStream::saveChild(
else if ( m_nStreamMode == PACKAGE_STREAM_RAW )
m_bRawStream = true;
- bool bParallelDeflate = false;
+ bool bBackgroundThreadDeflate = false;
bool bTransportOwnEncrStreamAsRaw = false;
// During the storing the original size of the stream can be changed
// TODO/LATER: get rid of this hack
@@ -758,17 +759,25 @@ bool ZipPackageStream::saveChild(
}
else
{
- // tdf#89236 Encrypting in parallel does not work
- bParallelDeflate = !bToBeEncrypted;
- // Do not deflate small streams in a thread. XSeekable's getLength()
+ // tdf#89236 Encrypting in a background thread does not work
+ bBackgroundThreadDeflate = !bToBeEncrypted;
+ // Do not deflate small streams using threads. XSeekable's getLength()
// gives the full size, XInputStream's available() may not be
// the full size, but it appears that at this point it usually is.
- if (xSeek.is() && xSeek->getLength() < 100000)
- bParallelDeflate = false;
- else if (xStream->available() < 100000)
- bParallelDeflate = false;
+ sal_Int64 estimatedSize = xSeek.is() ? xSeek->getLength() : xStream->available();
- if (bParallelDeflate)
+ if (estimatedSize > 1000000)
+ {
+ // Use ThreadDeflater which will split the stream into blocks and compress
+ // them in threads, but not in background (i.e. writeStream() will block).
+ // This is suitable for large data.
+ bBackgroundThreadDeflate = false;
+ rZipOut.writeLOC(pTempEntry, bToBeEncrypted);
+ ZipOutputEntryParallel aZipEntry(rZipOut.getStream(), m_xContext, *pTempEntry, this, bToBeEncrypted);
+ aZipEntry.writeStream(xStream);
+ rZipOut.rawCloseEntry(bToBeEncrypted);
+ }
+ else if (bBackgroundThreadDeflate && estimatedSize > 100000)
{
// tdf#93553 limit to a useful amount of pending tasks. Having way too many
// tasks pending may use a lot of memory. Take number of available
@@ -786,6 +795,7 @@ bool ZipPackageStream::saveChild(
}
else
{
+ bBackgroundThreadDeflate = false;
rZipOut.writeLOC(pTempEntry, bToBeEncrypted);
ZipOutputEntry aZipEntry(rZipOut.getStream(), m_xContext, *pTempEntry, this, bToBeEncrypted);
aZipEntry.writeStream(xStream);
@@ -823,7 +833,7 @@ bool ZipPackageStream::saveChild(
}
}
- if (bSuccess && !bParallelDeflate)
+ if (bSuccess && !bBackgroundThreadDeflate)
successfullyWritten(pTempEntry);
if ( aPropSet.hasElements()