summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--package/Library_package2.mk2
-rw-r--r--package/qa/cppunit/test_package.cxx116
-rw-r--r--package/source/zipapi/XBufferedThreadedStream.cxx200
-rw-r--r--package/source/zipapi/XBufferedThreadedStream.hxx79
-rw-r--r--package/source/zipapi/ZipFile.cxx10
5 files changed, 373 insertions, 34 deletions
diff --git a/package/Library_package2.mk b/package/Library_package2.mk
index 3096a976601f..0ff715e031c1 100644
--- a/package/Library_package2.mk
+++ b/package/Library_package2.mk
@@ -31,6 +31,7 @@ $(eval $(call gb_Library_use_libraries,package2,\
sal \
sax \
ucbhelper \
+ salhelper \
))
$(eval $(call gb_Library_use_externals,package2,\
@@ -51,6 +52,7 @@ $(eval $(call gb_Library_add_exception_objects,package2,\
package/source/zipapi/Deflater \
package/source/zipapi/Inflater \
package/source/zipapi/sha1context \
+ package/source/zipapi/XBufferedThreadedStream \
package/source/zipapi/XUnbufferedStream \
package/source/zipapi/ZipEnumeration \
package/source/zipapi/ZipFile \
diff --git a/package/qa/cppunit/test_package.cxx b/package/qa/cppunit/test_package.cxx
index 335f490ddaba..0e1f4778184d 100644
--- a/package/qa/cppunit/test_package.cxx
+++ b/package/qa/cppunit/test_package.cxx
@@ -27,19 +27,55 @@ namespace
public:
PackageTest() {}
+ virtual void setUp() override;
+
virtual bool load(const OUString &,
const OUString &rURL, const OUString &,
SfxFilterFlags, SotClipboardFormatId, unsigned int) override;
void test();
void testThreadedStreams();
+ void testBufferedThreadedStreams();
CPPUNIT_TEST_SUITE(PackageTest);
CPPUNIT_TEST(test);
CPPUNIT_TEST(testThreadedStreams);
+ CPPUNIT_TEST(testBufferedThreadedStreams);
CPPUNIT_TEST_SUITE_END();
+
+ private:
+ uno::Reference<container::XNameAccess> mxNA;
+ void verifyStreams( std::vector<std::vector<char>> &aBuffers );
};
+ void PackageTest::setUp()
+ {
+ BootstrapFixtureBase::setUp();
+ OUString aURL = m_directories.getURLFromSrc("/package/qa/cppunit/data/a2z.zip");
+
+ uno::Sequence<beans::NamedValue> aNVs(2);
+ aNVs[0].Name = "URL";
+ aNVs[0].Value <<= aURL;
+ aNVs[1].Name = "UseBufferedStream";
+ aNVs[1].Value <<= true;
+
+ uno::Sequence<uno::Any> aArgs(1);
+ aArgs[0] <<= aNVs;
+
+ uno::Reference<uno::XComponentContext> xCxt = comphelper::getProcessComponentContext();
+ uno::Reference<lang::XMultiComponentFactory> xSvcMgr = xCxt->getServiceManager();
+
+ uno::Reference<packages::zip::XZipFileAccess2> xZip(
+ xSvcMgr->createInstanceWithArgumentsAndContext(
+ "com.sun.star.packages.zip.ZipFileAccess", aArgs, xCxt),
+ uno::UNO_QUERY);
+
+ CPPUNIT_ASSERT(xZip.is());
+
+ mxNA = uno::Reference<container::XNameAccess>(xZip, uno::UNO_QUERY);
+ CPPUNIT_ASSERT(mxNA.is());
+ }
+
bool PackageTest::load(const OUString &,
const OUString &rURL, const OUString &,
SfxFilterFlags, SotClipboardFormatId, unsigned int)
@@ -62,6 +98,20 @@ namespace
m_directories.getURLFromSrc("/package/qa/cppunit/data/"));
}
+ void PackageTest::verifyStreams( std::vector<std::vector<char>> &aBuffers )
+ {
+ CPPUNIT_ASSERT_EQUAL(size_t(26), aBuffers.size());
+ auto itBuf = aBuffers.begin();
+
+ for (char c = 'a'; c <= 'z'; ++c, ++itBuf)
+ {
+ const std::vector<char>& rBuf = *itBuf;
+ CPPUNIT_ASSERT_EQUAL(size_t(1048576), rBuf.size()); // 1 MB each.
+ for (char check : rBuf)
+ CPPUNIT_ASSERT_EQUAL(c, check);
+ }
+ }
+
// TODO : This test currently doesn't fail even when you set
// UseBufferedStream to false. Look into this and replace it with a better
// test that actually fails when the aforementioned flag is set to false.
@@ -95,30 +145,6 @@ namespace
}
};
- OUString aURL = m_directories.getURLFromSrc("/package/qa/cppunit/data/a2z.zip");
-
- uno::Sequence<beans::NamedValue> aNVs(2);
- aNVs[0].Name = "URL";
- aNVs[0].Value <<= aURL;
- aNVs[1].Name = "UseBufferedStream";
- aNVs[1].Value <<= true;
-
- uno::Sequence<uno::Any> aArgs(1);
- aArgs[0] <<= aNVs;
-
- uno::Reference<uno::XComponentContext> xCxt = comphelper::getProcessComponentContext();
- uno::Reference<lang::XMultiComponentFactory> xSvcMgr = xCxt->getServiceManager();
-
- uno::Reference<packages::zip::XZipFileAccess2> xZip(
- xSvcMgr->createInstanceWithArgumentsAndContext(
- "com.sun.star.packages.zip.ZipFileAccess", aArgs, xCxt),
- uno::UNO_QUERY);
-
- CPPUNIT_ASSERT(xZip.is());
-
- uno::Reference<container::XNameAccess> xNA(xZip, uno::UNO_QUERY);
- CPPUNIT_ASSERT(xNA.is());
-
{
comphelper::ThreadPool aPool(4);
std::shared_ptr<comphelper::ThreadTaskTag> pTag = comphelper::ThreadPool::createThreadTaskTag();
@@ -132,26 +158,50 @@ namespace
aName += ".txt";
uno::Reference<io::XInputStream> xStrm;
- xNA->getByName(aName) >>= xStrm;
+ mxNA->getByName(aName) >>= xStrm;
CPPUNIT_ASSERT(xStrm.is());
aPool.pushTask(new Worker(pTag, xStrm, *itBuf));
}
aPool.waitUntilDone(pTag);
+ verifyStreams( aTestBuffers );
+ }
+ }
- // Verify the streams.
- CPPUNIT_ASSERT_EQUAL(size_t(26), aTestBuffers.size());
- itBuf = aTestBuffers.begin();
+ void PackageTest::testBufferedThreadedStreams()
+ {
+ std::vector<std::vector<char>> aTestBuffers(26);
+ auto itBuf = aTestBuffers.begin();
+ sal_Int32 nReadSize = 0;
- for (char c = 'a'; c <= 'z'; ++c, ++itBuf)
+ for (char c = 'a'; c <= 'z'; ++c, ++itBuf)
+ {
+ OUString aName(c);
+ aName += ".txt";
+
+ uno::Reference<io::XInputStream> xStrm;
+ //Size of each stream is 1mb (>10000) => XBufferedThreadedStream
+ mxNA->getByName(aName) >>= xStrm;
+
+ CPPUNIT_ASSERT(xStrm.is());
+ sal_Int32 nSize = xStrm->available();
+
+ uno::Sequence<sal_Int8> aBytes;
+ //Read chuncks of increasing size
+ nReadSize += 1024;
+
+ while (nSize > 0)
{
- const std::vector<char>& rBuf = *itBuf;
- CPPUNIT_ASSERT_EQUAL(size_t(1048576), rBuf.size()); // 1 MB each.
- for (char check : rBuf)
- CPPUNIT_ASSERT_EQUAL(c, check);
+ sal_Int32 nBytesRead = xStrm->readBytes(aBytes, nReadSize);
+ const sal_Int8* p = aBytes.getArray();
+ const sal_Int8* pEnd = p + nBytesRead;
+ std::copy(p, pEnd, std::back_inserter(*itBuf));
+ nSize -= nBytesRead;
}
}
+
+ verifyStreams( aTestBuffers );
}
CPPUNIT_TEST_SUITE_REGISTRATION(PackageTest);
diff --git a/package/source/zipapi/XBufferedThreadedStream.cxx b/package/source/zipapi/XBufferedThreadedStream.cxx
new file mode 100644
index 000000000000..59a89f9c64e1
--- /dev/null
+++ b/package/source/zipapi/XBufferedThreadedStream.cxx
@@ -0,0 +1,200 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+#include <XBufferedThreadedStream.hxx>
+#include <com/sun/star/packages/zip/ZipIOException.hpp>
+
+using namespace css::uno;
+using com::sun::star::packages::zip::ZipIOException;
+
+namespace {
+
+class UnzippingThread: public salhelper::Thread
+{
+ XBufferedThreadedStream &mxStream;
+public:
+ explicit UnzippingThread(XBufferedThreadedStream &xStream): Thread("Unzipping"), mxStream(xStream) {}
+private:
+ virtual void execute() override
+ {
+ try
+ {
+ mxStream.produce();
+ }
+ catch( const RuntimeException &e )
+ {
+ SAL_WARN("package", "RuntimeException from unbuffered Stream " << e.Message );
+ mxStream.saveException( new RuntimeException( e ) );
+ }
+ catch( const ZipIOException &e )
+ {
+ SAL_WARN("package", "ZipIOException from unbuffered Stream " << e.Message );
+ mxStream.saveException( new ZipIOException( e ) );
+ }
+ catch( const Exception &e )
+ {
+ SAL_WARN("package", "Unexpected exception " << e.Message );
+ mxStream.saveException( new Exception( e ) );
+ }
+
+ mxStream.setTerminateThread();
+ }
+};
+
+}
+
+XBufferedThreadedStream::XBufferedThreadedStream(
+ const Reference<XInputStream>& xSrcStream )
+: mxSrcStream( xSrcStream )
+, mnPos(0)
+, mnStreamSize( xSrcStream->available() )
+, mnOffset( 0 )
+, mxUnzippingThread( new UnzippingThread(*this) )
+, mbTerminateThread( false )
+, maSavedException( nullptr )
+{
+ mxUnzippingThread->launch();
+}
+
+XBufferedThreadedStream::~XBufferedThreadedStream()
+{
+ setTerminateThread();
+ mxUnzippingThread->join();
+}
+
+/**
+ * Reads from UnbufferedStream in a seperate thread and stores the buffer blocks
+ * in maPendingBuffers queue for further use.
+ */
+void XBufferedThreadedStream::produce()
+{
+ Buffer pProducedBuffer;
+ std::unique_lock<std::mutex> aGuard( maBufferProtector );
+ do
+ {
+ if( !maUsedBuffers.empty() )
+ {
+ pProducedBuffer = maUsedBuffers.front();
+ maUsedBuffers.pop();
+ }
+
+ aGuard.unlock();
+ mxSrcStream->readBytes( pProducedBuffer, nBufferSize );
+
+ aGuard.lock();
+ maPendingBuffers.push( pProducedBuffer );
+ maBufferConsumeResume.notify_one();
+ maBufferProduceResume.wait( aGuard, [&]{return canProduce(); } );
+
+ if( mbTerminateThread )
+ break;
+
+ } while( hasBytes() );
+}
+
+/**
+ * Fetches next available block from maPendingBuffers for use in Reading thread.
+ */
+const Buffer& XBufferedThreadedStream::getNextBlock()
+{
+ const sal_Int32 nBufSize = maInUseBuffer.getLength();
+ if( nBufSize <= 0 || mnOffset >= nBufSize )
+ {
+ std::unique_lock<std::mutex> aGuard( maBufferProtector );
+ if( mnOffset >= nBufSize )
+ maUsedBuffers.push( maInUseBuffer );
+
+ maBufferConsumeResume.wait( aGuard, [&]{return canConsume(); } );
+
+ if( maPendingBuffers.empty() )
+ {
+ maInUseBuffer = Buffer();
+ if( maSavedException )
+ throw *maSavedException;
+ }
+ else
+ {
+ maInUseBuffer = maPendingBuffers.front();
+ maPendingBuffers.pop();
+ mnOffset = 0;
+
+ if( maPendingBuffers.size() <= nBufferLowWater )
+ maBufferProduceResume.notify_one();
+ }
+ }
+
+ return maInUseBuffer;
+}
+
+void XBufferedThreadedStream::setTerminateThread()
+{
+ mbTerminateThread = true;
+ maBufferProduceResume.notify_one();
+ maBufferConsumeResume.notify_one();
+}
+
+sal_Int32 SAL_CALL XBufferedThreadedStream::readBytes( Sequence< sal_Int8 >& rData, sal_Int32 nBytesToRead )
+{
+ if( !hasBytes() )
+ return 0;
+
+ const sal_Int32 nAvailableSize = std::min<sal_Int32>( nBytesToRead, remainingSize() );
+ rData.realloc( nAvailableSize );
+ sal_Int32 i = 0, nPendingBytes = nAvailableSize;
+
+ while( nPendingBytes )
+ {
+ const Buffer &pBuffer = getNextBlock();
+ if( pBuffer.getLength() <= 0 )
+ {
+ rData.realloc( nAvailableSize - nPendingBytes );
+ return nAvailableSize - nPendingBytes;
+ }
+ const sal_Int32 limit = std::min<sal_Int32>( nPendingBytes, pBuffer.getLength() - mnOffset );
+
+ memcpy( &rData[i], &pBuffer[mnOffset], limit );
+
+ nPendingBytes -= limit;
+ mnOffset += limit;
+ mnPos += limit;
+ i += limit;
+ }
+
+ return nAvailableSize;
+}
+
+sal_Int32 SAL_CALL XBufferedThreadedStream::readSomeBytes( Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead )
+{
+ return readBytes( aData, nMaxBytesToRead );
+}
+void SAL_CALL XBufferedThreadedStream::skipBytes( sal_Int32 nBytesToSkip )
+{
+ if( nBytesToSkip )
+ {
+ Sequence < sal_Int8 > aSequence( nBytesToSkip );
+ readBytes( aSequence, nBytesToSkip );
+ }
+}
+
+sal_Int32 SAL_CALL XBufferedThreadedStream::available()
+{
+ if( !hasBytes() )
+ return 0;
+
+ return remainingSize();
+}
+
+void SAL_CALL XBufferedThreadedStream::closeInput()
+{
+ setTerminateThread();
+ mxUnzippingThread->join();
+ mxSrcStream->closeInput();
+}
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/package/source/zipapi/XBufferedThreadedStream.hxx b/package/source/zipapi/XBufferedThreadedStream.hxx
new file mode 100644
index 000000000000..b047b25fdf85
--- /dev/null
+++ b/package/source/zipapi/XBufferedThreadedStream.hxx
@@ -0,0 +1,79 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+#ifndef INCLUDED_PACKAGE_SOURCE_ZIPAPI_XBUFFEREDTHREADEDSTREAM_HXX
+#define INCLUDED_PACKAGE_SOURCE_ZIPAPI_XBUFFEREDTHREADEDSTREAM_HXX
+
+#include <salhelper/thread.hxx>
+#include <XUnbufferedStream.hxx>
+#include <queue>
+#include <vector>
+#include <mutex>
+#include <condition_variable>
+
+typedef css::uno::Sequence< sal_Int8 > Buffer;
+
+class XBufferedThreadedStream : public cppu::WeakImplHelper< css::io::XInputStream >
+{
+private:
+ const css::uno::Reference<XInputStream> mxSrcStream;
+ size_t mnPos; /// position in stream
+ size_t mnStreamSize; /// available size of stream
+
+ Buffer maInUseBuffer; /// Buffer block in use
+ int mnOffset; /// position in maInUseBuffer
+ std::queue < Buffer > maPendingBuffers; /// Buffers that are available for use
+ std::queue < Buffer > maUsedBuffers;
+
+ rtl::Reference< salhelper::Thread > mxUnzippingThread;
+ std::mutex maBufferProtector; /// mutex protecting Buffer queues.
+ std::condition_variable maBufferConsumeResume;
+ std::condition_variable maBufferProduceResume;
+ bool mbTerminateThread; /// indicates the failure of one of the threads
+
+ css::uno::Exception *maSavedException; /// exception caught during unzipping is saved to be thrown during reading
+
+ static const size_t nBufferLowWater = 2;
+ static const size_t nBufferHighWater = 4;
+ static const size_t nBufferSize = 32 * 1024;
+
+ const Buffer& getNextBlock();
+ size_t remainingSize() const { return mnStreamSize - mnPos; }
+ bool hasBytes() const { return mnPos < mnStreamSize; }
+
+ bool canProduce() const
+ {
+ return( mbTerminateThread || maPendingBuffers.size() < nBufferHighWater );
+ }
+
+ bool canConsume() const
+ {
+ return( mbTerminateThread || !maPendingBuffers.empty() );
+ }
+
+public:
+ XBufferedThreadedStream(
+ const css::uno::Reference<XInputStream>& xSrcStream );
+
+ virtual ~XBufferedThreadedStream() override;
+
+ void produce();
+ void setTerminateThread();
+ void saveException( css::uno::Exception *e ) { maSavedException = e; }
+
+ // XInputStream
+ virtual sal_Int32 SAL_CALL readBytes( css::uno::Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead ) override;
+ virtual sal_Int32 SAL_CALL readSomeBytes( css::uno::Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead ) override;
+ virtual void SAL_CALL skipBytes( sal_Int32 nBytesToSkip ) override;
+ virtual sal_Int32 SAL_CALL available( ) override;
+ virtual void SAL_CALL closeInput( ) override;
+};
+#endif
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/package/source/zipapi/ZipFile.cxx b/package/source/zipapi/ZipFile.cxx
index ba41d5f10d1d..ddea09b3d824 100644
--- a/package/source/zipapi/ZipFile.cxx
+++ b/package/source/zipapi/ZipFile.cxx
@@ -44,6 +44,7 @@
#include <ZipFile.hxx>
#include <ZipEnumeration.hxx>
#include <XUnbufferedStream.hxx>
+#include <XBufferedThreadedStream.hxx>
#include <PackageConstants.hxx>
#include <EncryptedDataHeader.hxx>
#include <EncryptionData.hxx>
@@ -625,7 +626,14 @@ uno::Reference< XInputStream > ZipFile::createStreamForZipEntry(
if (!mbUseBufferedStream)
return xSrcStream;
- uno::Reference<io::XInputStream> xBufStream(new XBufferedStream(xSrcStream));
+ uno::Reference<io::XInputStream> xBufStream;
+ static const sal_Int32 nThreadingThreshold = 10000;
+
+ if( xSrcStream->available() > nThreadingThreshold )
+ xBufStream = new XBufferedThreadedStream(xSrcStream);
+ else
+ xBufStream = new XBufferedStream(xSrcStream);
+
return xBufStream;
}