diff options
author | cinap_lenrek <cinap_lenrek@localhost> | 2011-05-03 15:16:20 +0000 |
---|---|---|
committer | cinap_lenrek <cinap_lenrek@localhost> | 2011-05-03 15:16:20 +0000 |
commit | 5976fdfe42ecdee07df0621d9323c2790b23eb5d (patch) | |
tree | e399aa4b8bb7c6d5d2eb8267cf9a2904370a046a /sys/lib/python/sqlite3 | |
parent | 1665b57e14f8637569e52f8752cc9dd1672a5cfb (diff) |
remove stuff
Diffstat (limited to 'sys/lib/python/sqlite3')
-rw-r--r-- | sys/lib/python/sqlite3/test/__init__.py | 0 | ||||
-rw-r--r-- | sys/lib/python/sqlite3/test/dbapi.py | 732 | ||||
-rw-r--r-- | sys/lib/python/sqlite3/test/factory.py | 164 | ||||
-rw-r--r-- | sys/lib/python/sqlite3/test/hooks.py | 117 | ||||
-rw-r--r-- | sys/lib/python/sqlite3/test/regression.py | 81 | ||||
-rw-r--r-- | sys/lib/python/sqlite3/test/transactions.py | 156 | ||||
-rw-r--r-- | sys/lib/python/sqlite3/test/types.py | 351 | ||||
-rw-r--r-- | sys/lib/python/sqlite3/test/userfunctions.py | 413 |
8 files changed, 0 insertions, 2014 deletions
diff --git a/sys/lib/python/sqlite3/test/__init__.py b/sys/lib/python/sqlite3/test/__init__.py deleted file mode 100644 index e69de29bb..000000000 --- a/sys/lib/python/sqlite3/test/__init__.py +++ /dev/null diff --git a/sys/lib/python/sqlite3/test/dbapi.py b/sys/lib/python/sqlite3/test/dbapi.py deleted file mode 100644 index b08da9c5f..000000000 --- a/sys/lib/python/sqlite3/test/dbapi.py +++ /dev/null @@ -1,732 +0,0 @@ -#-*- coding: ISO-8859-1 -*- -# pysqlite2/test/dbapi.py: tests for DB-API compliance -# -# Copyright (C) 2004-2005 Gerhard Häring <gh@ghaering.de> -# -# This file is part of pysqlite. -# -# This software is provided 'as-is', without any express or implied -# warranty. In no event will the authors be held liable for any damages -# arising from the use of this software. -# -# Permission is granted to anyone to use this software for any purpose, -# including commercial applications, and to alter it and redistribute it -# freely, subject to the following restrictions: -# -# 1. The origin of this software must not be misrepresented; you must not -# claim that you wrote the original software. If you use this software -# in a product, an acknowledgment in the product documentation would be -# appreciated but is not required. -# 2. Altered source versions must be plainly marked as such, and must not be -# misrepresented as being the original software. -# 3. This notice may not be removed or altered from any source distribution. - -import unittest -import threading -import sqlite3 as sqlite - -class ModuleTests(unittest.TestCase): - def CheckAPILevel(self): - self.assertEqual(sqlite.apilevel, "2.0", - "apilevel is %s, should be 2.0" % sqlite.apilevel) - - def CheckThreadSafety(self): - self.assertEqual(sqlite.threadsafety, 1, - "threadsafety is %d, should be 1" % sqlite.threadsafety) - - def CheckParamStyle(self): - self.assertEqual(sqlite.paramstyle, "qmark", - "paramstyle is '%s', should be 'qmark'" % - sqlite.paramstyle) - - def CheckWarning(self): - self.assert_(issubclass(sqlite.Warning, StandardError), - "Warning is not a subclass of StandardError") - - def CheckError(self): - self.failUnless(issubclass(sqlite.Error, StandardError), - "Error is not a subclass of StandardError") - - def CheckInterfaceError(self): - self.failUnless(issubclass(sqlite.InterfaceError, sqlite.Error), - "InterfaceError is not a subclass of Error") - - def CheckDatabaseError(self): - self.failUnless(issubclass(sqlite.DatabaseError, sqlite.Error), - "DatabaseError is not a subclass of Error") - - def CheckDataError(self): - self.failUnless(issubclass(sqlite.DataError, sqlite.DatabaseError), - "DataError is not a subclass of DatabaseError") - - def CheckOperationalError(self): - self.failUnless(issubclass(sqlite.OperationalError, sqlite.DatabaseError), - "OperationalError is not a subclass of DatabaseError") - - def CheckIntegrityError(self): - self.failUnless(issubclass(sqlite.IntegrityError, sqlite.DatabaseError), - "IntegrityError is not a subclass of DatabaseError") - - def CheckInternalError(self): - self.failUnless(issubclass(sqlite.InternalError, sqlite.DatabaseError), - "InternalError is not a subclass of DatabaseError") - - def CheckProgrammingError(self): - self.failUnless(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError), - "ProgrammingError is not a subclass of DatabaseError") - - def CheckNotSupportedError(self): - self.failUnless(issubclass(sqlite.NotSupportedError, - sqlite.DatabaseError), - "NotSupportedError is not a subclass of DatabaseError") - -class ConnectionTests(unittest.TestCase): - def setUp(self): - self.cx = sqlite.connect(":memory:") - cu = self.cx.cursor() - cu.execute("create table test(id integer primary key, name text)") - cu.execute("insert into test(name) values (?)", ("foo",)) - - def tearDown(self): - self.cx.close() - - def CheckCommit(self): - self.cx.commit() - - def CheckCommitAfterNoChanges(self): - """ - A commit should also work when no changes were made to the database. - """ - self.cx.commit() - self.cx.commit() - - def CheckRollback(self): - self.cx.rollback() - - def CheckRollbackAfterNoChanges(self): - """ - A rollback should also work when no changes were made to the database. - """ - self.cx.rollback() - self.cx.rollback() - - def CheckCursor(self): - cu = self.cx.cursor() - - def CheckFailedOpen(self): - YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db" - try: - con = sqlite.connect(YOU_CANNOT_OPEN_THIS) - except sqlite.OperationalError: - return - self.fail("should have raised an OperationalError") - - def CheckClose(self): - self.cx.close() - - def CheckExceptions(self): - # Optional DB-API extension. - self.failUnlessEqual(self.cx.Warning, sqlite.Warning) - self.failUnlessEqual(self.cx.Error, sqlite.Error) - self.failUnlessEqual(self.cx.InterfaceError, sqlite.InterfaceError) - self.failUnlessEqual(self.cx.DatabaseError, sqlite.DatabaseError) - self.failUnlessEqual(self.cx.DataError, sqlite.DataError) - self.failUnlessEqual(self.cx.OperationalError, sqlite.OperationalError) - self.failUnlessEqual(self.cx.IntegrityError, sqlite.IntegrityError) - self.failUnlessEqual(self.cx.InternalError, sqlite.InternalError) - self.failUnlessEqual(self.cx.ProgrammingError, sqlite.ProgrammingError) - self.failUnlessEqual(self.cx.NotSupportedError, sqlite.NotSupportedError) - -class CursorTests(unittest.TestCase): - def setUp(self): - self.cx = sqlite.connect(":memory:") - self.cu = self.cx.cursor() - self.cu.execute("create table test(id integer primary key, name text, income number)") - self.cu.execute("insert into test(name) values (?)", ("foo",)) - - def tearDown(self): - self.cu.close() - self.cx.close() - - def CheckExecuteNoArgs(self): - self.cu.execute("delete from test") - - def CheckExecuteIllegalSql(self): - try: - self.cu.execute("select asdf") - self.fail("should have raised an OperationalError") - except sqlite.OperationalError: - return - except: - self.fail("raised wrong exception") - - def CheckExecuteTooMuchSql(self): - try: - self.cu.execute("select 5+4; select 4+5") - self.fail("should have raised a Warning") - except sqlite.Warning: - return - except: - self.fail("raised wrong exception") - - def CheckExecuteTooMuchSql2(self): - self.cu.execute("select 5+4; -- foo bar") - - def CheckExecuteTooMuchSql3(self): - self.cu.execute(""" - select 5+4; - - /* - foo - */ - """) - - def CheckExecuteWrongSqlArg(self): - try: - self.cu.execute(42) - self.fail("should have raised a ValueError") - except ValueError: - return - except: - self.fail("raised wrong exception.") - - def CheckExecuteArgInt(self): - self.cu.execute("insert into test(id) values (?)", (42,)) - - def CheckExecuteArgFloat(self): - self.cu.execute("insert into test(income) values (?)", (2500.32,)) - - def CheckExecuteArgString(self): - self.cu.execute("insert into test(name) values (?)", ("Hugo",)) - - def CheckExecuteWrongNoOfArgs1(self): - # too many parameters - try: - self.cu.execute("insert into test(id) values (?)", (17, "Egon")) - self.fail("should have raised ProgrammingError") - except sqlite.ProgrammingError: - pass - - def CheckExecuteWrongNoOfArgs2(self): - # too little parameters - try: - self.cu.execute("insert into test(id) values (?)") - self.fail("should have raised ProgrammingError") - except sqlite.ProgrammingError: - pass - - def CheckExecuteWrongNoOfArgs3(self): - # no parameters, parameters are needed - try: - self.cu.execute("insert into test(id) values (?)") - self.fail("should have raised ProgrammingError") - except sqlite.ProgrammingError: - pass - - def CheckExecuteDictMapping(self): - self.cu.execute("insert into test(name) values ('foo')") - self.cu.execute("select name from test where name=:name", {"name": "foo"}) - row = self.cu.fetchone() - self.failUnlessEqual(row[0], "foo") - - def CheckExecuteDictMappingTooLittleArgs(self): - self.cu.execute("insert into test(name) values ('foo')") - try: - self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"}) - self.fail("should have raised ProgrammingError") - except sqlite.ProgrammingError: - pass - - def CheckExecuteDictMappingNoArgs(self): - self.cu.execute("insert into test(name) values ('foo')") - try: - self.cu.execute("select name from test where name=:name") - self.fail("should have raised ProgrammingError") - except sqlite.ProgrammingError: - pass - - def CheckExecuteDictMappingUnnamed(self): - self.cu.execute("insert into test(name) values ('foo')") - try: - self.cu.execute("select name from test where name=?", {"name": "foo"}) - self.fail("should have raised ProgrammingError") - except sqlite.ProgrammingError: - pass - - def CheckClose(self): - self.cu.close() - - def CheckRowcountExecute(self): - self.cu.execute("delete from test") - self.cu.execute("insert into test(name) values ('foo')") - self.cu.execute("insert into test(name) values ('foo')") - self.cu.execute("update test set name='bar'") - self.failUnlessEqual(self.cu.rowcount, 2) - - def CheckRowcountExecutemany(self): - self.cu.execute("delete from test") - self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)]) - self.failUnlessEqual(self.cu.rowcount, 3) - - def CheckTotalChanges(self): - self.cu.execute("insert into test(name) values ('foo')") - self.cu.execute("insert into test(name) values ('foo')") - if self.cx.total_changes < 2: - self.fail("total changes reported wrong value") - - # Checks for executemany: - # Sequences are required by the DB-API, iterators - # enhancements in pysqlite. - - def CheckExecuteManySequence(self): - self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)]) - - def CheckExecuteManyIterator(self): - class MyIter: - def __init__(self): - self.value = 5 - - def next(self): - if self.value == 10: - raise StopIteration - else: - self.value += 1 - return (self.value,) - - self.cu.executemany("insert into test(income) values (?)", MyIter()) - - def CheckExecuteManyGenerator(self): - def mygen(): - for i in range(5): - yield (i,) - - self.cu.executemany("insert into test(income) values (?)", mygen()) - - def CheckExecuteManyWrongSqlArg(self): - try: - self.cu.executemany(42, [(3,)]) - self.fail("should have raised a ValueError") - except ValueError: - return - except: - self.fail("raised wrong exception.") - - def CheckExecuteManySelect(self): - try: - self.cu.executemany("select ?", [(3,)]) - self.fail("should have raised a ProgrammingError") - except sqlite.ProgrammingError: - return - except: - self.fail("raised wrong exception.") - - def CheckExecuteManyNotIterable(self): - try: - self.cu.executemany("insert into test(income) values (?)", 42) - self.fail("should have raised a TypeError") - except TypeError: - return - except Exception, e: - print "raised", e.__class__ - self.fail("raised wrong exception.") - - def CheckFetchIter(self): - # Optional DB-API extension. - self.cu.execute("delete from test") - self.cu.execute("insert into test(id) values (?)", (5,)) - self.cu.execute("insert into test(id) values (?)", (6,)) - self.cu.execute("select id from test order by id") - lst = [] - for row in self.cu: - lst.append(row[0]) - self.failUnlessEqual(lst[0], 5) - self.failUnlessEqual(lst[1], 6) - - def CheckFetchone(self): - self.cu.execute("select name from test") - row = self.cu.fetchone() - self.failUnlessEqual(row[0], "foo") - row = self.cu.fetchone() - self.failUnlessEqual(row, None) - - def CheckFetchoneNoStatement(self): - cur = self.cx.cursor() - row = cur.fetchone() - self.failUnlessEqual(row, None) - - def CheckArraySize(self): - # must default ot 1 - self.failUnlessEqual(self.cu.arraysize, 1) - - # now set to 2 - self.cu.arraysize = 2 - - # now make the query return 3 rows - self.cu.execute("delete from test") - self.cu.execute("insert into test(name) values ('A')") - self.cu.execute("insert into test(name) values ('B')") - self.cu.execute("insert into test(name) values ('C')") - self.cu.execute("select name from test") - res = self.cu.fetchmany() - - self.failUnlessEqual(len(res), 2) - - def CheckFetchmany(self): - self.cu.execute("select name from test") - res = self.cu.fetchmany(100) - self.failUnlessEqual(len(res), 1) - res = self.cu.fetchmany(100) - self.failUnlessEqual(res, []) - - def CheckFetchall(self): - self.cu.execute("select name from test") - res = self.cu.fetchall() - self.failUnlessEqual(len(res), 1) - res = self.cu.fetchall() - self.failUnlessEqual(res, []) - - def CheckSetinputsizes(self): - self.cu.setinputsizes([3, 4, 5]) - - def CheckSetoutputsize(self): - self.cu.setoutputsize(5, 0) - - def CheckSetoutputsizeNoColumn(self): - self.cu.setoutputsize(42) - - def CheckCursorConnection(self): - # Optional DB-API extension. - self.failUnlessEqual(self.cu.connection, self.cx) - - def CheckWrongCursorCallable(self): - try: - def f(): pass - cur = self.cx.cursor(f) - self.fail("should have raised a TypeError") - except TypeError: - return - self.fail("should have raised a ValueError") - - def CheckCursorWrongClass(self): - class Foo: pass - foo = Foo() - try: - cur = sqlite.Cursor(foo) - self.fail("should have raised a ValueError") - except TypeError: - pass - -class ThreadTests(unittest.TestCase): - def setUp(self): - self.con = sqlite.connect(":memory:") - self.cur = self.con.cursor() - self.cur.execute("create table test(id integer primary key, name text, bin binary, ratio number, ts timestamp)") - - def tearDown(self): - self.cur.close() - self.con.close() - - def CheckConCursor(self): - def run(con, errors): - try: - cur = con.cursor() - errors.append("did not raise ProgrammingError") - return - except sqlite.ProgrammingError: - return - except: - errors.append("raised wrong exception") - - errors = [] - t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) - t.start() - t.join() - if len(errors) > 0: - self.fail("\n".join(errors)) - - def CheckConCommit(self): - def run(con, errors): - try: - con.commit() - errors.append("did not raise ProgrammingError") - return - except sqlite.ProgrammingError: - return - except: - errors.append("raised wrong exception") - - errors = [] - t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) - t.start() - t.join() - if len(errors) > 0: - self.fail("\n".join(errors)) - - def CheckConRollback(self): - def run(con, errors): - try: - con.rollback() - errors.append("did not raise ProgrammingError") - return - except sqlite.ProgrammingError: - return - except: - errors.append("raised wrong exception") - - errors = [] - t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) - t.start() - t.join() - if len(errors) > 0: - self.fail("\n".join(errors)) - - def CheckConClose(self): - def run(con, errors): - try: - con.close() - errors.append("did not raise ProgrammingError") - return - except sqlite.ProgrammingError: - return - except: - errors.append("raised wrong exception") - - errors = [] - t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) - t.start() - t.join() - if len(errors) > 0: - self.fail("\n".join(errors)) - - def CheckCurImplicitBegin(self): - def run(cur, errors): - try: - cur.execute("insert into test(name) values ('a')") - errors.append("did not raise ProgrammingError") - return - except sqlite.ProgrammingError: - return - except: - errors.append("raised wrong exception") - - errors = [] - t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) - t.start() - t.join() - if len(errors) > 0: - self.fail("\n".join(errors)) - - def CheckCurClose(self): - def run(cur, errors): - try: - cur.close() - errors.append("did not raise ProgrammingError") - return - except sqlite.ProgrammingError: - return - except: - errors.append("raised wrong exception") - - errors = [] - t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) - t.start() - t.join() - if len(errors) > 0: - self.fail("\n".join(errors)) - - def CheckCurExecute(self): - def run(cur, errors): - try: - cur.execute("select name from test") - errors.append("did not raise ProgrammingError") - return - except sqlite.ProgrammingError: - return - except: - errors.append("raised wrong exception") - - errors = [] - self.cur.execute("insert into test(name) values ('a')") - t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) - t.start() - t.join() - if len(errors) > 0: - self.fail("\n".join(errors)) - - def CheckCurIterNext(self): - def run(cur, errors): - try: - row = cur.fetchone() - errors.append("did not raise ProgrammingError") - return - except sqlite.ProgrammingError: - return - except: - errors.append("raised wrong exception") - - errors = [] - self.cur.execute("insert into test(name) values ('a')") - self.cur.execute("select name from test") - t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) - t.start() - t.join() - if len(errors) > 0: - self.fail("\n".join(errors)) - -class ConstructorTests(unittest.TestCase): - def CheckDate(self): - d = sqlite.Date(2004, 10, 28) - - def CheckTime(self): - t = sqlite.Time(12, 39, 35) - - def CheckTimestamp(self): - ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35) - - def CheckDateFromTicks(self): - d = sqlite.DateFromTicks(42) - - def CheckTimeFromTicks(self): - t = sqlite.TimeFromTicks(42) - - def CheckTimestampFromTicks(self): - ts = sqlite.TimestampFromTicks(42) - - def CheckBinary(self): - b = sqlite.Binary(chr(0) + "'") - -class ExtensionTests(unittest.TestCase): - def CheckScriptStringSql(self): - con = sqlite.connect(":memory:") - cur = con.cursor() - cur.executescript(""" - -- bla bla - /* a stupid comment */ - create table a(i); - insert into a(i) values (5); - """) - cur.execute("select i from a") - res = cur.fetchone()[0] - self.failUnlessEqual(res, 5) - - def CheckScriptStringUnicode(self): - con = sqlite.connect(":memory:") - cur = con.cursor() - cur.executescript(u""" - create table a(i); - insert into a(i) values (5); - select i from a; - delete from a; - insert into a(i) values (6); - """) - cur.execute("select i from a") - res = cur.fetchone()[0] - self.failUnlessEqual(res, 6) - - def CheckScriptErrorIncomplete(self): - con = sqlite.connect(":memory:") - cur = con.cursor() - raised = False - try: - cur.executescript("create table test(sadfsadfdsa") - except sqlite.ProgrammingError: - raised = True - self.failUnlessEqual(raised, True, "should have raised an exception") - - def CheckScriptErrorNormal(self): - con = sqlite.connect(":memory:") - cur = con.cursor() - raised = False - try: - cur.executescript("create table test(sadfsadfdsa); select foo from hurz;") - except sqlite.OperationalError: - raised = True - self.failUnlessEqual(raised, True, "should have raised an exception") - - def CheckConnectionExecute(self): - con = sqlite.connect(":memory:") - result = con.execute("select 5").fetchone()[0] - self.failUnlessEqual(result, 5, "Basic test of Connection.execute") - - def CheckConnectionExecutemany(self): - con = sqlite.connect(":memory:") - con.execute("create table test(foo)") - con.executemany("insert into test(foo) values (?)", [(3,), (4,)]) - result = con.execute("select foo from test order by foo").fetchall() - self.failUnlessEqual(result[0][0], 3, "Basic test of Connection.executemany") - self.failUnlessEqual(result[1][0], 4, "Basic test of Connection.executemany") - - def CheckConnectionExecutescript(self): - con = sqlite.connect(":memory:") - con.executescript("create table test(foo); insert into test(foo) values (5);") - result = con.execute("select foo from test").fetchone()[0] - self.failUnlessEqual(result, 5, "Basic test of Connection.executescript") - -class ClosedTests(unittest.TestCase): - def setUp(self): - pass - - def tearDown(self): - pass - - def CheckClosedConCursor(self): - con = sqlite.connect(":memory:") - con.close() - try: - cur = con.cursor() - self.fail("Should have raised a ProgrammingError") - except sqlite.ProgrammingError: - pass - except: - self.fail("Should have raised a ProgrammingError") - - def CheckClosedConCommit(self): - con = sqlite.connect(":memory:") - con.close() - try: - con.commit() - self.fail("Should have raised a ProgrammingError") - except sqlite.ProgrammingError: - pass - except: - self.fail("Should have raised a ProgrammingError") - - def CheckClosedConRollback(self): - con = sqlite.connect(":memory:") - con.close() - try: - con.rollback() - self.fail("Should have raised a ProgrammingError") - except sqlite.ProgrammingError: - pass - except: - self.fail("Should have raised a ProgrammingError") - - def CheckClosedCurExecute(self): - con = sqlite.connect(":memory:") - cur = con.cursor() - con.close() - try: - cur.execute("select 4") - self.fail("Should have raised a ProgrammingError") - except sqlite.ProgrammingError: - pass - except: - self.fail("Should have raised a ProgrammingError") - -def suite(): - module_suite = unittest.makeSuite(ModuleTests, "Check") - connection_suite = unittest.makeSuite(ConnectionTests, "Check") - cursor_suite = unittest.makeSuite(CursorTests, "Check") - thread_suite = unittest.makeSuite(ThreadTests, "Check") - constructor_suite = unittest.makeSuite(ConstructorTests, "Check") - ext_suite = unittest.makeSuite(ExtensionTests, "Check") - closed_suite = unittest.makeSuite(ClosedTests, "Check") - return unittest.TestSuite((module_suite, connection_suite, cursor_suite, thread_suite, constructor_suite, ext_suite, closed_suite)) - -def test(): - runner = unittest.TextTestRunner() - runner.run(suite()) - -if __name__ == "__main__": - test() diff --git a/sys/lib/python/sqlite3/test/factory.py b/sys/lib/python/sqlite3/test/factory.py deleted file mode 100644 index 8778056af..000000000 --- a/sys/lib/python/sqlite3/test/factory.py +++ /dev/null @@ -1,164 +0,0 @@ -#-*- coding: ISO-8859-1 -*- -# pysqlite2/test/factory.py: tests for the various factories in pysqlite -# -# Copyright (C) 2005 Gerhard Häring <gh@ghaering.de> -# -# This file is part of pysqlite. -# -# This software is provided 'as-is', without any express or implied -# warranty. In no event will the authors be held liable for any damages -# arising from the use of this software. -# -# Permission is granted to anyone to use this software for any purpose, -# including commercial applications, and to alter it and redistribute it -# freely, subject to the following restrictions: -# -# 1. The origin of this software must not be misrepresented; you must not -# claim that you wrote the original software. If you use this software -# in a product, an acknowledgment in the product documentation would be -# appreciated but is not required. -# 2. Altered source versions must be plainly marked as such, and must not be -# misrepresented as being the original software. -# 3. This notice may not be removed or altered from any source distribution. - -import unittest -import sqlite3 as sqlite - -class MyConnection(sqlite.Connection): - def __init__(self, *args, **kwargs): - sqlite.Connection.__init__(self, *args, **kwargs) - -def dict_factory(cursor, row): - d = {} - for idx, col in enumerate(cursor.description): - d[col[0]] = row[idx] - return d - -class MyCursor(sqlite.Cursor): - def __init__(self, *args, **kwargs): - sqlite.Cursor.__init__(self, *args, **kwargs) - self.row_factory = dict_factory - -class ConnectionFactoryTests(unittest.TestCase): - def setUp(self): - self.con = sqlite.connect(":memory:", factory=MyConnection) - - def tearDown(self): - self.con.close() - - def CheckIsInstance(self): - self.failUnless(isinstance(self.con, - MyConnection), - "connection is not instance of MyConnection") - -class CursorFactoryTests(unittest.TestCase): - def setUp(self): - self.con = sqlite.connect(":memory:") - - def tearDown(self): - self.con.close() - - def CheckIsInstance(self): - cur = self.con.cursor(factory=MyCursor) - self.failUnless(isinstance(cur, - MyCursor), - "cursor is not instance of MyCursor") - -class RowFactoryTestsBackwardsCompat(unittest.TestCase): - def setUp(self): - self.con = sqlite.connect(":memory:") - - def CheckIsProducedByFactory(self): - cur = self.con.cursor(factory=MyCursor) - cur.execute("select 4+5 as foo") - row = cur.fetchone() - self.failUnless(isinstance(row, - dict), - "row is not instance of dict") - cur.close() - - def tearDown(self): - self.con.close() - -class RowFactoryTests(unittest.TestCase): - def setUp(self): - self.con = sqlite.connect(":memory:") - - def CheckCustomFactory(self): - self.con.row_factory = lambda cur, row: list(row) - row = self.con.execute("select 1, 2").fetchone() - self.failUnless(isinstance(row, - list), - "row is not instance of list") - - def CheckSqliteRow(self): - self.con.row_factory = sqlite.Row - row = self.con.execute("select 1 as a, 2 as b").fetchone() - self.failUnless(isinstance(row, - sqlite.Row), - "row is not instance of sqlite.Row") - - col1, col2 = row["a"], row["b"] - self.failUnless(col1 == 1, "by name: wrong result for column 'a'") - self.failUnless(col2 == 2, "by name: wrong result for column 'a'") - - col1, col2 = row["A"], row["B"] - self.failUnless(col1 == 1, "by name: wrong result for column 'A'") - self.failUnless(col2 == 2, "by name: wrong result for column 'B'") - - col1, col2 = row[0], row[1] - self.failUnless(col1 == 1, "by index: wrong result for column 0") - self.failUnless(col2 == 2, "by index: wrong result for column 1") - - def tearDown(self): - self.con.close() - -class TextFactoryTests(unittest.TestCase): - def setUp(self): - self.con = sqlite.connect(":memory:") - - def CheckUnicode(self): - austria = unicode("Österreich", "latin1") - row = self.con.execute("select ?", (austria,)).fetchone() - self.failUnless(type(row[0]) == unicode, "type of row[0] must be unicode") - - def CheckString(self): - self.con.text_factory = str - austria = unicode("Österreich", "latin1") - row = self.con.execute("select ?", (austria,)).fetchone() - self.failUnless(type(row[0]) == str, "type of row[0] must be str") - self.failUnless(row[0] == austria.encode("utf-8"), "column must equal original data in UTF-8") - - def CheckCustom(self): - self.con.text_factory = lambda x: unicode(x, "utf-8", "ignore") - austria = unicode("Österreich", "latin1") - row = self.con.execute("select ?", (austria.encode("latin1"),)).fetchone() - self.failUnless(type(row[0]) == unicode, "type of row[0] must be unicode") - self.failUnless(row[0].endswith(u"reich"), "column must contain original data") - - def CheckOptimizedUnicode(self): - self.con.text_factory = sqlite.OptimizedUnicode - austria = unicode("Österreich", "latin1") - germany = unicode("Deutchland") - a_row = self.con.execute("select ?", (austria,)).fetchone() - d_row = self.con.execute("select ?", (germany,)).fetchone() - self.failUnless(type(a_row[0]) == unicode, "type of non-ASCII row must be unicode") - self.failUnless(type(d_row[0]) == str, "type of ASCII-only row must be str") - - def tearDown(self): - self.con.close() - -def suite(): - connection_suite = unittest.makeSuite(ConnectionFactoryTests, "Check") - cursor_suite = unittest.makeSuite(CursorFactoryTests, "Check") - row_suite_compat = unittest.makeSuite(RowFactoryTestsBackwardsCompat, "Check") - row_suite = unittest.makeSuite(RowFactoryTests, "Check") - text_suite = unittest.makeSuite(TextFactoryTests, "Check") - return unittest.TestSuite((connection_suite, cursor_suite, row_suite_compat, row_suite, text_suite)) - -def test(): - runner = unittest.TextTestRunner() - runner.run(suite()) - -if __name__ == "__main__": - test() diff --git a/sys/lib/python/sqlite3/test/hooks.py b/sys/lib/python/sqlite3/test/hooks.py deleted file mode 100644 index 761bdaa6b..000000000 --- a/sys/lib/python/sqlite3/test/hooks.py +++ /dev/null @@ -1,117 +0,0 @@ -#-*- coding: ISO-8859-1 -*- -# pysqlite2/test/hooks.py: tests for various SQLite-specific hooks -# -# Copyright (C) 2006 Gerhard Häring <gh@ghaering.de> -# -# This file is part of pysqlite. -# -# This software is provided 'as-is', without any express or implied -# warranty. In no event will the authors be held liable for any damages -# arising from the use of this software. -# -# Permission is granted to anyone to use this software for any purpose, -# including commercial applications, and to alter it and redistribute it -# freely, subject to the following restrictions: -# -# 1. The origin of this software must not be misrepresented; you must not -# claim that you wrote the original software. If you use this software -# in a product, an acknowledgment in the product documentation would be -# appreciated but is not required. -# 2. Altered source versions must be plainly marked as such, and must not be -# misrepresented as being the original software. -# 3. This notice may not be removed or altered from any source distribution. - -import os, unittest -import sqlite3 as sqlite - -class CollationTests(unittest.TestCase): - def setUp(self): - pass - - def tearDown(self): - pass - - def CheckCreateCollationNotCallable(self): - con = sqlite.connect(":memory:") - try: - con.create_collation("X", 42) - self.fail("should have raised a TypeError") - except TypeError, e: - self.failUnlessEqual(e.args[0], "parameter must be callable") - - def CheckCreateCollationNotAscii(self): - con = sqlite.connect(":memory:") - try: - con.create_collation("collä", cmp) - self.fail("should have raised a ProgrammingError") - except sqlite.ProgrammingError, e: - pass - - def CheckCollationIsUsed(self): - if sqlite.version_info < (3, 2, 1): # old SQLite versions crash on this test - return - def mycoll(x, y): - # reverse order - return -cmp(x, y) - - con = sqlite.connect(":memory:") - con.create_collation("mycoll", mycoll) - sql = """ - select x from ( - select 'a' as x - union - select 'b' as x - union - select 'c' as x - ) order by x collate mycoll - """ - result = con.execute(sql).fetchall() - if result[0][0] != "c" or result[1][0] != "b" or result[2][0] != "a": - self.fail("the expected order was not returned") - - con.create_collation("mycoll", None) - try: - result = con.execute(sql).fetchall() - self.fail("should have raised an OperationalError") - except sqlite.OperationalError, e: - self.failUnlessEqual(e.args[0].lower(), "no such collation sequence: mycoll") - - def CheckCollationRegisterTwice(self): - """ - Register two different collation functions under the same name. - Verify that the last one is actually used. - """ - con = sqlite.connect(":memory:") - con.create_collation("mycoll", cmp) - con.create_collation("mycoll", lambda x, y: -cmp(x, y)) - result = con.execute(""" - select x from (select 'a' as x union select 'b' as x) order by x collate mycoll - """).fetchall() - if result[0][0] != 'b' or result[1][0] != 'a': - self.fail("wrong collation function is used") - - def CheckDeregisterCollation(self): - """ - Register a collation, then deregister it. Make sure an error is raised if we try - to use it. - """ - con = sqlite.connect(":memory:") - con.create_collation("mycoll", cmp) - con.create_collation("mycoll", None) - try: - con.execute("select 'a' as x union select 'b' as x order by x collate mycoll") - self.fail("should have raised an OperationalError") - except sqlite.OperationalError, e: - if not e.args[0].startswith("no such collation sequence"): - self.fail("wrong OperationalError raised") - -def suite(): - collation_suite = unittest.makeSuite(CollationTests, "Check") - return unittest.TestSuite((collation_suite,)) - -def test(): - runner = unittest.TextTestRunner() - runner.run(suite()) - -if __name__ == "__main__": - test() diff --git a/sys/lib/python/sqlite3/test/regression.py b/sys/lib/python/sqlite3/test/regression.py deleted file mode 100644 index c8733b963..000000000 --- a/sys/lib/python/sqlite3/test/regression.py +++ /dev/null @@ -1,81 +0,0 @@ -#-*- coding: ISO-8859-1 -*- -# pysqlite2/test/regression.py: pysqlite regression tests -# -# Copyright (C) 2006 Gerhard Häring <gh@ghaering.de> -# -# This file is part of pysqlite. -# -# This software is provided 'as-is', without any express or implied -# warranty. In no event will the authors be held liable for any damages -# arising from the use of this software. -# -# Permission is granted to anyone to use this software for any purpose, -# including commercial applications, and to alter it and redistribute it -# freely, subject to the following restrictions: -# -# 1. The origin of this software must not be misrepresented; you must not -# claim that you wrote the original software. If you use this software -# in a product, an acknowledgment in the product documentation would be -# appreciated but is not required. -# 2. Altered source versions must be plainly marked as such, and must not be -# misrepresented as being the original software. -# 3. This notice may not be removed or altered from any source distribution. - -import unittest -import sqlite3 as sqlite - -class RegressionTests(unittest.TestCase): - def setUp(self): - self.con = sqlite.connect(":memory:") - - def tearDown(self): - self.con.close() - - def CheckPragmaUserVersion(self): - # This used to crash pysqlite because this pragma command returns NULL for the column name - cur = self.con.cursor() - cur.execute("pragma user_version") - - def CheckPragmaSchemaVersion(self): - # This still crashed pysqlite <= 2.2.1 - con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) - try: - cur = self.con.cursor() - cur.execute("pragma schema_version") - finally: - cur.close() - con.close() - - def CheckStatementReset(self): - # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are - # reset before a rollback, but only those that are still in the - # statement cache. The others are not accessible from the connection object. - con = sqlite.connect(":memory:", cached_statements=5) - cursors = [con.cursor() for x in xrange(5)] - cursors[0].execute("create table test(x)") - for i in range(10): - cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in xrange(10)]) - - for i in range(5): - cursors[i].execute(" " * i + "select x from test") - - con.rollback() - - def CheckColumnNameWithSpaces(self): - cur = self.con.cursor() - cur.execute('select 1 as "foo bar [datetime]"') - self.failUnlessEqual(cur.description[0][0], "foo bar") - - cur.execute('select 1 as "foo baz"') - self.failUnlessEqual(cur.description[0][0], "foo baz") - -def suite(): - regression_suite = unittest.makeSuite(RegressionTests, "Check") - return unittest.TestSuite((regression_suite,)) - -def test(): - runner = unittest.TextTestRunner() - runner.run(suite()) - -if __name__ == "__main__": - test() diff --git a/sys/lib/python/sqlite3/test/transactions.py b/sys/lib/python/sqlite3/test/transactions.py deleted file mode 100644 index 1f0b19aa9..000000000 --- a/sys/lib/python/sqlite3/test/transactions.py +++ /dev/null @@ -1,156 +0,0 @@ -#-*- coding: ISO-8859-1 -*- -# pysqlite2/test/transactions.py: tests transactions -# -# Copyright (C) 2005 Gerhard Häring <gh@ghaering.de> -# -# This file is part of pysqlite. -# -# This software is provided 'as-is', without any express or implied -# warranty. In no event will the authors be held liable for any damages -# arising from the use of this software. -# -# Permission is granted to anyone to use this software for any purpose, -# including commercial applications, and to alter it and redistribute it -# freely, subject to the following restrictions: -# -# 1. The origin of this software must not be misrepresented; you must not -# claim that you wrote the original software. If you use this software -# in a product, an acknowledgment in the product documentation would be -# appreciated but is not required. -# 2. Altered source versions must be plainly marked as such, and must not be -# misrepresented as being the original software. -# 3. This notice may not be removed or altered from any source distribution. - -import os, unittest -import sqlite3 as sqlite - -def get_db_path(): - return "sqlite_testdb" - -class TransactionTests(unittest.TestCase): - def setUp(self): - try: - os.remove(get_db_path()) - except: - pass - - self.con1 = sqlite.connect(get_db_path(), timeout=0.1) - self.cur1 = self.con1.cursor() - - self.con2 = sqlite.connect(get_db_path(), timeout=0.1) - self.cur2 = self.con2.cursor() - - def tearDown(self): - self.cur1.close() - self.con1.close() - - self.cur2.close() - self.con2.close() - - os.unlink(get_db_path()) - - def CheckDMLdoesAutoCommitBefore(self): - self.cur1.execute("create table test(i)") - self.cur1.execute("insert into test(i) values (5)") - self.cur1.execute("create table test2(j)") - self.cur2.execute("select i from test") - res = self.cur2.fetchall() - self.failUnlessEqual(len(res), 1) - - def CheckInsertStartsTransaction(self): - self.cur1.execute("create table test(i)") - self.cur1.execute("insert into test(i) values (5)") - self.cur2.execute("select i from test") - res = self.cur2.fetchall() - self.failUnlessEqual(len(res), 0) - - def CheckUpdateStartsTransaction(self): - self.cur1.execute("create table test(i)") - self.cur1.execute("insert into test(i) values (5)") - self.con1.commit() - self.cur1.execute("update test set i=6") - self.cur2.execute("select i from test") - res = self.cur2.fetchone()[0] - self.failUnlessEqual(res, 5) - - def CheckDeleteStartsTransaction(self): - self.cur1.execute("create table test(i)") - self.cur1.execute("insert into test(i) values (5)") - self.con1.commit() - self.cur1.execute("delete from test") - self.cur2.execute("select i from test") - res = self.cur2.fetchall() - self.failUnlessEqual(len(res), 1) - - def CheckReplaceStartsTransaction(self): - self.cur1.execute("create table test(i)") - self.cur1.execute("insert into test(i) values (5)") - self.con1.commit() - self.cur1.execute("replace into test(i) values (6)") - self.cur2.execute("select i from test") - res = self.cur2.fetchall() - self.failUnlessEqual(len(res), 1) - self.failUnlessEqual(res[0][0], 5) - - def CheckToggleAutoCommit(self): - self.cur1.execute("create table test(i)") - self.cur1.execute("insert into test(i) values (5)") - self.con1.isolation_level = None - self.failUnlessEqual(self.con1.isolation_level, None) - self.cur2.execute("select i from test") - res = self.cur2.fetchall() - self.failUnlessEqual(len(res), 1) - - self.con1.isolation_level = "DEFERRED" - self.failUnlessEqual(self.con1.isolation_level , "DEFERRED") - self.cur1.execute("insert into test(i) values (5)") - self.cur2.execute("select i from test") - res = self.cur2.fetchall() - self.failUnlessEqual(len(res), 1) - - def CheckRaiseTimeout(self): - self.cur1.execute("create table test(i)") - self.cur1.execute("insert into test(i) values (5)") - try: - self.cur2.execute("insert into test(i) values (5)") - self.fail("should have raised an OperationalError") - except sqlite.OperationalError: - pass - except: - self.fail("should have raised an OperationalError") - -class SpecialCommandTests(unittest.TestCase): - def setUp(self): - self.con = sqlite.connect(":memory:") - self.cur = self.con.cursor() - - def CheckVacuum(self): - self.cur.execute("create table test(i)") - self.cur.execute("insert into test(i) values (5)") - self.cur.execute("vacuum") - - def CheckDropTable(self): - self.cur.execute("create table test(i)") - self.cur.execute("insert into test(i) values (5)") - self.cur.execute("drop table test") - - def CheckPragma(self): - self.cur.execute("create table test(i)") - self.cur.execute("insert into test(i) values (5)") - self.cur.execute("pragma count_changes=1") - - def tearDown(self): - self.cur.close() - self.con.close() - -def suite(): - default_suite = unittest.makeSuite(TransactionTests, "Check") - special_command_suite = unittest.makeSuite(SpecialCommandTests, "Check") - return unittest.TestSuite((default_suite, special_command_suite)) - -def test(): - runner = unittest.TextTestRunner() - runner.run(suite()) - -if __name__ == "__main__": - test() diff --git a/sys/lib/python/sqlite3/test/types.py b/sys/lib/python/sqlite3/test/types.py deleted file mode 100644 index 8da5722d5..000000000 --- a/sys/lib/python/sqlite3/test/types.py +++ /dev/null @@ -1,351 +0,0 @@ -#-*- coding: ISO-8859-1 -*- -# pysqlite2/test/types.py: tests for type conversion and detection -# -# Copyright (C) 2005 Gerhard Häring <gh@ghaering.de> -# -# This file is part of pysqlite. -# -# This software is provided 'as-is', without any express or implied -# warranty. In no event will the authors be held liable for any damages -# arising from the use of this software. -# -# Permission is granted to anyone to use this software for any purpose, -# including commercial applications, and to alter it and redistribute it -# freely, subject to the following restrictions: -# -# 1. The origin of this software must not be misrepresented; you must not -# claim that you wrote the original software. If you use this software -# in a product, an acknowledgment in the product documentation would be -# appreciated but is not required. -# 2. Altered source versions must be plainly marked as such, and must not be -# misrepresented as being the original software. -# 3. This notice may not be removed or altered from any source distribution. - -import bz2, datetime -import unittest -import sqlite3 as sqlite - -class SqliteTypeTests(unittest.TestCase): - def setUp(self): - self.con = sqlite.connect(":memory:") - self.cur = self.con.cursor() - self.cur.execute("create table test(i integer, s varchar, f number, b blob)") - - def tearDown(self): - self.cur.close() - self.con.close() - - def CheckString(self): - self.cur.execute("insert into test(s) values (?)", (u"Österreich",)) - self.cur.execute("select s from test") - row = self.cur.fetchone() - self.failUnlessEqual(row[0], u"Österreich") - - def CheckSmallInt(self): - self.cur.execute("insert into test(i) values (?)", (42,)) - self.cur.execute("select i from test") - row = self.cur.fetchone() - self.failUnlessEqual(row[0], 42) - - def CheckLargeInt(self): - num = 2**40 - self.cur.execute("insert into test(i) values (?)", (num,)) - self.cur.execute("select i from test") - row = self.cur.fetchone() - self.failUnlessEqual(row[0], num) - - def CheckFloat(self): - val = 3.14 - self.cur.execute("insert into test(f) values (?)", (val,)) - self.cur.execute("select f from test") - row = self.cur.fetchone() - self.failUnlessEqual(row[0], val) - - def CheckBlob(self): - val = buffer("Guglhupf") - self.cur.execute("insert into test(b) values (?)", (val,)) - self.cur.execute("select b from test") - row = self.cur.fetchone() - self.failUnlessEqual(row[0], val) - - def CheckUnicodeExecute(self): - self.cur.execute(u"select 'Österreich'") - row = self.cur.fetchone() - self.failUnlessEqual(row[0], u"Österreich") - -class DeclTypesTests(unittest.TestCase): - class Foo: - def __init__(self, _val): - self.val = _val - - def __cmp__(self, other): - if not isinstance(other, DeclTypesTests.Foo): - raise ValueError - if self.val == other.val: - return 0 - else: - return 1 - - def __conform__(self, protocol): - if protocol is sqlite.PrepareProtocol: - return self.val - else: - return None - - def __str__(self): - return "<%s>" % self.val - - def setUp(self): - self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES) - self.cur = self.con.cursor() - self.cur.execute("create table test(i int, s str, f float, b bool, u unicode, foo foo, bin blob)") - - # override float, make them always return the same number - sqlite.converters["FLOAT"] = lambda x: 47.2 - - # and implement two custom ones - sqlite.converters["BOOL"] = lambda x: bool(int(x)) - sqlite.converters["FOO"] = DeclTypesTests.Foo - - def tearDown(self): - del sqlite.converters["FLOAT"] - del sqlite.converters["BOOL"] - del sqlite.converters["FOO"] - self.cur.close() - self.con.close() - - def CheckString(self): - # default - self.cur.execute("insert into test(s) values (?)", ("foo",)) - self.cur.execute("select s from test") - row = self.cur.fetchone() - self.failUnlessEqual(row[0], "foo") - - def CheckSmallInt(self): - # default - self.cur.execute("insert into test(i) values (?)", (42,)) - self.cur.execute("select i from test") - row = self.cur.fetchone() - self.failUnlessEqual(row[0], 42) - - def CheckLargeInt(self): - # default - num = 2**40 - self.cur.execute("insert into test(i) values (?)", (num,)) - self.cur.execute("select i from test") - row = self.cur.fetchone() - self.failUnlessEqual(row[0], num) - - def CheckFloat(self): - # custom - val = 3.14 - self.cur.execute("insert into test(f) values (?)", (val,)) - self.cur.execute("select f from test") - row = self.cur.fetchone() - self.failUnlessEqual(row[0], 47.2) - - def CheckBool(self): - # custom - self.cur.execute("insert into test(b) values (?)", (False,)) - self.cur.execute("select b from test") - row = self.cur.fetchone() - self.failUnlessEqual(row[0], False) - - self.cur.execute("delete from test") - self.cur.execute("insert into test(b) values (?)", (True,)) - self.cur.execute("select b from test") - row = self.cur.fetchone() - self.failUnlessEqual(row[0], True) - - def CheckUnicode(self): - # default - val = u"\xd6sterreich" - self.cur.execute("insert into test(u) values (?)", (val,)) - self.cur.execute("select u from test") - row = self.cur.fetchone() - self.failUnlessEqual(row[0], val) - - def CheckFoo(self): - val = DeclTypesTests.Foo("bla") - self.cur.execute("insert into test(foo) values (?)", (val,)) - self.cur.execute("select foo from test") - row = self.cur.fetchone() - self.failUnlessEqual(row[0], val) - - def CheckUnsupportedSeq(self): - class Bar: pass - val = Bar() - try: - self.cur.execute("insert into test(f) values (?)", (val,)) - self.fail("should have raised an InterfaceError") - except sqlite.InterfaceError: - pass - except: - self.fail("should have raised an InterfaceError") - - def CheckUnsupportedDict(self): - class Bar: pass - val = Bar() - try: - self.cur.execute("insert into test(f) values (:val)", {"val": val}) - self.fail("should have raised an InterfaceError") - except sqlite.InterfaceError: - pass - except: - self.fail("should have raised an InterfaceError") - - def CheckBlob(self): - # default - val = buffer("Guglhupf") - self.cur.execute("insert into test(bin) values (?)", (val,)) - self.cur.execute("select bin from test") - row = self.cur.fetchone() - self.failUnlessEqual(row[0], val) - -class ColNamesTests(unittest.TestCase): - def setUp(self): - self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES|sqlite.PARSE_DECLTYPES) - self.cur = self.con.cursor() - self.cur.execute("create table test(x foo)") - - sqlite.converters["FOO"] = lambda x: "[%s]" % x - sqlite.converters["BAR"] = lambda x: "<%s>" % x - sqlite.converters["EXC"] = lambda x: 5/0 - - def tearDown(self): - del sqlite.converters["FOO"] - del sqlite.converters["BAR"] - del sqlite.converters["EXC"] - self.cur.close() - self.con.close() - - def CheckDeclType(self): - self.cur.execute("insert into test(x) values (?)", ("xxx",)) - self.cur.execute("select x from test") - val = self.cur.fetchone()[0] - self.failUnlessEqual(val, "[xxx]") - - def CheckNone(self): - self.cur.execute("insert into test(x) values (?)", (None,)) - self.cur.execute("select x from test") - val = self.cur.fetchone()[0] - self.failUnlessEqual(val, None) - - def CheckColName(self): - self.cur.execute("insert into test(x) values (?)", ("xxx",)) - self.cur.execute('select x as "x [bar]" from test') - val = self.cur.fetchone()[0] - self.failUnlessEqual(val, "<xxx>") - - # Check if the stripping of colnames works. Everything after the first - # whitespace should be stripped. - self.failUnlessEqual(self.cur.description[0][0], "x") - - def CheckCursorDescriptionNoRow(self): - """ - cursor.description should at least provide the column name(s), even if - no row returned. - """ - self.cur.execute("select * from test where 0 = 1") - self.assert_(self.cur.description[0][0] == "x") - -class ObjectAdaptationTests(unittest.TestCase): - def cast(obj): - return float(obj) - cast = staticmethod(cast) - - def setUp(self): - self.con = sqlite.connect(":memory:") - try: - del sqlite.adapters[int] - except: - pass - sqlite.register_adapter(int, ObjectAdaptationTests.cast) - self.cur = self.con.cursor() - - def tearDown(self): - del sqlite.adapters[(int, sqlite.PrepareProtocol)] - self.cur.close() - self.con.close() - - def CheckCasterIsUsed(self): - self.cur.execute("select ?", (4,)) - val = self.cur.fetchone()[0] - self.failUnlessEqual(type(val), float) - -class BinaryConverterTests(unittest.TestCase): - def convert(s): - return bz2.decompress(s) - convert = staticmethod(convert) - - def setUp(self): - self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) - sqlite.register_converter("bin", BinaryConverterTests.convert) - - def tearDown(self): - self.con.close() - - def CheckBinaryInputForConverter(self): - testdata = "abcdefg" * 10 - result = self.con.execute('select ? as "x [bin]"', (buffer(bz2.compress(testdata)),)).fetchone()[0] - self.failUnlessEqual(testdata, result) - -class DateTimeTests(unittest.TestCase): - def setUp(self): - self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES) - self.cur = self.con.cursor() - self.cur.execute("create table test(d date, ts timestamp)") - - def tearDown(self): - self.cur.close() - self.con.close() - - def CheckSqliteDate(self): - d = sqlite.Date(2004, 2, 14) - self.cur.execute("insert into test(d) values (?)", (d,)) - self.cur.execute("select d from test") - d2 = self.cur.fetchone()[0] - self.failUnlessEqual(d, d2) - - def CheckSqliteTimestamp(self): - ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0) - self.cur.execute("insert into test(ts) values (?)", (ts,)) - self.cur.execute("select ts from test") - ts2 = self.cur.fetchone()[0] - self.failUnlessEqual(ts, ts2) - - def CheckSqlTimestamp(self): - # The date functions are only available in SQLite version 3.1 or later - if sqlite.sqlite_version_info < (3, 1): - return - - # SQLite's current_timestamp uses UTC time, while datetime.datetime.now() uses local time. - now = datetime.datetime.now() - self.cur.execute("insert into test(ts) values (current_timestamp)") - self.cur.execute("select ts from test") - ts = self.cur.fetchone()[0] - self.failUnlessEqual(type(ts), datetime.datetime) - self.failUnlessEqual(ts.year, now.year) - - def CheckDateTimeSubSeconds(self): - ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 500000) - self.cur.execute("insert into test(ts) values (?)", (ts,)) - self.cur.execute("select ts from test") - ts2 = self.cur.fetchone()[0] - self.failUnlessEqual(ts, ts2) - -def suite(): - sqlite_type_suite = unittest.makeSuite(SqliteTypeTests, "Check") - decltypes_type_suite = unittest.makeSuite(DeclTypesTests, "Check") - colnames_type_suite = unittest.makeSuite(ColNamesTests, "Check") - adaptation_suite = unittest.makeSuite(ObjectAdaptationTests, "Check") - bin_suite = unittest.makeSuite(BinaryConverterTests, "Check") - date_suite = unittest.makeSuite(DateTimeTests, "Check") - return unittest.TestSuite((sqlite_type_suite, decltypes_type_suite, colnames_type_suite, adaptation_suite, bin_suite, date_suite)) - -def test(): - runner = unittest.TextTestRunner() - runner.run(suite()) - -if __name__ == "__main__": - test() diff --git a/sys/lib/python/sqlite3/test/userfunctions.py b/sys/lib/python/sqlite3/test/userfunctions.py deleted file mode 100644 index 31bf28919..000000000 --- a/sys/lib/python/sqlite3/test/userfunctions.py +++ /dev/null @@ -1,413 +0,0 @@ -#-*- coding: ISO-8859-1 -*- -# pysqlite2/test/userfunctions.py: tests for user-defined functions and -# aggregates. -# -# Copyright (C) 2005 Gerhard Häring <gh@ghaering.de> -# -# This file is part of pysqlite. -# -# This software is provided 'as-is', without any express or implied -# warranty. In no event will the authors be held liable for any damages -# arising from the use of this software. -# -# Permission is granted to anyone to use this software for any purpose, -# including commercial applications, and to alter it and redistribute it -# freely, subject to the following restrictions: -# -# 1. The origin of this software must not be misrepresented; you must not -# claim that you wrote the original software. If you use this software -# in a product, an acknowledgment in the product documentation would be -# appreciated but is not required. -# 2. Altered source versions must be plainly marked as such, and must not be -# misrepresented as being the original software. -# 3. This notice may not be removed or altered from any source distribution. - -import unittest -import sqlite3 as sqlite - -def func_returntext(): - return "foo" -def func_returnunicode(): - return u"bar" -def func_returnint(): - return 42 -def func_returnfloat(): - return 3.14 -def func_returnnull(): - return None -def func_returnblob(): - return buffer("blob") -def func_raiseexception(): - 5/0 - -def func_isstring(v): - return type(v) is unicode -def func_isint(v): - return type(v) is int -def func_isfloat(v): - return type(v) is float -def func_isnone(v): - return type(v) is type(None) -def func_isblob(v): - return type(v) is buffer - -class AggrNoStep: - def __init__(self): - pass - - def finalize(self): - return 1 - -class AggrNoFinalize: - def __init__(self): - pass - - def step(self, x): - pass - -class AggrExceptionInInit: - def __init__(self): - 5/0 - - def step(self, x): - pass - - def finalize(self): - pass - -class AggrExceptionInStep: - def __init__(self): - pass - - def step(self, x): - 5/0 - - def finalize(self): - return 42 - -class AggrExceptionInFinalize: - def __init__(self): - pass - - def step(self, x): - pass - - def finalize(self): - 5/0 - -class AggrCheckType: - def __init__(self): - self.val = None - - def step(self, whichType, val): - theType = {"str": unicode, "int": int, "float": float, "None": type(None), "blob": buffer} - self.val = int(theType[whichType] is type(val)) - - def finalize(self): - return self.val - -class AggrSum: - def __init__(self): - self.val = 0.0 - - def step(self, val): - self.val += val - - def finalize(self): - return self.val - -class FunctionTests(unittest.TestCase): - def setUp(self): - self.con = sqlite.connect(":memory:") - - self.con.create_function("returntext", 0, func_returntext) - self.con.create_function("returnunicode", 0, func_returnunicode) - self.con.create_function("returnint", 0, func_returnint) - self.con.create_function("returnfloat", 0, func_returnfloat) - self.con.create_function("returnnull", 0, func_returnnull) - self.con.create_function("returnblob", 0, func_returnblob) - self.con.create_function("raiseexception", 0, func_raiseexception) - - self.con.create_function("isstring", 1, func_isstring) - self.con.create_function("isint", 1, func_isint) - self.con.create_function("isfloat", 1, func_isfloat) - self.con.create_function("isnone", 1, func_isnone) - self.con.create_function("isblob", 1, func_isblob) - - def tearDown(self): - self.con.close() - - def CheckFuncErrorOnCreate(self): - try: - self.con.create_function("bla", -100, lambda x: 2*x) - self.fail("should have raised an OperationalError") - except sqlite.OperationalError: - pass - - def CheckFuncRefCount(self): - def getfunc(): - def f(): - return 1 - return f - f = getfunc() - globals()["foo"] = f - # self.con.create_function("reftest", 0, getfunc()) - self.con.create_function("reftest", 0, f) - cur = self.con.cursor() - cur.execute("select reftest()") - - def CheckFuncReturnText(self): - cur = self.con.cursor() - cur.execute("select returntext()") - val = cur.fetchone()[0] - self.failUnlessEqual(type(val), unicode) - self.failUnlessEqual(val, "foo") - - def CheckFuncReturnUnicode(self): - cur = self.con.cursor() - cur.execute("select returnunicode()") - val = cur.fetchone()[0] - self.failUnlessEqual(type(val), unicode) - self.failUnlessEqual(val, u"bar") - - def CheckFuncReturnInt(self): - cur = self.con.cursor() - cur.execute("select returnint()") - val = cur.fetchone()[0] - self.failUnlessEqual(type(val), int) - self.failUnlessEqual(val, 42) - - def CheckFuncReturnFloat(self): - cur = self.con.cursor() - cur.execute("select returnfloat()") - val = cur.fetchone()[0] - self.failUnlessEqual(type(val), float) - if val < 3.139 or val > 3.141: - self.fail("wrong value") - - def CheckFuncReturnNull(self): - cur = self.con.cursor() - cur.execute("select returnnull()") - val = cur.fetchone()[0] - self.failUnlessEqual(type(val), type(None)) - self.failUnlessEqual(val, None) - - def CheckFuncReturnBlob(self): - cur = self.con.cursor() - cur.execute("select returnblob()") - val = cur.fetchone()[0] - self.failUnlessEqual(type(val), buffer) - self.failUnlessEqual(val, buffer("blob")) - - def CheckFuncException(self): - cur = self.con.cursor() - try: - cur.execute("select raiseexception()") - cur.fetchone() - self.fail("should have raised OperationalError") - except sqlite.OperationalError, e: - self.failUnlessEqual(e.args[0], 'user-defined function raised exception') - - def CheckParamString(self): - cur = self.con.cursor() - cur.execute("select isstring(?)", ("foo",)) - val = cur.fetchone()[0] - self.failUnlessEqual(val, 1) - - def CheckParamInt(self): - cur = self.con.cursor() - cur.execute("select isint(?)", (42,)) - val = cur.fetchone()[0] - self.failUnlessEqual(val, 1) - - def CheckParamFloat(self): - cur = self.con.cursor() - cur.execute("select isfloat(?)", (3.14,)) - val = cur.fetchone()[0] - self.failUnlessEqual(val, 1) - - def CheckParamNone(self): - cur = self.con.cursor() - cur.execute("select isnone(?)", (None,)) - val = cur.fetchone()[0] - self.failUnlessEqual(val, 1) - - def CheckParamBlob(self): - cur = self.con.cursor() - cur.execute("select isblob(?)", (buffer("blob"),)) - val = cur.fetchone()[0] - self.failUnlessEqual(val, 1) - -class AggregateTests(unittest.TestCase): - def setUp(self): - self.con = sqlite.connect(":memory:") - cur = self.con.cursor() - cur.execute(""" - create table test( - t text, - i integer, - f float, - n, - b blob - ) - """) - cur.execute("insert into test(t, i, f, n, b) values (?, ?, ?, ?, ?)", - ("foo", 5, 3.14, None, buffer("blob"),)) - - self.con.create_aggregate("nostep", 1, AggrNoStep) - self.con.create_aggregate("nofinalize", 1, AggrNoFinalize) - self.con.create_aggregate("excInit", 1, AggrExceptionInInit) - self.con.create_aggregate("excStep", 1, AggrExceptionInStep) - self.con.create_aggregate("excFinalize", 1, AggrExceptionInFinalize) - self.con.create_aggregate("checkType", 2, AggrCheckType) - self.con.create_aggregate("mysum", 1, AggrSum) - - def tearDown(self): - #self.cur.close() - #self.con.close() - pass - - def CheckAggrErrorOnCreate(self): - try: - self.con.create_function("bla", -100, AggrSum) - self.fail("should have raised an OperationalError") - except sqlite.OperationalError: - pass - - def CheckAggrNoStep(self): - cur = self.con.cursor() - try: - cur.execute("select nostep(t) from test") - self.fail("should have raised an AttributeError") - except AttributeError, e: - self.failUnlessEqual(e.args[0], "AggrNoStep instance has no attribute 'step'") - - def CheckAggrNoFinalize(self): - cur = self.con.cursor() - try: - cur.execute("select nofinalize(t) from test") - val = cur.fetchone()[0] - self.fail("should have raised an OperationalError") - except sqlite.OperationalError, e: - self.failUnlessEqual(e.args[0], "user-defined aggregate's 'finalize' method raised error") - - def CheckAggrExceptionInInit(self): - cur = self.con.cursor() - try: - cur.execute("select excInit(t) from test") - val = cur.fetchone()[0] - self.fail("should have raised an OperationalError") - except sqlite.OperationalError, e: - self.failUnlessEqual(e.args[0], "user-defined aggregate's '__init__' method raised error") - - def CheckAggrExceptionInStep(self): - cur = self.con.cursor() - try: - cur.execute("select excStep(t) from test") - val = cur.fetchone()[0] - self.fail("should have raised an OperationalError") - except sqlite.OperationalError, e: - self.failUnlessEqual(e.args[0], "user-defined aggregate's 'step' method raised error") - - def CheckAggrExceptionInFinalize(self): - cur = self.con.cursor() - try: - cur.execute("select excFinalize(t) from test") - val = cur.fetchone()[0] - self.fail("should have raised an OperationalError") - except sqlite.OperationalError, e: - self.failUnlessEqual(e.args[0], "user-defined aggregate's 'finalize' method raised error") - - def CheckAggrCheckParamStr(self): - cur = self.con.cursor() - cur.execute("select checkType('str', ?)", ("foo",)) - val = cur.fetchone()[0] - self.failUnlessEqual(val, 1) - - def CheckAggrCheckParamInt(self): - cur = self.con.cursor() - cur.execute("select checkType('int', ?)", (42,)) - val = cur.fetchone()[0] - self.failUnlessEqual(val, 1) - - def CheckAggrCheckParamFloat(self): - cur = self.con.cursor() - cur.execute("select checkType('float', ?)", (3.14,)) - val = cur.fetchone()[0] - self.failUnlessEqual(val, 1) - - def CheckAggrCheckParamNone(self): - cur = self.con.cursor() - cur.execute("select checkType('None', ?)", (None,)) - val = cur.fetchone()[0] - self.failUnlessEqual(val, 1) - - def CheckAggrCheckParamBlob(self): - cur = self.con.cursor() - cur.execute("select checkType('blob', ?)", (buffer("blob"),)) - val = cur.fetchone()[0] - self.failUnlessEqual(val, 1) - - def CheckAggrCheckAggrSum(self): - cur = self.con.cursor() - cur.execute("delete from test") - cur.executemany("insert into test(i) values (?)", [(10,), (20,), (30,)]) - cur.execute("select mysum(i) from test") - val = cur.fetchone()[0] - self.failUnlessEqual(val, 60) - -def authorizer_cb(action, arg1, arg2, dbname, source): - if action != sqlite.SQLITE_SELECT: - return sqlite.SQLITE_DENY - if arg2 == 'c2' or arg1 == 't2': - return sqlite.SQLITE_DENY - return sqlite.SQLITE_OK - -class AuthorizerTests(unittest.TestCase): - def setUp(self): - self.con = sqlite.connect(":memory:") - self.con.executescript(""" - create table t1 (c1, c2); - create table t2 (c1, c2); - insert into t1 (c1, c2) values (1, 2); - insert into t2 (c1, c2) values (4, 5); - """) - - # For our security test: - self.con.execute("select c2 from t2") - - self.con.set_authorizer(authorizer_cb) - - def tearDown(self): - pass - - def CheckTableAccess(self): - try: - self.con.execute("select * from t2") - except sqlite.DatabaseError, e: - if not e.args[0].endswith("prohibited"): - self.fail("wrong exception text: %s" % e.args[0]) - return - self.fail("should have raised an exception due to missing privileges") - - def CheckColumnAccess(self): - try: - self.con.execute("select c2 from t1") - except sqlite.DatabaseError, e: - if not e.args[0].endswith("prohibited"): - self.fail("wrong exception text: %s" % e.args[0]) - return - self.fail("should have raised an exception due to missing privileges") - -def suite(): - function_suite = unittest.makeSuite(FunctionTests, "Check") - aggregate_suite = unittest.makeSuite(AggregateTests, "Check") - authorizer_suite = unittest.makeSuite(AuthorizerTests, "Check") - return unittest.TestSuite((function_suite, aggregate_suite, authorizer_suite)) - -def test(): - runner = unittest.TextTestRunner() - runner.run(suite()) - -if __name__ == "__main__": - test() |