diff options
author | cinap_lenrek <cinap_lenrek@localhost> | 2011-05-04 05:41:33 +0000 |
---|---|---|
committer | cinap_lenrek <cinap_lenrek@localhost> | 2011-05-04 05:41:33 +0000 |
commit | b8436b026a90291ba26afa4f7a2700720b03339f (patch) | |
tree | 3098aede87640c80567ecb31022e0404a8b5ec75 /sys/lib/python/test/test_dummy_threading.py | |
parent | 6c1b42188259a6f1636cd15a9570b18af03e2dbb (diff) |
remove python test cases
Diffstat (limited to 'sys/lib/python/test/test_dummy_threading.py')
-rw-r--r-- | sys/lib/python/test/test_dummy_threading.py | 73 |
1 files changed, 0 insertions, 73 deletions
diff --git a/sys/lib/python/test/test_dummy_threading.py b/sys/lib/python/test/test_dummy_threading.py deleted file mode 100644 index 3724f1e5a..000000000 --- a/sys/lib/python/test/test_dummy_threading.py +++ /dev/null @@ -1,73 +0,0 @@ -# Very rudimentary test of threading module - -# Create a bunch of threads, let each do some work, wait until all are done - -from test.test_support import verbose -import random -import dummy_threading as _threading -import time - - -class TestThread(_threading.Thread): - - def run(self): - global running - # Uncomment if testing another module, such as the real 'threading' - # module. - #delay = random.random() * 2 - delay = 0 - if verbose: - print 'task', self.getName(), 'will run for', delay, 'sec' - sema.acquire() - mutex.acquire() - running = running + 1 - if verbose: - print running, 'tasks are running' - mutex.release() - time.sleep(delay) - if verbose: - print 'task', self.getName(), 'done' - mutex.acquire() - running = running - 1 - if verbose: - print self.getName(), 'is finished.', running, 'tasks are running' - mutex.release() - sema.release() - -def starttasks(): - for i in range(numtasks): - t = TestThread(name="<thread %d>"%i) - threads.append(t) - t.start() - - -def test_main(): - # This takes about n/3 seconds to run (about n/3 clumps of tasks, times - # about 1 second per clump). - global numtasks - numtasks = 10 - - # no more than 3 of the 10 can run at once - global sema - sema = _threading.BoundedSemaphore(value=3) - global mutex - mutex = _threading.RLock() - global running - running = 0 - - global threads - threads = [] - - starttasks() - - if verbose: - print 'waiting for all tasks to complete' - for t in threads: - t.join() - if verbose: - print 'all tasks done' - - - -if __name__ == '__main__': - test_main() |