diff options
author | cinap_lenrek <cinap_lenrek@localhost> | 2011-05-04 05:41:33 +0000 |
---|---|---|
committer | cinap_lenrek <cinap_lenrek@localhost> | 2011-05-04 05:41:33 +0000 |
commit | b8436b026a90291ba26afa4f7a2700720b03339f (patch) | |
tree | 3098aede87640c80567ecb31022e0404a8b5ec75 /sys/lib/python/test/test_asynchat.py | |
parent | 6c1b42188259a6f1636cd15a9570b18af03e2dbb (diff) |
remove python test cases
Diffstat (limited to 'sys/lib/python/test/test_asynchat.py')
-rw-r--r-- | sys/lib/python/test/test_asynchat.py | 93 |
1 files changed, 0 insertions, 93 deletions
diff --git a/sys/lib/python/test/test_asynchat.py b/sys/lib/python/test/test_asynchat.py deleted file mode 100644 index 992616796..000000000 --- a/sys/lib/python/test/test_asynchat.py +++ /dev/null @@ -1,93 +0,0 @@ -# test asynchat -- requires threading - -import thread # If this fails, we can't test this module -import asyncore, asynchat, socket, threading, time -import unittest -from test import test_support - -HOST = "127.0.0.1" -PORT = 54322 - -class echo_server(threading.Thread): - - def run(self): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - global PORT - PORT = test_support.bind_port(sock, HOST, PORT) - sock.listen(1) - conn, client = sock.accept() - buffer = "" - while "\n" not in buffer: - data = conn.recv(1) - if not data: - break - buffer = buffer + data - while buffer: - n = conn.send(buffer) - buffer = buffer[n:] - conn.close() - sock.close() - -class echo_client(asynchat.async_chat): - - def __init__(self, terminator): - asynchat.async_chat.__init__(self) - self.contents = None - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.connect((HOST, PORT)) - self.set_terminator(terminator) - self.buffer = "" - - def handle_connect(self): - pass - ##print "Connected" - - def collect_incoming_data(self, data): - self.buffer = self.buffer + data - - def found_terminator(self): - #print "Received:", repr(self.buffer) - self.contents = self.buffer - self.buffer = "" - self.close() - - -class TestAsynchat(unittest.TestCase): - def setUp (self): - pass - - def tearDown (self): - pass - - def test_line_terminator(self): - s = echo_server() - s.start() - time.sleep(1) # Give server time to initialize - c = echo_client('\n') - c.push("hello ") - c.push("world\n") - asyncore.loop() - s.join() - - self.assertEqual(c.contents, 'hello world') - - def test_numeric_terminator(self): - # Try reading a fixed number of bytes - s = echo_server() - s.start() - time.sleep(1) # Give server time to initialize - c = echo_client(6L) - c.push("hello ") - c.push("world\n") - asyncore.loop() - s.join() - - self.assertEqual(c.contents, 'hello ') - - -def test_main(verbose=None): - test_support.run_unittest(TestAsynchat) - -if __name__ == "__main__": - test_main(verbose=True) |