From 0632208977a204195a4f5b9e727aed511ece075f Mon Sep 17 00:00:00 2001 From: Mohammed Abdul Azeem Date: Sat, 27 May 2017 13:17:04 +0530 Subject: First cut at moving unzipping into new thread: XBufferedThreadedStream class buffers data in a new thread, which will be available to be read from parent thread. Change-Id: I62d367fa1dec23da39aba24b5c765b57707956bb Reviewed-on: https://gerrit.libreoffice.org/38135 Tested-by: Jenkins Reviewed-by: Michael Meeks --- package/Library_package2.mk | 2 + package/qa/cppunit/test_package.cxx | 116 +++++++++---- package/source/zipapi/XBufferedThreadedStream.cxx | 200 ++++++++++++++++++++++ package/source/zipapi/XBufferedThreadedStream.hxx | 79 +++++++++ package/source/zipapi/ZipFile.cxx | 10 +- 5 files changed, 373 insertions(+), 34 deletions(-) create mode 100644 package/source/zipapi/XBufferedThreadedStream.cxx create mode 100644 package/source/zipapi/XBufferedThreadedStream.hxx diff --git a/package/Library_package2.mk b/package/Library_package2.mk index 3096a976601f..0ff715e031c1 100644 --- a/package/Library_package2.mk +++ b/package/Library_package2.mk @@ -31,6 +31,7 @@ $(eval $(call gb_Library_use_libraries,package2,\ sal \ sax \ ucbhelper \ + salhelper \ )) $(eval $(call gb_Library_use_externals,package2,\ @@ -51,6 +52,7 @@ $(eval $(call gb_Library_add_exception_objects,package2,\ package/source/zipapi/Deflater \ package/source/zipapi/Inflater \ package/source/zipapi/sha1context \ + package/source/zipapi/XBufferedThreadedStream \ package/source/zipapi/XUnbufferedStream \ package/source/zipapi/ZipEnumeration \ package/source/zipapi/ZipFile \ diff --git a/package/qa/cppunit/test_package.cxx b/package/qa/cppunit/test_package.cxx index 335f490ddaba..0e1f4778184d 100644 --- a/package/qa/cppunit/test_package.cxx +++ b/package/qa/cppunit/test_package.cxx @@ -27,19 +27,55 @@ namespace public: PackageTest() {} + virtual void setUp() override; + virtual bool load(const OUString &, const OUString &rURL, const OUString &, SfxFilterFlags, SotClipboardFormatId, unsigned int) override; void test(); void testThreadedStreams(); + void testBufferedThreadedStreams(); CPPUNIT_TEST_SUITE(PackageTest); CPPUNIT_TEST(test); CPPUNIT_TEST(testThreadedStreams); + CPPUNIT_TEST(testBufferedThreadedStreams); CPPUNIT_TEST_SUITE_END(); + + private: + uno::Reference mxNA; + void verifyStreams( std::vector> &aBuffers ); }; + void PackageTest::setUp() + { + BootstrapFixtureBase::setUp(); + OUString aURL = m_directories.getURLFromSrc("/package/qa/cppunit/data/a2z.zip"); + + uno::Sequence aNVs(2); + aNVs[0].Name = "URL"; + aNVs[0].Value <<= aURL; + aNVs[1].Name = "UseBufferedStream"; + aNVs[1].Value <<= true; + + uno::Sequence aArgs(1); + aArgs[0] <<= aNVs; + + uno::Reference xCxt = comphelper::getProcessComponentContext(); + uno::Reference xSvcMgr = xCxt->getServiceManager(); + + uno::Reference xZip( + xSvcMgr->createInstanceWithArgumentsAndContext( + "com.sun.star.packages.zip.ZipFileAccess", aArgs, xCxt), + uno::UNO_QUERY); + + CPPUNIT_ASSERT(xZip.is()); + + mxNA = uno::Reference(xZip, uno::UNO_QUERY); + CPPUNIT_ASSERT(mxNA.is()); + } + bool PackageTest::load(const OUString &, const OUString &rURL, const OUString &, SfxFilterFlags, SotClipboardFormatId, unsigned int) @@ -62,6 +98,20 @@ namespace m_directories.getURLFromSrc("/package/qa/cppunit/data/")); } + void PackageTest::verifyStreams( std::vector> &aBuffers ) + { + CPPUNIT_ASSERT_EQUAL(size_t(26), aBuffers.size()); + auto itBuf = aBuffers.begin(); + + for (char c = 'a'; c <= 'z'; ++c, ++itBuf) + { + const std::vector& rBuf = *itBuf; + CPPUNIT_ASSERT_EQUAL(size_t(1048576), rBuf.size()); // 1 MB each. + for (char check : rBuf) + CPPUNIT_ASSERT_EQUAL(c, check); + } + } + // TODO : This test currently doesn't fail even when you set // UseBufferedStream to false. Look into this and replace it with a better // test that actually fails when the aforementioned flag is set to false. @@ -95,30 +145,6 @@ namespace } }; - OUString aURL = m_directories.getURLFromSrc("/package/qa/cppunit/data/a2z.zip"); - - uno::Sequence aNVs(2); - aNVs[0].Name = "URL"; - aNVs[0].Value <<= aURL; - aNVs[1].Name = "UseBufferedStream"; - aNVs[1].Value <<= true; - - uno::Sequence aArgs(1); - aArgs[0] <<= aNVs; - - uno::Reference xCxt = comphelper::getProcessComponentContext(); - uno::Reference xSvcMgr = xCxt->getServiceManager(); - - uno::Reference xZip( - xSvcMgr->createInstanceWithArgumentsAndContext( - "com.sun.star.packages.zip.ZipFileAccess", aArgs, xCxt), - uno::UNO_QUERY); - - CPPUNIT_ASSERT(xZip.is()); - - uno::Reference xNA(xZip, uno::UNO_QUERY); - CPPUNIT_ASSERT(xNA.is()); - { comphelper::ThreadPool aPool(4); std::shared_ptr pTag = comphelper::ThreadPool::createThreadTaskTag(); @@ -132,26 +158,50 @@ namespace aName += ".txt"; uno::Reference xStrm; - xNA->getByName(aName) >>= xStrm; + mxNA->getByName(aName) >>= xStrm; CPPUNIT_ASSERT(xStrm.is()); aPool.pushTask(new Worker(pTag, xStrm, *itBuf)); } aPool.waitUntilDone(pTag); + verifyStreams( aTestBuffers ); + } + } - // Verify the streams. - CPPUNIT_ASSERT_EQUAL(size_t(26), aTestBuffers.size()); - itBuf = aTestBuffers.begin(); + void PackageTest::testBufferedThreadedStreams() + { + std::vector> aTestBuffers(26); + auto itBuf = aTestBuffers.begin(); + sal_Int32 nReadSize = 0; - for (char c = 'a'; c <= 'z'; ++c, ++itBuf) + for (char c = 'a'; c <= 'z'; ++c, ++itBuf) + { + OUString aName(c); + aName += ".txt"; + + uno::Reference xStrm; + //Size of each stream is 1mb (>10000) => XBufferedThreadedStream + mxNA->getByName(aName) >>= xStrm; + + CPPUNIT_ASSERT(xStrm.is()); + sal_Int32 nSize = xStrm->available(); + + uno::Sequence aBytes; + //Read chuncks of increasing size + nReadSize += 1024; + + while (nSize > 0) { - const std::vector& rBuf = *itBuf; - CPPUNIT_ASSERT_EQUAL(size_t(1048576), rBuf.size()); // 1 MB each. - for (char check : rBuf) - CPPUNIT_ASSERT_EQUAL(c, check); + sal_Int32 nBytesRead = xStrm->readBytes(aBytes, nReadSize); + const sal_Int8* p = aBytes.getArray(); + const sal_Int8* pEnd = p + nBytesRead; + std::copy(p, pEnd, std::back_inserter(*itBuf)); + nSize -= nBytesRead; } } + + verifyStreams( aTestBuffers ); } CPPUNIT_TEST_SUITE_REGISTRATION(PackageTest); diff --git a/package/source/zipapi/XBufferedThreadedStream.cxx b/package/source/zipapi/XBufferedThreadedStream.cxx new file mode 100644 index 000000000000..59a89f9c64e1 --- /dev/null +++ b/package/source/zipapi/XBufferedThreadedStream.cxx @@ -0,0 +1,200 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include +#include + +using namespace css::uno; +using com::sun::star::packages::zip::ZipIOException; + +namespace { + +class UnzippingThread: public salhelper::Thread +{ + XBufferedThreadedStream &mxStream; +public: + explicit UnzippingThread(XBufferedThreadedStream &xStream): Thread("Unzipping"), mxStream(xStream) {} +private: + virtual void execute() override + { + try + { + mxStream.produce(); + } + catch( const RuntimeException &e ) + { + SAL_WARN("package", "RuntimeException from unbuffered Stream " << e.Message ); + mxStream.saveException( new RuntimeException( e ) ); + } + catch( const ZipIOException &e ) + { + SAL_WARN("package", "ZipIOException from unbuffered Stream " << e.Message ); + mxStream.saveException( new ZipIOException( e ) ); + } + catch( const Exception &e ) + { + SAL_WARN("package", "Unexpected exception " << e.Message ); + mxStream.saveException( new Exception( e ) ); + } + + mxStream.setTerminateThread(); + } +}; + +} + +XBufferedThreadedStream::XBufferedThreadedStream( + const Reference& xSrcStream ) +: mxSrcStream( xSrcStream ) +, mnPos(0) +, mnStreamSize( xSrcStream->available() ) +, mnOffset( 0 ) +, mxUnzippingThread( new UnzippingThread(*this) ) +, mbTerminateThread( false ) +, maSavedException( nullptr ) +{ + mxUnzippingThread->launch(); +} + +XBufferedThreadedStream::~XBufferedThreadedStream() +{ + setTerminateThread(); + mxUnzippingThread->join(); +} + +/** + * Reads from UnbufferedStream in a seperate thread and stores the buffer blocks + * in maPendingBuffers queue for further use. + */ +void XBufferedThreadedStream::produce() +{ + Buffer pProducedBuffer; + std::unique_lock aGuard( maBufferProtector ); + do + { + if( !maUsedBuffers.empty() ) + { + pProducedBuffer = maUsedBuffers.front(); + maUsedBuffers.pop(); + } + + aGuard.unlock(); + mxSrcStream->readBytes( pProducedBuffer, nBufferSize ); + + aGuard.lock(); + maPendingBuffers.push( pProducedBuffer ); + maBufferConsumeResume.notify_one(); + maBufferProduceResume.wait( aGuard, [&]{return canProduce(); } ); + + if( mbTerminateThread ) + break; + + } while( hasBytes() ); +} + +/** + * Fetches next available block from maPendingBuffers for use in Reading thread. + */ +const Buffer& XBufferedThreadedStream::getNextBlock() +{ + const sal_Int32 nBufSize = maInUseBuffer.getLength(); + if( nBufSize <= 0 || mnOffset >= nBufSize ) + { + std::unique_lock aGuard( maBufferProtector ); + if( mnOffset >= nBufSize ) + maUsedBuffers.push( maInUseBuffer ); + + maBufferConsumeResume.wait( aGuard, [&]{return canConsume(); } ); + + if( maPendingBuffers.empty() ) + { + maInUseBuffer = Buffer(); + if( maSavedException ) + throw *maSavedException; + } + else + { + maInUseBuffer = maPendingBuffers.front(); + maPendingBuffers.pop(); + mnOffset = 0; + + if( maPendingBuffers.size() <= nBufferLowWater ) + maBufferProduceResume.notify_one(); + } + } + + return maInUseBuffer; +} + +void XBufferedThreadedStream::setTerminateThread() +{ + mbTerminateThread = true; + maBufferProduceResume.notify_one(); + maBufferConsumeResume.notify_one(); +} + +sal_Int32 SAL_CALL XBufferedThreadedStream::readBytes( Sequence< sal_Int8 >& rData, sal_Int32 nBytesToRead ) +{ + if( !hasBytes() ) + return 0; + + const sal_Int32 nAvailableSize = std::min( nBytesToRead, remainingSize() ); + rData.realloc( nAvailableSize ); + sal_Int32 i = 0, nPendingBytes = nAvailableSize; + + while( nPendingBytes ) + { + const Buffer &pBuffer = getNextBlock(); + if( pBuffer.getLength() <= 0 ) + { + rData.realloc( nAvailableSize - nPendingBytes ); + return nAvailableSize - nPendingBytes; + } + const sal_Int32 limit = std::min( nPendingBytes, pBuffer.getLength() - mnOffset ); + + memcpy( &rData[i], &pBuffer[mnOffset], limit ); + + nPendingBytes -= limit; + mnOffset += limit; + mnPos += limit; + i += limit; + } + + return nAvailableSize; +} + +sal_Int32 SAL_CALL XBufferedThreadedStream::readSomeBytes( Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead ) +{ + return readBytes( aData, nMaxBytesToRead ); +} +void SAL_CALL XBufferedThreadedStream::skipBytes( sal_Int32 nBytesToSkip ) +{ + if( nBytesToSkip ) + { + Sequence < sal_Int8 > aSequence( nBytesToSkip ); + readBytes( aSequence, nBytesToSkip ); + } +} + +sal_Int32 SAL_CALL XBufferedThreadedStream::available() +{ + if( !hasBytes() ) + return 0; + + return remainingSize(); +} + +void SAL_CALL XBufferedThreadedStream::closeInput() +{ + setTerminateThread(); + mxUnzippingThread->join(); + mxSrcStream->closeInput(); +} + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/package/source/zipapi/XBufferedThreadedStream.hxx b/package/source/zipapi/XBufferedThreadedStream.hxx new file mode 100644 index 000000000000..b047b25fdf85 --- /dev/null +++ b/package/source/zipapi/XBufferedThreadedStream.hxx @@ -0,0 +1,79 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#ifndef INCLUDED_PACKAGE_SOURCE_ZIPAPI_XBUFFEREDTHREADEDSTREAM_HXX +#define INCLUDED_PACKAGE_SOURCE_ZIPAPI_XBUFFEREDTHREADEDSTREAM_HXX + +#include +#include +#include +#include +#include +#include + +typedef css::uno::Sequence< sal_Int8 > Buffer; + +class XBufferedThreadedStream : public cppu::WeakImplHelper< css::io::XInputStream > +{ +private: + const css::uno::Reference mxSrcStream; + size_t mnPos; /// position in stream + size_t mnStreamSize; /// available size of stream + + Buffer maInUseBuffer; /// Buffer block in use + int mnOffset; /// position in maInUseBuffer + std::queue < Buffer > maPendingBuffers; /// Buffers that are available for use + std::queue < Buffer > maUsedBuffers; + + rtl::Reference< salhelper::Thread > mxUnzippingThread; + std::mutex maBufferProtector; /// mutex protecting Buffer queues. + std::condition_variable maBufferConsumeResume; + std::condition_variable maBufferProduceResume; + bool mbTerminateThread; /// indicates the failure of one of the threads + + css::uno::Exception *maSavedException; /// exception caught during unzipping is saved to be thrown during reading + + static const size_t nBufferLowWater = 2; + static const size_t nBufferHighWater = 4; + static const size_t nBufferSize = 32 * 1024; + + const Buffer& getNextBlock(); + size_t remainingSize() const { return mnStreamSize - mnPos; } + bool hasBytes() const { return mnPos < mnStreamSize; } + + bool canProduce() const + { + return( mbTerminateThread || maPendingBuffers.size() < nBufferHighWater ); + } + + bool canConsume() const + { + return( mbTerminateThread || !maPendingBuffers.empty() ); + } + +public: + XBufferedThreadedStream( + const css::uno::Reference& xSrcStream ); + + virtual ~XBufferedThreadedStream() override; + + void produce(); + void setTerminateThread(); + void saveException( css::uno::Exception *e ) { maSavedException = e; } + + // XInputStream + virtual sal_Int32 SAL_CALL readBytes( css::uno::Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead ) override; + virtual sal_Int32 SAL_CALL readSomeBytes( css::uno::Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead ) override; + virtual void SAL_CALL skipBytes( sal_Int32 nBytesToSkip ) override; + virtual sal_Int32 SAL_CALL available( ) override; + virtual void SAL_CALL closeInput( ) override; +}; +#endif + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/package/source/zipapi/ZipFile.cxx b/package/source/zipapi/ZipFile.cxx index ba41d5f10d1d..ddea09b3d824 100644 --- a/package/source/zipapi/ZipFile.cxx +++ b/package/source/zipapi/ZipFile.cxx @@ -44,6 +44,7 @@ #include #include #include +#include #include #include #include @@ -625,7 +626,14 @@ uno::Reference< XInputStream > ZipFile::createStreamForZipEntry( if (!mbUseBufferedStream) return xSrcStream; - uno::Reference xBufStream(new XBufferedStream(xSrcStream)); + uno::Reference xBufStream; + static const sal_Int32 nThreadingThreshold = 10000; + + if( xSrcStream->available() > nThreadingThreshold ) + xBufStream = new XBufferedThreadedStream(xSrcStream); + else + xBufStream = new XBufferedStream(xSrcStream); + return xBufStream; } -- cgit