diff options
Diffstat (limited to 'Lib/test')
-rwxr-xr-x | Lib/test/regrtest.py | 28 | ||||
-rw-r--r-- | Lib/test/test_epoll.py | 189 | ||||
-rw-r--r-- | Lib/test/test_kqueue.py | 166 | ||||
-rw-r--r-- | Lib/test/test_poll.py | 3 | ||||
-rw-r--r-- | Lib/test/test_signal.py | 241 | ||||
-rw-r--r-- | Lib/test/test_threading.py | 46 | ||||
-rw-r--r-- | Lib/test/test_urllib2.py | 8 |
7 files changed, 532 insertions, 149 deletions
diff --git a/Lib/test/regrtest.py b/Lib/test/regrtest.py index 2212af5..9b8b572 100755 --- a/Lib/test/regrtest.py +++ b/Lib/test/regrtest.py @@ -873,10 +873,12 @@ _expectations = { test_dl test_fcntl test_fork1 + test_epoll test_gdbm test_grp test_ioctl test_largefile + test_kqueue test_mhlib test_openpty test_ossaudiodev @@ -897,6 +899,7 @@ _expectations = { test_curses test_dl test_largefile + test_kqueue test_ossaudiodev """, 'mac': @@ -912,10 +915,12 @@ _expectations = { test_dl test_fcntl test_fork1 + test_epoll test_grp test_ioctl test_largefile test_locale + test_kqueue test_mmap test_openpty test_ossaudiodev @@ -933,7 +938,9 @@ _expectations = { """ test_bsddb test_dl + test_epoll test_largefile + test_kqueue test_minidom test_openpty test_pyexpat @@ -944,7 +951,9 @@ _expectations = { """ test_bsddb test_dl + test_epoll test_largefile + test_kqueue test_minidom test_openpty test_pyexpat @@ -957,9 +966,11 @@ _expectations = { test_bsddb test_dl test_fork1 + test_epoll test_gettext test_largefile test_locale + test_kqueue test_minidom test_openpty test_pyexpat @@ -977,9 +988,11 @@ _expectations = { test_bsddb test_bsddb3 test_curses + test_epoll test_gdbm test_largefile test_locale + test_minidom test_ossaudiodev test_poll """, @@ -988,6 +1001,8 @@ _expectations = { test_bsddb test_curses test_dbm + test_epoll + test_kqueue test_gdbm test_gzip test_openpty @@ -999,10 +1014,12 @@ _expectations = { test_bsddb test_curses test_dl + test_epoll test_gdbm test_gzip test_largefile test_locale + test_kqueue test_minidom test_openpty test_pyexpat @@ -1015,8 +1032,10 @@ _expectations = { test_curses test_dl test_gdbm + test_epoll test_largefile test_locale + test_kqueue test_mhlib test_mmap test_poll @@ -1027,7 +1046,9 @@ _expectations = { test_bsddb3 test_curses test_dbm + test_epoll test_ioctl + test_kqueue test_largefile test_locale test_ossaudiodev @@ -1040,6 +1061,8 @@ _expectations = { test_commands test_curses test_dl + test_epoll + test_kqueue test_largefile test_mhlib test_mmap @@ -1053,6 +1076,7 @@ _expectations = { """ test_bsddb test_bsddb3 + test_epoll test_gdbm test_locale test_ossaudiodev @@ -1069,8 +1093,10 @@ _expectations = { test_bsddb3 test_bz2 test_dl + test_epoll test_gdbm test_gzip + test_kqueue test_ossaudiodev test_tcl test_zipimport @@ -1082,6 +1108,7 @@ _expectations = { test_bsddb3 test_ctypes test_dl + test_epoll test_gdbm test_locale test_normalization @@ -1096,6 +1123,7 @@ _expectations = { test_ctypes test_curses test_dl + test_epoll test_gdbm test_locale test_ossaudiodev diff --git a/Lib/test/test_epoll.py b/Lib/test/test_epoll.py new file mode 100644 index 0000000..877a54e --- /dev/null +++ b/Lib/test/test_epoll.py @@ -0,0 +1,189 @@ +# Copyright (c) 2001-2006 Twisted Matrix Laboratories. +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +""" +Tests for epoll wrapper. +""" +import os +import socket +import errno +import time +import select +import tempfile +import unittest + +from test import test_support +if not hasattr(select, "epoll"): + raise test_support.TestSkipped("test works only on Linux 2.6") + +class TestEPoll(unittest.TestCase): + + def setUp(self): + self.serverSocket = socket.socket() + self.serverSocket.bind(('127.0.0.1', 0)) + self.serverSocket.listen(1) + self.connections = [self.serverSocket] + + + def tearDown(self): + for skt in self.connections: + skt.close() + + def _connected_pair(self): + client = socket.socket() + client.setblocking(False) + try: + client.connect(('127.0.0.1', self.serverSocket.getsockname()[1])) + except socket.error as e: + self.assertEquals(e.args[0], errno.EINPROGRESS) + else: + raise AssertionError("Connect should have raised EINPROGRESS") + server, addr = self.serverSocket.accept() + + self.connections.extend((client, server)) + return client, server + + def test_create(self): + try: + ep = select.epoll(16) + except OSError as e: + raise AssertionError(str(e)) + self.assert_(ep.fileno() > 0, ep.fileno()) + self.assert_(not ep.closed) + ep.close() + self.assert_(ep.closed) + self.assertRaises(ValueError, ep.fileno) + + def test_badcreate(self): + self.assertRaises(TypeError, select.epoll, 1, 2, 3) + self.assertRaises(TypeError, select.epoll, 'foo') + self.assertRaises(TypeError, select.epoll, None) + self.assertRaises(TypeError, select.epoll, ()) + self.assertRaises(TypeError, select.epoll, ['foo']) + self.assertRaises(TypeError, select.epoll, {}) + + def test_add(self): + server, client = self._connected_pair() + + ep = select.epoll(2) + try: + ep.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT) + ep.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT) + finally: + ep.close() + + def test_fromfd(self): + server, client = self._connected_pair() + + ep = select.epoll(2) + ep2 = select.epoll.fromfd(ep.fileno()) + + ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT) + ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT) + + events = ep.poll(1, 4) + events2 = ep2.poll(0.9, 4) + self.assertEqual(len(events), 2) + self.assertEqual(len(events2), 2) + + ep.close() + try: + ep2.poll(1, 4) + except IOError as e: + self.failUnlessEqual(e.args[0], errno.EBADF, e) + else: + self.fail("epoll on closed fd didn't raise EBADF") + + def test_control_and_wait(self): + client, server = self._connected_pair() + + ep = select.epoll(16) + ep.register(server.fileno(), + select.EPOLLIN | select.EPOLLOUT | select.EPOLLET) + ep.register(client.fileno(), + select.EPOLLIN | select.EPOLLOUT | select.EPOLLET) + + now = time.time() + events = ep.poll(1, 4) + then = time.time() + self.failIf(then - now > 0.1, then - now) + + events.sort() + expected = [(client.fileno(), select.EPOLLOUT), + (server.fileno(), select.EPOLLOUT)] + expected.sort() + + self.assertEquals(events, expected) + self.failIf(then - now > 0.01, then - now) + + now = time.time() + events = ep.poll(timeout=2.1, maxevents=4) + then = time.time() + self.failIf(events) + + client.send(b"Hello!") + server.send(b"world!!!") + + now = time.time() + events = ep.poll(1, 4) + then = time.time() + self.failIf(then - now > 0.01) + + events.sort() + expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT), + (server.fileno(), select.EPOLLIN | select.EPOLLOUT)] + expected.sort() + + self.assertEquals(events, expected) + + ep.unregister(client.fileno()) + ep.modify(server.fileno(), select.EPOLLOUT) + now = time.time() + events = ep.poll(1, 4) + then = time.time() + self.failIf(then - now > 0.01) + + expected = [(server.fileno(), select.EPOLLOUT)] + self.assertEquals(events, expected) + + def test_errors(self): + self.assertRaises(ValueError, select.epoll, -2) + self.assertRaises(ValueError, select.epoll().register, -1, + select.EPOLLIN) + + def test_unregister_closed(self): + server, client = self._connected_pair() + fd = server.fileno() + ep = select.epoll(16) + ep.register(server) + + now = time.time() + events = ep.poll(1, 4) + then = time.time() + self.failIf(then - now > 0.01) + + server.close() + ep.unregister(fd) + +def test_main(): + test_support.run_unittest(TestEPoll) + +if __name__ == "__main__": + test_main() diff --git a/Lib/test/test_kqueue.py b/Lib/test/test_kqueue.py new file mode 100644 index 0000000..310eb33 --- /dev/null +++ b/Lib/test/test_kqueue.py @@ -0,0 +1,166 @@ +""" +Tests for kqueue wrapper. +""" +import socket +import errno +import time +import select +import sys +import unittest + +from test import test_support +if not hasattr(select, "kqueue"): + raise test_support.TestSkipped("test works only on BSD") + +class TestKQueue(unittest.TestCase): + def test_create_queue(self): + kq = select.kqueue() + self.assert_(kq.fileno() > 0, kq.fileno()) + self.assert_(not kq.closed) + kq.close() + self.assert_(kq.closed) + self.assertRaises(ValueError, kq.fileno) + + def test_create_event(self): + fd = sys.stderr.fileno() + ev = select.kevent(fd) + other = select.kevent(1000) + self.assertEqual(ev.ident, fd) + self.assertEqual(ev.filter, select.KQ_FILTER_READ) + self.assertEqual(ev.flags, select.KQ_EV_ADD) + self.assertEqual(ev.fflags, 0) + self.assertEqual(ev.data, 0) + self.assertEqual(ev.udata, 0) + self.assertEqual(ev, ev) + self.assertNotEqual(ev, other) + self.assertEqual(cmp(ev, other), -1) + self.assert_(ev < other) + self.assert_(other >= ev) + self.assertRaises(TypeError, cmp, ev, None) + self.assertRaises(TypeError, cmp, ev, 1) + self.assertRaises(TypeError, cmp, ev, "ev") + + ev = select.kevent(fd, select.KQ_FILTER_WRITE) + self.assertEqual(ev.ident, fd) + self.assertEqual(ev.filter, select.KQ_FILTER_WRITE) + self.assertEqual(ev.flags, select.KQ_EV_ADD) + self.assertEqual(ev.fflags, 0) + self.assertEqual(ev.data, 0) + self.assertEqual(ev.udata, 0) + self.assertEqual(ev, ev) + self.assertNotEqual(ev, other) + + ev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ONESHOT) + self.assertEqual(ev.ident, fd) + self.assertEqual(ev.filter, select.KQ_FILTER_WRITE) + self.assertEqual(ev.flags, select.KQ_EV_ONESHOT) + self.assertEqual(ev.fflags, 0) + self.assertEqual(ev.data, 0) + self.assertEqual(ev.udata, 0) + self.assertEqual(ev, ev) + self.assertNotEqual(ev, other) + + ev = select.kevent(1, 2, 3, 4, 5, 6) + self.assertEqual(ev.ident, 1) + self.assertEqual(ev.filter, 2) + self.assertEqual(ev.flags, 3) + self.assertEqual(ev.fflags, 4) + self.assertEqual(ev.data, 5) + self.assertEqual(ev.udata, 6) + self.assertEqual(ev, ev) + self.assertNotEqual(ev, other) + + def test_queue_event(self): + serverSocket = socket.socket() + serverSocket.bind(('127.0.0.1', 0)) + serverSocket.listen(1) + client = socket.socket() + client.setblocking(False) + try: + client.connect(('127.0.0.1', serverSocket.getsockname()[1])) + except socket.error as e: + self.assertEquals(e.args[0], errno.EINPROGRESS) + else: + #raise AssertionError("Connect should have raised EINPROGRESS") + pass # FreeBSD doesn't raise an exception here + server, addr = serverSocket.accept() + + if sys.platform.startswith("darwin"): + flags = select.KQ_EV_ADD | select.KQ_EV_ENABLE + else: + flags = 0 + + kq = select.kqueue() + kq2 = select.kqueue.fromfd(kq.fileno()) + + ev = select.kevent(server.fileno(), + select.KQ_FILTER_WRITE, + select.KQ_EV_ADD | select.KQ_EV_ENABLE) + kq.control([ev], 0) + ev = select.kevent(server.fileno(), + select.KQ_FILTER_READ, + select.KQ_EV_ADD | select.KQ_EV_ENABLE) + kq.control([ev], 0) + ev = select.kevent(client.fileno(), + select.KQ_FILTER_WRITE, + select.KQ_EV_ADD | select.KQ_EV_ENABLE) + kq2.control([ev], 0) + ev = select.kevent(client.fileno(), + select.KQ_FILTER_READ, + select.KQ_EV_ADD | select.KQ_EV_ENABLE) + kq2.control([ev], 0) + + events = kq.control(None, 4, 1) + events = [(e.ident, e.filter, e.flags) for e in events] + events.sort() + self.assertEquals(events, [ + (client.fileno(), select.KQ_FILTER_WRITE, flags), + (server.fileno(), select.KQ_FILTER_WRITE, flags)]) + + client.send(b"Hello!") + server.send(b"world!!!") + + events = kq.control(None, 4, 1) + # We may need to call it several times + for i in range(5): + if len(events) == 4: + break + events = kq.control(None, 4, 1) + events = [(e.ident, e.filter, e.flags) for e in events] + events.sort() + + self.assertEquals(events, [ + (client.fileno(), select.KQ_FILTER_WRITE, flags), + (client.fileno(), select.KQ_FILTER_READ, flags), + (server.fileno(), select.KQ_FILTER_WRITE, flags), + (server.fileno(), select.KQ_FILTER_READ, flags)]) + + # Remove completely client, and server read part + ev = select.kevent(client.fileno(), + select.KQ_FILTER_WRITE, + select.KQ_EV_DELETE) + kq.control([ev], 0) + ev = select.kevent(client.fileno(), + select.KQ_FILTER_READ, + select.KQ_EV_DELETE) + kq.control([ev], 0) + ev = select.kevent(server.fileno(), + select.KQ_FILTER_READ, + select.KQ_EV_DELETE) + kq.control([ev], 0, 0) + + events = kq.control([], 4, 0.99) + events = [(e.ident, e.filter, e.flags) for e in events] + events.sort() + self.assertEquals(events, [ + (server.fileno(), select.KQ_FILTER_WRITE, flags)]) + + client.close() + server.close() + serverSocket.close() + +def test_main(): + test_support.run_unittest(TestKQueue) + +if __name__ == "__main__": + test_main() diff --git a/Lib/test/test_poll.py b/Lib/test/test_poll.py index cd9bfb0..d546c78 100644 --- a/Lib/test/test_poll.py +++ b/Lib/test/test_poll.py @@ -34,7 +34,8 @@ class PollTests(unittest.TestCase): for i in range(NUM_PIPES): rd, wr = os.pipe() - p.register(rd, select.POLLIN) + p.register(rd) + p.modify(rd, select.POLLIN) p.register(wr, select.POLLOUT) readers.append(rd) writers.append(wr) diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py index 1d25814..2834076 100644 --- a/Lib/test/test_signal.py +++ b/Lib/test/test_signal.py @@ -1,6 +1,11 @@ import unittest from test import test_support +from contextlib import closing, nested +import pickle +import select import signal +import subprocess +import traceback import sys, os, time, errno if sys.platform[:3] in ('win', 'os2') or sys.platform == 'riscos': @@ -11,40 +16,20 @@ if sys.platform[:3] in ('win', 'os2') or sys.platform == 'riscos': class HandlerBCalled(Exception): pass + +def exit_subprocess(): + """Use os._exit(0) to exit the current subprocess. + + Otherwise, the test catches the SystemExit and continues executing + in parallel with the original test, so you wind up with an + exponential number of tests running concurrently. + """ + os._exit(0) + + class InterProcessSignalTests(unittest.TestCase): MAX_DURATION = 20 # Entire test should last at most 20 sec. - # Set up a child to send signals to us (the parent) after waiting - # long enough to receive the alarm. It seems we miss the alarm - # for some reason. This will hopefully stop the hangs on - # Tru64/Alpha. Alas, it doesn't. Tru64 appears to miss all the - # signals at times, or seemingly random subsets of them, and - # nothing done in force_test_exit so far has actually helped. - def spawn_force_test_exit_process(self, parent_pid): - # Sigh, both imports seem necessary to avoid errors. - import os - fork_pid = os.fork() - if fork_pid: - # In parent. - return fork_pid - - # In child. - import os, time - try: - # Wait 5 seconds longer than the expected alarm to give enough - # time for the normal sequence of events to occur. This is - # just a stop-gap to try to prevent the test from hanging. - time.sleep(self.MAX_DURATION + 5) - print(" child should not have to kill parent", - file=sys.__stdout__) - for signame in "SIGHUP", "SIGUSR1", "SIGUSR2", "SIGALRM": - os.kill(parent_pid, getattr(signal, signame)) - print(" child sent", signame, "to", - parent_pid, file=sys.__stdout__) - time.sleep(1) - finally: - os._exit(0) - def handlerA(self, *args): self.a_called = True if test_support.verbose: @@ -56,121 +41,133 @@ class InterProcessSignalTests(unittest.TestCase): print("handlerB invoked", args) raise HandlerBCalled(*args) - def test_main(self): - self.assertEquals(signal.getsignal(signal.SIGHUP), self.handlerA) - self.assertEquals(signal.getsignal(signal.SIGUSR1), self.handlerB) - self.assertEquals(signal.getsignal(signal.SIGUSR2), signal.SIG_IGN) - self.assertEquals(signal.getsignal(signal.SIGALRM), - signal.default_int_handler) - - # Launch an external script to send us signals. - # We expect the external script to: - # send HUP, which invokes handlerA to set a_called - # send USR1, which invokes handlerB to set b_called and raise - # HandlerBCalled - # send USR2, which is ignored - # - # Then we expect the alarm to go off, and its handler raises - # KeyboardInterrupt, finally getting us out of the loop. + def wait(self, child): + """Wait for child to finish, ignoring EINTR.""" + while True: + try: + child.wait() + return + except OSError as e: + if e.errno != errno.EINTR: + raise - if test_support.verbose: - verboseflag = '-x' - else: - verboseflag = '+x' + def run_test(self): + # Install handlers. This function runs in a sub-process, so we + # don't worry about re-setting the default handlers. + signal.signal(signal.SIGHUP, self.handlerA) + signal.signal(signal.SIGUSR1, self.handlerB) + signal.signal(signal.SIGUSR2, signal.SIG_IGN) + signal.signal(signal.SIGALRM, signal.default_int_handler) + + # Variables the signals will modify: + self.a_called = False + self.b_called = False - pid = self.pid + # Let the sub-processes know who to send signals to. + pid = os.getpid() if test_support.verbose: print("test runner's pid is", pid) - # Shell script that will send us asynchronous signals - script = """ - ( - set %(verboseflag)s - sleep 2 - kill -HUP %(pid)d - sleep 2 - kill -USR1 %(pid)d - sleep 2 - kill -USR2 %(pid)d - ) & - """ % vars() - - signal.alarm(self.MAX_DURATION) - - handler_b_exception_raised = False + child = subprocess.Popen(['kill', '-HUP', str(pid)]) + self.wait(child) + self.assertTrue(self.a_called) + self.assertFalse(self.b_called) + self.a_called = False - os.system(script) try: + child = subprocess.Popen(['kill', '-USR1', str(pid)]) + # This wait should be interrupted by the signal's exception. + self.wait(child) + self.fail('HandlerBCalled exception not thrown') + except HandlerBCalled: + self.assertTrue(self.b_called) + self.assertFalse(self.a_called) if test_support.verbose: - print("starting pause() loop...") - while 1: - try: - if test_support.verbose: - print("call pause()...") - signal.pause() - if test_support.verbose: - print("pause() returned") - except HandlerBCalled: - handler_b_exception_raised = True - if test_support.verbose: - print("HandlerBCalled exception caught") + print("HandlerBCalled exception caught") + child = subprocess.Popen(['kill', '-USR2', str(pid)]) + self.wait(child) # Nothing should happen. + + try: + signal.alarm(1) + # The race condition in pause doesn't matter in this case, + # since alarm is going to raise a KeyboardException, which + # will skip the call. + signal.pause() except KeyboardInterrupt: if test_support.verbose: print("KeyboardInterrupt (the alarm() went off)") - - self.assert_(self.a_called) - self.assert_(self.b_called) - self.assert_(handler_b_exception_raised) - - def setUp(self): - # Install handlers. - self.hup = signal.signal(signal.SIGHUP, self.handlerA) - self.usr1 = signal.signal(signal.SIGUSR1, self.handlerB) - self.usr2 = signal.signal(signal.SIGUSR2, signal.SIG_IGN) - self.alrm = signal.signal(signal.SIGALRM, - signal.default_int_handler) - self.a_called = False - self.b_called = False - self.pid = os.getpid() - self.fork_pid = self.spawn_force_test_exit_process(self.pid) - - def tearDown(self): - # Forcibly kill the child we created to ping us if there was a - # test error. - try: - # Make sure we don't kill ourself if there was a fork - # error. - if self.fork_pid > 0: - os.kill(self.fork_pid, signal.SIGKILL) except: - # If the child killed us, it has probably exited. Killing - # a non-existent process will raise an error which we - # don't care about. - pass - - # Restore handlers. - signal.alarm(0) # cancel alarm in case we died early - signal.signal(signal.SIGHUP, self.hup) - signal.signal(signal.SIGUSR1, self.usr1) - signal.signal(signal.SIGUSR2, self.usr2) - signal.signal(signal.SIGALRM, self.alrm) + self.fail('Some other exception woke us from pause: %s' % + traceback.format_exc()) + else: + self.fail('pause returned of its own accord') + + def test_main(self): + # This function spawns a child process to insulate the main + # test-running process from all the signals. It then + # communicates with that child process over a pipe and + # re-raises information about any exceptions the child + # throws. The real work happens in self.run_test(). + os_done_r, os_done_w = os.pipe() + with nested(closing(os.fdopen(os_done_r, 'rb')), + closing(os.fdopen(os_done_w, 'wb'))) as (done_r, done_w): + child = os.fork() + if child == 0: + # In the child process; run the test and report results + # through the pipe. + try: + done_r.close() + # Have to close done_w again here because + # exit_subprocess() will skip the enclosing with block. + with closing(done_w): + try: + self.run_test() + except: + pickle.dump(traceback.format_exc(), done_w) + else: + pickle.dump(None, done_w) + except: + print('Uh oh, raised from pickle.') + traceback.print_exc() + finally: + exit_subprocess() + + done_w.close() + # Block for up to MAX_DURATION seconds for the test to finish. + r, w, x = select.select([done_r], [], [], self.MAX_DURATION) + if done_r in r: + tb = pickle.load(done_r) + if tb: + self.fail(tb) + else: + os.kill(child, signal.SIGKILL) + self.fail('Test deadlocked after %d seconds.' % + self.MAX_DURATION) class BasicSignalTests(unittest.TestCase): + def trivial_signal_handler(self, *args): + pass + def test_out_of_range_signal_number_raises_error(self): self.assertRaises(ValueError, signal.getsignal, 4242) - def trivial_signal_handler(*args): - pass - self.assertRaises(ValueError, signal.signal, 4242, - trivial_signal_handler) + self.trivial_signal_handler) def test_setting_signal_handler_to_none_raises_error(self): self.assertRaises(TypeError, signal.signal, signal.SIGUSR1, None) + def test_getsignal(self): + hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler) + self.assertEquals(signal.getsignal(signal.SIGHUP), + self.trivial_signal_handler) + signal.signal(signal.SIGHUP, hup) + self.assertEquals(signal.getsignal(signal.SIGHUP), hup) + + class WakeupSignalTests(unittest.TestCase): TIMEOUT_FULL = 10 TIMEOUT_HALF = 5 @@ -233,7 +230,7 @@ class SiginterruptTest(unittest.TestCase): os.kill(ppid, self.signum) time.sleep(0.2) finally: - os._exit(0) + exit_subprocess() try: os.close(w) diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 3d5c120..67d9ed9 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -30,32 +30,26 @@ class TestThread(threading.Thread): self.nrunning = nrunning def run(self): - delay = random.random() * 2 + delay = random.random() / 10000.0 if verbose: print('task', self.getName(), 'will run for', delay, 'sec') - self.sema.acquire() + with self.sema: + with self.mutex: + self.nrunning.inc() + if verbose: + print(self.nrunning.get(), 'tasks are running') + self.testcase.assert_(self.nrunning.get() <= 3) - self.mutex.acquire() - self.nrunning.inc() - if verbose: - print(self.nrunning.get(), 'tasks are running') - self.testcase.assert_(self.nrunning.get() <= 3) - self.mutex.release() - - time.sleep(delay) - if verbose: - print('task', self.getName(), 'done') - - self.mutex.acquire() - self.nrunning.dec() - self.testcase.assert_(self.nrunning.get() >= 0) - if verbose: - print(self.getName(), 'is finished.', self.nrunning.get(), \ - 'tasks are running') - self.mutex.release() - - self.sema.release() + time.sleep(delay) + if verbose: + print('task', self.getName(), 'done') + with self.mutex: + self.nrunning.dec() + self.testcase.assert_(self.nrunning.get() >= 0) + if verbose: + print('%s is finished. %d tasks are running' % + self.getName(), self.nrunning.get()) class ThreadTests(unittest.TestCase): @@ -218,6 +212,10 @@ class ThreadTests(unittest.TestCase): rc = subprocess.call([sys.executable, "-c", """if 1: import ctypes, sys, time, thread + # This lock is used as a simple event variable. + ready = thread.allocate_lock() + ready.acquire() + # Module globals are cleared before __del__ is run # So we save the functions in class dict class C: @@ -229,10 +227,11 @@ class ThreadTests(unittest.TestCase): def waitingThread(): x = C() + ready.release() time.sleep(100) thread.start_new_thread(waitingThread, ()) - time.sleep(1) # be sure the other thread is waiting + ready.acquire() # Be sure the other thread is waiting. sys.exit(42) """]) self.assertEqual(rc, 42) @@ -242,7 +241,6 @@ class ThreadTests(unittest.TestCase): # threading.enumerate() after it has been join()ed. enum = threading.enumerate old_interval = sys.getcheckinterval() - sys.setcheckinterval(1) try: for i in range(1, 1000): t = threading.Thread(target=lambda: None) diff --git a/Lib/test/test_urllib2.py b/Lib/test/test_urllib2.py index 6ceec06..9f0b2c4 100644 --- a/Lib/test/test_urllib2.py +++ b/Lib/test/test_urllib2.py @@ -906,13 +906,14 @@ class HandlerTests(unittest.TestCase): self.assertEqual([(handlers[0], "http_open")], [tup[0:2] for tup in o.calls]) - def test_basic_auth(self): + def test_basic_auth(self, quote_char='"'): opener = OpenerDirector() password_manager = MockPasswordManager() auth_handler = urllib2.HTTPBasicAuthHandler(password_manager) realm = "ACME Widget Store" http_handler = MockHTTPHandler( - 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm) + 401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' % + (quote_char, realm, quote_char) ) opener.add_handler(auth_handler) opener.add_handler(http_handler) self._test_basic_auth(opener, auth_handler, "Authorization", @@ -921,6 +922,9 @@ class HandlerTests(unittest.TestCase): "http://acme.example.com/protected", ) + def test_basic_auth_with_single_quoted_realm(self): + self.test_basic_auth(quote_char="'") + def test_proxy_basic_auth(self): opener = OpenerDirector() ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128")) |