summaryrefslogtreecommitdiff
path: root/sys/lib/python/test/test_dummy_threading.py
diff options
context:
space:
mode:
authorcinap_lenrek <cinap_lenrek@localhost>2011-05-03 11:25:13 +0000
committercinap_lenrek <cinap_lenrek@localhost>2011-05-03 11:25:13 +0000
commit458120dd40db6b4df55a4e96b650e16798ef06a0 (patch)
tree8f82685be24fef97e715c6f5ca4c68d34d5074ee /sys/lib/python/test/test_dummy_threading.py
parent3a742c699f6806c1145aea5149bf15de15a0afd7 (diff)
add hg and python
Diffstat (limited to 'sys/lib/python/test/test_dummy_threading.py')
-rw-r--r--sys/lib/python/test/test_dummy_threading.py73
1 files changed, 73 insertions, 0 deletions
diff --git a/sys/lib/python/test/test_dummy_threading.py b/sys/lib/python/test/test_dummy_threading.py
new file mode 100644
index 000000000..3724f1e5a
--- /dev/null
+++ b/sys/lib/python/test/test_dummy_threading.py
@@ -0,0 +1,73 @@
+# Very rudimentary test of threading module
+
+# Create a bunch of threads, let each do some work, wait until all are done
+
+from test.test_support import verbose
+import random
+import dummy_threading as _threading
+import time
+
+
+class TestThread(_threading.Thread):
+
+ def run(self):
+ global running
+ # Uncomment if testing another module, such as the real 'threading'
+ # module.
+ #delay = random.random() * 2
+ delay = 0
+ if verbose:
+ print 'task', self.getName(), 'will run for', delay, 'sec'
+ sema.acquire()
+ mutex.acquire()
+ running = running + 1
+ if verbose:
+ print running, 'tasks are running'
+ mutex.release()
+ time.sleep(delay)
+ if verbose:
+ print 'task', self.getName(), 'done'
+ mutex.acquire()
+ running = running - 1
+ if verbose:
+ print self.getName(), 'is finished.', running, 'tasks are running'
+ mutex.release()
+ sema.release()
+
+def starttasks():
+ for i in range(numtasks):
+ t = TestThread(name="<thread %d>"%i)
+ threads.append(t)
+ t.start()
+
+
+def test_main():
+ # This takes about n/3 seconds to run (about n/3 clumps of tasks, times
+ # about 1 second per clump).
+ global numtasks
+ numtasks = 10
+
+ # no more than 3 of the 10 can run at once
+ global sema
+ sema = _threading.BoundedSemaphore(value=3)
+ global mutex
+ mutex = _threading.RLock()
+ global running
+ running = 0
+
+ global threads
+ threads = []
+
+ starttasks()
+
+ if verbose:
+ print 'waiting for all tasks to complete'
+ for t in threads:
+ t.join()
+ if verbose:
+ print 'all tasks done'
+
+
+
+if __name__ == '__main__':
+ test_main()