summaryrefslogtreecommitdiff
path: root/bin/convwatch.py
diff options
context:
space:
mode:
authorMichael Stahl <mstahl@redhat.com>2012-02-27 20:55:58 +0100
committerMichael Stahl <mstahl@redhat.com>2012-02-27 20:55:58 +0100
commitfa5e41067bc15c38dcc6c0c6fb327adad0444774 (patch)
tree5d95ee448ffc5e94145307924e1af1ca52b31da9 /bin/convwatch.py
parenta860cd108c7e3e1eb0e5dfef6020610da1c07ed3 (diff)
add convwatch.py
Diffstat (limited to 'bin/convwatch.py')
-rwxr-xr-xbin/convwatch.py424
1 files changed, 424 insertions, 0 deletions
diff --git a/bin/convwatch.py b/bin/convwatch.py
new file mode 100755
index 000000000000..78802343077b
--- /dev/null
+++ b/bin/convwatch.py
@@ -0,0 +1,424 @@
+# -*- tab-width: 4; indent-tabs-mode: nil; py-indent-offset: 4 -*-
+# Version: MPL 1.1 / GPLv3+ / LGPLv3+
+#
+# The contents of this file are subject to the Mozilla Public License Version
+# 1.1 (the "License"); you may not use this file except in compliance with
+# the License or as specified alternatively below. You may obtain a copy of
+# the License at http://www.mozilla.org/MPL/
+#
+# Software distributed under the License is distributed on an "AS IS" basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+#
+# Major Contributor(s):
+# Copyright (C) 2012 Red Hat, Inc., Michael Stahl <mstahl@redhat.com>
+# (initial developer)
+#
+# All Rights Reserved.
+#
+# For minor contributions see the git repository.
+#
+# Alternatively, the contents of this file may be used under the terms of
+# either the GNU General Public License Version 3 or later (the "GPLv3+"), or
+# the GNU Lesser General Public License Version 3 or later (the "LGPLv3+"),
+# in which case the provisions of the GPLv3+ or the LGPLv3+ are applicable
+# instead of those above.
+
+import getopt
+import os
+import subprocess
+import sys
+import time
+import uuid
+try:
+ from urllib.parse import quote
+except ImportError:
+ from urllib import quote
+
+try:
+ import pyuno
+ import uno
+ import unohelper
+except ImportError:
+ print("pyuno not found: try to set PYTHONPATH and URE_BOOTSTRAP variables")
+ print("PYTHONPATH=/installation/opt/program")
+ print("URE_BOOTSTRAP=file:///installation/opt/program/fundamentalrc")
+ raise
+
+try:
+ from com.sun.star.document import XDocumentEventListener
+except ImportError:
+ print("UNO API class not found: try to set URE_BOOTSTRAP variable")
+ print("URE_BOOTSTRAP=file:///installation/opt/program/fundamentalrc")
+ raise
+
+### utilities ###
+
+def partition(list, pred):
+ left = []
+ right = []
+ for e in list:
+ if pred(e):
+ left.append(e)
+ else:
+ right.append(e)
+ return (left, right)
+
+def filelist(dir, suffix):
+ if len(dir) == 0:
+ raise Exception("filelist: empty directory")
+ if not(dir[-1] == "/"):
+ dir += "/"
+ files = [dir + f for f in os.listdir(dir)]
+# print(files)
+ return [f for f in files
+ if os.path.isfile(f) and os.path.splitext(f)[1] == suffix]
+
+def getFiles(dirs, suffix):
+ files = []
+ for dir in dirs:
+ files += filelist(dir, suffix)
+ return files
+
+### UNO utilities ###
+
+class OfficeConnection:
+ def __init__(self, args):
+ self.args = args
+ self.soffice = None
+ self.socket = None
+ self.xContext = None
+ def setUp(self):
+ (method, sep, rest) = self.args["--soffice"].partition(":")
+ if sep != ":":
+ raise Exception("soffice parameter does not specify method")
+ if method == "path":
+ socket = "pipe,name=pytest" + str(uuid.uuid1())
+ try:
+ userdir = self.args["--userdir"]
+ except KeyError:
+ raise Exception("'path' method requires --userdir")
+ if not(userdir.startswith("file://")):
+ raise Exception("--userdir must be file URL")
+ self.soffice = self.bootstrap(rest, userdir, socket)
+ elif method == "connect":
+ socket = rest
+ else:
+ raise Exception("unsupported connection method: " + method)
+ self.xContext = self.connect(socket)
+
+ def bootstrap(self, soffice, userdir, socket):
+ argv = [ soffice, "--accept=" + socket + ";urp",
+ "-env:UserInstallation=" + userdir,
+ "--quickstart=no", "--nofirststartwizard",
+ "--norestore", "--nologo", "--headless" ]
+ if "--valgrind" in self.args:
+ argv.append("--valgrind")
+ return subprocess.Popen(argv)
+
+ def connect(self, socket):
+ xLocalContext = uno.getComponentContext()
+ xUnoResolver = xLocalContext.ServiceManager.createInstanceWithContext(
+ "com.sun.star.bridge.UnoUrlResolver", xLocalContext)
+ url = "uno:" + socket + ";urp;StarOffice.ComponentContext"
+ print("OfficeConnection: connecting to: " + url)
+ while True:
+ try:
+ xContext = xUnoResolver.resolve(url)
+ return xContext
+# except com.sun.star.connection.NoConnectException
+ except pyuno.getClass("com.sun.star.connection.NoConnectException"):
+ print("NoConnectException: sleeping...")
+ time.sleep(1)
+
+ def tearDown(self):
+ if self.soffice:
+ if self.xContext:
+ try:
+ print("tearDown: calling terminate()...")
+ xMgr = self.xContext.ServiceManager
+ xDesktop = xMgr.createInstanceWithContext(
+ "com.sun.star.frame.Desktop", self.xContext)
+ xDesktop.terminate()
+ print("...done")
+# except com.sun.star.lang.DisposedException:
+ except pyuno.getClass("com.sun.star.beans.UnknownPropertyException"):
+ print("caught UnknownPropertyException")
+ pass # ignore, also means disposed
+ except pyuno.getClass("com.sun.star.lang.DisposedException"):
+ print("caught DisposedException")
+ pass # ignore
+ else:
+ self.soffice.terminate()
+ ret = self.soffice.wait()
+ self.xContext = None
+ self.socket = None
+ self.soffice = None
+ if ret != 0:
+ raise Exception("Exit status indicates failure: " + str(ret))
+# return ret
+
+class PerTestConnection:
+ def __init__(self, args):
+ self.args = args
+ self.connection = None
+ def getContext(self):
+ return self.connection.xContext
+ def setUp(self):
+ assert(not(self.connection))
+ def preTest(self):
+ conn = OfficeConnection(self.args)
+ conn.setUp()
+ self.connection = conn
+ def postTest(self):
+ if self.connection:
+ try:
+ self.connection.tearDown()
+ finally:
+ self.connection = None
+ def tearDown(self):
+ assert(not(self.connection))
+
+class PersistentConnection:
+ def __init__(self, args):
+ self.args = args
+ self.connection = None
+ def getContext(self):
+ return self.connection.xContext
+ def setUp(self):
+ conn = OfficeConnection(self.args)
+ conn.setUp()
+ self.connection = conn
+ def preTest(self):
+ assert(self.connection)
+ def postTest(self):
+ assert(self.connection)
+ def tearDown(self):
+ if self.connection:
+ try:
+ self.connection.tearDown()
+ finally:
+ self.connection = None
+
+def simpleInvoke(connection, test):
+ try:
+ connection.preTest()
+ test.run(connection.getContext())
+ finally:
+ connection.postTest()
+
+def retryInvoke(connection, test):
+ tries = 5
+ while tries > 0:
+ try:
+ tries -= 1
+ try:
+ connection.preTest()
+ test.run(connection.getContext())
+ return
+ finally:
+ connection.postTest()
+ except KeyboardInterrupt:
+ raise # Ctrl+C should work
+ except:
+ print("retryInvoke: caught exception")
+ raise Exception("FAILED retryInvoke")
+
+def runConnectionTests(connection, invoker, tests):
+ try:
+ connection.setUp()
+ for test in tests:
+ invoker(connection, test)
+ finally:
+ connection.tearDown()
+
+class EventListener(XDocumentEventListener,unohelper.Base):
+ def __init__(self):
+ self.layoutFinished = False
+ def documentEventOccured(self, event):
+# print(str(event.EventName))
+ if event.EventName == "OnLayoutFinished":
+ self.layoutFinished = True
+ def disposing(event):
+ pass
+
+def mkPropertyValue(name, value):
+ return uno.createUnoStruct("com.sun.star.beans.PropertyValue",
+ name, 0, value, 0)
+
+### tests ###
+
+def loadFromURL(xContext, url):
+ xDesktop = xContext.ServiceManager.createInstanceWithContext(
+ "com.sun.star.frame.Desktop", xContext)
+ props = [("Hidden", True), ("ReadOnly", True)] # FilterName?
+ loadProps = tuple([mkPropertyValue(name, value) for (name, value) in props])
+ xListener = EventListener()
+ xGEB = xContext.ServiceManager.createInstanceWithContext(
+ "com.sun.star.frame.GlobalEventBroadcaster", xContext)
+ xGEB.addDocumentEventListener(xListener)
+ try:
+ xDoc = xDesktop.loadComponentFromURL(url, "_blank", 0, loadProps)
+ time_ = 0
+ while time_ < 30:
+ if xListener.layoutFinished:
+ return xDoc
+ print("delaying...")
+ time_ += 1
+ time.sleep(1)
+ print("timeout: no OnLayoutFinished received")
+ return xDoc
+ except:
+ if xDoc:
+ print("CLOSING")
+ xDoc.close(True)
+ raise
+ finally:
+ if xListener:
+ xGEB.removeDocumentEventListener(xListener)
+
+def printDoc(xContext, xDoc, url):
+ props = [ mkPropertyValue("FileName", url) ]
+# xDoc.print(props)
+ uno.invoke(xDoc, "print", (tuple(props),)) # damn, that's a keyword!
+ busy = True
+ while busy:
+ print("printing...")
+ time.sleep(1)
+ prt = xDoc.getPrinter()
+ for value in prt:
+ if value.Name == "IsBusy":
+ busy = value.Value
+ print("...done printing")
+
+class LoadPrintFileTest:
+ def __init__(self, file, prtsuffix):
+ self.file = file
+ self.prtsuffix = prtsuffix
+ def run(self, xContext):
+ print("Loading document: " + self.file)
+ try:
+ url = "file://" + quote(self.file)
+ xDoc = loadFromURL(xContext, url)
+ printDoc(xContext, xDoc, url + self.prtsuffix)
+ finally:
+ if xDoc:
+ xDoc.close(True)
+ print("...done with: " + self.file)
+
+def runLoadPrintFileTests(opts, dirs, suffix, reference):
+ if reference:
+ prtsuffix = ".pdf.reference"
+ else:
+ prtsuffix = ".pdf"
+ files = getFiles(dirs, suffix)
+ tests = (LoadPrintFileTest(file, prtsuffix) for file in files)
+ connection = PersistentConnection(opts)
+# connection = PerTestConnection(opts)
+ runConnectionTests(connection, simpleInvoke, tests)
+
+def mkImages(file, resolution):
+ argv = [ "gs", "-r" + resolution, "-sOutputFile=" + file + ".%04d.jpeg",
+ "-dNOPROMPT", "-dNOPAUSE", "-dBATCH", "-sDEVICE=jpeg", file ]
+ ret = subprocess.check_call(argv)
+
+def mkAllImages(dirs, suffix, resolution, reference):
+ if reference:
+ prtsuffix = ".pdf.reference"
+ else:
+ prtsuffix = ".pdf"
+ for dir in dirs:
+ files = filelist(dir, suffix)
+ print(files)
+ for f in files:
+ mkImages(f + prtsuffix, resolution)
+
+def identify(imagefile):
+ argv = ["identify", "-format", "%k", imagefile]
+ result = subprocess.check_output(argv)
+ if result.partition("\n")[0] != "1":
+ print("identify result: " + result)
+ print("DIFFERENCE in " + imagefile)
+
+def compose(refimagefile, imagefile, diffimagefile):
+ argv = [ "composite", "-compose", "difference",
+ refimagefile, imagefile, diffimagefile ]
+ subprocess.check_call(argv)
+
+def compareImages(file):
+ allimages = [f for f in filelist(os.path.dirname(file), ".jpeg")
+ if f.startswith(file)]
+# refimages = [f for f in filelist(os.path.dirname(file), ".jpeg")
+# if f.startswith(file + ".reference")]
+# print("compareImages: allimages:" + str(allimages))
+ (refimages, images) = partition(sorted(allimages),
+ lambda f: f.startswith(file + ".pdf.reference"))
+# print("compareImages: images" + str(images))
+ for (image, refimage) in zip(images, refimages):
+ compose(image, refimage, image + ".diff")
+ identify(image + ".diff")
+ if (len(images) != len(refimages)):
+ print("DIFFERENT NUMBER OF IMAGES FOR: " + file)
+
+def compareAllImages(dirs, suffix):
+ print "compareAllImages..."
+ for dir in dirs:
+ files = filelist(dir, suffix)
+# print("compareAllImages:" + str(files))
+ for f in files:
+ compareImages(f)
+ print "...compareAllImages done"
+
+
+def parseArgs(argv):
+ (optlist,args) = getopt.getopt(argv[1:], "hr",
+ ["help", "soffice=", "userdir=", "reference", "valgrind"])
+# print optlist
+ return (dict(optlist), args)
+
+def usage():
+ message = """usage: {program} [option]... [directory]..."
+ -h | --help: print usage information
+ -r | --reference: generate new reference files (otherwise: compare)
+ --soffice=method:location
+ specify soffice instance to connect to
+ supported methods: 'path', 'connect'
+ --userdir=URL specify user installation directory for 'path' method
+ --valgrind pass --valgrind to soffice for 'path' method"""
+ print(message.format(program = os.path.basename(sys.argv[0])))
+
+def checkTools():
+ try:
+ subprocess.check_output(["gs", "--version"])
+ except:
+ print("Cannot execute 'gs'. Please install ghostscript.")
+ sys.exit(1)
+ try:
+ subprocess.check_output(["composite", "-version"])
+ subprocess.check_output(["identify", "-version"])
+ except:
+ print("Cannot execute 'composite' or 'identify'.")
+ print("Please install ImageMagick.")
+ sys.exit(1)
+
+if __name__ == "__main__":
+# checkTools()
+ (opts,args) = parseArgs(sys.argv)
+ if len(args) == 0:
+ usage()
+ sys.exit(1)
+ if "-h" in opts or "--help" in opts:
+ usage()
+ sys.exit()
+ elif "--soffice" in opts:
+ reference = "-r" in opts or "--reference" in opts
+ runLoadPrintFileTests(opts, args, ".odt", reference)
+ mkAllImages(args, ".odt", "200", reference)
+ if not(reference):
+ compareAllImages(args, ".odt")
+ else:
+ usage()
+ sys.exit(1)
+
+# vim:set shiftwidth=4 softtabstop=4 expandtab: