diff options
author | cinap_lenrek <cinap_lenrek@localhost> | 2011-05-04 05:41:33 +0000 |
---|---|---|
committer | cinap_lenrek <cinap_lenrek@localhost> | 2011-05-04 05:41:33 +0000 |
commit | b8436b026a90291ba26afa4f7a2700720b03339f (patch) | |
tree | 3098aede87640c80567ecb31022e0404a8b5ec75 /sys/lib/python/test/test_socket.py | |
parent | 6c1b42188259a6f1636cd15a9570b18af03e2dbb (diff) |
remove python test cases
Diffstat (limited to 'sys/lib/python/test/test_socket.py')
-rw-r--r-- | sys/lib/python/test/test_socket.py | 995 |
1 files changed, 0 insertions, 995 deletions
diff --git a/sys/lib/python/test/test_socket.py b/sys/lib/python/test/test_socket.py deleted file mode 100644 index b5215214d..000000000 --- a/sys/lib/python/test/test_socket.py +++ /dev/null @@ -1,995 +0,0 @@ -#!/usr/bin/env python - -import unittest -from test import test_support - -import socket -import select -import time -import thread, threading -import Queue -import sys -import array -from weakref import proxy -import signal - -PORT = 50007 -HOST = 'localhost' -MSG = 'Michael Gilfix was here\n' - -class SocketTCPTest(unittest.TestCase): - - def setUp(self): - self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - global PORT - PORT = test_support.bind_port(self.serv, HOST, PORT) - self.serv.listen(1) - - def tearDown(self): - self.serv.close() - self.serv = None - -class SocketUDPTest(unittest.TestCase): - - def setUp(self): - self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - global PORT - PORT = test_support.bind_port(self.serv, HOST, PORT) - - def tearDown(self): - self.serv.close() - self.serv = None - -class ThreadableTest: - """Threadable Test class - - The ThreadableTest class makes it easy to create a threaded - client/server pair from an existing unit test. To create a - new threaded class from an existing unit test, use multiple - inheritance: - - class NewClass (OldClass, ThreadableTest): - pass - - This class defines two new fixture functions with obvious - purposes for overriding: - - clientSetUp () - clientTearDown () - - Any new test functions within the class must then define - tests in pairs, where the test name is preceeded with a - '_' to indicate the client portion of the test. Ex: - - def testFoo(self): - # Server portion - - def _testFoo(self): - # Client portion - - Any exceptions raised by the clients during their tests - are caught and transferred to the main thread to alert - the testing framework. - - Note, the server setup function cannot call any blocking - functions that rely on the client thread during setup, - unless serverExplicityReady() is called just before - the blocking call (such as in setting up a client/server - connection and performing the accept() in setUp(). - """ - - def __init__(self): - # Swap the true setup function - self.__setUp = self.setUp - self.__tearDown = self.tearDown - self.setUp = self._setUp - self.tearDown = self._tearDown - - def serverExplicitReady(self): - """This method allows the server to explicitly indicate that - it wants the client thread to proceed. This is useful if the - server is about to execute a blocking routine that is - dependent upon the client thread during its setup routine.""" - self.server_ready.set() - - def _setUp(self): - self.server_ready = threading.Event() - self.client_ready = threading.Event() - self.done = threading.Event() - self.queue = Queue.Queue(1) - - # Do some munging to start the client test. - methodname = self.id() - i = methodname.rfind('.') - methodname = methodname[i+1:] - test_method = getattr(self, '_' + methodname) - self.client_thread = thread.start_new_thread( - self.clientRun, (test_method,)) - - self.__setUp() - if not self.server_ready.isSet(): - self.server_ready.set() - self.client_ready.wait() - - def _tearDown(self): - self.__tearDown() - self.done.wait() - - if not self.queue.empty(): - msg = self.queue.get() - self.fail(msg) - - def clientRun(self, test_func): - self.server_ready.wait() - self.client_ready.set() - self.clientSetUp() - if not callable(test_func): - raise TypeError, "test_func must be a callable function" - try: - test_func() - except Exception, strerror: - self.queue.put(strerror) - self.clientTearDown() - - def clientSetUp(self): - raise NotImplementedError, "clientSetUp must be implemented." - - def clientTearDown(self): - self.done.set() - thread.exit() - -class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest): - - def __init__(self, methodName='runTest'): - SocketTCPTest.__init__(self, methodName=methodName) - ThreadableTest.__init__(self) - - def clientSetUp(self): - self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - - def clientTearDown(self): - self.cli.close() - self.cli = None - ThreadableTest.clientTearDown(self) - -class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest): - - def __init__(self, methodName='runTest'): - SocketUDPTest.__init__(self, methodName=methodName) - ThreadableTest.__init__(self) - - def clientSetUp(self): - self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - -class SocketConnectedTest(ThreadedTCPSocketTest): - - def __init__(self, methodName='runTest'): - ThreadedTCPSocketTest.__init__(self, methodName=methodName) - - def setUp(self): - ThreadedTCPSocketTest.setUp(self) - # Indicate explicitly we're ready for the client thread to - # proceed and then perform the blocking call to accept - self.serverExplicitReady() - conn, addr = self.serv.accept() - self.cli_conn = conn - - def tearDown(self): - self.cli_conn.close() - self.cli_conn = None - ThreadedTCPSocketTest.tearDown(self) - - def clientSetUp(self): - ThreadedTCPSocketTest.clientSetUp(self) - self.cli.connect((HOST, PORT)) - self.serv_conn = self.cli - - def clientTearDown(self): - self.serv_conn.close() - self.serv_conn = None - ThreadedTCPSocketTest.clientTearDown(self) - -class SocketPairTest(unittest.TestCase, ThreadableTest): - - def __init__(self, methodName='runTest'): - unittest.TestCase.__init__(self, methodName=methodName) - ThreadableTest.__init__(self) - - def setUp(self): - self.serv, self.cli = socket.socketpair() - - def tearDown(self): - self.serv.close() - self.serv = None - - def clientSetUp(self): - pass - - def clientTearDown(self): - self.cli.close() - self.cli = None - ThreadableTest.clientTearDown(self) - - -####################################################################### -## Begin Tests - -class GeneralModuleTests(unittest.TestCase): - - def test_weakref(self): - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - p = proxy(s) - self.assertEqual(p.fileno(), s.fileno()) - s.close() - s = None - try: - p.fileno() - except ReferenceError: - pass - else: - self.fail('Socket proxy still exists') - - def testSocketError(self): - # Testing socket module exceptions - def raise_error(*args, **kwargs): - raise socket.error - def raise_herror(*args, **kwargs): - raise socket.herror - def raise_gaierror(*args, **kwargs): - raise socket.gaierror - self.failUnlessRaises(socket.error, raise_error, - "Error raising socket exception.") - self.failUnlessRaises(socket.error, raise_herror, - "Error raising socket exception.") - self.failUnlessRaises(socket.error, raise_gaierror, - "Error raising socket exception.") - - def testCrucialConstants(self): - # Testing for mission critical constants - socket.AF_INET - socket.SOCK_STREAM - socket.SOCK_DGRAM - socket.SOCK_RAW - socket.SOCK_RDM - socket.SOCK_SEQPACKET - socket.SOL_SOCKET - socket.SO_REUSEADDR - - def testHostnameRes(self): - # Testing hostname resolution mechanisms - hostname = socket.gethostname() - try: - ip = socket.gethostbyname(hostname) - except socket.error: - # Probably name lookup wasn't set up right; skip this test - return - self.assert_(ip.find('.') >= 0, "Error resolving host to ip.") - try: - hname, aliases, ipaddrs = socket.gethostbyaddr(ip) - except socket.error: - # Probably a similar problem as above; skip this test - return - all_host_names = [hostname, hname] + aliases - fqhn = socket.getfqdn(ip) - if not fqhn in all_host_names: - self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names))) - - def testRefCountGetNameInfo(self): - # Testing reference count for getnameinfo - import sys - if hasattr(sys, "getrefcount"): - try: - # On some versions, this loses a reference - orig = sys.getrefcount(__name__) - socket.getnameinfo(__name__,0) - except SystemError: - if sys.getrefcount(__name__) <> orig: - self.fail("socket.getnameinfo loses a reference") - - def testInterpreterCrash(self): - # Making sure getnameinfo doesn't crash the interpreter - try: - # On some versions, this crashes the interpreter. - socket.getnameinfo(('x', 0, 0, 0), 0) - except socket.error: - pass - - def testNtoH(self): - # This just checks that htons etc. are their own inverse, - # when looking at the lower 16 or 32 bits. - sizes = {socket.htonl: 32, socket.ntohl: 32, - socket.htons: 16, socket.ntohs: 16} - for func, size in sizes.items(): - mask = (1L<<size) - 1 - for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210): - self.assertEqual(i & mask, func(func(i&mask)) & mask) - - swapped = func(mask) - self.assertEqual(swapped & mask, mask) - self.assertRaises(OverflowError, func, 1L<<34) - - def testGetServBy(self): - eq = self.assertEqual - # Find one service that exists, then check all the related interfaces. - # I've ordered this by protocols that have both a tcp and udp - # protocol, at least for modern Linuxes. - if sys.platform in ('linux2', 'freebsd4', 'freebsd5', 'freebsd6', - 'freebsd7', 'darwin'): - # avoid the 'echo' service on this platform, as there is an - # assumption breaking non-standard port/protocol entry - services = ('daytime', 'qotd', 'domain') - else: - services = ('echo', 'daytime', 'domain') - for service in services: - try: - port = socket.getservbyname(service, 'tcp') - break - except socket.error: - pass - else: - raise socket.error - # Try same call with optional protocol omitted - port2 = socket.getservbyname(service) - eq(port, port2) - # Try udp, but don't barf it it doesn't exist - try: - udpport = socket.getservbyname(service, 'udp') - except socket.error: - udpport = None - else: - eq(udpport, port) - # Now make sure the lookup by port returns the same service name - eq(socket.getservbyport(port2), service) - eq(socket.getservbyport(port, 'tcp'), service) - if udpport is not None: - eq(socket.getservbyport(udpport, 'udp'), service) - - def testDefaultTimeout(self): - # Testing default timeout - # The default timeout should initially be None - self.assertEqual(socket.getdefaulttimeout(), None) - s = socket.socket() - self.assertEqual(s.gettimeout(), None) - s.close() - - # Set the default timeout to 10, and see if it propagates - socket.setdefaulttimeout(10) - self.assertEqual(socket.getdefaulttimeout(), 10) - s = socket.socket() - self.assertEqual(s.gettimeout(), 10) - s.close() - - # Reset the default timeout to None, and see if it propagates - socket.setdefaulttimeout(None) - self.assertEqual(socket.getdefaulttimeout(), None) - s = socket.socket() - self.assertEqual(s.gettimeout(), None) - s.close() - - # Check that setting it to an invalid value raises ValueError - self.assertRaises(ValueError, socket.setdefaulttimeout, -1) - - # Check that setting it to an invalid type raises TypeError - self.assertRaises(TypeError, socket.setdefaulttimeout, "spam") - - def testIPv4toString(self): - if not hasattr(socket, 'inet_pton'): - return # No inet_pton() on this platform - from socket import inet_aton as f, inet_pton, AF_INET - g = lambda a: inet_pton(AF_INET, a) - - self.assertEquals('\x00\x00\x00\x00', f('0.0.0.0')) - self.assertEquals('\xff\x00\xff\x00', f('255.0.255.0')) - self.assertEquals('\xaa\xaa\xaa\xaa', f('170.170.170.170')) - self.assertEquals('\x01\x02\x03\x04', f('1.2.3.4')) - self.assertEquals('\xff\xff\xff\xff', f('255.255.255.255')) - - self.assertEquals('\x00\x00\x00\x00', g('0.0.0.0')) - self.assertEquals('\xff\x00\xff\x00', g('255.0.255.0')) - self.assertEquals('\xaa\xaa\xaa\xaa', g('170.170.170.170')) - self.assertEquals('\xff\xff\xff\xff', g('255.255.255.255')) - - def testIPv6toString(self): - if not hasattr(socket, 'inet_pton'): - return # No inet_pton() on this platform - try: - from socket import inet_pton, AF_INET6, has_ipv6 - if not has_ipv6: - return - except ImportError: - return - f = lambda a: inet_pton(AF_INET6, a) - - self.assertEquals('\x00' * 16, f('::')) - self.assertEquals('\x00' * 16, f('0::0')) - self.assertEquals('\x00\x01' + '\x00' * 14, f('1::')) - self.assertEquals( - '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae', - f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae') - ) - - def testStringToIPv4(self): - if not hasattr(socket, 'inet_ntop'): - return # No inet_ntop() on this platform - from socket import inet_ntoa as f, inet_ntop, AF_INET - g = lambda a: inet_ntop(AF_INET, a) - - self.assertEquals('1.0.1.0', f('\x01\x00\x01\x00')) - self.assertEquals('170.85.170.85', f('\xaa\x55\xaa\x55')) - self.assertEquals('255.255.255.255', f('\xff\xff\xff\xff')) - self.assertEquals('1.2.3.4', f('\x01\x02\x03\x04')) - - self.assertEquals('1.0.1.0', g('\x01\x00\x01\x00')) - self.assertEquals('170.85.170.85', g('\xaa\x55\xaa\x55')) - self.assertEquals('255.255.255.255', g('\xff\xff\xff\xff')) - - def testStringToIPv6(self): - if not hasattr(socket, 'inet_ntop'): - return # No inet_ntop() on this platform - try: - from socket import inet_ntop, AF_INET6, has_ipv6 - if not has_ipv6: - return - except ImportError: - return - f = lambda a: inet_ntop(AF_INET6, a) - - self.assertEquals('::', f('\x00' * 16)) - self.assertEquals('::1', f('\x00' * 15 + '\x01')) - self.assertEquals( - 'aef:b01:506:1001:ffff:9997:55:170', - f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70') - ) - - # XXX The following don't test module-level functionality... - - def testSockName(self): - # Testing getsockname() - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.bind(("0.0.0.0", PORT+1)) - name = sock.getsockname() - # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate - # it reasonable to get the host's addr in addition to 0.0.0.0. - # At least for eCos. This is required for the S/390 to pass. - my_ip_addr = socket.gethostbyname(socket.gethostname()) - self.assert_(name[0] in ("0.0.0.0", my_ip_addr), '%s invalid' % name[0]) - self.assertEqual(name[1], PORT+1) - - def testGetSockOpt(self): - # Testing getsockopt() - # We know a socket should start without reuse==0 - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) - self.failIf(reuse != 0, "initial mode is reuse") - - def testSetSockOpt(self): - # Testing setsockopt() - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) - self.failIf(reuse == 0, "failed to set reuse mode") - - def testSendAfterClose(self): - # testing send() after close() with timeout - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(1) - sock.close() - self.assertRaises(socket.error, sock.send, "spam") - - def testNewAttributes(self): - # testing .family, .type and .protocol - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.assertEqual(sock.family, socket.AF_INET) - self.assertEqual(sock.type, socket.SOCK_STREAM) - self.assertEqual(sock.proto, 0) - sock.close() - -class BasicTCPTest(SocketConnectedTest): - - def __init__(self, methodName='runTest'): - SocketConnectedTest.__init__(self, methodName=methodName) - - def testRecv(self): - # Testing large receive over TCP - msg = self.cli_conn.recv(1024) - self.assertEqual(msg, MSG) - - def _testRecv(self): - self.serv_conn.send(MSG) - - def testOverFlowRecv(self): - # Testing receive in chunks over TCP - seg1 = self.cli_conn.recv(len(MSG) - 3) - seg2 = self.cli_conn.recv(1024) - msg = seg1 + seg2 - self.assertEqual(msg, MSG) - - def _testOverFlowRecv(self): - self.serv_conn.send(MSG) - - def testRecvFrom(self): - # Testing large recvfrom() over TCP - msg, addr = self.cli_conn.recvfrom(1024) - self.assertEqual(msg, MSG) - - def _testRecvFrom(self): - self.serv_conn.send(MSG) - - def testOverFlowRecvFrom(self): - # Testing recvfrom() in chunks over TCP - seg1, addr = self.cli_conn.recvfrom(len(MSG)-3) - seg2, addr = self.cli_conn.recvfrom(1024) - msg = seg1 + seg2 - self.assertEqual(msg, MSG) - - def _testOverFlowRecvFrom(self): - self.serv_conn.send(MSG) - - def testSendAll(self): - # Testing sendall() with a 2048 byte string over TCP - msg = '' - while 1: - read = self.cli_conn.recv(1024) - if not read: - break - msg += read - self.assertEqual(msg, 'f' * 2048) - - def _testSendAll(self): - big_chunk = 'f' * 2048 - self.serv_conn.sendall(big_chunk) - - def testFromFd(self): - # Testing fromfd() - if not hasattr(socket, "fromfd"): - return # On Windows, this doesn't exist - fd = self.cli_conn.fileno() - sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) - msg = sock.recv(1024) - self.assertEqual(msg, MSG) - - def _testFromFd(self): - self.serv_conn.send(MSG) - - def testShutdown(self): - # Testing shutdown() - msg = self.cli_conn.recv(1024) - self.assertEqual(msg, MSG) - - def _testShutdown(self): - self.serv_conn.send(MSG) - self.serv_conn.shutdown(2) - -class BasicUDPTest(ThreadedUDPSocketTest): - - def __init__(self, methodName='runTest'): - ThreadedUDPSocketTest.__init__(self, methodName=methodName) - - def testSendtoAndRecv(self): - # Testing sendto() and Recv() over UDP - msg = self.serv.recv(len(MSG)) - self.assertEqual(msg, MSG) - - def _testSendtoAndRecv(self): - self.cli.sendto(MSG, 0, (HOST, PORT)) - - def testRecvFrom(self): - # Testing recvfrom() over UDP - msg, addr = self.serv.recvfrom(len(MSG)) - self.assertEqual(msg, MSG) - - def _testRecvFrom(self): - self.cli.sendto(MSG, 0, (HOST, PORT)) - - def testRecvFromNegative(self): - # Negative lengths passed to recvfrom should give ValueError. - self.assertRaises(ValueError, self.serv.recvfrom, -1) - - def _testRecvFromNegative(self): - self.cli.sendto(MSG, 0, (HOST, PORT)) - -class TCPCloserTest(ThreadedTCPSocketTest): - - def testClose(self): - conn, addr = self.serv.accept() - conn.close() - - sd = self.cli - read, write, err = select.select([sd], [], [], 1.0) - self.assertEqual(read, [sd]) - self.assertEqual(sd.recv(1), '') - - def _testClose(self): - self.cli.connect((HOST, PORT)) - time.sleep(1.0) - -class BasicSocketPairTest(SocketPairTest): - - def __init__(self, methodName='runTest'): - SocketPairTest.__init__(self, methodName=methodName) - - def testRecv(self): - msg = self.serv.recv(1024) - self.assertEqual(msg, MSG) - - def _testRecv(self): - self.cli.send(MSG) - - def testSend(self): - self.serv.send(MSG) - - def _testSend(self): - msg = self.cli.recv(1024) - self.assertEqual(msg, MSG) - -class NonBlockingTCPTests(ThreadedTCPSocketTest): - - def __init__(self, methodName='runTest'): - ThreadedTCPSocketTest.__init__(self, methodName=methodName) - - def testSetBlocking(self): - # Testing whether set blocking works - self.serv.setblocking(0) - start = time.time() - try: - self.serv.accept() - except socket.error: - pass - end = time.time() - self.assert_((end - start) < 1.0, "Error setting non-blocking mode.") - - def _testSetBlocking(self): - pass - - def testAccept(self): - # Testing non-blocking accept - self.serv.setblocking(0) - try: - conn, addr = self.serv.accept() - except socket.error: - pass - else: - self.fail("Error trying to do non-blocking accept.") - read, write, err = select.select([self.serv], [], []) - if self.serv in read: - conn, addr = self.serv.accept() - else: - self.fail("Error trying to do accept after select.") - - def _testAccept(self): - time.sleep(0.1) - self.cli.connect((HOST, PORT)) - - def testConnect(self): - # Testing non-blocking connect - conn, addr = self.serv.accept() - - def _testConnect(self): - self.cli.settimeout(10) - self.cli.connect((HOST, PORT)) - - def testRecv(self): - # Testing non-blocking recv - conn, addr = self.serv.accept() - conn.setblocking(0) - try: - msg = conn.recv(len(MSG)) - except socket.error: - pass - else: - self.fail("Error trying to do non-blocking recv.") - read, write, err = select.select([conn], [], []) - if conn in read: - msg = conn.recv(len(MSG)) - self.assertEqual(msg, MSG) - else: - self.fail("Error during select call to non-blocking socket.") - - def _testRecv(self): - self.cli.connect((HOST, PORT)) - time.sleep(0.1) - self.cli.send(MSG) - -class FileObjectClassTestCase(SocketConnectedTest): - - bufsize = -1 # Use default buffer size - - def __init__(self, methodName='runTest'): - SocketConnectedTest.__init__(self, methodName=methodName) - - def setUp(self): - SocketConnectedTest.setUp(self) - self.serv_file = self.cli_conn.makefile('rb', self.bufsize) - - def tearDown(self): - self.serv_file.close() - self.assert_(self.serv_file.closed) - self.serv_file = None - SocketConnectedTest.tearDown(self) - - def clientSetUp(self): - SocketConnectedTest.clientSetUp(self) - self.cli_file = self.serv_conn.makefile('wb') - - def clientTearDown(self): - self.cli_file.close() - self.assert_(self.cli_file.closed) - self.cli_file = None - SocketConnectedTest.clientTearDown(self) - - def testSmallRead(self): - # Performing small file read test - first_seg = self.serv_file.read(len(MSG)-3) - second_seg = self.serv_file.read(3) - msg = first_seg + second_seg - self.assertEqual(msg, MSG) - - def _testSmallRead(self): - self.cli_file.write(MSG) - self.cli_file.flush() - - def testFullRead(self): - # read until EOF - msg = self.serv_file.read() - self.assertEqual(msg, MSG) - - def _testFullRead(self): - self.cli_file.write(MSG) - self.cli_file.close() - - def testUnbufferedRead(self): - # Performing unbuffered file read test - buf = '' - while 1: - char = self.serv_file.read(1) - if not char: - break - buf += char - self.assertEqual(buf, MSG) - - def _testUnbufferedRead(self): - self.cli_file.write(MSG) - self.cli_file.flush() - - def testReadline(self): - # Performing file readline test - line = self.serv_file.readline() - self.assertEqual(line, MSG) - - def _testReadline(self): - self.cli_file.write(MSG) - self.cli_file.flush() - - def testClosedAttr(self): - self.assert_(not self.serv_file.closed) - - def _testClosedAttr(self): - self.assert_(not self.cli_file.closed) - -class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase): - - """Repeat the tests from FileObjectClassTestCase with bufsize==0. - - In this case (and in this case only), it should be possible to - create a file object, read a line from it, create another file - object, read another line from it, without loss of data in the - first file object's buffer. Note that httplib relies on this - when reading multiple requests from the same socket.""" - - bufsize = 0 # Use unbuffered mode - - def testUnbufferedReadline(self): - # Read a line, create a new file object, read another line with it - line = self.serv_file.readline() # first line - self.assertEqual(line, "A. " + MSG) # first line - self.serv_file = self.cli_conn.makefile('rb', 0) - line = self.serv_file.readline() # second line - self.assertEqual(line, "B. " + MSG) # second line - - def _testUnbufferedReadline(self): - self.cli_file.write("A. " + MSG) - self.cli_file.write("B. " + MSG) - self.cli_file.flush() - -class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase): - - bufsize = 1 # Default-buffered for reading; line-buffered for writing - - -class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase): - - bufsize = 2 # Exercise the buffering code - - -class Urllib2FileobjectTest(unittest.TestCase): - - # urllib2.HTTPHandler has "borrowed" socket._fileobject, and requires that - # it close the socket if the close c'tor argument is true - - def testClose(self): - class MockSocket: - closed = False - def flush(self): pass - def close(self): self.closed = True - - # must not close unless we request it: the original use of _fileobject - # by module socket requires that the underlying socket not be closed until - # the _socketobject that created the _fileobject is closed - s = MockSocket() - f = socket._fileobject(s) - f.close() - self.assert_(not s.closed) - - s = MockSocket() - f = socket._fileobject(s, close=True) - f.close() - self.assert_(s.closed) - -class TCPTimeoutTest(SocketTCPTest): - - def testTCPTimeout(self): - def raise_timeout(*args, **kwargs): - self.serv.settimeout(1.0) - self.serv.accept() - self.failUnlessRaises(socket.timeout, raise_timeout, - "Error generating a timeout exception (TCP)") - - def testTimeoutZero(self): - ok = False - try: - self.serv.settimeout(0.0) - foo = self.serv.accept() - except socket.timeout: - self.fail("caught timeout instead of error (TCP)") - except socket.error: - ok = True - except: - self.fail("caught unexpected exception (TCP)") - if not ok: - self.fail("accept() returned success when we did not expect it") - - def testInterruptedTimeout(self): - # XXX I don't know how to do this test on MSWindows or any other - # plaform that doesn't support signal.alarm() or os.kill(), though - # the bug should have existed on all platforms. - if not hasattr(signal, "alarm"): - return # can only test on *nix - self.serv.settimeout(5.0) # must be longer than alarm - class Alarm(Exception): - pass - def alarm_handler(signal, frame): - raise Alarm - old_alarm = signal.signal(signal.SIGALRM, alarm_handler) - try: - signal.alarm(2) # POSIX allows alarm to be up to 1 second early - try: - foo = self.serv.accept() - except socket.timeout: - self.fail("caught timeout instead of Alarm") - except Alarm: - pass - except: - self.fail("caught other exception instead of Alarm") - else: - self.fail("nothing caught") - signal.alarm(0) # shut off alarm - except Alarm: - self.fail("got Alarm in wrong place") - finally: - # no alarm can be pending. Safe to restore old handler. - signal.signal(signal.SIGALRM, old_alarm) - -class UDPTimeoutTest(SocketTCPTest): - - def testUDPTimeout(self): - def raise_timeout(*args, **kwargs): - self.serv.settimeout(1.0) - self.serv.recv(1024) - self.failUnlessRaises(socket.timeout, raise_timeout, - "Error generating a timeout exception (UDP)") - - def testTimeoutZero(self): - ok = False - try: - self.serv.settimeout(0.0) - foo = self.serv.recv(1024) - except socket.timeout: - self.fail("caught timeout instead of error (UDP)") - except socket.error: - ok = True - except: - self.fail("caught unexpected exception (UDP)") - if not ok: - self.fail("recv() returned success when we did not expect it") - -class TestExceptions(unittest.TestCase): - - def testExceptionTree(self): - self.assert_(issubclass(socket.error, Exception)) - self.assert_(issubclass(socket.herror, socket.error)) - self.assert_(issubclass(socket.gaierror, socket.error)) - self.assert_(issubclass(socket.timeout, socket.error)) - -class TestLinuxAbstractNamespace(unittest.TestCase): - - UNIX_PATH_MAX = 108 - - def testLinuxAbstractNamespace(self): - address = "\x00python-test-hello\x00\xff" - s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - s1.bind(address) - s1.listen(1) - s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - s2.connect(s1.getsockname()) - s1.accept() - self.assertEqual(s1.getsockname(), address) - self.assertEqual(s2.getpeername(), address) - - def testMaxName(self): - address = "\x00" + "h" * (self.UNIX_PATH_MAX - 1) - s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - s.bind(address) - self.assertEqual(s.getsockname(), address) - - def testNameOverflow(self): - address = "\x00" + "h" * self.UNIX_PATH_MAX - s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.assertRaises(socket.error, s.bind, address) - - -class BufferIOTest(SocketConnectedTest): - """ - Test the buffer versions of socket.recv() and socket.send(). - """ - def __init__(self, methodName='runTest'): - SocketConnectedTest.__init__(self, methodName=methodName) - - def testRecvInto(self): - buf = array.array('c', ' '*1024) - nbytes = self.cli_conn.recv_into(buf) - self.assertEqual(nbytes, len(MSG)) - msg = buf.tostring()[:len(MSG)] - self.assertEqual(msg, MSG) - - def _testRecvInto(self): - buf = buffer(MSG) - self.serv_conn.send(buf) - - def testRecvFromInto(self): - buf = array.array('c', ' '*1024) - nbytes, addr = self.cli_conn.recvfrom_into(buf) - self.assertEqual(nbytes, len(MSG)) - msg = buf.tostring()[:len(MSG)] - self.assertEqual(msg, MSG) - - def _testRecvFromInto(self): - buf = buffer(MSG) - self.serv_conn.send(buf) - -def test_main(): - tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest, - TestExceptions, BufferIOTest] - if sys.platform != 'mac': - tests.extend([ BasicUDPTest, UDPTimeoutTest ]) - - tests.extend([ - NonBlockingTCPTests, - FileObjectClassTestCase, - UnbufferedFileObjectClassTestCase, - LineBufferedFileObjectClassTestCase, - SmallBufferedFileObjectClassTestCase, - Urllib2FileobjectTest, - ]) - if hasattr(socket, "socketpair"): - tests.append(BasicSocketPairTest) - if sys.platform == 'linux2': - tests.append(TestLinuxAbstractNamespace) - - thread_info = test_support.threading_setup() - test_support.run_unittest(*tests) - test_support.threading_cleanup(*thread_info) - -if __name__ == "__main__": - test_main() |