diff options
-rw-r--r-- | include/sal/log-areas.dox | 1 | ||||
-rw-r--r-- | package/Library_package2.mk | 1 | ||||
-rw-r--r-- | package/inc/ThreadedDeflater.hxx | 62 | ||||
-rw-r--r-- | package/inc/ZipOutputEntry.hxx | 68 | ||||
-rw-r--r-- | package/source/zipapi/ThreadedDeflater.cxx | 181 | ||||
-rw-r--r-- | package/source/zipapi/ZipOutputEntry.cxx | 195 | ||||
-rw-r--r-- | package/source/zippackage/ZipPackageStream.cxx | 30 |
7 files changed, 478 insertions, 60 deletions
diff --git a/include/sal/log-areas.dox b/include/sal/log-areas.dox index 1f007b0c2944..8d9a2c68ea92 100644 --- a/include/sal/log-areas.dox +++ b/include/sal/log-areas.dox @@ -334,6 +334,7 @@ certain functionality. @li @c package @li @c package.xstor +@li @c package.threadeddeflate @section sdext diff --git a/package/Library_package2.mk b/package/Library_package2.mk index 75a15f0e0d08..195c87f9ff4b 100644 --- a/package/Library_package2.mk +++ b/package/Library_package2.mk @@ -53,6 +53,7 @@ $(eval $(call gb_Library_add_exception_objects,package2,\ package/source/zipapi/Deflater \ package/source/zipapi/Inflater \ package/source/zipapi/sha1context \ + package/source/zipapi/ThreadedDeflater \ package/source/zipapi/XBufferedThreadedStream \ package/source/zipapi/XUnbufferedStream \ package/source/zipapi/ZipEnumeration \ diff --git a/package/inc/ThreadedDeflater.hxx b/package/inc/ThreadedDeflater.hxx new file mode 100644 index 000000000000..90801700a37e --- /dev/null +++ b/package/inc/ThreadedDeflater.hxx @@ -0,0 +1,62 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#ifndef INCLUDED_PACKAGE_THREADEDDEFLATER_HXX +#define INCLUDED_PACKAGE_THREADEDDEFLATER_HXX + +#include <com/sun/star/uno/Sequence.hxx> +#include <package/packagedllapi.hxx> +#include <comphelper/threadpool.hxx> +#include <atomic> +#include <memory> + +namespace ZipUtils +{ +/// Parallel compression a stream using the libz deflate algorithm. +/// +/// Almost a replacement for the Deflater class. Call startDeflate() with the data, +/// check with finished() or waitForTasks() and retrieve result with getOutput(). +/// The class will internally split into multiple threads. +class ThreadedDeflater final +{ + class Task; + // Note: All this should be lock-less. Each task writes only to its part + // of the data, flags are atomic. + css::uno::Sequence<sal_Int8> inBuffer; + int zlibLevel; + std::shared_ptr<comphelper::ThreadTaskTag> threadTaskTag; + std::atomic<int> pendingTasksCount; + std::vector<std::vector<sal_Int8>> outBuffers; + +public: + // Unlike with Deflater class, bNoWrap is always true. + ThreadedDeflater(sal_Int32 nSetLevel); + ~ThreadedDeflater(); + void startDeflate(const css::uno::Sequence<sal_Int8>& rBuffer); + void waitForTasks(); + bool finished() const; + css::uno::Sequence<sal_Int8> getOutput() const; + void clear(); +}; + +} // namespace + +#endif + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/package/inc/ZipOutputEntry.hxx b/package/inc/ZipOutputEntry.hxx index af6528f04ea1..7234d890f4c2 100644 --- a/package/inc/ZipOutputEntry.hxx +++ b/package/inc/ZipOutputEntry.hxx @@ -35,11 +35,9 @@ struct ZipEntry; class ZipPackageBuffer; class ZipPackageStream; -class ZipOutputEntry +class ZipOutputEntryBase { protected: - css::uno::Sequence< sal_Int8 > m_aDeflateBuffer; - ZipUtils::Deflater m_aDeflater; css::uno::Reference< css::uno::XComponentContext > m_xContext; css::uno::Reference< css::io::XOutputStream > m_xOutStream; @@ -53,10 +51,9 @@ protected: bool const m_bEncryptCurrentEntry; public: - ZipOutputEntry( - const css::uno::Reference< css::io::XOutputStream >& rxOutStream, - const css::uno::Reference< css::uno::XComponentContext >& rxContext, - ZipEntry& rEntry, ZipPackageStream* pStream, bool bEncrypt); + virtual ~ZipOutputEntryBase() = default; + + virtual void writeStream(const css::uno::Reference< css::io::XInputStream >& xInStream) = 0; ZipEntry* getZipEntry() { return m_pCurrentEntry; } ZipPackageStream* getZipPackageStream() { return m_pCurrentStream; } @@ -64,7 +61,36 @@ public: void closeEntry(); - void writeStream(const css::uno::Reference< css::io::XInputStream >& xInStream); +protected: + ZipOutputEntryBase( + const css::uno::Reference< css::io::XOutputStream >& rxOutStream, + const css::uno::Reference< css::uno::XComponentContext >& rxContext, + ZipEntry& rEntry, ZipPackageStream* pStream, bool bEncrypt, bool checkStream); + + // Inherited classes call this with deflated data buffer. + void processDeflated( const css::uno::Sequence< sal_Int8 >& deflateBuffer, sal_Int32 nLength ); + // Inherited classes call this with the input buffer. + void processInput( const css::uno::Sequence< sal_Int8 >& rBuffer ); + + virtual void finishDeflater() = 0; + virtual sal_Int64 getDeflaterTotalIn() const = 0; + virtual sal_Int64 getDeflaterTotalOut() const = 0; + virtual void deflaterReset() = 0; + virtual bool isDeflaterFinished() const = 0; +}; + +// Normal non-threaded case. +class ZipOutputEntry : public ZipOutputEntryBase +{ + css::uno::Sequence< sal_Int8 > m_aDeflateBuffer; + ZipUtils::Deflater m_aDeflater; + +public: + ZipOutputEntry( + const css::uno::Reference< css::io::XOutputStream >& rxOutStream, + const css::uno::Reference< css::uno::XComponentContext >& rxContext, + ZipEntry& rEntry, ZipPackageStream* pStream, bool bEncrypt); + void writeStream(const css::uno::Reference< css::io::XInputStream >& xInStream) override; void write(const css::uno::Sequence< sal_Int8 >& rBuffer); protected: @@ -72,10 +98,15 @@ protected: const css::uno::Reference< css::io::XOutputStream >& rxOutStream, const css::uno::Reference< css::uno::XComponentContext >& rxContext, ZipEntry& rEntry, ZipPackageStream* pStream, bool bEncrypt, bool checkStream); + virtual void finishDeflater() override; + virtual sal_Int64 getDeflaterTotalIn() const override; + virtual sal_Int64 getDeflaterTotalOut() const override; + virtual void deflaterReset() override; + virtual bool isDeflaterFinished() const override; void doDeflate(); }; -// Class that runs the compression in a thread. +// Class that runs the compression in a background thread. class ZipOutputEntryInThread : public ZipOutputEntry { class Task; @@ -103,6 +134,25 @@ private: void setFinished() { m_bFinished = true; } }; +// Class that synchronously runs the compression in multiple threads (using ThreadDeflater). +class ZipOutputEntryParallel : public ZipOutputEntryBase +{ + sal_Int64 totalIn; + sal_Int64 totalOut; +public: + ZipOutputEntryParallel( + const css::uno::Reference< css::io::XOutputStream >& rxOutStream, + const css::uno::Reference< css::uno::XComponentContext >& rxContext, + ZipEntry& rEntry, ZipPackageStream* pStream, bool bEncrypt); + void writeStream(const css::uno::Reference< css::io::XInputStream >& xInStream) override; +protected: + virtual void finishDeflater() override; + virtual sal_Int64 getDeflaterTotalIn() const override; + virtual sal_Int64 getDeflaterTotalOut() const override; + virtual void deflaterReset() override; + virtual bool isDeflaterFinished() const override; +}; + #endif /* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/package/source/zipapi/ThreadedDeflater.cxx b/package/source/zipapi/ThreadedDeflater.cxx new file mode 100644 index 000000000000..b136981b3bdb --- /dev/null +++ b/package/source/zipapi/ThreadedDeflater.cxx @@ -0,0 +1,181 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#include <ThreadedDeflater.hxx> +#include <zlib.h> +#include <com/sun/star/packages/zip/ZipConstants.hpp> +#include <sal/log.hxx> + +using namespace com::sun::star::packages::zip::ZipConstants; +using namespace com::sun::star; + +namespace ZipUtils +{ +const sal_Int64 MaxBlockSize = 128 * 1024; + +// Parallel ZLIB compression using threads. The class internally splits the data into +// blocks and spawns ThreadPool tasks to process them independently. This is achieved +// in a similar way how pigz works, see comments from Mark Adler at +// https://stackoverflow.com/questions/30294766/how-to-use-multiple-threads-for-zlib-compression +// and +// https://stackoverflow.com/questions/30794053/how-to-use-multiple-threads-for-zlib-compression-same-input-source + +// Everything here should be either read-only, or writing to distinct data, or atomic. + +class ThreadedDeflater::Task : public comphelper::ThreadTask +{ + z_stream stream; + ThreadedDeflater* deflater; + int sequence; + int blockSize; + +public: + Task(ThreadedDeflater* deflater_, int sequence_, int blockSize_) + : comphelper::ThreadTask(deflater_->threadTaskTag) + , deflater(deflater_) + , sequence(sequence_) + , blockSize(blockSize_) + { + } + +private: + virtual void doWork() override; +}; + +ThreadedDeflater::ThreadedDeflater(sal_Int32 nSetLevel) + : zlibLevel(nSetLevel) + , threadTaskTag(comphelper::ThreadPool::createThreadTaskTag()) + , pendingTasksCount(0) +{ +} + +ThreadedDeflater::~ThreadedDeflater() +{ + waitForTasks(); + clear(); +} + +void ThreadedDeflater::startDeflate(const uno::Sequence<sal_Int8>& rBuffer) +{ + inBuffer = rBuffer; + sal_Int64 size = inBuffer.getLength(); + int tasksCount = (size + MaxBlockSize - 1) / MaxBlockSize; + tasksCount = std::max(tasksCount, 1); + pendingTasksCount = tasksCount; + outBuffers.resize(pendingTasksCount); + for (int sequence = 0; sequence < tasksCount; ++sequence) + { + sal_Int64 thisSize = std::min(MaxBlockSize, size); + size -= thisSize; + comphelper::ThreadPool::getSharedOptimalPool().pushTask( + std::make_unique<Task>(this, sequence, thisSize)); + } + assert(size == 0); +} + +bool ThreadedDeflater::finished() const { return pendingTasksCount == 0; } + +css::uno::Sequence<sal_Int8> ThreadedDeflater::getOutput() const +{ + assert(finished()); + sal_Int64 totalSize = 0; + for (const auto& buffer : outBuffers) + totalSize += buffer.size(); + uno::Sequence<sal_Int8> outBuffer(totalSize); + auto pos = outBuffer.begin(); + for (const auto& buffer : outBuffers) + pos = std::copy(buffer.begin(), buffer.end(), pos); + return outBuffer; +} + +void ThreadedDeflater::waitForTasks() +{ + comphelper::ThreadPool::getSharedOptimalPool().waitUntilDone(threadTaskTag); +} + +void ThreadedDeflater::clear() +{ + assert(finished()); + inBuffer = uno::Sequence<sal_Int8>(); + outBuffers.clear(); +} + +#if defined Z_PREFIX +#define deflateInit2 z_deflateInit2 +#define deflateBound z_deflateBound +#define deflateSetDictionary z_deflateSetDictionary +#define deflate z_deflate +#define deflateEnd z_deflateEnd +#endif + +void ThreadedDeflater::Task::doWork() +{ + stream.zalloc = nullptr; + stream.zfree = nullptr; + stream.opaque = nullptr; + // -MAX_WBITS means 32k window size and raw stream + if (deflateInit2(&stream, deflater->zlibLevel, Z_DEFLATED, -MAX_WBITS, DEF_MEM_LEVEL, + Z_DEFAULT_STRATEGY) + != Z_OK) + { + SAL_WARN("package.threadeddeflate", "deflateInit2() failed"); + abort(); + } + // Find out size for our output buffer to be large enough for deflate() needing to be called just once. + sal_Int64 outputMaxSize = deflateBound(&stream, blockSize); + // add extra size for Z_SYNC_FLUSH + outputMaxSize += 20; + deflater->outBuffers[sequence].resize(outputMaxSize); + sal_Int64 myInBufferStart = sequence * MaxBlockSize; + // zlib doesn't handle const properly + unsigned char* inBufferPtr = reinterpret_cast<unsigned char*>( + const_cast<signed char*>(deflater->inBuffer.getConstArray())); + if (sequence != 0) + { + // the window size is 32k, so set last 32k of previous data as the dictionary + assert(MAX_WBITS == 15); + assert(MaxBlockSize >= 32768); + deflateSetDictionary(&stream, inBufferPtr + myInBufferStart - 32768, 32768); + } + stream.next_in = inBufferPtr + myInBufferStart; + stream.avail_in = blockSize; + stream.next_out = reinterpret_cast<unsigned char*>(deflater->outBuffers[sequence].data()); + stream.avail_out = outputMaxSize; + bool last = sequence == int(deflater->outBuffers.size() - 1); // Last block? + // The trick is in using Z_SYNC_FLUSH instead of Z_NO_FLUSH. It will align the data at a byte boundary, + // and since we use a raw stream, the data blocks then can be simply concatenated. + int res = deflate(&stream, last ? Z_FINISH : Z_SYNC_FLUSH); + assert(stream.avail_in == 0); // Check that everything has been deflated. + if (last ? res == Z_STREAM_END : res == Z_OK) + { // ok + sal_Int64 outSize = outputMaxSize - stream.avail_out; + deflater->outBuffers[sequence].resize(outSize); + --deflater->pendingTasksCount; + } + else + { + SAL_WARN("package.threadeddeflate", "deflate() failed"); + abort(); + } + deflateEnd(&stream); +} + +} // namespace + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/package/source/zipapi/ZipOutputEntry.cxx b/package/source/zipapi/ZipOutputEntry.cxx index 74281fd063dd..bee9d0aeb70c 100644 --- a/package/source/zipapi/ZipOutputEntry.cxx +++ b/package/source/zipapi/ZipOutputEntry.cxx @@ -27,6 +27,7 @@ #include <osl/diagnose.h> #include <PackageConstants.hxx> +#include <ThreadedDeflater.hxx> #include <ZipEntry.hxx> #include <ZipFile.hxx> #include <ZipPackageBuffer.hxx> @@ -41,16 +42,14 @@ using namespace com::sun::star::packages::zip::ZipConstants; /** This class is used to deflate Zip entries */ -ZipOutputEntry::ZipOutputEntry( +ZipOutputEntryBase::ZipOutputEntryBase( const css::uno::Reference< css::io::XOutputStream >& rxOutput, const uno::Reference< uno::XComponentContext >& rxContext, ZipEntry& rEntry, ZipPackageStream* pStream, bool bEncrypt, bool checkStream) -: m_aDeflateBuffer(n_ConstBufferSize) -, m_aDeflater(DEFAULT_COMPRESSION, true) -, m_xContext(rxContext) +: m_xContext(rxContext) , m_xOutStream(rxOutput) , m_pCurrentEntry(&rEntry) , m_nDigested(0) @@ -67,33 +66,21 @@ ZipOutputEntry::ZipOutputEntry( } } -ZipOutputEntry::ZipOutputEntry( - const css::uno::Reference< css::io::XOutputStream >& rxOutput, - const uno::Reference< uno::XComponentContext >& rxContext, - ZipEntry& rEntry, - ZipPackageStream* pStream, - bool bEncrypt) -: ZipOutputEntry( rxOutput, rxContext, rEntry, pStream, bEncrypt, true) -{ -} - -void ZipOutputEntry::closeEntry() +void ZipOutputEntryBase::closeEntry() { - m_aDeflater.finish(); - while (!m_aDeflater.finished()) - doDeflate(); + finishDeflater(); if ((m_pCurrentEntry->nFlag & 8) == 0) { - if (m_pCurrentEntry->nSize != m_aDeflater.getTotalIn()) + if (m_pCurrentEntry->nSize != getDeflaterTotalIn()) { OSL_FAIL("Invalid entry size"); } - if (m_pCurrentEntry->nCompressedSize != m_aDeflater.getTotalOut()) + if (m_pCurrentEntry->nCompressedSize != getDeflaterTotalOut()) { // Different compression strategies make the merit of this // test somewhat dubious - m_pCurrentEntry->nCompressedSize = m_aDeflater.getTotalOut(); + m_pCurrentEntry->nCompressedSize = getDeflaterTotalOut(); } if (m_pCurrentEntry->nCrc != m_aCRC.getValue()) { @@ -104,12 +91,12 @@ void ZipOutputEntry::closeEntry() { if ( !m_bEncryptCurrentEntry ) { - m_pCurrentEntry->nSize = m_aDeflater.getTotalIn(); - m_pCurrentEntry->nCompressedSize = m_aDeflater.getTotalOut(); + m_pCurrentEntry->nSize = getDeflaterTotalIn(); + m_pCurrentEntry->nCompressedSize = getDeflaterTotalOut(); } m_pCurrentEntry->nCrc = m_aCRC.getValue(); } - m_aDeflater.reset(); + deflaterReset(); m_aCRC.reset(); if (m_bEncryptCurrentEntry) @@ -128,25 +115,11 @@ void ZipOutputEntry::closeEntry() } } -void ZipOutputEntry::write( const Sequence< sal_Int8 >& rBuffer ) +void ZipOutputEntryBase::processDeflated( const uno::Sequence< sal_Int8 >& deflateBuffer, sal_Int32 nLength ) { - if (!m_aDeflater.finished()) - { - m_aDeflater.setInputSegment(rBuffer); - while (!m_aDeflater.needsInput()) - doDeflate(); - if (!m_bEncryptCurrentEntry) - m_aCRC.updateSegment(rBuffer, rBuffer.getLength()); - } -} - -void ZipOutputEntry::doDeflate() -{ - sal_Int32 nLength = m_aDeflater.doDeflateSegment(m_aDeflateBuffer, m_aDeflateBuffer.getLength()); - if ( nLength > 0 ) { - uno::Sequence< sal_Int8 > aTmpBuffer( m_aDeflateBuffer.getConstArray(), nLength ); + uno::Sequence< sal_Int8 > aTmpBuffer( deflateBuffer.getConstArray(), nLength ); if ( m_bEncryptCurrentEntry && m_xDigestContext.is() && m_xCipherContext.is() ) { // Need to update our digest before encryption... @@ -175,7 +148,7 @@ void ZipOutputEntry::doDeflate() } } - if ( m_aDeflater.finished() && m_bEncryptCurrentEntry && m_xDigestContext.is() && m_xCipherContext.is() ) + if ( isDeflaterFinished() && m_bEncryptCurrentEntry && m_xDigestContext.is() && m_xCipherContext.is() ) { // FIXME64: sequence not 64bit safe. uno::Sequence< sal_Int8 > aEncryptionBuffer = m_xCipherContext->finalizeCipherContextAndDispose(); @@ -191,6 +164,80 @@ void ZipOutputEntry::doDeflate() } } +void ZipOutputEntryBase::processInput( const uno::Sequence< sal_Int8 >& rBuffer ) +{ + if (!m_bEncryptCurrentEntry) + m_aCRC.updateSegment(rBuffer, rBuffer.getLength()); +} + +ZipOutputEntry::ZipOutputEntry( + const css::uno::Reference< css::io::XOutputStream >& rxOutput, + const uno::Reference< uno::XComponentContext >& rxContext, + ZipEntry& rEntry, + ZipPackageStream* pStream, + bool bEncrypt, + bool checkStream) +: ZipOutputEntryBase(rxOutput, rxContext, rEntry, pStream, bEncrypt, checkStream) +, m_aDeflateBuffer(n_ConstBufferSize) +, m_aDeflater(DEFAULT_COMPRESSION, true) +{ +} + +ZipOutputEntry::ZipOutputEntry( + const css::uno::Reference< css::io::XOutputStream >& rxOutput, + const uno::Reference< uno::XComponentContext >& rxContext, + ZipEntry& rEntry, + ZipPackageStream* pStream, + bool bEncrypt) +: ZipOutputEntry( rxOutput, rxContext, rEntry, pStream, bEncrypt, true) +{ +} + +void ZipOutputEntry::write( const Sequence< sal_Int8 >& rBuffer ) +{ + if (!m_aDeflater.finished()) + { + m_aDeflater.setInputSegment(rBuffer); + while (!m_aDeflater.needsInput()) + doDeflate(); + processInput(rBuffer); + } +} + +void ZipOutputEntry::doDeflate() +{ + sal_Int32 nLength = m_aDeflater.doDeflateSegment(m_aDeflateBuffer, m_aDeflateBuffer.getLength()); + processDeflated( m_aDeflateBuffer, nLength ); +} + +void ZipOutputEntry::finishDeflater() +{ + m_aDeflater.finish(); + while (!m_aDeflater.finished()) + doDeflate(); +} + +sal_Int64 ZipOutputEntry::getDeflaterTotalIn() const +{ + return m_aDeflater.getTotalIn(); +} + +sal_Int64 ZipOutputEntry::getDeflaterTotalOut() const +{ + return m_aDeflater.getTotalOut(); +} + +void ZipOutputEntry::deflaterReset() +{ + m_aDeflater.reset(); +} + +bool ZipOutputEntry::isDeflaterFinished() const +{ + return m_aDeflater.finished(); +} + + ZipOutputEntryInThread::ZipOutputEntryInThread( const uno::Reference< uno::XComponentContext >& rxContext, ZipEntry& rEntry, @@ -301,4 +348,70 @@ void ZipOutputEntry::writeStream(const uno::Reference< io::XInputStream >& xInSt closeEntry(); } + +ZipOutputEntryParallel::ZipOutputEntryParallel( + const css::uno::Reference< css::io::XOutputStream >& rxOutput, + const uno::Reference< uno::XComponentContext >& rxContext, + ZipEntry& rEntry, + ZipPackageStream* pStream, + bool bEncrypt) +: ZipOutputEntryBase(rxOutput, rxContext, rEntry, pStream, bEncrypt, true) +, totalIn(0) +, totalOut(0) +{ +} + +void ZipOutputEntryParallel::writeStream(const uno::Reference< io::XInputStream >& xInStream) +{ + sal_Int64 toRead = xInStream->available(); + uno::Sequence< sal_Int8 > inBuffer( toRead ); + sal_Int64 read = xInStream->readBytes(inBuffer, toRead); + if (read < toRead) + inBuffer.realloc( read ); + while( xInStream->available() > 0 ) + { // We didn't get the full size from available(). + uno::Sequence< sal_Int8 > buf( xInStream->available()); + read = xInStream->readBytes( buf, xInStream->available()); + sal_Int64 oldSize = inBuffer.getLength(); + inBuffer.realloc( oldSize + read ); + std::copy( buf.begin(), buf.end(), inBuffer.begin() + oldSize ); + } + ZipUtils::ThreadedDeflater deflater( DEFAULT_COMPRESSION ); + totalIn = inBuffer.getLength(); + deflater.startDeflate( inBuffer ); + processInput( inBuffer ); + deflater.waitForTasks(); + uno::Sequence< sal_Int8 > outBuffer = deflater.getOutput(); + deflater.clear(); // release memory + totalOut = outBuffer.getLength(); + processDeflated(outBuffer, outBuffer.getLength()); + closeEntry(); +} + +void ZipOutputEntryParallel::finishDeflater() +{ + // ThreadedDeflater is called synchronously in one call, so nothing to do here. +} + +sal_Int64 ZipOutputEntryParallel::getDeflaterTotalIn() const +{ + return totalIn; +} + +sal_Int64 ZipOutputEntryParallel::getDeflaterTotalOut() const +{ + return totalOut; +} + +void ZipOutputEntryParallel::deflaterReset() +{ + totalIn = 0; + totalOut = 0; +} + +bool ZipOutputEntryParallel::isDeflaterFinished() const +{ + return true; +} + /* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/package/source/zippackage/ZipPackageStream.cxx b/package/source/zippackage/ZipPackageStream.cxx index e795776ab065..4692372c8fe5 100644 --- a/package/source/zippackage/ZipPackageStream.cxx +++ b/package/source/zippackage/ZipPackageStream.cxx @@ -35,6 +35,7 @@ #include <string.h> #include <CRC32.hxx> +#include <ThreadedDeflater.hxx> #include <ZipOutputEntry.hxx> #include <ZipOutputStream.hxx> #include <ZipPackage.hxx> @@ -496,7 +497,7 @@ bool ZipPackageStream::saveChild( else if ( m_nStreamMode == PACKAGE_STREAM_RAW ) m_bRawStream = true; - bool bParallelDeflate = false; + bool bBackgroundThreadDeflate = false; bool bTransportOwnEncrStreamAsRaw = false; // During the storing the original size of the stream can be changed // TODO/LATER: get rid of this hack @@ -758,17 +759,25 @@ bool ZipPackageStream::saveChild( } else { - // tdf#89236 Encrypting in parallel does not work - bParallelDeflate = !bToBeEncrypted; - // Do not deflate small streams in a thread. XSeekable's getLength() + // tdf#89236 Encrypting in a background thread does not work + bBackgroundThreadDeflate = !bToBeEncrypted; + // Do not deflate small streams using threads. XSeekable's getLength() // gives the full size, XInputStream's available() may not be // the full size, but it appears that at this point it usually is. - if (xSeek.is() && xSeek->getLength() < 100000) - bParallelDeflate = false; - else if (xStream->available() < 100000) - bParallelDeflate = false; + sal_Int64 estimatedSize = xSeek.is() ? xSeek->getLength() : xStream->available(); - if (bParallelDeflate) + if (estimatedSize > 1000000) + { + // Use ThreadDeflater which will split the stream into blocks and compress + // them in threads, but not in background (i.e. writeStream() will block). + // This is suitable for large data. + bBackgroundThreadDeflate = false; + rZipOut.writeLOC(pTempEntry, bToBeEncrypted); + ZipOutputEntryParallel aZipEntry(rZipOut.getStream(), m_xContext, *pTempEntry, this, bToBeEncrypted); + aZipEntry.writeStream(xStream); + rZipOut.rawCloseEntry(bToBeEncrypted); + } + else if (bBackgroundThreadDeflate && estimatedSize > 100000) { // tdf#93553 limit to a useful amount of pending tasks. Having way too many // tasks pending may use a lot of memory. Take number of available @@ -786,6 +795,7 @@ bool ZipPackageStream::saveChild( } else { + bBackgroundThreadDeflate = false; rZipOut.writeLOC(pTempEntry, bToBeEncrypted); ZipOutputEntry aZipEntry(rZipOut.getStream(), m_xContext, *pTempEntry, this, bToBeEncrypted); aZipEntry.writeStream(xStream); @@ -823,7 +833,7 @@ bool ZipPackageStream::saveChild( } } - if (bSuccess && !bParallelDeflate) + if (bSuccess && !bBackgroundThreadDeflate) successfullyWritten(pTempEntry); if ( aPropSet.hasElements() |