summaryrefslogtreecommitdiff
path: root/sys/lib/python/test/test_socketserver.py
diff options
context:
space:
mode:
authorcinap_lenrek <cinap_lenrek@localhost>2011-05-04 05:41:33 +0000
committercinap_lenrek <cinap_lenrek@localhost>2011-05-04 05:41:33 +0000
commitb8436b026a90291ba26afa4f7a2700720b03339f (patch)
tree3098aede87640c80567ecb31022e0404a8b5ec75 /sys/lib/python/test/test_socketserver.py
parent6c1b42188259a6f1636cd15a9570b18af03e2dbb (diff)
remove python test cases
Diffstat (limited to 'sys/lib/python/test/test_socketserver.py')
-rw-r--r--sys/lib/python/test/test_socketserver.py218
1 files changed, 0 insertions, 218 deletions
diff --git a/sys/lib/python/test/test_socketserver.py b/sys/lib/python/test/test_socketserver.py
deleted file mode 100644
index 9a67a358b..000000000
--- a/sys/lib/python/test/test_socketserver.py
+++ /dev/null
@@ -1,218 +0,0 @@
-# Test suite for SocketServer.py
-
-from test import test_support
-from test.test_support import (verbose, verify, TESTFN, TestSkipped,
- reap_children)
-test_support.requires('network')
-
-from SocketServer import *
-import socket
-import errno
-import select
-import time
-import threading
-import os
-
-NREQ = 3
-DELAY = 0.5
-
-class MyMixinHandler:
- def handle(self):
- time.sleep(DELAY)
- line = self.rfile.readline()
- time.sleep(DELAY)
- self.wfile.write(line)
-
-class MyStreamHandler(MyMixinHandler, StreamRequestHandler):
- pass
-
-class MyDatagramHandler(MyMixinHandler, DatagramRequestHandler):
- pass
-
-class MyMixinServer:
- def serve_a_few(self):
- for i in range(NREQ):
- self.handle_request()
- def handle_error(self, request, client_address):
- self.close_request(request)
- self.server_close()
- raise
-
-teststring = "hello world\n"
-
-def receive(sock, n, timeout=20):
- r, w, x = select.select([sock], [], [], timeout)
- if sock in r:
- return sock.recv(n)
- else:
- raise RuntimeError, "timed out on %r" % (sock,)
-
-def testdgram(proto, addr):
- s = socket.socket(proto, socket.SOCK_DGRAM)
- s.sendto(teststring, addr)
- buf = data = receive(s, 100)
- while data and '\n' not in buf:
- data = receive(s, 100)
- buf += data
- verify(buf == teststring)
- s.close()
-
-def teststream(proto, addr):
- s = socket.socket(proto, socket.SOCK_STREAM)
- s.connect(addr)
- s.sendall(teststring)
- buf = data = receive(s, 100)
- while data and '\n' not in buf:
- data = receive(s, 100)
- buf += data
- verify(buf == teststring)
- s.close()
-
-class ServerThread(threading.Thread):
- def __init__(self, addr, svrcls, hdlrcls):
- threading.Thread.__init__(self)
- self.__addr = addr
- self.__svrcls = svrcls
- self.__hdlrcls = hdlrcls
- def run(self):
- class svrcls(MyMixinServer, self.__svrcls):
- pass
- if verbose: print "thread: creating server"
- svr = svrcls(self.__addr, self.__hdlrcls)
- # pull the address out of the server in case it changed
- # this can happen if another process is using the port
- addr = svr.server_address
- if addr:
- self.__addr = addr
- if self.__addr != svr.socket.getsockname():
- raise RuntimeError('server_address was %s, expected %s' %
- (self.__addr, svr.socket.getsockname()))
- if verbose: print "thread: serving three times"
- svr.serve_a_few()
- if verbose: print "thread: done"
-
-seed = 0
-def pickport():
- global seed
- seed += 1
- return 10000 + (os.getpid() % 1000)*10 + seed
-
-host = "localhost"
-testfiles = []
-def pickaddr(proto):
- if proto == socket.AF_INET:
- return (host, pickport())
- else:
- fn = TESTFN + str(pickport())
- if os.name == 'os2':
- # AF_UNIX socket names on OS/2 require a specific prefix
- # which can't include a drive letter and must also use
- # backslashes as directory separators
- if fn[1] == ':':
- fn = fn[2:]
- if fn[0] in (os.sep, os.altsep):
- fn = fn[1:]
- fn = os.path.join('\socket', fn)
- if os.sep == '/':
- fn = fn.replace(os.sep, os.altsep)
- else:
- fn = fn.replace(os.altsep, os.sep)
- testfiles.append(fn)
- return fn
-
-def cleanup():
- for fn in testfiles:
- try:
- os.remove(fn)
- except os.error:
- pass
- testfiles[:] = []
-
-def testloop(proto, servers, hdlrcls, testfunc):
- for svrcls in servers:
- addr = pickaddr(proto)
- if verbose:
- print "ADDR =", addr
- print "CLASS =", svrcls
- t = ServerThread(addr, svrcls, hdlrcls)
- if verbose: print "server created"
- t.start()
- if verbose: print "server running"
- for i in range(NREQ):
- time.sleep(DELAY)
- if verbose: print "test client", i
- testfunc(proto, addr)
- if verbose: print "waiting for server"
- t.join()
- if verbose: print "done"
-
-class ForgivingTCPServer(TCPServer):
- # prevent errors if another process is using the port we want
- def server_bind(self):
- host, default_port = self.server_address
- # this code shamelessly stolen from test.test_support
- # the ports were changed to protect the innocent
- import sys
- for port in [default_port, 3434, 8798, 23833]:
- try:
- self.server_address = host, port
- TCPServer.server_bind(self)
- break
- except socket.error, (err, msg):
- if err != errno.EADDRINUSE:
- raise
- print >>sys.__stderr__, \
- ' WARNING: failed to listen on port %d, trying another' % port
-
-tcpservers = [ForgivingTCPServer, ThreadingTCPServer]
-if hasattr(os, 'fork') and os.name not in ('os2',):
- tcpservers.append(ForkingTCPServer)
-udpservers = [UDPServer, ThreadingUDPServer]
-if hasattr(os, 'fork') and os.name not in ('os2',):
- udpservers.append(ForkingUDPServer)
-
-if not hasattr(socket, 'AF_UNIX'):
- streamservers = []
- dgramservers = []
-else:
- class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer): pass
- streamservers = [UnixStreamServer, ThreadingUnixStreamServer]
- if hasattr(os, 'fork') and os.name not in ('os2',):
- streamservers.append(ForkingUnixStreamServer)
- class ForkingUnixDatagramServer(ForkingMixIn, UnixDatagramServer): pass
- dgramservers = [UnixDatagramServer, ThreadingUnixDatagramServer]
- if hasattr(os, 'fork') and os.name not in ('os2',):
- dgramservers.append(ForkingUnixDatagramServer)
-
-def sloppy_cleanup():
- # See http://python.org/sf/1540386
- # We need to reap children here otherwise a child from one server
- # can be left running for the next server and cause a test failure.
- time.sleep(DELAY)
- reap_children()
-
-def testall():
- testloop(socket.AF_INET, tcpservers, MyStreamHandler, teststream)
- sloppy_cleanup()
- testloop(socket.AF_INET, udpservers, MyDatagramHandler, testdgram)
- if hasattr(socket, 'AF_UNIX'):
- sloppy_cleanup()
- testloop(socket.AF_UNIX, streamservers, MyStreamHandler, teststream)
- # Alas, on Linux (at least) recvfrom() doesn't return a meaningful
- # client address so this cannot work:
- ##testloop(socket.AF_UNIX, dgramservers, MyDatagramHandler, testdgram)
-
-def test_main():
- import imp
- if imp.lock_held():
- # If the import lock is held, the threads will hang.
- raise TestSkipped("can't run when import lock is held")
-
- try:
- testall()
- finally:
- cleanup()
- reap_children()
-
-if __name__ == "__main__":
- test_main()