summaryrefslogtreecommitdiff
path: root/sys/lib/python/bsddb/test/test_lock.py
diff options
context:
space:
mode:
authorcinap_lenrek <cinap_lenrek@localhost>2011-05-03 11:25:13 +0000
committercinap_lenrek <cinap_lenrek@localhost>2011-05-03 11:25:13 +0000
commit458120dd40db6b4df55a4e96b650e16798ef06a0 (patch)
tree8f82685be24fef97e715c6f5ca4c68d34d5074ee /sys/lib/python/bsddb/test/test_lock.py
parent3a742c699f6806c1145aea5149bf15de15a0afd7 (diff)
add hg and python
Diffstat (limited to 'sys/lib/python/bsddb/test/test_lock.py')
-rw-r--r--sys/lib/python/bsddb/test/test_lock.py143
1 files changed, 143 insertions, 0 deletions
diff --git a/sys/lib/python/bsddb/test/test_lock.py b/sys/lib/python/bsddb/test/test_lock.py
new file mode 100644
index 000000000..7d7779805
--- /dev/null
+++ b/sys/lib/python/bsddb/test/test_lock.py
@@ -0,0 +1,143 @@
+"""
+TestCases for testing the locking sub-system.
+"""
+
+import sys, os, string
+import tempfile
+import time
+from pprint import pprint
+
+try:
+ from threading import Thread, currentThread
+ have_threads = 1
+except ImportError:
+ have_threads = 0
+
+
+import unittest
+from test_all import verbose
+
+try:
+ # For Pythons w/distutils pybsddb
+ from bsddb3 import db
+except ImportError:
+ # For Python 2.3
+ from bsddb import db
+
+
+#----------------------------------------------------------------------
+
+class LockingTestCase(unittest.TestCase):
+
+ def setUp(self):
+ homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
+ self.homeDir = homeDir
+ try: os.mkdir(homeDir)
+ except os.error: pass
+ self.env = db.DBEnv()
+ self.env.open(homeDir, db.DB_THREAD | db.DB_INIT_MPOOL |
+ db.DB_INIT_LOCK | db.DB_CREATE)
+
+
+ def tearDown(self):
+ self.env.close()
+ import glob
+ files = glob.glob(os.path.join(self.homeDir, '*'))
+ for file in files:
+ os.remove(file)
+
+
+ def test01_simple(self):
+ if verbose:
+ print '\n', '-=' * 30
+ print "Running %s.test01_simple..." % self.__class__.__name__
+
+ anID = self.env.lock_id()
+ if verbose:
+ print "locker ID: %s" % anID
+ lock = self.env.lock_get(anID, "some locked thing", db.DB_LOCK_WRITE)
+ if verbose:
+ print "Aquired lock: %s" % lock
+ time.sleep(1)
+ self.env.lock_put(lock)
+ if verbose:
+ print "Released lock: %s" % lock
+
+
+
+
+ def test02_threaded(self):
+ if verbose:
+ print '\n', '-=' * 30
+ print "Running %s.test02_threaded..." % self.__class__.__name__
+
+ threads = []
+ threads.append(Thread(target = self.theThread,
+ args=(5, db.DB_LOCK_WRITE)))
+ threads.append(Thread(target = self.theThread,
+ args=(1, db.DB_LOCK_READ)))
+ threads.append(Thread(target = self.theThread,
+ args=(1, db.DB_LOCK_READ)))
+ threads.append(Thread(target = self.theThread,
+ args=(1, db.DB_LOCK_WRITE)))
+ threads.append(Thread(target = self.theThread,
+ args=(1, db.DB_LOCK_READ)))
+ threads.append(Thread(target = self.theThread,
+ args=(1, db.DB_LOCK_READ)))
+ threads.append(Thread(target = self.theThread,
+ args=(1, db.DB_LOCK_WRITE)))
+ threads.append(Thread(target = self.theThread,
+ args=(1, db.DB_LOCK_WRITE)))
+ threads.append(Thread(target = self.theThread,
+ args=(1, db.DB_LOCK_WRITE)))
+
+ for t in threads:
+ t.start()
+ for t in threads:
+ t.join()
+
+ def test03_set_timeout(self):
+ # test that the set_timeout call works
+ if hasattr(self.env, 'set_timeout'):
+ self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)
+ self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)
+ self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)
+ self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)
+
+ def theThread(self, sleepTime, lockType):
+ name = currentThread().getName()
+ if lockType == db.DB_LOCK_WRITE:
+ lt = "write"
+ else:
+ lt = "read"
+
+ anID = self.env.lock_id()
+ if verbose:
+ print "%s: locker ID: %s" % (name, anID)
+
+ lock = self.env.lock_get(anID, "some locked thing", lockType)
+ if verbose:
+ print "%s: Aquired %s lock: %s" % (name, lt, lock)
+
+ time.sleep(sleepTime)
+
+ self.env.lock_put(lock)
+ if verbose:
+ print "%s: Released %s lock: %s" % (name, lt, lock)
+
+
+#----------------------------------------------------------------------
+
+def test_suite():
+ suite = unittest.TestSuite()
+
+ if have_threads:
+ suite.addTest(unittest.makeSuite(LockingTestCase))
+ else:
+ suite.addTest(unittest.makeSuite(LockingTestCase, 'test01'))
+
+ return suite
+
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='test_suite')