summaryrefslogtreecommitdiff
path: root/package/source
diff options
context:
space:
mode:
authorLuboš Luňák <l.lunak@collabora.com>2019-05-25 12:28:27 +0200
committerLuboš Luňák <l.lunak@collabora.com>2019-05-28 12:28:01 +0200
commitaa44e10942937452930255be156c1b29247ee969 (patch)
treedb5258343c82c434d1026870285ae9f21727f915 /package/source
parent7cd3f267cfbf3655f6a7a395b80560ecd22e15f7 (diff)
parallel deflate compression (one stream, multiple threads)
ZipPackageStream::saveChild() already had one threaded compression, but that still uses only one thread for one stream. Many documents contain many streams (where this is useful), but large documents often contain one huge content.xml, which then would be compressed using just one thread. But it is in fact possible to do deflate in parallel on the same data, at the cost of somewhat increased CPU usage (spread over threads). This is handled separately from the background thread path, as integrating these two approaches would probably be needlessly complex (since they both internally use ThreadPool, the tasks should often intermix and parallelize anyway). On my 4-core (8 HT threads) machine this reduces the compression time of tdf#113042 from 3s to 1s. Change-Id: Ifbc889a27966f97eb1ce2ce01c5fb0b151a1bdf8 Reviewed-on: https://gerrit.libreoffice.org/73032 Tested-by: Jenkins Reviewed-by: Luboš Luňák <l.lunak@collabora.com>
Diffstat (limited to 'package/source')
-rw-r--r--package/source/zipapi/ThreadedDeflater.cxx181
-rw-r--r--package/source/zipapi/ZipOutputEntry.cxx195
-rw-r--r--package/source/zippackage/ZipPackageStream.cxx30
3 files changed, 355 insertions, 51 deletions
diff --git a/package/source/zipapi/ThreadedDeflater.cxx b/package/source/zipapi/ThreadedDeflater.cxx
new file mode 100644
index 000000000000..b136981b3bdb
--- /dev/null
+++ b/package/source/zipapi/ThreadedDeflater.cxx
@@ -0,0 +1,181 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * This file incorporates work covered by the following license notice:
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding copyright
+ * ownership. The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of
+ * the License at http://www.apache.org/licenses/LICENSE-2.0 .
+ */
+
+#include <ThreadedDeflater.hxx>
+#include <zlib.h>
+#include <com/sun/star/packages/zip/ZipConstants.hpp>
+#include <sal/log.hxx>
+
+using namespace com::sun::star::packages::zip::ZipConstants;
+using namespace com::sun::star;
+
+namespace ZipUtils
+{
+const sal_Int64 MaxBlockSize = 128 * 1024;
+
+// Parallel ZLIB compression using threads. The class internally splits the data into
+// blocks and spawns ThreadPool tasks to process them independently. This is achieved
+// in a similar way how pigz works, see comments from Mark Adler at
+// https://stackoverflow.com/questions/30294766/how-to-use-multiple-threads-for-zlib-compression
+// and
+// https://stackoverflow.com/questions/30794053/how-to-use-multiple-threads-for-zlib-compression-same-input-source
+
+// Everything here should be either read-only, or writing to distinct data, or atomic.
+
+class ThreadedDeflater::Task : public comphelper::ThreadTask
+{
+ z_stream stream;
+ ThreadedDeflater* deflater;
+ int sequence;
+ int blockSize;
+
+public:
+ Task(ThreadedDeflater* deflater_, int sequence_, int blockSize_)
+ : comphelper::ThreadTask(deflater_->threadTaskTag)
+ , deflater(deflater_)
+ , sequence(sequence_)
+ , blockSize(blockSize_)
+ {
+ }
+
+private:
+ virtual void doWork() override;
+};
+
+ThreadedDeflater::ThreadedDeflater(sal_Int32 nSetLevel)
+ : zlibLevel(nSetLevel)
+ , threadTaskTag(comphelper::ThreadPool::createThreadTaskTag())
+ , pendingTasksCount(0)
+{
+}
+
+ThreadedDeflater::~ThreadedDeflater()
+{
+ waitForTasks();
+ clear();
+}
+
+void ThreadedDeflater::startDeflate(const uno::Sequence<sal_Int8>& rBuffer)
+{
+ inBuffer = rBuffer;
+ sal_Int64 size = inBuffer.getLength();
+ int tasksCount = (size + MaxBlockSize - 1) / MaxBlockSize;
+ tasksCount = std::max(tasksCount, 1);
+ pendingTasksCount = tasksCount;
+ outBuffers.resize(pendingTasksCount);
+ for (int sequence = 0; sequence < tasksCount; ++sequence)
+ {
+ sal_Int64 thisSize = std::min(MaxBlockSize, size);
+ size -= thisSize;
+ comphelper::ThreadPool::getSharedOptimalPool().pushTask(
+ std::make_unique<Task>(this, sequence, thisSize));
+ }
+ assert(size == 0);
+}
+
+bool ThreadedDeflater::finished() const { return pendingTasksCount == 0; }
+
+css::uno::Sequence<sal_Int8> ThreadedDeflater::getOutput() const
+{
+ assert(finished());
+ sal_Int64 totalSize = 0;
+ for (const auto& buffer : outBuffers)
+ totalSize += buffer.size();
+ uno::Sequence<sal_Int8> outBuffer(totalSize);
+ auto pos = outBuffer.begin();
+ for (const auto& buffer : outBuffers)
+ pos = std::copy(buffer.begin(), buffer.end(), pos);
+ return outBuffer;
+}
+
+void ThreadedDeflater::waitForTasks()
+{
+ comphelper::ThreadPool::getSharedOptimalPool().waitUntilDone(threadTaskTag);
+}
+
+void ThreadedDeflater::clear()
+{
+ assert(finished());
+ inBuffer = uno::Sequence<sal_Int8>();
+ outBuffers.clear();
+}
+
+#if defined Z_PREFIX
+#define deflateInit2 z_deflateInit2
+#define deflateBound z_deflateBound
+#define deflateSetDictionary z_deflateSetDictionary
+#define deflate z_deflate
+#define deflateEnd z_deflateEnd
+#endif
+
+void ThreadedDeflater::Task::doWork()
+{
+ stream.zalloc = nullptr;
+ stream.zfree = nullptr;
+ stream.opaque = nullptr;
+ // -MAX_WBITS means 32k window size and raw stream
+ if (deflateInit2(&stream, deflater->zlibLevel, Z_DEFLATED, -MAX_WBITS, DEF_MEM_LEVEL,
+ Z_DEFAULT_STRATEGY)
+ != Z_OK)
+ {
+ SAL_WARN("package.threadeddeflate", "deflateInit2() failed");
+ abort();
+ }
+ // Find out size for our output buffer to be large enough for deflate() needing to be called just once.
+ sal_Int64 outputMaxSize = deflateBound(&stream, blockSize);
+ // add extra size for Z_SYNC_FLUSH
+ outputMaxSize += 20;
+ deflater->outBuffers[sequence].resize(outputMaxSize);
+ sal_Int64 myInBufferStart = sequence * MaxBlockSize;
+ // zlib doesn't handle const properly
+ unsigned char* inBufferPtr = reinterpret_cast<unsigned char*>(
+ const_cast<signed char*>(deflater->inBuffer.getConstArray()));
+ if (sequence != 0)
+ {
+ // the window size is 32k, so set last 32k of previous data as the dictionary
+ assert(MAX_WBITS == 15);
+ assert(MaxBlockSize >= 32768);
+ deflateSetDictionary(&stream, inBufferPtr + myInBufferStart - 32768, 32768);
+ }
+ stream.next_in = inBufferPtr + myInBufferStart;
+ stream.avail_in = blockSize;
+ stream.next_out = reinterpret_cast<unsigned char*>(deflater->outBuffers[sequence].data());
+ stream.avail_out = outputMaxSize;
+ bool last = sequence == int(deflater->outBuffers.size() - 1); // Last block?
+ // The trick is in using Z_SYNC_FLUSH instead of Z_NO_FLUSH. It will align the data at a byte boundary,
+ // and since we use a raw stream, the data blocks then can be simply concatenated.
+ int res = deflate(&stream, last ? Z_FINISH : Z_SYNC_FLUSH);
+ assert(stream.avail_in == 0); // Check that everything has been deflated.
+ if (last ? res == Z_STREAM_END : res == Z_OK)
+ { // ok
+ sal_Int64 outSize = outputMaxSize - stream.avail_out;
+ deflater->outBuffers[sequence].resize(outSize);
+ --deflater->pendingTasksCount;
+ }
+ else
+ {
+ SAL_WARN("package.threadeddeflate", "deflate() failed");
+ abort();
+ }
+ deflateEnd(&stream);
+}
+
+} // namespace
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/package/source/zipapi/ZipOutputEntry.cxx b/package/source/zipapi/ZipOutputEntry.cxx
index 74281fd063dd..bee9d0aeb70c 100644
--- a/package/source/zipapi/ZipOutputEntry.cxx
+++ b/package/source/zipapi/ZipOutputEntry.cxx
@@ -27,6 +27,7 @@
#include <osl/diagnose.h>
#include <PackageConstants.hxx>
+#include <ThreadedDeflater.hxx>
#include <ZipEntry.hxx>
#include <ZipFile.hxx>
#include <ZipPackageBuffer.hxx>
@@ -41,16 +42,14 @@ using namespace com::sun::star::packages::zip::ZipConstants;
/** This class is used to deflate Zip entries
*/
-ZipOutputEntry::ZipOutputEntry(
+ZipOutputEntryBase::ZipOutputEntryBase(
const css::uno::Reference< css::io::XOutputStream >& rxOutput,
const uno::Reference< uno::XComponentContext >& rxContext,
ZipEntry& rEntry,
ZipPackageStream* pStream,
bool bEncrypt,
bool checkStream)
-: m_aDeflateBuffer(n_ConstBufferSize)
-, m_aDeflater(DEFAULT_COMPRESSION, true)
-, m_xContext(rxContext)
+: m_xContext(rxContext)
, m_xOutStream(rxOutput)
, m_pCurrentEntry(&rEntry)
, m_nDigested(0)
@@ -67,33 +66,21 @@ ZipOutputEntry::ZipOutputEntry(
}
}
-ZipOutputEntry::ZipOutputEntry(
- const css::uno::Reference< css::io::XOutputStream >& rxOutput,
- const uno::Reference< uno::XComponentContext >& rxContext,
- ZipEntry& rEntry,
- ZipPackageStream* pStream,
- bool bEncrypt)
-: ZipOutputEntry( rxOutput, rxContext, rEntry, pStream, bEncrypt, true)
-{
-}
-
-void ZipOutputEntry::closeEntry()
+void ZipOutputEntryBase::closeEntry()
{
- m_aDeflater.finish();
- while (!m_aDeflater.finished())
- doDeflate();
+ finishDeflater();
if ((m_pCurrentEntry->nFlag & 8) == 0)
{
- if (m_pCurrentEntry->nSize != m_aDeflater.getTotalIn())
+ if (m_pCurrentEntry->nSize != getDeflaterTotalIn())
{
OSL_FAIL("Invalid entry size");
}
- if (m_pCurrentEntry->nCompressedSize != m_aDeflater.getTotalOut())
+ if (m_pCurrentEntry->nCompressedSize != getDeflaterTotalOut())
{
// Different compression strategies make the merit of this
// test somewhat dubious
- m_pCurrentEntry->nCompressedSize = m_aDeflater.getTotalOut();
+ m_pCurrentEntry->nCompressedSize = getDeflaterTotalOut();
}
if (m_pCurrentEntry->nCrc != m_aCRC.getValue())
{
@@ -104,12 +91,12 @@ void ZipOutputEntry::closeEntry()
{
if ( !m_bEncryptCurrentEntry )
{
- m_pCurrentEntry->nSize = m_aDeflater.getTotalIn();
- m_pCurrentEntry->nCompressedSize = m_aDeflater.getTotalOut();
+ m_pCurrentEntry->nSize = getDeflaterTotalIn();
+ m_pCurrentEntry->nCompressedSize = getDeflaterTotalOut();
}
m_pCurrentEntry->nCrc = m_aCRC.getValue();
}
- m_aDeflater.reset();
+ deflaterReset();
m_aCRC.reset();
if (m_bEncryptCurrentEntry)
@@ -128,25 +115,11 @@ void ZipOutputEntry::closeEntry()
}
}
-void ZipOutputEntry::write( const Sequence< sal_Int8 >& rBuffer )
+void ZipOutputEntryBase::processDeflated( const uno::Sequence< sal_Int8 >& deflateBuffer, sal_Int32 nLength )
{
- if (!m_aDeflater.finished())
- {
- m_aDeflater.setInputSegment(rBuffer);
- while (!m_aDeflater.needsInput())
- doDeflate();
- if (!m_bEncryptCurrentEntry)
- m_aCRC.updateSegment(rBuffer, rBuffer.getLength());
- }
-}
-
-void ZipOutputEntry::doDeflate()
-{
- sal_Int32 nLength = m_aDeflater.doDeflateSegment(m_aDeflateBuffer, m_aDeflateBuffer.getLength());
-
if ( nLength > 0 )
{
- uno::Sequence< sal_Int8 > aTmpBuffer( m_aDeflateBuffer.getConstArray(), nLength );
+ uno::Sequence< sal_Int8 > aTmpBuffer( deflateBuffer.getConstArray(), nLength );
if ( m_bEncryptCurrentEntry && m_xDigestContext.is() && m_xCipherContext.is() )
{
// Need to update our digest before encryption...
@@ -175,7 +148,7 @@ void ZipOutputEntry::doDeflate()
}
}
- if ( m_aDeflater.finished() && m_bEncryptCurrentEntry && m_xDigestContext.is() && m_xCipherContext.is() )
+ if ( isDeflaterFinished() && m_bEncryptCurrentEntry && m_xDigestContext.is() && m_xCipherContext.is() )
{
// FIXME64: sequence not 64bit safe.
uno::Sequence< sal_Int8 > aEncryptionBuffer = m_xCipherContext->finalizeCipherContextAndDispose();
@@ -191,6 +164,80 @@ void ZipOutputEntry::doDeflate()
}
}
+void ZipOutputEntryBase::processInput( const uno::Sequence< sal_Int8 >& rBuffer )
+{
+ if (!m_bEncryptCurrentEntry)
+ m_aCRC.updateSegment(rBuffer, rBuffer.getLength());
+}
+
+ZipOutputEntry::ZipOutputEntry(
+ const css::uno::Reference< css::io::XOutputStream >& rxOutput,
+ const uno::Reference< uno::XComponentContext >& rxContext,
+ ZipEntry& rEntry,
+ ZipPackageStream* pStream,
+ bool bEncrypt,
+ bool checkStream)
+: ZipOutputEntryBase(rxOutput, rxContext, rEntry, pStream, bEncrypt, checkStream)
+, m_aDeflateBuffer(n_ConstBufferSize)
+, m_aDeflater(DEFAULT_COMPRESSION, true)
+{
+}
+
+ZipOutputEntry::ZipOutputEntry(
+ const css::uno::Reference< css::io::XOutputStream >& rxOutput,
+ const uno::Reference< uno::XComponentContext >& rxContext,
+ ZipEntry& rEntry,
+ ZipPackageStream* pStream,
+ bool bEncrypt)
+: ZipOutputEntry( rxOutput, rxContext, rEntry, pStream, bEncrypt, true)
+{
+}
+
+void ZipOutputEntry::write( const Sequence< sal_Int8 >& rBuffer )
+{
+ if (!m_aDeflater.finished())
+ {
+ m_aDeflater.setInputSegment(rBuffer);
+ while (!m_aDeflater.needsInput())
+ doDeflate();
+ processInput(rBuffer);
+ }
+}
+
+void ZipOutputEntry::doDeflate()
+{
+ sal_Int32 nLength = m_aDeflater.doDeflateSegment(m_aDeflateBuffer, m_aDeflateBuffer.getLength());
+ processDeflated( m_aDeflateBuffer, nLength );
+}
+
+void ZipOutputEntry::finishDeflater()
+{
+ m_aDeflater.finish();
+ while (!m_aDeflater.finished())
+ doDeflate();
+}
+
+sal_Int64 ZipOutputEntry::getDeflaterTotalIn() const
+{
+ return m_aDeflater.getTotalIn();
+}
+
+sal_Int64 ZipOutputEntry::getDeflaterTotalOut() const
+{
+ return m_aDeflater.getTotalOut();
+}
+
+void ZipOutputEntry::deflaterReset()
+{
+ m_aDeflater.reset();
+}
+
+bool ZipOutputEntry::isDeflaterFinished() const
+{
+ return m_aDeflater.finished();
+}
+
+
ZipOutputEntryInThread::ZipOutputEntryInThread(
const uno::Reference< uno::XComponentContext >& rxContext,
ZipEntry& rEntry,
@@ -301,4 +348,70 @@ void ZipOutputEntry::writeStream(const uno::Reference< io::XInputStream >& xInSt
closeEntry();
}
+
+ZipOutputEntryParallel::ZipOutputEntryParallel(
+ const css::uno::Reference< css::io::XOutputStream >& rxOutput,
+ const uno::Reference< uno::XComponentContext >& rxContext,
+ ZipEntry& rEntry,
+ ZipPackageStream* pStream,
+ bool bEncrypt)
+: ZipOutputEntryBase(rxOutput, rxContext, rEntry, pStream, bEncrypt, true)
+, totalIn(0)
+, totalOut(0)
+{
+}
+
+void ZipOutputEntryParallel::writeStream(const uno::Reference< io::XInputStream >& xInStream)
+{
+ sal_Int64 toRead = xInStream->available();
+ uno::Sequence< sal_Int8 > inBuffer( toRead );
+ sal_Int64 read = xInStream->readBytes(inBuffer, toRead);
+ if (read < toRead)
+ inBuffer.realloc( read );
+ while( xInStream->available() > 0 )
+ { // We didn't get the full size from available().
+ uno::Sequence< sal_Int8 > buf( xInStream->available());
+ read = xInStream->readBytes( buf, xInStream->available());
+ sal_Int64 oldSize = inBuffer.getLength();
+ inBuffer.realloc( oldSize + read );
+ std::copy( buf.begin(), buf.end(), inBuffer.begin() + oldSize );
+ }
+ ZipUtils::ThreadedDeflater deflater( DEFAULT_COMPRESSION );
+ totalIn = inBuffer.getLength();
+ deflater.startDeflate( inBuffer );
+ processInput( inBuffer );
+ deflater.waitForTasks();
+ uno::Sequence< sal_Int8 > outBuffer = deflater.getOutput();
+ deflater.clear(); // release memory
+ totalOut = outBuffer.getLength();
+ processDeflated(outBuffer, outBuffer.getLength());
+ closeEntry();
+}
+
+void ZipOutputEntryParallel::finishDeflater()
+{
+ // ThreadedDeflater is called synchronously in one call, so nothing to do here.
+}
+
+sal_Int64 ZipOutputEntryParallel::getDeflaterTotalIn() const
+{
+ return totalIn;
+}
+
+sal_Int64 ZipOutputEntryParallel::getDeflaterTotalOut() const
+{
+ return totalOut;
+}
+
+void ZipOutputEntryParallel::deflaterReset()
+{
+ totalIn = 0;
+ totalOut = 0;
+}
+
+bool ZipOutputEntryParallel::isDeflaterFinished() const
+{
+ return true;
+}
+
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/package/source/zippackage/ZipPackageStream.cxx b/package/source/zippackage/ZipPackageStream.cxx
index e795776ab065..4692372c8fe5 100644
--- a/package/source/zippackage/ZipPackageStream.cxx
+++ b/package/source/zippackage/ZipPackageStream.cxx
@@ -35,6 +35,7 @@
#include <string.h>
#include <CRC32.hxx>
+#include <ThreadedDeflater.hxx>
#include <ZipOutputEntry.hxx>
#include <ZipOutputStream.hxx>
#include <ZipPackage.hxx>
@@ -496,7 +497,7 @@ bool ZipPackageStream::saveChild(
else if ( m_nStreamMode == PACKAGE_STREAM_RAW )
m_bRawStream = true;
- bool bParallelDeflate = false;
+ bool bBackgroundThreadDeflate = false;
bool bTransportOwnEncrStreamAsRaw = false;
// During the storing the original size of the stream can be changed
// TODO/LATER: get rid of this hack
@@ -758,17 +759,25 @@ bool ZipPackageStream::saveChild(
}
else
{
- // tdf#89236 Encrypting in parallel does not work
- bParallelDeflate = !bToBeEncrypted;
- // Do not deflate small streams in a thread. XSeekable's getLength()
+ // tdf#89236 Encrypting in a background thread does not work
+ bBackgroundThreadDeflate = !bToBeEncrypted;
+ // Do not deflate small streams using threads. XSeekable's getLength()
// gives the full size, XInputStream's available() may not be
// the full size, but it appears that at this point it usually is.
- if (xSeek.is() && xSeek->getLength() < 100000)
- bParallelDeflate = false;
- else if (xStream->available() < 100000)
- bParallelDeflate = false;
+ sal_Int64 estimatedSize = xSeek.is() ? xSeek->getLength() : xStream->available();
- if (bParallelDeflate)
+ if (estimatedSize > 1000000)
+ {
+ // Use ThreadDeflater which will split the stream into blocks and compress
+ // them in threads, but not in background (i.e. writeStream() will block).
+ // This is suitable for large data.
+ bBackgroundThreadDeflate = false;
+ rZipOut.writeLOC(pTempEntry, bToBeEncrypted);
+ ZipOutputEntryParallel aZipEntry(rZipOut.getStream(), m_xContext, *pTempEntry, this, bToBeEncrypted);
+ aZipEntry.writeStream(xStream);
+ rZipOut.rawCloseEntry(bToBeEncrypted);
+ }
+ else if (bBackgroundThreadDeflate && estimatedSize > 100000)
{
// tdf#93553 limit to a useful amount of pending tasks. Having way too many
// tasks pending may use a lot of memory. Take number of available
@@ -786,6 +795,7 @@ bool ZipPackageStream::saveChild(
}
else
{
+ bBackgroundThreadDeflate = false;
rZipOut.writeLOC(pTempEntry, bToBeEncrypted);
ZipOutputEntry aZipEntry(rZipOut.getStream(), m_xContext, *pTempEntry, this, bToBeEncrypted);
aZipEntry.writeStream(xStream);
@@ -823,7 +833,7 @@ bool ZipPackageStream::saveChild(
}
}
- if (bSuccess && !bParallelDeflate)
+ if (bSuccess && !bBackgroundThreadDeflate)
successfullyWritten(pTempEntry);
if ( aPropSet.hasElements()