From b8436b026a90291ba26afa4f7a2700720b03339f Mon Sep 17 00:00:00 2001 From: cinap_lenrek Date: Wed, 4 May 2011 05:41:33 +0000 Subject: remove python test cases --- sys/lib/python/test/test_logging.py | 685 ------------------------------------ 1 file changed, 685 deletions(-) delete mode 100644 sys/lib/python/test/test_logging.py (limited to 'sys/lib/python/test/test_logging.py') diff --git a/sys/lib/python/test/test_logging.py b/sys/lib/python/test/test_logging.py deleted file mode 100644 index d96d703e1..000000000 --- a/sys/lib/python/test/test_logging.py +++ /dev/null @@ -1,685 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2001-2004 by Vinay Sajip. All Rights Reserved. -# -# Permission to use, copy, modify, and distribute this software and its -# documentation for any purpose and without fee is hereby granted, -# provided that the above copyright notice appear in all copies and that -# both that copyright notice and this permission notice appear in -# supporting documentation, and that the name of Vinay Sajip -# not be used in advertising or publicity pertaining to distribution -# of the software without specific, written prior permission. -# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING -# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL -# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR -# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER -# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT -# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -# -# This file is part of the Python logging distribution. See -# http://www.red-dove.com/python_logging.html -# -"""Test harness for the logging module. Run all tests. - -Copyright (C) 2001-2002 Vinay Sajip. All Rights Reserved. -""" - -import select -import os, sys, string, struct, types, cPickle, cStringIO -import socket, tempfile, threading, time -import logging, logging.handlers, logging.config -from test.test_support import run_with_locale - -BANNER = "-- %-10s %-6s ---------------------------------------------------\n" - -FINISH_UP = "Finish up, it's closing time. Messages should bear numbers 0 through 24." - -#---------------------------------------------------------------------------- -# Log receiver -#---------------------------------------------------------------------------- - -TIMEOUT = 10 - -from SocketServer import ThreadingTCPServer, StreamRequestHandler - -class LogRecordStreamHandler(StreamRequestHandler): - """ - Handler for a streaming logging request. It basically logs the record - using whatever logging policy is configured locally. - """ - - def handle(self): - """ - Handle multiple requests - each expected to be a 4-byte length, - followed by the LogRecord in pickle format. Logs the record - according to whatever policy is configured locally. - """ - while 1: - try: - chunk = self.connection.recv(4) - if len(chunk) < 4: - break - slen = struct.unpack(">L", chunk)[0] - chunk = self.connection.recv(slen) - while len(chunk) < slen: - chunk = chunk + self.connection.recv(slen - len(chunk)) - obj = self.unPickle(chunk) - record = logging.makeLogRecord(obj) - self.handleLogRecord(record) - except: - raise - - def unPickle(self, data): - return cPickle.loads(data) - - def handleLogRecord(self, record): - logname = "logrecv.tcp." + record.name - #If the end-of-messages sentinel is seen, tell the server to terminate - if record.msg == FINISH_UP: - self.server.abort = 1 - record.msg = record.msg + " (via " + logname + ")" - logger = logging.getLogger(logname) - logger.handle(record) - -# The server sets socketDataProcessed when it's done. -socketDataProcessed = threading.Event() - -class LogRecordSocketReceiver(ThreadingTCPServer): - """ - A simple-minded TCP socket-based logging receiver suitable for test - purposes. - """ - - allow_reuse_address = 1 - - def __init__(self, host='localhost', - port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, - handler=LogRecordStreamHandler): - ThreadingTCPServer.__init__(self, (host, port), handler) - self.abort = 0 - self.timeout = 1 - - def serve_until_stopped(self): - while not self.abort: - rd, wr, ex = select.select([self.socket.fileno()], [], [], - self.timeout) - if rd: - self.handle_request() - #notify the main thread that we're about to exit - socketDataProcessed.set() - # close the listen socket - self.server_close() - - def process_request(self, request, client_address): - #import threading - t = threading.Thread(target = self.finish_request, - args = (request, client_address)) - t.start() - -def runTCP(tcpserver): - tcpserver.serve_until_stopped() - -#---------------------------------------------------------------------------- -# Test 0 -#---------------------------------------------------------------------------- - -msgcount = 0 - -def nextmessage(): - global msgcount - rv = "Message %d" % msgcount - msgcount = msgcount + 1 - return rv - -def test0(): - ERR = logging.getLogger("ERR") - ERR.setLevel(logging.ERROR) - INF = logging.getLogger("INF") - INF.setLevel(logging.INFO) - INF_ERR = logging.getLogger("INF.ERR") - INF_ERR.setLevel(logging.ERROR) - DEB = logging.getLogger("DEB") - DEB.setLevel(logging.DEBUG) - - INF_UNDEF = logging.getLogger("INF.UNDEF") - INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF") - UNDEF = logging.getLogger("UNDEF") - - GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF") - CHILD = logging.getLogger("INF.BADPARENT") - - #These should log - ERR.log(logging.FATAL, nextmessage()) - ERR.error(nextmessage()) - - INF.log(logging.FATAL, nextmessage()) - INF.error(nextmessage()) - INF.warn(nextmessage()) - INF.info(nextmessage()) - - INF_UNDEF.log(logging.FATAL, nextmessage()) - INF_UNDEF.error(nextmessage()) - INF_UNDEF.warn (nextmessage()) - INF_UNDEF.info (nextmessage()) - - INF_ERR.log(logging.FATAL, nextmessage()) - INF_ERR.error(nextmessage()) - - INF_ERR_UNDEF.log(logging.FATAL, nextmessage()) - INF_ERR_UNDEF.error(nextmessage()) - - DEB.log(logging.FATAL, nextmessage()) - DEB.error(nextmessage()) - DEB.warn (nextmessage()) - DEB.info (nextmessage()) - DEB.debug(nextmessage()) - - UNDEF.log(logging.FATAL, nextmessage()) - UNDEF.error(nextmessage()) - UNDEF.warn (nextmessage()) - UNDEF.info (nextmessage()) - - GRANDCHILD.log(logging.FATAL, nextmessage()) - CHILD.log(logging.FATAL, nextmessage()) - - #These should not log - ERR.warn(nextmessage()) - ERR.info(nextmessage()) - ERR.debug(nextmessage()) - - INF.debug(nextmessage()) - INF_UNDEF.debug(nextmessage()) - - INF_ERR.warn(nextmessage()) - INF_ERR.info(nextmessage()) - INF_ERR.debug(nextmessage()) - INF_ERR_UNDEF.warn(nextmessage()) - INF_ERR_UNDEF.info(nextmessage()) - INF_ERR_UNDEF.debug(nextmessage()) - - INF.info(FINISH_UP) - -#---------------------------------------------------------------------------- -# Test 1 -#---------------------------------------------------------------------------- - -# -# First, we define our levels. There can be as many as you want - the only -# limitations are that they should be integers, the lowest should be > 0 and -# larger values mean less information being logged. If you need specific -# level values which do not fit into these limitations, you can use a -# mapping dictionary to convert between your application levels and the -# logging system. -# -SILENT = 10 -TACITURN = 9 -TERSE = 8 -EFFUSIVE = 7 -SOCIABLE = 6 -VERBOSE = 5 -TALKATIVE = 4 -GARRULOUS = 3 -CHATTERBOX = 2 -BORING = 1 - -LEVEL_RANGE = range(BORING, SILENT + 1) - -# -# Next, we define names for our levels. You don't need to do this - in which -# case the system will use "Level n" to denote the text for the level. -# -my_logging_levels = { - SILENT : 'Silent', - TACITURN : 'Taciturn', - TERSE : 'Terse', - EFFUSIVE : 'Effusive', - SOCIABLE : 'Sociable', - VERBOSE : 'Verbose', - TALKATIVE : 'Talkative', - GARRULOUS : 'Garrulous', - CHATTERBOX : 'Chatterbox', - BORING : 'Boring', -} - -# -# Now, to demonstrate filtering: suppose for some perverse reason we only -# want to print out all except GARRULOUS messages. Let's create a filter for -# this purpose... -# -class SpecificLevelFilter(logging.Filter): - def __init__(self, lvl): - self.level = lvl - - def filter(self, record): - return self.level != record.levelno - -class GarrulousFilter(SpecificLevelFilter): - def __init__(self): - SpecificLevelFilter.__init__(self, GARRULOUS) - -# -# Now, let's demonstrate filtering at the logger. This time, use a filter -# which excludes SOCIABLE and TACITURN messages. Note that GARRULOUS events -# are still excluded. -# -class VerySpecificFilter(logging.Filter): - def filter(self, record): - return record.levelno not in [SOCIABLE, TACITURN] - -def message(s): - sys.stdout.write("%s\n" % s) - -SHOULD1 = "This should only be seen at the '%s' logging level (or lower)" - -def test1(): -# -# Now, tell the logging system to associate names with our levels. -# - for lvl in my_logging_levels.keys(): - logging.addLevelName(lvl, my_logging_levels[lvl]) - -# -# Now, define a test function which logs an event at each of our levels. -# - - def doLog(log): - for lvl in LEVEL_RANGE: - log.log(lvl, SHOULD1, logging.getLevelName(lvl)) - - log = logging.getLogger("") - hdlr = log.handlers[0] -# -# Set the logging level to each different value and call the utility -# function to log events. -# In the output, you should see that each time round the loop, the number of -# logging events which are actually output decreases. -# - for lvl in LEVEL_RANGE: - message("-- setting logging level to '%s' -----" % - logging.getLevelName(lvl)) - log.setLevel(lvl) - doLog(log) - # - # Now, we demonstrate level filtering at the handler level. Tell the - # handler defined above to filter at level 'SOCIABLE', and repeat the - # above loop. Compare the output from the two runs. - # - hdlr.setLevel(SOCIABLE) - message("-- Filtering at handler level to SOCIABLE --") - for lvl in LEVEL_RANGE: - message("-- setting logging level to '%s' -----" % - logging.getLevelName(lvl)) - log.setLevel(lvl) - doLog(log) - - hdlr.setLevel(0) #turn off level filtering at the handler - - garr = GarrulousFilter() - hdlr.addFilter(garr) - message("-- Filtering using GARRULOUS filter --") - for lvl in LEVEL_RANGE: - message("-- setting logging level to '%s' -----" % - logging.getLevelName(lvl)) - log.setLevel(lvl) - doLog(log) - spec = VerySpecificFilter() - log.addFilter(spec) - message("-- Filtering using specific filter for SOCIABLE, TACITURN --") - for lvl in LEVEL_RANGE: - message("-- setting logging level to '%s' -----" % - logging.getLevelName(lvl)) - log.setLevel(lvl) - doLog(log) - - log.removeFilter(spec) - hdlr.removeFilter(garr) - #Undo the one level which clashes...for regression tests - logging.addLevelName(logging.DEBUG, "DEBUG") - -#---------------------------------------------------------------------------- -# Test 2 -#---------------------------------------------------------------------------- - -MSG = "-- logging %d at INFO, messages should be seen every 10 events --" -def test2(): - logger = logging.getLogger("") - sh = logger.handlers[0] - sh.close() - logger.removeHandler(sh) - mh = logging.handlers.MemoryHandler(10,logging.WARNING, sh) - logger.setLevel(logging.DEBUG) - logger.addHandler(mh) - message("-- logging at DEBUG, nothing should be seen yet --") - logger.debug("Debug message") - message("-- logging at INFO, nothing should be seen yet --") - logger.info("Info message") - message("-- logging at WARNING, 3 messages should be seen --") - logger.warn("Warn message") - for i in xrange(102): - message(MSG % i) - logger.info("Info index = %d", i) - mh.close() - logger.removeHandler(mh) - logger.addHandler(sh) - -#---------------------------------------------------------------------------- -# Test 3 -#---------------------------------------------------------------------------- - -FILTER = "a.b" - -def doLog3(): - logging.getLogger("a").info("Info 1") - logging.getLogger("a.b").info("Info 2") - logging.getLogger("a.c").info("Info 3") - logging.getLogger("a.b.c").info("Info 4") - logging.getLogger("a.b.c.d").info("Info 5") - logging.getLogger("a.bb.c").info("Info 6") - logging.getLogger("b").info("Info 7") - logging.getLogger("b.a").info("Info 8") - logging.getLogger("c.a.b").info("Info 9") - logging.getLogger("a.bb").info("Info 10") - -def test3(): - root = logging.getLogger() - root.setLevel(logging.DEBUG) - hand = root.handlers[0] - message("Unfiltered...") - doLog3() - message("Filtered with '%s'..." % FILTER) - filt = logging.Filter(FILTER) - hand.addFilter(filt) - doLog3() - hand.removeFilter(filt) - -#---------------------------------------------------------------------------- -# Test 4 -#---------------------------------------------------------------------------- - -# config0 is a standard configuration. -config0 = """ -[loggers] -keys=root - -[handlers] -keys=hand1 - -[formatters] -keys=form1 - -[logger_root] -level=NOTSET -handlers=hand1 - -[handler_hand1] -class=StreamHandler -level=NOTSET -formatter=form1 -args=(sys.stdout,) - -[formatter_form1] -format=%(levelname)s:%(name)s:%(message)s -datefmt= -""" - -# config1 adds a little to the standard configuration. -config1 = """ -[loggers] -keys=root,parser - -[handlers] -keys=hand1 - -[formatters] -keys=form1 - -[logger_root] -level=NOTSET -handlers=hand1 - -[logger_parser] -level=DEBUG -handlers=hand1 -propagate=1 -qualname=compiler.parser - -[handler_hand1] -class=StreamHandler -level=NOTSET -formatter=form1 -args=(sys.stdout,) - -[formatter_form1] -format=%(levelname)s:%(name)s:%(message)s -datefmt= -""" - -# config2 has a subtle configuration error that should be reported -config2 = string.replace(config1, "sys.stdout", "sys.stbout") - -# config3 has a less subtle configuration error -config3 = string.replace( - config1, "formatter=form1", "formatter=misspelled_name") - -def test4(): - for i in range(4): - conf = globals()['config%d' % i] - sys.stdout.write('config%d: ' % i) - loggerDict = logging.getLogger().manager.loggerDict - logging._acquireLock() - try: - saved_handlers = logging._handlers.copy() - saved_handler_list = logging._handlerList[:] - saved_loggers = loggerDict.copy() - finally: - logging._releaseLock() - try: - fn = tempfile.mktemp(".ini") - f = open(fn, "w") - f.write(conf) - f.close() - try: - logging.config.fileConfig(fn) - #call again to make sure cleanup is correct - logging.config.fileConfig(fn) - except: - t = sys.exc_info()[0] - message(str(t)) - else: - message('ok.') - os.remove(fn) - finally: - logging._acquireLock() - try: - logging._handlers.clear() - logging._handlers.update(saved_handlers) - logging._handlerList[:] = saved_handler_list - loggerDict = logging.getLogger().manager.loggerDict - loggerDict.clear() - loggerDict.update(saved_loggers) - finally: - logging._releaseLock() - -#---------------------------------------------------------------------------- -# Test 5 -#---------------------------------------------------------------------------- - -test5_config = """ -[loggers] -keys=root - -[handlers] -keys=hand1 - -[formatters] -keys=form1 - -[logger_root] -level=NOTSET -handlers=hand1 - -[handler_hand1] -class=StreamHandler -level=NOTSET -formatter=form1 -args=(sys.stdout,) - -[formatter_form1] -class=test.test_logging.FriendlyFormatter -format=%(levelname)s:%(name)s:%(message)s -datefmt= -""" - -class FriendlyFormatter (logging.Formatter): - def formatException(self, ei): - return "%s... Don't panic!" % str(ei[0]) - - -def test5(): - loggerDict = logging.getLogger().manager.loggerDict - logging._acquireLock() - try: - saved_handlers = logging._handlers.copy() - saved_handler_list = logging._handlerList[:] - saved_loggers = loggerDict.copy() - finally: - logging._releaseLock() - try: - fn = tempfile.mktemp(".ini") - f = open(fn, "w") - f.write(test5_config) - f.close() - logging.config.fileConfig(fn) - try: - raise KeyError - except KeyError: - logging.exception("just testing") - os.remove(fn) - hdlr = logging.getLogger().handlers[0] - logging.getLogger().handlers.remove(hdlr) - finally: - logging._acquireLock() - try: - logging._handlers.clear() - logging._handlers.update(saved_handlers) - logging._handlerList[:] = saved_handler_list - loggerDict = logging.getLogger().manager.loggerDict - loggerDict.clear() - loggerDict.update(saved_loggers) - finally: - logging._releaseLock() - - -#---------------------------------------------------------------------------- -# Test Harness -#---------------------------------------------------------------------------- -def banner(nm, typ): - sep = BANNER % (nm, typ) - sys.stdout.write(sep) - sys.stdout.flush() - -def test_main_inner(): - rootLogger = logging.getLogger("") - rootLogger.setLevel(logging.DEBUG) - hdlr = logging.StreamHandler(sys.stdout) - fmt = logging.Formatter(logging.BASIC_FORMAT) - hdlr.setFormatter(fmt) - rootLogger.addHandler(hdlr) - - # Find an unused port number - port = logging.handlers.DEFAULT_TCP_LOGGING_PORT - while port < logging.handlers.DEFAULT_TCP_LOGGING_PORT+100: - try: - tcpserver = LogRecordSocketReceiver(port=port) - except socket.error: - port += 1 - else: - break - else: - raise ImportError, "Could not find unused port" - - - #Set up a handler such that all events are sent via a socket to the log - #receiver (logrecv). - #The handler will only be added to the rootLogger for some of the tests - shdlr = logging.handlers.SocketHandler('localhost', port) - - #Configure the logger for logrecv so events do not propagate beyond it. - #The sockLogger output is buffered in memory until the end of the test, - #and printed at the end. - sockOut = cStringIO.StringIO() - sockLogger = logging.getLogger("logrecv") - sockLogger.setLevel(logging.DEBUG) - sockhdlr = logging.StreamHandler(sockOut) - sockhdlr.setFormatter(logging.Formatter( - "%(name)s -> %(levelname)s: %(message)s")) - sockLogger.addHandler(sockhdlr) - sockLogger.propagate = 0 - - #Set up servers - threads = [] - #sys.stdout.write("About to start TCP server...\n") - threads.append(threading.Thread(target=runTCP, args=(tcpserver,))) - - for thread in threads: - thread.start() - try: - banner("log_test0", "begin") - - rootLogger.addHandler(shdlr) - test0() - # XXX(nnorwitz): Try to fix timing related test failures. - # This sleep gives us some extra time to read messages. - # The test generally only fails on Solaris without this sleep. - time.sleep(2.0) - shdlr.close() - rootLogger.removeHandler(shdlr) - - banner("log_test0", "end") - - for t in range(1,6): - banner("log_test%d" % t, "begin") - globals()['test%d' % t]() - banner("log_test%d" % t, "end") - - finally: - #wait for TCP receiver to terminate - socketDataProcessed.wait() - # ensure the server dies - tcpserver.abort = 1 - for thread in threads: - thread.join(2.0) - banner("logrecv output", "begin") - sys.stdout.write(sockOut.getvalue()) - sockOut.close() - sockLogger.removeHandler(sockhdlr) - sockhdlr.close() - banner("logrecv output", "end") - sys.stdout.flush() - try: - hdlr.close() - except: - pass - rootLogger.removeHandler(hdlr) - -# Set the locale to the platform-dependent default. I have no idea -# why the test does this, but in any case we save the current locale -# first and restore it at the end. -@run_with_locale('LC_ALL', '') -def test_main(): - # Save and restore the original root logger level across the tests. - # Otherwise, e.g., if any test using cookielib runs after test_logging, - # cookielib's debug-level logger tries to log messages, leading to - # confusing: - # No handlers could be found for logger "cookielib" - # output while the tests are running. - root_logger = logging.getLogger("") - original_logging_level = root_logger.getEffectiveLevel() - try: - test_main_inner() - finally: - root_logger.setLevel(original_logging_level) - -if __name__ == "__main__": - sys.stdout.write("test_logging\n") - test_main() -- cgit v1.2.3