From 34592e25791a3ebbe24805bde3a2ac0a447b2089 Mon Sep 17 00:00:00 2001 From: Lionel Elie Mamane Date: Tue, 2 Aug 2011 16:12:50 +0200 Subject: Apply connectivity-workben-postgresql.diff --- connectivity/workben/postgresql/ddl.py | 185 +++++++++++++ connectivity/workben/postgresql/main.py | 90 ++++++ connectivity/workben/postgresql/makefile.mk | 103 +++++++ connectivity/workben/postgresql/metadata.py | 151 ++++++++++ .../workben/postgresql/preparedstatement.py | 228 +++++++++++++++ connectivity/workben/postgresql/sdbcx.py | 306 +++++++++++++++++++++ connectivity/workben/postgresql/statement.py | 277 +++++++++++++++++++ 7 files changed, 1340 insertions(+) create mode 100644 connectivity/workben/postgresql/ddl.py create mode 100644 connectivity/workben/postgresql/main.py create mode 100644 connectivity/workben/postgresql/makefile.mk create mode 100644 connectivity/workben/postgresql/metadata.py create mode 100644 connectivity/workben/postgresql/preparedstatement.py create mode 100644 connectivity/workben/postgresql/sdbcx.py create mode 100644 connectivity/workben/postgresql/statement.py (limited to 'connectivity/workben') diff --git a/connectivity/workben/postgresql/ddl.py b/connectivity/workben/postgresql/ddl.py new file mode 100644 index 000000000000..4c1c5e2c5275 --- /dev/null +++ b/connectivity/workben/postgresql/ddl.py @@ -0,0 +1,185 @@ +#************************************************************************* +# +# $RCSfile: ddl.py,v $ +# +# $Revision: 1.1.2.5 $ +# +# last change: $Author: jbu $ $Date: 2007/01/07 13:50:38 $ +# +# The Contents of this file are made available subject to the terms of +# either of the following licenses +# +# - GNU Lesser General Public License Version 2.1 +# - Sun Industry Standards Source License Version 1.1 +# +# Sun Microsystems Inc., October, 2000 +# +# GNU Lesser General Public License Version 2.1 +# ============================================= +# Copyright 2000 by Sun Microsystems, Inc. +# 901 San Antonio Road, Palo Alto, CA 94303, USA +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1, as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, +# MA 02111-1307 USA +# +# +# Sun Industry Standards Source License Version 1.1 +# ================================================= +# The contents of this file are subject to the Sun Industry Standards +# Source License Version 1.1 (the "License"); You may not use this file +# except in compliance with the License. You may obtain a copy of the +# License at http://www.openoffice.org/license.html. +# +# Software provided under this License is provided on an "AS IS" basis, +# WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, +# WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS, +# MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING. +# See the License for the specific provisions governing your rights and +# obligations concerning the Software. +# +# The Initial Developer of the Original Code is: Ralph Thomas +# +# Copyright: 2000 by Sun Microsystems, Inc. +# +# All Rights Reserved. +# +# Contributor(s): Ralph Thomas, Joerg Budischewski +# +#************************************************************************* +from com.sun.star.sdbc import SQLException +import sys + +def dumpResultSet( rs ): + meta = rs.getMetaData() + for i in range(1, meta.getColumnCount()+1): + sys.stdout.write(meta.getColumnName( i ) + "\t") + sys.stdout.write( "\n" ) + while rs.next(): + for i in range( 1, meta.getColumnCount()+1): + sys.stdout.write( rs.getString( i ) + "\t" ) + sys.stdout.write( "\n" ) + rs.beforeFirst() + + + +def executeIgnoringException( stmt, sql ): + try: + stmt.executeUpdate(sql) + except SQLException: + pass + +def cleanGroupsAndUsers( stmt ): + rs = stmt.executeQuery("SELECT groname FROM pg_group WHERE groname LIKE 'pqsdbc_%'" ) + stmt2 = stmt.getConnection().createStatement() + while rs.next(): + stmt2.executeUpdate("DROP GROUP " + rs.getString(1) ) + + rs.close() + rs = stmt.executeQuery( "SELECT usename FROM pg_user WHERE usename LIKE 'pqsdbc_%'" ) + while rs.next(): + stmt2.executeUpdate( "DROP USER " + rs.getString(1) ) + + + + +def executeDDLs( connection ): + + stmt = connection.createStatement() + + + executeIgnoringException( stmt, "DROP VIEW customer2" ) + executeIgnoringException( stmt, "DROP TABLE orderpos" ) + executeIgnoringException( stmt, "DROP TABLE ordertab" ) + executeIgnoringException( stmt, "DROP TABLE product" ) + executeIgnoringException( stmt, "DROP TABLE customer" ) + executeIgnoringException( stmt, "DROP TABLE blub" ) + executeIgnoringException( stmt, "DROP TABLE foo" ) + executeIgnoringException( stmt, "DROP TABLE nooid" ) + executeIgnoringException( stmt, "DROP TABLE nooid2" ) + cleanGroupsAndUsers( stmt ) + executeIgnoringException( stmt, "DROP DOMAIN pqsdbc_short" ) + executeIgnoringException( stmt, "DROP DOMAIN pqsdbc_amount" ) + executeIgnoringException( stmt, "DROP SCHEMA pqsdbc_test" ) + + ddls = ( + "BEGIN", + "CREATE DOMAIN pqsdbc_short AS int2", + "CREATE DOMAIN pqsdbc_amount AS integer", + "CREATE USER pqsdbc_joe", + "CREATE USER pqsdbc_susy", + "CREATE USER pqsdbc_boss", + "CREATE USER pqsdbc_customer", # technical user (e.g. a webfrontend) + "CREATE GROUP pqsdbc_employees WITH USER pqsdbc_joe,pqsdbc_susy", + "CREATE GROUP pqsdbc_admin WITH USER pqsdbc_susy,pqsdbc_boss", + "CREATE SCHEMA pqsdbc_test", + "CREATE TABLE customer ( "+ + "id char(8) UNIQUE PRIMARY KEY, "+ + "name text, " + + "dummySerial serial UNIQUE) WITH OIDS", + "COMMENT ON TABLE customer IS 'contains customer attributes'", + "COMMENT ON COLUMN customer.id IS 'unique id'", + "CREATE TABLE product ("+ + "id char(8) UNIQUE PRIMARY KEY,"+ + "name text,"+ + "price numeric(10,2),"+ + "image bytea) WITH OIDS", + + "CREATE TABLE ordertab ( "+ + "id char(8) UNIQUE PRIMARY KEY,"+ + "customerid char(8) CONSTRAINT cust REFERENCES customer(id) ON DELETE CASCADE ON UPDATE RESTRICT,"+ + "orderdate char(8),"+ + "delivered boolean ) WITH OIDS", + "CREATE TABLE orderpos ( "+ + "orderid char(8) REFERENCES ordertab(id),"+ + "id char(3),"+ + "productid char(8) REFERENCES product(id),"+ + "amount pqsdbc_amount,"+ + "shortamount pqsdbc_short,"+ + "PRIMARY KEY (orderid,id)) WITH OIDS", + "CREATE TABLE nooid ("+ + "id char(8) UNIQUE PRIMARY KEY,"+ + "name text) "+ + "WITHOUT OIDS", + "CREATE TABLE nooid2 ("+ + "id serial UNIQUE PRIMARY KEY,"+ + "name text) "+ + "WITHOUT OIDS", + "CREATE VIEW customer2 AS SELECT id,name FROM customer", + "GRANT SELECT ON TABLE customer,product,orderpos,ordertab TO pqsdbc_customer", + "GRANT SELECT ON TABLE product TO GROUP pqsdbc_employees", + "GRANT SELECT,UPDATE, INSERT ON TABLE customer TO GROUP pqsdbc_employees", + "GRANT ALL ON TABLE orderpos,ordertab TO GROUP pqsdbc_employees, GROUP pqsdbc_admin", + "GRANT ALL ON TABLE customer TO GROUP pqsdbc_admin", # the admin is allowed to delete customers + "GRANT ALL ON TABLE product TO pqsdbc_boss", # only the boss may change the product table + "INSERT INTO public.customer VALUES ('C1','John Doe')", + "INSERT INTO \"public\" . \"customer\" VALUES ('C2','Bruce Springsteen')", + + "INSERT INTO \"public\".product VALUES ('PZZ2','Pizza Mista',6.95,'\\003foo\\005')", + "INSERT INTO product VALUES ('PZZ5','Pizza Funghi',5.95,'\\001foo\\005')", + "INSERT INTO product VALUES ('PAS1','Lasagne',5.49,NULL)", + + "INSERT INTO ordertab VALUES ( '1', 'C2', '20030403','true')", + "INSERT INTO ordertab VALUES ( '2', 'C1', '20030402','false')", + + "INSERT INTO orderpos VALUES ( '1','001', 'PZZ2',2,0)", + "INSERT INTO orderpos VALUES ( '1','002', 'PZZ5',3,-1)", + "INSERT INTO orderpos VALUES ( '2','001', 'PAS1',5,1)", + "INSERT INTO orderpos VALUES ( '2','002', 'PZZ2',3,2)", + "COMMIT" ) + for i in ddls: + stmt.executeUpdate(i) + + connection.getTables() # force refresh of metadata + + stmt.close() diff --git a/connectivity/workben/postgresql/main.py b/connectivity/workben/postgresql/main.py new file mode 100644 index 000000000000..800dabc9d465 --- /dev/null +++ b/connectivity/workben/postgresql/main.py @@ -0,0 +1,90 @@ +#************************************************************************* +# +# $RCSfile: main.py,v $ +# +# $Revision: 1.1.2.2 $ +# +# last change: $Author: jbu $ $Date: 2004/05/09 20:04:59 $ +# +# The Contents of this file are made available subject to the terms of +# either of the following licenses +# +# - GNU Lesser General Public License Version 2.1 +# - Sun Industry Standards Source License Version 1.1 +# +# Sun Microsystems Inc., October, 2000 +# +# GNU Lesser General Public License Version 2.1 +# ============================================= +# Copyright 2000 by Sun Microsystems, Inc. +# 901 San Antonio Road, Palo Alto, CA 94303, USA +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1, as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, +# MA 02111-1307 USA +# +# +# Sun Industry Standards Source License Version 1.1 +# ================================================= +# The contents of this file are subject to the Sun Industry Standards +# Source License Version 1.1 (the "License"); You may not use this file +# except in compliance with the License. You may obtain a copy of the +# License at http://www.openoffice.org/license.html. +# +# Software provided under this License is provided on an "AS IS" basis, +# WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, +# WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS, +# MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING. +# See the License for the specific provisions governing your rights and +# obligations concerning the Software. +# +# The Initial Developer of the Original Code is: Joerg Budischewski +# +# Copyright: 2000 by Sun Microsystems, Inc. +# +# All Rights Reserved. +# +# Contributor(s): Joerg Budischewski +# +# +# +#************************************************************************* +import uno +import unohelper +import unittest +import statement +import preparedstatement +import metadata +import sdbcx +import sys +import os + +ctx = uno.getComponentContext() + +# needed for the tests +unohelper.addComponentsToContext( + ctx,ctx, + ("postgresql-sdbc.uno","postgresql-sdbc-impl.uno","typeconverter.uno"), + "com.sun.star.loader.SharedLibrary") + +runner = unittest.TextTestRunner(sys.stderr,1,2) +dburl = sys.argv[1] # os.environ['USER'] + "_pqtest" +print "dburl=" + dburl + +suite = unittest.TestSuite() +suite.addTest(statement.suite(ctx,dburl)) +suite.addTest(preparedstatement.suite(ctx,dburl)) +suite.addTest(metadata.suite(ctx,dburl)) +suite.addTest(sdbcx.suite(ctx,dburl)) + +runner.run(suite) diff --git a/connectivity/workben/postgresql/makefile.mk b/connectivity/workben/postgresql/makefile.mk new file mode 100644 index 000000000000..82ec2b65f7bb --- /dev/null +++ b/connectivity/workben/postgresql/makefile.mk @@ -0,0 +1,103 @@ +#************************************************************************* +# +# $RCSfile: makefile.mk,v $ +# +# $Revision: 1.1.2.2 $ +# +# last change: $Author: jbu $ $Date: 2004/05/09 20:04:59 $ +# +# The Contents of this file are made available subject to the terms of +# either of the following licenses +# +# - GNU Lesser General Public License Version 2.1 +# - Sun Industry Standards Source License Version 1.1 +# +# Sun Microsystems Inc., October, 2000 +# +# GNU Lesser General Public License Version 2.1 +# ============================================= +# Copyright 2000 by Sun Microsystems, Inc. +# 901 San Antonio Road, Palo Alto, CA 94303, USA +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1, as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, +# MA 02111-1307 USA +# +# +# Sun Industry Standards Source License Version 1.1 +# ================================================= +# The contents of this file are subject to the Sun Industry Standards +# Source License Version 1.1 (the "License"); You may not use this file +# except in compliance with the License. You may obtain a copy of the +# License at http://www.openoffice.org/license.html. +# +# Software provided under this License is provided on an "AS IS" basis, +# WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, +# WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS, +# MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING. +# See the License for the specific provisions governing your rights and +# obligations concerning the Software. +# +# The Initial Developer of the Original Code is: Sun Microsystems, Inc. +# +# Copyright: 2000 by Sun Microsystems, Inc. +# +# All Rights Reserved. +# +# Contributor(s): _______________________________________ +# +# +# +#************************************************************************* + +PRJ=..$/.. + +PRJNAME=connectivity +TARGET=postgresql-test +LIBTARGET=NO +TARGETTYPE=CUI +ENABLE_EXCEPTIONS=TRUE + +# --- Settings ----------------------------------------------------- + +.INCLUDE : settings.mk +.INCLUDE : sv.mk +# --- Files -------------------------------------------------------- + + +PYFILES = \ + $(DLLDEST)$/statement.py \ + $(DLLDEST)$/preparedstatement.py \ + $(DLLDEST)$/main.py \ + $(DLLDEST)$/ddl.py \ + $(DLLDEST)$/sdbcx.py \ + $(DLLDEST)$/metadata.py + +ALL : \ + $(PYFILES) \ + doc + +.INCLUDE : target.mk + +$(DLLDEST)$/%.py: %.py + +cp $? $@ + + +.PHONY doc: + @echo "start test with dmake runtest dburl=your-url" + @echo " e.g. dmake runtest dburl=sdbc:postgresql:dbname=pqtest" + @echo " MUST: Create a separate datbases before (here pqtest)," + @echo " (SOME TABLES GET DROPPED)" + +runtest : ALL + +cd $(DLLDEST) && python main.py "$(dburl)" diff --git a/connectivity/workben/postgresql/metadata.py b/connectivity/workben/postgresql/metadata.py new file mode 100644 index 000000000000..948567588485 --- /dev/null +++ b/connectivity/workben/postgresql/metadata.py @@ -0,0 +1,151 @@ +#************************************************************************* +# +# $RCSfile: metadata.py,v $ +# +# $Revision: 1.1.2.4 $ +# +# last change: $Author: jbu $ $Date: 2006/05/27 11:33:11 $ +# +# The Contents of this file are made available subject to the terms of +# either of the following licenses +# +# - GNU Lesser General Public License Version 2.1 +# - Sun Industry Standards Source License Version 1.1 +# +# Sun Microsystems Inc., October, 2000 +# +# GNU Lesser General Public License Version 2.1 +# ============================================= +# Copyright 2000 by Sun Microsystems, Inc. +# 901 San Antonio Road, Palo Alto, CA 94303, USA +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1, as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, +# MA 02111-1307 USA +# +# +# Sun Industry Standards Source License Version 1.1 +# ================================================= +# The contents of this file are subject to the Sun Industry Standards +# Source License Version 1.1 (the "License"); You may not use this file +# except in compliance with the License. You may obtain a copy of the +# License at http://www.openoffice.org/license.html. +# +# Software provided under this License is provided on an "AS IS" basis, +# WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, +# WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS, +# MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING. +# See the License for the specific provisions governing your rights and +# obligations concerning the Software. +# +# The Initial Developer of the Original Code is: Ralph Thomas +# +# Copyright: 2000 by Sun Microsystems, Inc. +# +# All Rights Reserved. +# +# Contributor(s): Ralph Thomas, Joerg Budischewski +# +#************************************************************************* +import unittest +import sys +import ddl + +from com.sun.star.sdbc.DataType import SMALLINT, INTEGER, BIGINT , DATE, TIME, TIMESTAMP, NUMERIC + +def dumpResultSet( rs , count ): +# for i in range(1, count): +# sys.stdout.write(meta.getColumnName( i ) + "\t") + sys.stdout.write( "\n" ) + while rs.next(): + for i in range( 1, count+1): + sys.stdout.write( rs.getString( i ) + "\t" ) + sys.stdout.write( "\n" ) + rs.beforeFirst() + + + + +def suite(ctx,dburl): + suite = unittest.TestSuite() + suite.addTest(TestCase("testDatabaseMetaData",ctx,dburl)) + suite.addTest(TestCase("testTypeGuess",ctx,dburl)) + return suite + +class TestCase(unittest.TestCase): + def __init__(self,method,ctx,dburl): + unittest.TestCase.__init__(self,method) + self.ctx = ctx + self.dburl = dburl + + + def setUp( self ): + self.driver = self.ctx.ServiceManager.createInstanceWithContext( + 'org.openoffice.comp.connectivity.pq.Driver' , self.ctx ) + self.connection = self.driver.connect( self.dburl, () ) + ddl.executeDDLs( self.connection ) + + def tearDown( self ): + self.connection.close() + + def testDatabaseMetaData( self ): + meta = self.connection.getMetaData() + + rs = meta.getTables( None, "public", "%", () ) +# dumpResultSet( rs, 5) + + rs = meta.getColumns( None, "%", "customer", "%" ) +# dumpResultSet( rs, 18 ) + + rs = meta.getPrimaryKeys( None, "public" , "%" ) +# dumpResultSet( rs , 6 ) + rs = meta.getTablePrivileges( None, "public" , "%" ) +# dumpResultSet( rs , 7 ) + rs = meta.getColumns( None, "public" , "customer", "%" ) +# dumpResultSet( rs , 18 ) + rs = meta.getTypeInfo() +# dumpResultSet(rs, 18) + while rs.next(): + if rs.getString(1) == "pqsdbc_short": + self.failUnless( rs.getInt(2) == SMALLINT ) + break + self.failUnless( not rs.isAfterLast() ) # domain type cannot be found + + + rs = meta.getIndexInfo( None, "public" , "customer", False, False ) +# dumpResultSet( rs, 13 ) + + def testTypeGuess( self ): + stmt = self.connection.createStatement() + rs = stmt.executeQuery( "SELECT sum(amount) FROM orderpos" ) + meta = rs.getMetaData() + self.failUnless( meta.getColumnType(1) == BIGINT ) + + stmt = self.connection.createStatement() + rs = stmt.executeQuery( "SELECT sum(price) FROM product" ) + meta = rs.getMetaData() + self.failUnless( meta.getColumnType(1) == NUMERIC ) + + rs = stmt.executeQuery( "SELECT max(ttime) FROM firsttable" ) + meta = rs.getMetaData() + self.failUnless( meta.getColumnType(1) == TIME ) + + rs = stmt.executeQuery( "SELECT max(tdate) FROM firsttable" ) + meta = rs.getMetaData() + self.failUnless( meta.getColumnType(1) == DATE ) + + rs = stmt.executeQuery( "SELECT max(ttimestamp) FROM firsttable" ) + meta = rs.getMetaData() + self.failUnless( meta.getColumnType(1) == TIMESTAMP ) +# rs.next() +# print rs.getString( 1 ) diff --git a/connectivity/workben/postgresql/preparedstatement.py b/connectivity/workben/postgresql/preparedstatement.py new file mode 100644 index 000000000000..d049c9f6f5c3 --- /dev/null +++ b/connectivity/workben/postgresql/preparedstatement.py @@ -0,0 +1,228 @@ +#************************************************************************* +# +# $RCSfile: preparedstatement.py,v $ +# +# $Revision: 1.1.2.9 $ +# +# last change: $Author: jbu $ $Date: 2008/07/07 21:37:11 $ +# +# The Contents of this file are made available subject to the terms of +# either of the following licenses +# +# - GNU Lesser General Public License Version 2.1 +# - Sun Industry Standards Source License Version 1.1 +# +# Sun Microsystems Inc., October, 2000 +# +# GNU Lesser General Public License Version 2.1 +# ============================================= +# Copyright 2000 by Sun Microsystems, Inc. +# 901 San Antonio Road, Palo Alto, CA 94303, USA +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1, as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, +# MA 02111-1307 USA +# +# +# Sun Industry Standards Source License Version 1.1 +# ================================================= +# The contents of this file are subject to the Sun Industry Standards +# Source License Version 1.1 (the "License"); You may not use this file +# except in compliance with the License. You may obtain a copy of the +# License at http://www.openoffice.org/license.html. +# +# Software provided under this License is provided on an "AS IS" basis, +# WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, +# WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS, +# MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING. +# See the License for the specific provisions governing your rights and +# obligations concerning the Software. +# +# The Initial Developer of the Original Code is: Ralph Thomas +# +# Copyright: 2000 by Sun Microsystems, Inc. +# +# All Rights Reserved. +# +# Contributor(s): Ralph Thomas, Joerg Budischewski +# +#************************************************************************* +import unittest +import sys +import ddl +from uno import ByteSequence +from com.sun.star.sdbc import SQLException +from com.sun.star.sdbc.ResultSetConcurrency import UPDATABLE +from com.sun.star.sdbc.DataType import NUMERIC,VARCHAR + +def suite(ctx,dburl): + suite = unittest.TestSuite() + suite.addTest(TestCase("testQuery",ctx,dburl)) + suite.addTest(TestCase("testGeneratedResultSet",ctx,dburl)) + suite.addTest(TestCase("testUpdateableResultSet",ctx,dburl)) + suite.addTest(TestCase("testQuoteQuote",ctx,dburl)) + return suite + +def realEquals( a,b,eps ): + val = a - b + if val < 0: + val = -1. * val + return val < eps + +class TestCase(unittest.TestCase): + def __init__(self,method,ctx,dburl): + unittest.TestCase.__init__(self,method) + self.ctx = ctx + self.dburl = dburl + + def setUp(self): + self.driver = self.ctx.ServiceManager.createInstanceWithContext( + 'org.openoffice.comp.connectivity.pq.Driver', self.ctx ) + self.connection = self.driver.connect( self.dburl, () ) + ddl.executeDDLs( self.connection ) + + def testDown( self ): + self.connection.close() + + def testQuery( self ): + + stmts = "SELECT product.id FROM product WHERE product.price > :lowprice AND product.price < :upprice", \ + "SELECT product.id FROM product WHERE product.price > ? AND product.price < ?" , \ + "SELECT \"product\".\"id\" FROM product WHERE \"product\".\"price\" > :lowprice AND \"product\".\"price\" < :upprice" + + + for stmt in stmts: + prepstmt = self.connection.prepareStatement( stmt ) + prepstmt.setDouble( 1, 5.80 ) + prepstmt.setObjectWithInfo( 2, 7. , NUMERIC, 2) + prepstmt.setObjectWithInfo( 2, "7.0000", NUMERIC, 2 ) + rs = prepstmt.executeQuery( ) + self.failUnless( rs.getMetaData().getColumnCount() == 1 ) + self.failUnless( rs.getMetaData().getColumnName(1) == "id") + self.failUnless( prepstmt.getMetaData().getColumnCount() == 1 ) + self.failUnless( prepstmt.getMetaData().getColumnName(1) == "id" ) + self.failUnless( rs.next() ) + self.failUnless( rs.getString( 1 ).strip() == "PZZ2" ) + self.failUnless( rs.next() ) + self.failUnless( rs.getString( 1 ).strip() == "PZZ5" ) + self.failUnless( rs.isLast() ) + + prepstmt = self.connection.prepareStatement( + "SELECT name FROM product WHERE id = ?" ) + prepstmt.setString( 1, 'PZZ2' ) + rs = prepstmt.executeQuery() + self.failUnless( rs.next() ) + self.failUnless( rs.getString( 1 ) == "Pizza Mista" ) + self.failUnless( rs.isLast() ) + + prepstmt = self.connection.prepareStatement( + "SELECT name FROM product WHERE image = ?" ) + prepstmt.setBytes( 1, ByteSequence( "\001foo\005" ) ) + rs = prepstmt.executeQuery() + self.failUnless( rs.next() ) + self.failUnless( rs.getString( 1 ) == "Pizza Funghi" ) + self.failUnless( rs.isLast() ) + + prepstmt = self.connection.prepareStatement( + "SELECT * FROM ordertab WHERE delivered = ?" ) + prepstmt.setBoolean( 1 , False ) + rs = prepstmt.executeQuery() + self.failUnless( rs.next() ) + self.failUnless( rs.getString( 1 ).strip() == "2" ) + self.failUnless( rs.isLast() ) + + stmt = self.connection.createStatement() + rs = stmt.executeQuery( "SELECT * FROM \"public\".\"customer\"" ) + + stmt.executeUpdate( "DELETE FROM product where id='PAS5'" ) + prepstmt =self.connection.prepareStatement( + "INSERT INTO product VALUES(?,'Ravioli',?,NULL)" ); + prepstmt.setObjectWithInfo( 1, "PAS5" ,VARCHAR,0) + prepstmt.setObjectWithInfo( 2, "9.223" ,NUMERIC,2) + prepstmt.executeUpdate() + rs= stmt.executeQuery( "SELECT price FROM product WHERE id = 'PAS5'" ) + self.failUnless( rs.next() ) + self.failUnless( rs.getString( 1 ).strip() == "9.22" ) + + stmt.executeUpdate( "DELETE FROM product where id='PAS5'" ) + prepstmt =self.connection.prepareStatement( + "INSERT INTO product VALUES('PAS5','Ravioli',?,NULL)" ); + prepstmt.setObjectWithInfo( 1, 9.223,NUMERIC,2 ) + prepstmt.executeUpdate() + rs= stmt.executeQuery( "SELECT price FROM product WHERE id = 'PAS5'" ) + self.failUnless( rs.next() ) + self.failUnless( rs.getString( 1 ).strip() == "9.22" ) + + def testGeneratedResultSet( self ): + prepstmt = self.connection.prepareStatement( + "INSERT INTO customer VALUES( ?, ? )" ) + prepstmt.setString( 1, "C3" ) + prepstmt.setString( 2, "Norah Jones" ) + prepstmt.executeUpdate() + rs = prepstmt.getGeneratedValues() + self.failUnless( rs.next() ) + self.failUnless( rs.getInt( 3 ) == 3 ) + + prepstmt = self.connection.prepareStatement( + "INSERT INTO public.nooid (id,name) VALUES( ?, ? )" ) + prepstmt.setString( 1, "C3" ) + prepstmt.setString( 2, "Norah Jones" ) + prepstmt.executeUpdate() + rs = prepstmt.getGeneratedValues() + self.failUnless( rs.next() ) + self.failUnless( rs.getString(1).rstrip() == "C3" ) + + prepstmt = self.connection.prepareStatement( + "INSERT INTO public.nooid2 (name) VALUES( ? )" ) + prepstmt.setString( 1, "Norah Jones" ) + prepstmt.executeUpdate() + rs = prepstmt.getGeneratedValues() + self.failUnless( rs ) + self.failUnless( rs.next() ) + self.failUnless( rs.getString(2) == "Norah Jones" ) + self.failUnless( rs.getString(1) == "1" ) + + def testUpdateableResultSet( self ): + stmt = self.connection.createStatement() + stmt.ResultSetConcurrency = UPDATABLE + rs = stmt.executeQuery( "SELECT * FROM orderpos" ) +# ddl.dumpResultSet( rs ) + rs.next() + rs.deleteRow() + rs.next() + rs.updateInt( 4 , 32 ) + rs.updateRow() + + rs.moveToInsertRow() + rs.updateString( 1 , '2' ) + rs.updateString( 2, '003' ) + rs.updateString( 3, 'PZZ5' ) + rs.updateInt( 4, 22 ) + rs.insertRow() + + rs = stmt.executeQuery( "SELECT * FROM orderpos" ) + rs = stmt.executeQuery( "SELECT * FROM \"public\".\"orderpos\"" ) +# ddl.dumpResultSet( rs ) + + def testQuoteQuote( self ): + stmt = self.connection.prepareStatement( "select 'foo''l'" ) + rs = stmt.executeQuery() + self.failUnless( rs ) + self.failUnless( rs.next() ) + self.failUnless( rs.getString(1) == "foo'l" ) + + stmt = self.connection.prepareStatement( "select 'foo''''l'" ) + rs = stmt.executeQuery() + self.failUnless( rs ) + self.failUnless( rs.next() ) + self.failUnless( rs.getString(1) == "foo''l" ) diff --git a/connectivity/workben/postgresql/sdbcx.py b/connectivity/workben/postgresql/sdbcx.py new file mode 100644 index 000000000000..b864efc63ebb --- /dev/null +++ b/connectivity/workben/postgresql/sdbcx.py @@ -0,0 +1,306 @@ +#************************************************************************* +# +# $RCSfile: sdbcx.py,v $ +# +# $Revision: 1.1.2.6 $ +# +# last change: $Author: jbu $ $Date: 2007/01/07 13:50:38 $ +# +# The Contents of this file are made available subject to the terms of +# either of the following licenses +# +# - GNU Lesser General Public License Version 2.1 +# - Sun Industry Standards Source License Version 1.1 +# +# Sun Microsystems Inc., October, 2000 +# +# GNU Lesser General Public License Version 2.1 +# ============================================= +# Copyright 2000 by Sun Microsystems, Inc. +# 901 San Antonio Road, Palo Alto, CA 94303, USA +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1, as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, +# MA 02111-1307 USA +# +# +# Sun Industry Standards Source License Version 1.1 +# ================================================= +# The contents of this file are subject to the Sun Industry Standards +# Source License Version 1.1 (the "License"); You may not use this file +# except in compliance with the License. You may obtain a copy of the +# License at http://www.openoffice.org/license.html. +# +# Software provided under this License is provided on an "AS IS" basis, +# WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, +# WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS, +# MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING. +# See the License for the specific provisions governing your rights and +# obligations concerning the Software. +# +# The Initial Developer of the Original Code is: Joerg Budischewski +# +# Copyright: 2000 by Sun Microsystems, Inc. +# +# All Rights Reserved. +# +# Contributor(s): Joerg Budischewski +# +# +# +#************************************************************************* +import unittest +import ddl +import unohelper +import sys +from com.sun.star.sdbc import SQLException +from com.sun.star.sdbc.DataType import VARCHAR, CHAR, DECIMAL, DOUBLE, BIGINT, NUMERIC +from com.sun.star.sdbc.ColumnValue import NO_NULLS, NULLABLE +from com.sun.star.sdbcx.KeyType import PRIMARY, FOREIGN, UNIQUE +from com.sun.star.sdbc.KeyRule import RESTRICT, CASCADE, NO_ACTION + +def suite(ctx,dburl): + suite = unittest.TestSuite() + suite.addTest(TestCase("testTables",ctx,dburl)) + suite.addTest(TestCase("testViews",ctx,dburl)) + suite.addTest(TestCase("testKeys",ctx,dburl)) + suite.addTest(TestCase("testUsers",ctx,dburl)) + suite.addTest(TestCase("testIndexes",ctx,dburl)) + return suite + +def nullable2Str( v ): + if v == NO_NULLS: + return "NOT NULL" + return "" + +def autoIncremtent2Str( v ): + if v: + return "auto increment" + return "" + +def dumpColumns( columns ): + n = columns.getCount() + print "Name\t type\t prec\t scale\t" + for i in range( 0, n ): + col = columns.getByIndex( i ) + print col.Name + "\t "+col.TypeName + "\t " + str(col.Precision) + "\t " + str(col.Scale) + "\t "+\ + str( col.DefaultValue ) + "\t " + str( col.Description ) + "\t " +\ + autoIncremtent2Str( col.IsAutoIncrement ) + "\t " + \ + nullable2Str( col.IsNullable ) + + +class TestCase(unittest.TestCase): + def __init__(self,method,ctx,dburl): + unittest.TestCase.__init__(self,method) + self.ctx = ctx + self.dburl = dburl + + def setUp( self ): + self.driver = self.ctx.ServiceManager.createInstanceWithContext( + 'org.openoffice.comp.connectivity.pq.Driver' , self.ctx ) + self.connection = self.driver.connect( self.dburl, () ) + ddl.executeDDLs( self.connection ) + + def tearDown( self ): + self.connection.close() + + def checkDescriptor( self, descriptor, name, typeName, type, prec, scale, defaultValue, desc ): + self.failUnless( descriptor.Name == name ) + self.failUnless( descriptor.TypeName == typeName ) + self.failUnless( descriptor.Type == type ) + self.failUnless( descriptor.Precision == prec ) + self.failUnless( descriptor.Scale == scale ) +# print descriptor.DefaultValue + " == " + defaultValue +# self.failUnless( descriptor.DefaultValue == defaultValue ) + self.failUnless( descriptor.Description == desc ) + + + def testKeys( self ): + dd = self.driver.getDataDefinitionByConnection( self.connection ) + tables = dd.getTables() + t = tables.getByName( "public.ordertab" ) + keys = t.getKeys() + key = keys.getByName( "cust" ) + self.failUnless( key.Name == "cust" ) + self.failUnless( key.Type == FOREIGN ) + self.failUnless( key.ReferencedTable == "public.customer" ) + self.failUnless( key.UpdateRule == RESTRICT ) + self.failUnless( key.DeleteRule == CASCADE ) + + keycolumns = keys.getByName( "ordertab_pkey" ).getColumns() + self.failUnless( keycolumns.getElementNames() == (u"id",) ) + + key = keys.getByName( "ordertab_pkey" ) + self.failUnless( key.Name == "ordertab_pkey" ) + self.failUnless( key.Type == PRIMARY ) + self.failUnless( key.UpdateRule == NO_ACTION ) + self.failUnless( key.DeleteRule == NO_ACTION ) + + keys = tables.getByName( "public.customer" ).getKeys() + key = keys.getByName( "customer_dummyserial_key" ) + self.failUnless( key.Name == "customer_dummyserial_key" ) + self.failUnless( key.Type == UNIQUE ) + self.failUnless( key.UpdateRule == NO_ACTION ) + self.failUnless( key.DeleteRule == NO_ACTION ) + + keys = tables.getByName( "public.orderpos" ).getKeys() + keyEnum = keys.createEnumeration() + while keyEnum.hasMoreElements(): + key = keyEnum.nextElement() + cols = key.getColumns() + colEnum = cols.createEnumeration() + while colEnum.hasMoreElements(): + col = colEnum.nextElement() + + def testViews( self ): + dd = self.driver.getDataDefinitionByConnection( self.connection ) + views = dd.getViews() + + v = views.getByName( "public.customer2" ) + self.failUnless( v.Name == "customer2" ) + self.failUnless( v.SchemaName == "public" ) + self.failUnless( v.Command != "" ) + + def testIndexes( self ): + dd = self.driver.getDataDefinitionByConnection( self.connection ) + tables = dd.getTables() + t = tables.getByName( "public.ordertab" ) + indexes = t.getIndexes() + index = indexes.getByName( "ordertab_pkey" ) + + self.failUnless( index.Name == "ordertab_pkey" ) + self.failUnless( index.IsPrimaryKeyIndex ) + self.failUnless( index.IsUnique ) + self.failUnless( not index.IsClustered ) + + columns = index.getColumns() + self.failUnless( columns.hasByName( "id" ) ) + + self.failUnless( columns.getByIndex(0).Name == "id" ) + + def checkRenameTable( self, t , tables): + t.rename( "foo" ) + self.failUnless( tables.hasByName( "public.foo" ) ) + + t.rename( "public.foo2" ) + self.failUnless( tables.hasByName( "public.foo2" ) ) + + try: + t.rename( "pqsdbc_test.foo2" ) + self.failUnless( tables.hasByName( "pqsdbc_test.foo2" ) ) + print "looks like a server 8.1 or later (changing a schema succeeded)" + t.rename( "pqsdbc_test.foo" ) + self.failUnless( tables.hasByName( "pqsdbc_test.foo" ) ) + t.rename( "public.foo2" ) + self.failUnless( tables.hasByName( "public.foo2" ) ) + except SQLException,e: + if e.Message.find( "support changing" ) >= 0: + print "looks like a server prior to 8.1 (changing schema failed with Message [" + e.Message.replace("\n", " ") + "])" + else: + raise e + tables.dropByName( "public.foo2" ) + + def testTables( self ): + dd = self.driver.getDataDefinitionByConnection( self.connection ) + tables = dd.getTables() + t = tables.getByName( "public.customer" ) + self.failUnless( t.Name == "customer" ) + self.failUnless( t.SchemaName == "public" ) + self.failUnless( t.Type == "TABLE" ) + + cols = t.getColumns() + self.failUnless( cols.hasByName( 'name' ) ) + self.failUnless( cols.hasByName( 'id' ) ) + col = cols.getByName( "dummyserial" ) +# dumpColumns( cols ) + self.checkDescriptor( cols.getByName( "id" ), "id", "bpchar", CHAR, 8, 0, "", "unique id" ) + self.checkDescriptor( cols.getByName( "name" ), "name", "text", VARCHAR, 0, 0, "", "" ) + + dd = cols.createDataDescriptor() + dd.Name = "foo" + dd.TypeName = "CHAR" + dd.Type = CHAR + dd.Precision = 25 + dd.IsNullable = NULLABLE + cols.appendByDescriptor( dd ) + + dd.Name = "foo2" + dd.TypeName = "DECIMAL" + dd.Type = DECIMAL + dd.Precision = 12 + dd.Scale = 5 + dd.DefaultValue = "2.3423" + dd.Description = "foo2 description" + cols.appendByDescriptor( dd ) + + dd.Name = "cash" + dd.TypeName = "MONEY" + dd.Type = DOUBLE +# dd.IsNullable = NO_NULLS + dd.DefaultValue = "'2.42'" + cols.appendByDescriptor( dd ) + + cols.refresh() + + self.checkDescriptor( cols.getByName( "foo"), "foo", "bpchar", CHAR, 25,0,"","") + self.checkDescriptor( + cols.getByName( "foo2"), "foo2", "numeric", NUMERIC, 12,5,"2.3423","foo2 description") +# dumpColumns( cols ) + + datadesc = tables.createDataDescriptor() + datadesc.SchemaName = "public" + datadesc.Name = "blub" + datadesc.Description = "This describes blub" + + tables.appendByDescriptor( datadesc ) + + # make the appended descriptors known + tables.refresh() + + t = tables.getByName( "public.blub" ) + self.failUnless( t.Name == "blub" ) + self.failUnless( t.SchemaName == "public" ) + self.failUnless( t.Description == "This describes blub" ) + + cols = t.getColumns() + dd = cols.createDataDescriptor() + dd.Name = "mytext" + dd.TypeName = "text" + dd.Type = VARCHAR + dd.IsNullable = NO_NULLS + cols.appendByDescriptor( dd ) + + cols.refresh() + + dd.DefaultValue = "'myDefault'" + dd.Name = "mytext2" + dd.IsNullable = NULLABLE + dd.Description = "mytext-Description" + t.alterColumnByName( "mytext" , dd ) + + cols.refresh() + + self.checkDescriptor( cols.getByName( "mytext2" ), "mytext2", "text", VARCHAR, 0,0,"'myDefault'","mytext-Description" ) + + t = tables.getByName( "public.customer2" ) + self.checkRenameTable( t,tables ) + + t = tables.getByName( "public.blub" ) + self.checkRenameTable( t,tables ) + + + + def testUsers( self ): + dd = self.driver.getDataDefinitionByConnection( self.connection ) + users = dd.getUsers() + self.failUnless( "pqsdbc_joe" in users.getElementNames() ) diff --git a/connectivity/workben/postgresql/statement.py b/connectivity/workben/postgresql/statement.py new file mode 100644 index 000000000000..b4718386e3df --- /dev/null +++ b/connectivity/workben/postgresql/statement.py @@ -0,0 +1,277 @@ +#************************************************************************* +# +# $RCSfile: statement.py,v $ +# +# $Revision: 1.1.2.5 $ +# +# last change: $Author: jbu $ $Date: 2006/05/27 11:33:11 $ +# +# The Contents of this file are made available subject to the terms of +# either of the following licenses +# +# - GNU Lesser General Public License Version 2.1 +# - Sun Industry Standards Source License Version 1.1 +# +# Sun Microsystems Inc., October, 2000 +# +# GNU Lesser General Public License Version 2.1 +# ============================================= +# Copyright 2000 by Sun Microsystems, Inc. +# 901 San Antonio Road, Palo Alto, CA 94303, USA +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1, as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, +# MA 02111-1307 USA +# +# +# Sun Industry Standards Source License Version 1.1 +# ================================================= +# The contents of this file are subject to the Sun Industry Standards +# Source License Version 1.1 (the "License"); You may not use this file +# except in compliance with the License. You may obtain a copy of the +# License at http://www.openoffice.org/license.html. +# +# Software provided under this License is provided on an "AS IS" basis, +# WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, +# WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS, +# MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING. +# See the License for the specific provisions governing your rights and +# obligations concerning the Software. +# +# The Initial Developer of the Original Code is: Joerg Budischewski +# +# Copyright: 2000 by Sun Microsystems, Inc. +# +# All Rights Reserved. +# +# Contributor(s): Joerg Budischewski +# +# +# +#************************************************************************* +import unohelper +import unittest +import ddl +from com.sun.star.sdbc import SQLException, XArray +from com.sun.star.sdbc.DataType import VARCHAR +from com.sun.star.util import Date +from com.sun.star.util import Time +from com.sun.star.util import DateTime + +# todo +class MyArray( unohelper.Base, XArray ): + def __init__( self, data ): + self.data = data + def getBaseType( self ): + return VARCHAR + def getBaseTypeName( self ): + return "varchar" + def getArray( self, foo ): + return self.data + def getArrayAtIndex( self, startIndex, count, foo ): + return self.data[startIndex:startIndex+count-1] + def getResultSet( self, foo): + return None + def getResultSetAtIndex( self, startIndex, count, foo ): + return None + +def suite(ctx,dburl): + suite = unittest.TestSuite() + suite.addTest(TestCase("testRobustness",ctx,dburl)) + suite.addTest(TestCase("testRow",ctx,dburl)) + suite.addTest(TestCase("testNavigation",ctx,dburl)) + suite.addTest(TestCase("testDatabaseMetaData",ctx,dburl)) + suite.addTest(TestCase("testGeneratedResultSet",ctx,dburl)) + suite.addTest(TestCase("testResultSetMetaData",ctx,dburl)) + suite.addTest(TestCase("testArray",ctx,dburl)) + return suite + +def realEquals( a,b,eps ): + val = a - b + if val < 0: + val = -1. * val + return val < eps + +class TestCase(unittest.TestCase): + def __init__(self,method,ctx,dburl): + unittest.TestCase.__init__(self,method) + self.ctx = ctx + self.dburl = dburl + + def setUp(self): + self.driver = self.ctx.ServiceManager.createInstanceWithContext( + 'org.openoffice.comp.connectivity.pq.Driver' , self.ctx ) + self.connection = self.driver.connect( self.dburl, () ) + self.stmt = self.connection.createStatement() + try: + self.stmt.executeUpdate( "DROP TABLE firsttable" ) + except SQLException,e: + pass + + ddls = ( + "BEGIN", + "CREATE TABLE firsttable (tString text,tInteger integer,tShort smallint,tLong bigint,tFloat real,"+ + "tDouble double precision,tByteSeq bytea,tBool boolean, tDate date, tTime time, tTimestamp timestamp, tIntArray integer[], tStringArray text[], tSerial serial ) WITH OIDS", + "INSERT INTO firsttable VALUES ( 'foo', 70000, 12000, 70001, 2.4, 2.45, 'huhu', 'true', '1999-01-08','04:05:06','1999-01-08 04:05:06', '{2,3,4}', '{\"huhu\",\"hi\"}')", + "INSERT INTO firsttable VALUES ( 'foo2', 69999, 12001, 70002, -2.4, 2.55, 'huhu', 'false', '1999-01-08','04:05:06','1999-01-08 04:05:06', NULL , '{\"bla\"}' )", + "INSERT INTO firsttable VALUES ( 'foo2', 69999, 12001, 70002, -2.4, 2.55, 'huhu', null, '1999-01-08', '04:05:06','1999-01-08 04:05:06', '{}' , '{\"bl ubs\",\"bl\\\\\\\\a}}b\\\\\"a\",\"blub\"}' )", + "COMMIT" ) + for i in ddls: + self.stmt.executeUpdate(i) + + def tearDown(self): + self.stmt.close() + self.connection.close() + + def testRow(self): + row = ("foo",70000,12000,70001,2.4,2.45, "huhu", True , + Date(8,1,1999), Time(0,6,5,4),DateTime(0,6,5,4,8,1,1999) ) + row2 = ("foo2",69999,12001,70002,-2.4,2.55, "huhu", False ) + + rs = self.stmt.executeQuery( "SELECT * from firsttable" ) + self.failUnless( rs.next() ) + + self.failUnless( rs.getString(1) == row[0] ) + self.failUnless( rs.getInt(2) == row[1] ) + self.failUnless( rs.getShort(3) == row[2] ) + self.failUnless( rs.getLong(4) == row[3] ) + self.failUnless( realEquals(rs.getFloat(5), row[4], 0.001)) + self.failUnless( realEquals(rs.getDouble(6), row[5], 0.00001)) + self.failUnless( rs.getBytes(7) == row[6] ) + self.failUnless( rs.getBoolean(8) == row[7] ) + self.failUnless( rs.getDate(9) == row[8] ) + self.failUnless( rs.getTime(10) == row[9] ) + self.failUnless( rs.getTimestamp(11) == row[10] ) + + a = rs.getArray(12) + data = a.getArray( None ) + self.failUnless( len( data ) == 3 ) + self.failUnless( int(data[0] ) == 2 ) + self.failUnless( int(data[1] ) == 3 ) + self.failUnless( int(data[2] ) == 4 ) + + self.failUnless( rs.next() ) + + self.failUnless( rs.next() ) + data = rs.getArray(13).getArray(None) + self.failUnless( data[0] == "bl ubs" ) + self.failUnless( data[1] == "bl\\a}}b\"a" ) # check special keys + self.failUnless( data[2] == "blub" ) + + rs.getString(8) + self.failUnless( rs.wasNull() ) + rs.getString(7) + self.failUnless( not rs.wasNull() ) + + self.failUnless( rs.findColumn( "tShort" ) == 3 ) + rs.close() + + def testNavigation( self ): + rs = self.stmt.executeQuery( "SELECT * from firsttable" ) + self.failUnless( rs.isBeforeFirst() ) + self.failUnless( not rs.isAfterLast() ) + self.failUnless( rs.isBeforeFirst() ) + + self.failUnless( rs.next() ) + self.failUnless( rs.isFirst() ) + self.failUnless( not rs.isLast() ) + self.failUnless( not rs.isBeforeFirst() ) + + self.failUnless( rs.next() ) + self.failUnless( rs.next() ) + self.failUnless( not rs.next() ) + self.failUnless( rs.isAfterLast() ) + + rs.absolute( 1 ) + self.failUnless( rs.isFirst() ) + + rs.absolute( 3 ) + self.failUnless( rs.isLast() ) + + rs.relative( -1 ) + self.failUnless( rs.getRow() == 2 ) + + rs.relative( 1 ) + self.failUnless( rs.getRow() == 3 ) + + rs.close() + + def testRobustness( self ): + rs = self.stmt.executeQuery( "SELECT * from firsttable" ) + + self.failUnlessRaises( SQLException, rs.getString , 1 ) + + rs.next() + self.failUnlessRaises( SQLException, rs.getString , 24 ) + self.failUnlessRaises( SQLException, rs.getString , 0 ) + + self.connection.close() + self.failUnlessRaises( SQLException, rs.getString , 1 ) + self.failUnlessRaises( SQLException, self.stmt.executeQuery, "SELECT * from firsttable" ) + rs.close() + + + def testDatabaseMetaData( self ): + meta = self.connection.getMetaData() + + self.failUnless( not meta.isReadOnly() ) + + def testGeneratedResultSet( self ): + self.stmt.executeUpdate( + "INSERT INTO firsttable VALUES ( 'foo3', 69998, 12001, 70002, -2.4, 2.55, 'huhu2')" ) + #ddl.dumpResultSet( self.stmt.getGeneratedValues() ) + rs = self.stmt.getGeneratedValues() + self.failUnless( rs.next() ) + self.failUnless( rs.getInt( 14 ) == 4 ) + + def testResultSetMetaData( self ): + rs = self.stmt.executeQuery( "SELECT * from firsttable" ) + + # just check, if we get results ! + meta = rs.getMetaData() + + count = meta.getColumnCount() + for i in range( 1, count+1): + meta.isNullable( i ) + meta.isCurrency( i ) + meta.isCaseSensitive( i ) + meta.isSearchable( i ) + meta.isSigned( i ) + meta.getColumnDisplaySize( i ) + meta.getColumnName( i ) + meta.getColumnLabel( i ) + meta.getSchemaName( i ) + meta.getPrecision( i ) + meta.getScale( i ) + meta.getTableName( i ) + meta.getColumnTypeName( i ) + meta.getColumnType( i ) + meta.isReadOnly( i ) + meta.isWritable( i ) + meta.isDefinitelyWritable( i ) + meta.getColumnServiceName( i ) + + def testArray( self ): + stmt = self.connection.prepareStatement( + "INSERT INTO firsttable VALUES ( 'insertedArray', 70000, 12000, 70001, 2.4, 2.45, 'huhu', 'true', '1999-01-08','04:05:06','1999-01-08 04:05:06', '{2,3,4}', ? )" ) + myarray = ( "a", "\"c", "}d{" ) + stmt.setArray( 1, MyArray( myarray ) ) + stmt.executeUpdate() + + stmt = self.connection.createStatement() + rs = stmt.executeQuery( "SELECT tStringArray FROM firsttable WHERE tString = 'insertedArray'" ) + rs.next() + data = rs.getArray(1).getArray(None) + self.failUnless( data[0] == myarray[0] ) + self.failUnless( data[1] == myarray[1] ) + self.failUnless( data[2] == myarray[2] ) -- cgit