path: root/package/source
diff options
authorDennis Francis <>2020-01-11 11:51:34 +0530
committerDennis Francis <>2020-01-13 12:11:44 +0100
commit353d4528b8ad8abca9a13f3016632e42bab7afde (patch)
treea83950338ad79e77594638d9dd60bd073fdd2115 /package/source
parent9dce33e6943dec5ff111802ec3e7c338abf56592 (diff)
tdf#125662: do parallel-zip in batches
In this approach the input stream is read one batch (of constant size) at a time and each batch is compressed by ThreadedDeflater. After we are done with a batch, the deflated buffer is processed straightaway (directed to file backed storage). Change-Id: I2d42f86cf5898e4d746836d94bf6009a8d3b0230 Reviewed-on: Tested-by: Jenkins Reviewed-by: Luboš Luňák <>
Diffstat (limited to 'package/source')
2 files changed, 89 insertions, 61 deletions
diff --git a/package/source/zipapi/ThreadedDeflater.cxx b/package/source/zipapi/ThreadedDeflater.cxx
index 19bbda01bbb7..73725c580c02 100644
--- a/package/source/zipapi/ThreadedDeflater.cxx
+++ b/package/source/zipapi/ThreadedDeflater.cxx
@@ -44,14 +44,19 @@ class ThreadedDeflater::Task : public comphelper::ThreadTask
ThreadedDeflater* deflater;
int sequence;
int blockSize;
+ bool firstTask : 1;
+ bool lastTask : 1;
- Task(ThreadedDeflater* deflater_, int sequence_, int blockSize_)
+ Task(ThreadedDeflater* deflater_, int sequence_, int blockSize_, bool firstTask_,
+ bool lastTask_)
: comphelper::ThreadTask(deflater_->threadTaskTag)
, stream()
, deflater(deflater_)
, sequence(sequence_)
, blockSize(blockSize_)
+ , firstTask(firstTask_)
+ , lastTask(lastTask_)
@@ -61,58 +66,83 @@ private:
ThreadedDeflater::ThreadedDeflater(sal_Int32 nSetLevel)
: threadTaskTag(comphelper::ThreadPool::createThreadTaskTag())
+ , totalIn(0)
+ , totalOut(0)
, zlibLevel(nSetLevel)
- , pendingTasksCount(0)
-ThreadedDeflater::~ThreadedDeflater() COVERITY_NOEXCEPT_FALSE
- waitForTasks();
- clear();
+ThreadedDeflater::~ThreadedDeflater() COVERITY_NOEXCEPT_FALSE { clear(); }
-void ThreadedDeflater::startDeflate(const uno::Sequence<sal_Int8>& rBuffer)
+void ThreadedDeflater::deflateWrite(
+ const css::uno::Reference<css::io::XInputStream>& xInStream,
+ std::function<void(const css::uno::Sequence<sal_Int8>&, sal_Int32)> aProcessInputFunc,
+ std::function<void(const css::uno::Sequence<sal_Int8>&, sal_Int32)> aProcessOutputFunc)
- inBuffer = rBuffer;
- sal_Int64 size = inBuffer.getLength();
- int tasksCount = (size + MaxBlockSize - 1) / MaxBlockSize;
- tasksCount = std::max(tasksCount, 1);
- pendingTasksCount = tasksCount;
- outBuffers.resize(pendingTasksCount);
- for (int sequence = 0; sequence < tasksCount; ++sequence)
+ sal_Int64 nThreadCount = comphelper::ThreadPool::getSharedOptimalPool().getWorkerCount();
+ sal_Int64 batchSize = MaxBlockSize * nThreadCount;
+ inBuffer.realloc(batchSize);
+ prevDataBlock.realloc(MaxBlockSize);
+ outBuffers.resize(nThreadCount);
+ maProcessOutputFunc = aProcessOutputFunc;
+ bool firstTask = true;
+ while (xInStream->available() > 0)
- sal_Int64 thisSize = std::min(MaxBlockSize, size);
- size -= thisSize;
- comphelper::ThreadPool::getSharedOptimalPool().pushTask(
- std::make_unique<Task>(this, sequence, thisSize));
+ sal_Int64 inputBytes = xInStream->readBytes(inBuffer, batchSize);
+ aProcessInputFunc(inBuffer, inputBytes);
+ totalIn += inputBytes;
+ int sequence = 0;
+ bool lastBatch = xInStream->available() <= 0;
+ sal_Int64 bytesPending = inputBytes;
+ while (bytesPending > 0)
+ {
+ sal_Int64 taskSize = std::min(MaxBlockSize, bytesPending);
+ bytesPending -= taskSize;
+ bool lastTask = lastBatch && !bytesPending;
+ comphelper::ThreadPool::getSharedOptimalPool().pushTask(
+ std::make_unique<Task>(this, sequence++, taskSize, firstTask, lastTask));
+ if (firstTask)
+ firstTask = false;
+ }
+ assert(bytesPending == 0);
+ comphelper::ThreadPool::getSharedOptimalPool().waitUntilDone(threadTaskTag);
+ if (!lastBatch)
+ {
+ assert(inputBytes == batchSize);
+ std::copy_n(inBuffer.begin() + (batchSize - MaxBlockSize), MaxBlockSize,
+ prevDataBlock.begin());
+ }
+ processDeflatedBuffers();
- assert(size == 0);
-bool ThreadedDeflater::finished() const { return pendingTasksCount == 0; }
-css::uno::Sequence<sal_Int8> ThreadedDeflater::getOutput() const
+void ThreadedDeflater::processDeflatedBuffers()
- assert(finished());
- sal_Int64 totalSize = 0;
+ sal_Int64 batchOutputSize = 0;
for (const auto& buffer : outBuffers)
- totalSize += buffer.size();
- uno::Sequence<sal_Int8> outBuffer(totalSize);
+ batchOutputSize += buffer.size();
+ css::uno::Sequence<sal_Int8> outBuffer(batchOutputSize);
auto pos = outBuffer.begin();
- for (const auto& buffer : outBuffers)
+ for (auto& buffer : outBuffers)
+ {
pos = std::copy(buffer.begin(), buffer.end(), pos);
- return outBuffer;
+ buffer.clear();
+ }
-void ThreadedDeflater::waitForTasks()
- comphelper::ThreadPool::getSharedOptimalPool().waitUntilDone(threadTaskTag);
+ maProcessOutputFunc(outBuffer, batchOutputSize);
+ totalOut += batchOutputSize;
void ThreadedDeflater::clear()
- assert(finished());
inBuffer = uno::Sequence<sal_Int8>();
@@ -147,27 +177,35 @@ void ThreadedDeflater::Task::doWork()
// zlib doesn't handle const properly
unsigned char* inBufferPtr = reinterpret_cast<unsigned char*>(
const_cast<signed char*>(deflater->inBuffer.getConstArray()));
- if (sequence != 0)
+ if (!firstTask)
// the window size is 32k, so set last 32k of previous data as the dictionary
assert(MAX_WBITS == 15);
assert(MaxBlockSize >= 32768);
- deflateSetDictionary(&stream, inBufferPtr + myInBufferStart - 32768, 32768);
+ if (sequence > 0)
+ {
+ deflateSetDictionary(&stream, inBufferPtr + myInBufferStart - 32768, 32768);
+ }
+ else
+ {
+ unsigned char* prevBufferPtr = reinterpret_cast<unsigned char*>(
+ const_cast<signed char*>(deflater->prevDataBlock.getConstArray()));
+ deflateSetDictionary(&stream, prevBufferPtr + MaxBlockSize - 32768, 32768);
+ }
stream.next_in = inBufferPtr + myInBufferStart;
stream.avail_in = blockSize;
stream.next_out = reinterpret_cast<unsigned char*>(deflater->outBuffers[sequence].data());
stream.avail_out = outputMaxSize;
- bool last = sequence == int(deflater->outBuffers.size() - 1); // Last block?
// The trick is in using Z_SYNC_FLUSH instead of Z_NO_FLUSH. It will align the data at a byte boundary,
// and since we use a raw stream, the data blocks then can be simply concatenated.
- int res = deflate(&stream, last ? Z_FINISH : Z_SYNC_FLUSH);
+ int res = deflate(&stream, lastTask ? Z_FINISH : Z_SYNC_FLUSH);
assert(stream.avail_in == 0); // Check that everything has been deflated.
- if (last ? res == Z_STREAM_END : res == Z_OK)
+ if (lastTask ? res == Z_STREAM_END : res == Z_OK)
{ // ok
sal_Int64 outSize = outputMaxSize - stream.avail_out;
- --deflater->pendingTasksCount;
diff --git a/package/source/zipapi/ZipOutputEntry.cxx b/package/source/zipapi/ZipOutputEntry.cxx
index bee9d0aeb70c..f08e687c43a4 100644
--- a/package/source/zipapi/ZipOutputEntry.cxx
+++ b/package/source/zipapi/ZipOutputEntry.cxx
@@ -363,28 +363,18 @@ ZipOutputEntryParallel::ZipOutputEntryParallel(
void ZipOutputEntryParallel::writeStream(const uno::Reference< io::XInputStream >& xInStream)
- sal_Int64 toRead = xInStream->available();
- uno::Sequence< sal_Int8 > inBuffer( toRead );
- sal_Int64 read = xInStream->readBytes(inBuffer, toRead);
- if (read < toRead)
- inBuffer.realloc( read );
- while( xInStream->available() > 0 )
- { // We didn't get the full size from available().
- uno::Sequence< sal_Int8 > buf( xInStream->available());
- read = xInStream->readBytes( buf, xInStream->available());
- sal_Int64 oldSize = inBuffer.getLength();
- inBuffer.realloc( oldSize + read );
- std::copy( buf.begin(), buf.end(), inBuffer.begin() + oldSize );
- }
ZipUtils::ThreadedDeflater deflater( DEFAULT_COMPRESSION );
- totalIn = inBuffer.getLength();
- deflater.startDeflate( inBuffer );
- processInput( inBuffer );
- deflater.waitForTasks();
- uno::Sequence< sal_Int8 > outBuffer = deflater.getOutput();
- deflater.clear(); // release memory
- totalOut = outBuffer.getLength();
- processDeflated(outBuffer, outBuffer.getLength());
+ deflater.deflateWrite(xInStream,
+ [this](const uno::Sequence< sal_Int8 >& rBuffer, sal_Int32 nLen) {
+ if (!m_bEncryptCurrentEntry)
+ m_aCRC.updateSegment(rBuffer, nLen);
+ },
+ [this](const uno::Sequence< sal_Int8 >& rBuffer, sal_Int32 nLen) {
+ processDeflated(rBuffer, nLen);
+ }
+ );
+ totalIn = deflater.getTotalIn();
+ totalOut = deflater.getTotalOut();