summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
Diffstat (limited to 'Lib')
-rw-r--r--Lib/logging/handlers.py72
-rw-r--r--Lib/test/test_code.py4
-rw-r--r--Lib/test/test_collections.py5
-rw-r--r--Lib/test/test_io.py31
-rw-r--r--Lib/test/test_lib2to3.py4
-rw-r--r--Lib/test/test_signal.py30
-rw-r--r--Lib/test/test_socket.py14
-rw-r--r--Lib/test/test_support.py18
8 files changed, 126 insertions, 52 deletions
diff --git a/Lib/logging/handlers.py b/Lib/logging/handlers.py
index 4645f3e..9cb0e09 100644
--- a/Lib/logging/handlers.py
+++ b/Lib/logging/handlers.py
@@ -19,12 +19,12 @@ Additional handlers for the logging package for Python. The core package is
based on PEP 282 and comments thereto in comp.lang.python, and influenced by
Apache's log4j system.
-Copyright (C) 2001-2007 Vinay Sajip. All Rights Reserved.
+Copyright (C) 2001-2008 Vinay Sajip. All Rights Reserved.
To use, simply 'import logging' and log away!
"""
-import logging, socket, os, pickle, struct, time, glob
+import logging, socket, os, pickle, struct, time, re
from stat import ST_DEV, ST_INO
try:
@@ -173,15 +173,19 @@ class TimedRotatingFileHandler(BaseRotatingHandler):
if self.when == 'S':
self.interval = 1 # one second
self.suffix = "%Y-%m-%d_%H-%M-%S"
+ self.extMatch = r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}$"
elif self.when == 'M':
self.interval = 60 # one minute
self.suffix = "%Y-%m-%d_%H-%M"
+ self.extMatch = r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}$"
elif self.when == 'H':
self.interval = 60 * 60 # one hour
self.suffix = "%Y-%m-%d_%H"
+ self.extMatch = r"^\d{4}-\d{2}-\d{2}_\d{2}$"
elif self.when == 'D' or self.when == 'MIDNIGHT':
self.interval = 60 * 60 * 24 # one day
self.suffix = "%Y-%m-%d"
+ self.extMatch = r"^\d{4}-\d{2}-\d{2}$"
elif self.when.startswith('W'):
self.interval = 60 * 60 * 24 * 7 # one week
if len(self.when) != 2:
@@ -190,9 +194,11 @@ class TimedRotatingFileHandler(BaseRotatingHandler):
raise ValueError("Invalid day specified for weekly rollover: %s" % self.when)
self.dayOfWeek = int(self.when[1])
self.suffix = "%Y-%m-%d"
+ self.extMatch = r"^\d{4}-\d{2}-\d{2}$"
else:
raise ValueError("Invalid rollover interval specified: %s" % self.when)
+ self.extMatch = re.compile(self.extMatch)
self.interval = self.interval * interval # multiply by units requested
self.rolloverAt = currentTime + self.interval
@@ -235,16 +241,24 @@ class TimedRotatingFileHandler(BaseRotatingHandler):
daysToWait = self.dayOfWeek - day
else:
daysToWait = 6 - day + self.dayOfWeek + 1
- self.rolloverAt = self.rolloverAt + (daysToWait * (60 * 60 * 24))
+ newRolloverAt = self.rolloverAt + (daysToWait * (60 * 60 * 24))
+ dstNow = t[-1]
+ dstAtRollover = time.localtime(newRolloverAt)[-1]
+ if dstNow != dstAtRollover:
+ if not dstNow: # DST kicks in before next rollover, so we need to deduct an hour
+ newRolloverAt = newRolloverAt - 3600
+ else: # DST bows out before next rollover, so we need to add an hour
+ newRolloverAt = newRolloverAt + 3600
+ self.rolloverAt = newRolloverAt
#print "Will rollover at %d, %d seconds from now" % (self.rolloverAt, self.rolloverAt - currentTime)
def shouldRollover(self, record):
"""
- Determine if rollover should occur
+ Determine if rollover should occur.
record is not used, as we are just comparing times, but it is needed so
- the method siguratures are the same
+ the method signatures are the same
"""
t = int(time.time())
if t >= self.rolloverAt:
@@ -252,6 +266,29 @@ class TimedRotatingFileHandler(BaseRotatingHandler):
#print "No need to rollover: %d, %d" % (t, self.rolloverAt)
return 0
+ def getFilesToDelete(self):
+ """
+ Determine the files to delete when rolling over.
+
+ More specific than the earlier method, which just used glob.glob().
+ """
+ dirName, baseName = os.path.split(self.baseFilename)
+ fileNames = os.listdir(dirName)
+ result = []
+ prefix = baseName + "."
+ plen = len(prefix)
+ for fileName in fileNames:
+ if fileName[:plen] == prefix:
+ suffix = fileName[plen:]
+ if self.extMatch.match(suffix):
+ result.append(fileName)
+ result.sort()
+ if len(result) < self.backupCount:
+ result = []
+ else:
+ result = result[:len(result) - self.backupCount]
+ return result
+
def doRollover(self):
"""
do a rollover; in this case, a date/time stamp is appended to the filename
@@ -270,14 +307,29 @@ class TimedRotatingFileHandler(BaseRotatingHandler):
os.rename(self.baseFilename, dfn)
if self.backupCount > 0:
# find the oldest log file and delete it
- s = glob.glob(self.baseFilename + ".20*")
- if len(s) > self.backupCount:
- s.sort()
- os.remove(s[0])
+ #s = glob.glob(self.baseFilename + ".20*")
+ #if len(s) > self.backupCount:
+ # s.sort()
+ # os.remove(s[0])
+ for s in self.getFilesToDelete():
+ os.remove(s)
#print "%s -> %s" % (self.baseFilename, dfn)
self.mode = 'w'
self.stream = self._open()
- self.rolloverAt = self.rolloverAt + self.interval
+ newRolloverAt = self.rolloverAt + self.interval
+ currentTime = int(time.time())
+ while newRolloverAt <= currentTime:
+ newRolloverAt = newRolloverAt + self.interval
+ #If DST changes and midnight or weekly rollover, adjust for this.
+ if self.when == 'MIDNIGHT' or self.when.startswith('W'):
+ dstNow = time.localtime(currentTime)[-1]
+ dstAtRollover = time.localtime(newRolloverAt)[-1]
+ if dstNow != dstAtRollover:
+ if not dstNow: # DST kicks in before next rollover, so we need to deduct an hour
+ newRolloverAt = newRolloverAt - 3600
+ else: # DST bows out before next rollover, so we need to add an hour
+ newRolloverAt = newRolloverAt + 3600
+ self.rolloverAt = newRolloverAt
class WatchedFileHandler(logging.FileHandler):
"""
diff --git a/Lib/test/test_code.py b/Lib/test/test_code.py
index 7512263..63378c2 100644
--- a/Lib/test/test_code.py
+++ b/Lib/test/test_code.py
@@ -122,3 +122,7 @@ def test_main(verbose=None):
from test.test_support import run_doctest
from test import test_code
run_doctest(test_code, verbose)
+
+
+if __name__ == '__main__':
+ test_main()
diff --git a/Lib/test/test_collections.py b/Lib/test/test_collections.py
index 77af0fb..9384744 100644
--- a/Lib/test/test_collections.py
+++ b/Lib/test/test_collections.py
@@ -317,13 +317,12 @@ class TestCollectionABCs(unittest.TestCase):
self.failIf(issubclass(str, MutableSequence))
import doctest, collections
-NamedTupleDocs = doctest.DocTestSuite(module=collections)
def test_main(verbose=None):
- import collections as CollectionsModule
+ NamedTupleDocs = doctest.DocTestSuite(module=collections)
test_classes = [TestNamedTuple, NamedTupleDocs, TestOneTrickPonyABCs, TestCollectionABCs]
test_support.run_unittest(*test_classes)
- test_support.run_doctest(CollectionsModule, verbose)
+ test_support.run_doctest(collections, verbose)
if __name__ == "__main__":
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index f0e79312..0c37cc7 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -574,6 +574,22 @@ class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
self.buffer = bytearray()
return output
+ codecEnabled = False
+
+ @classmethod
+ def lookupTestDecoder(cls, name):
+ if cls.codecEnabled and name == 'test_decoder':
+ return codecs.CodecInfo(
+ name='test_decoder', encode=None, decode=None,
+ incrementalencoder=None,
+ streamreader=None, streamwriter=None,
+ incrementaldecoder=cls)
+
+# Register the previous decoder for testing.
+# Disabled by default, tests will enable it.
+codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
+
+
class StatefulIncrementalDecoderTest(unittest.TestCase):
"""
Make sure the StatefulIncrementalDecoder actually works.
@@ -899,14 +915,6 @@ class TextIOWrapperTest(unittest.TestCase):
def testSeekAndTell(self):
"""Test seek/tell using the StatefulIncrementalDecoder."""
- def lookupTestDecoder(name):
- if self.codecEnabled and name == 'test_decoder':
- return codecs.CodecInfo(
- name='test_decoder', encode=None, decode=None,
- incrementalencoder=None,
- streamreader=None, streamwriter=None,
- incrementaldecoder=StatefulIncrementalDecoder)
-
def testSeekAndTellWithData(data, min_pos=0):
"""Tell/seek to various points within a data stream and ensure
that the decoded data returned by read() is consistent."""
@@ -927,9 +935,8 @@ class TextIOWrapperTest(unittest.TestCase):
self.assertEquals(f.read(), decoded[i:])
f.close()
- # Register a special incremental decoder for testing.
- codecs.register(lookupTestDecoder)
- self.codecEnabled = 1
+ # Enable the test decoder.
+ StatefulIncrementalDecoder.codecEnabled = 1
# Run the tests.
try:
@@ -948,7 +955,7 @@ class TextIOWrapperTest(unittest.TestCase):
# Ensure our test decoder won't interfere with subsequent tests.
finally:
- self.codecEnabled = 0
+ StatefulIncrementalDecoder.codecEnabled = 0
def testEncodedWrites(self):
data = "1234567890"
diff --git a/Lib/test/test_lib2to3.py b/Lib/test/test_lib2to3.py
index beda695..161b9dd 100644
--- a/Lib/test/test_lib2to3.py
+++ b/Lib/test/test_lib2to3.py
@@ -13,3 +13,7 @@ def suite():
def test_main():
run_unittest(suite())
+
+
+if __name__ == '__main__':
+ test_main()
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py
index 3c039a1..8c12f57 100644
--- a/Lib/test/test_signal.py
+++ b/Lib/test/test_signal.py
@@ -28,6 +28,15 @@ def exit_subprocess():
os._exit(0)
+def ignoring_eintr(__func, *args, **kwargs):
+ try:
+ return __func(*args, **kwargs)
+ except IOError as e:
+ if e.errno != signal.EINTR:
+ raise
+ return None
+
+
class InterProcessSignalTests(unittest.TestCase):
MAX_DURATION = 20 # Entire test should last at most 20 sec.
@@ -77,8 +86,11 @@ class InterProcessSignalTests(unittest.TestCase):
if test_support.verbose:
print("test runner's pid is", pid)
- child = subprocess.Popen(['kill', '-HUP', str(pid)])
- self.wait(child)
+ child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)])
+ if child:
+ self.wait(child)
+ if not self.a_called:
+ time.sleep(1) # Give the signal time to be delivered.
self.assertTrue(self.a_called)
self.assertFalse(self.b_called)
self.a_called = False
@@ -87,6 +99,7 @@ class InterProcessSignalTests(unittest.TestCase):
child = subprocess.Popen(['kill', '-USR1', str(pid)])
# This wait should be interrupted by the signal's exception.
self.wait(child)
+ time.sleep(1) # Give the signal time to be delivered.
self.fail('HandlerBCalled exception not thrown')
except HandlerBCalled:
self.assertTrue(self.b_called)
@@ -94,8 +107,9 @@ class InterProcessSignalTests(unittest.TestCase):
if test_support.verbose:
print("HandlerBCalled exception caught")
- child = subprocess.Popen(['kill', '-USR2', str(pid)])
- self.wait(child) # Nothing should happen.
+ child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)])
+ if child:
+ self.wait(child) # Nothing should happen.
try:
signal.alarm(1)
@@ -103,14 +117,18 @@ class InterProcessSignalTests(unittest.TestCase):
# since alarm is going to raise a KeyboardException, which
# will skip the call.
signal.pause()
+ # But if another signal arrives before the alarm, pause
+ # may return early.
+ time.sleep(1)
except KeyboardInterrupt:
if test_support.verbose:
print("KeyboardInterrupt (the alarm() went off)")
except:
- self.fail('Some other exception woke us from pause: %s' %
+ self.fail("Some other exception woke us from pause: %s" %
traceback.format_exc())
else:
- self.fail('pause returned of its own accord')
+ self.fail("pause returned of its own accord, and the signal"
+ " didn't arrive after another second.")
def test_main(self):
# This function spawns a child process to insulate the main
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py
index f4abffa..2bec373 100644
--- a/Lib/test/test_socket.py
+++ b/Lib/test/test_socket.py
@@ -15,14 +15,6 @@ import array
from weakref import proxy
import signal
-# Temporary hack to see why test_socket hangs on one buildbot
-if os.environ.get('COMPUTERNAME') == "GRAPE":
- def verbose_write(arg):
- print(arg, file=sys.__stdout__)
-else:
- def verbose_write(arg):
- pass
-
PORT = 50007
HOST = 'localhost'
MSG = b'Michael Gilfix was here\n'
@@ -30,21 +22,15 @@ MSG = b'Michael Gilfix was here\n'
class SocketTCPTest(unittest.TestCase):
def setUp(self):
- verbose_write(self)
self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- verbose_write(str(self) + " socket created")
global PORT
PORT = test_support.bind_port(self.serv, HOST, PORT)
- verbose_write(str(self) + " start listening")
self.serv.listen(1)
- verbose_write(str(self) + " started")
def tearDown(self):
- verbose_write(str(self) + " close")
self.serv.close()
self.serv = None
- verbose_write(str(self) + " done")
class SocketUDPTest(unittest.TestCase):
diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py
index f9ed396..d2be9bf 100644
--- a/Lib/test/test_support.py
+++ b/Lib/test/test_support.py
@@ -381,19 +381,23 @@ def transient_internet():
@contextlib.contextmanager
-def captured_stdout():
- """Run the with statement body using a StringIO object as sys.stdout.
- Example use::
+def captured_output(stream_name):
+ """Run the 'with' statement body using a StringIO object in place of a
+ specific attribute on the sys module.
+ Example use (with 'stream_name=stdout')::
with captured_stdout() as s:
print "hello"
assert s.getvalue() == "hello"
"""
import io
- orig_stdout = sys.stdout
- sys.stdout = io.StringIO()
- yield sys.stdout
- sys.stdout = orig_stdout
+ orig_stdout = getattr(sys, stream_name)
+ setattr(sys, stream_name, io.StringIO())
+ yield getattr(sys, stream_name)
+ setattr(sys, stream_name, orig_stdout)
+
+def captured_stdout():
+ return captured_output("stdout")
#=======================================================================