summaryrefslogtreecommitdiffstats
path: root/Lib/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test')
-rwxr-xr-xLib/test/regrtest.py28
-rw-r--r--Lib/test/test_epoll.py189
-rw-r--r--Lib/test/test_kqueue.py166
-rw-r--r--Lib/test/test_poll.py3
-rw-r--r--Lib/test/test_signal.py241
-rw-r--r--Lib/test/test_threading.py46
-rw-r--r--Lib/test/test_urllib2.py8
7 files changed, 532 insertions, 149 deletions
diff --git a/Lib/test/regrtest.py b/Lib/test/regrtest.py
index 2212af5..9b8b572 100755
--- a/Lib/test/regrtest.py
+++ b/Lib/test/regrtest.py
@@ -873,10 +873,12 @@ _expectations = {
test_dl
test_fcntl
test_fork1
+ test_epoll
test_gdbm
test_grp
test_ioctl
test_largefile
+ test_kqueue
test_mhlib
test_openpty
test_ossaudiodev
@@ -897,6 +899,7 @@ _expectations = {
test_curses
test_dl
test_largefile
+ test_kqueue
test_ossaudiodev
""",
'mac':
@@ -912,10 +915,12 @@ _expectations = {
test_dl
test_fcntl
test_fork1
+ test_epoll
test_grp
test_ioctl
test_largefile
test_locale
+ test_kqueue
test_mmap
test_openpty
test_ossaudiodev
@@ -933,7 +938,9 @@ _expectations = {
"""
test_bsddb
test_dl
+ test_epoll
test_largefile
+ test_kqueue
test_minidom
test_openpty
test_pyexpat
@@ -944,7 +951,9 @@ _expectations = {
"""
test_bsddb
test_dl
+ test_epoll
test_largefile
+ test_kqueue
test_minidom
test_openpty
test_pyexpat
@@ -957,9 +966,11 @@ _expectations = {
test_bsddb
test_dl
test_fork1
+ test_epoll
test_gettext
test_largefile
test_locale
+ test_kqueue
test_minidom
test_openpty
test_pyexpat
@@ -977,9 +988,11 @@ _expectations = {
test_bsddb
test_bsddb3
test_curses
+ test_epoll
test_gdbm
test_largefile
test_locale
+ test_minidom
test_ossaudiodev
test_poll
""",
@@ -988,6 +1001,8 @@ _expectations = {
test_bsddb
test_curses
test_dbm
+ test_epoll
+ test_kqueue
test_gdbm
test_gzip
test_openpty
@@ -999,10 +1014,12 @@ _expectations = {
test_bsddb
test_curses
test_dl
+ test_epoll
test_gdbm
test_gzip
test_largefile
test_locale
+ test_kqueue
test_minidom
test_openpty
test_pyexpat
@@ -1015,8 +1032,10 @@ _expectations = {
test_curses
test_dl
test_gdbm
+ test_epoll
test_largefile
test_locale
+ test_kqueue
test_mhlib
test_mmap
test_poll
@@ -1027,7 +1046,9 @@ _expectations = {
test_bsddb3
test_curses
test_dbm
+ test_epoll
test_ioctl
+ test_kqueue
test_largefile
test_locale
test_ossaudiodev
@@ -1040,6 +1061,8 @@ _expectations = {
test_commands
test_curses
test_dl
+ test_epoll
+ test_kqueue
test_largefile
test_mhlib
test_mmap
@@ -1053,6 +1076,7 @@ _expectations = {
"""
test_bsddb
test_bsddb3
+ test_epoll
test_gdbm
test_locale
test_ossaudiodev
@@ -1069,8 +1093,10 @@ _expectations = {
test_bsddb3
test_bz2
test_dl
+ test_epoll
test_gdbm
test_gzip
+ test_kqueue
test_ossaudiodev
test_tcl
test_zipimport
@@ -1082,6 +1108,7 @@ _expectations = {
test_bsddb3
test_ctypes
test_dl
+ test_epoll
test_gdbm
test_locale
test_normalization
@@ -1096,6 +1123,7 @@ _expectations = {
test_ctypes
test_curses
test_dl
+ test_epoll
test_gdbm
test_locale
test_ossaudiodev
diff --git a/Lib/test/test_epoll.py b/Lib/test/test_epoll.py
new file mode 100644
index 0000000..877a54e
--- /dev/null
+++ b/Lib/test/test_epoll.py
@@ -0,0 +1,189 @@
+# Copyright (c) 2001-2006 Twisted Matrix Laboratories.
+#
+# Permission is hereby granted, free of charge, to any person obtaining
+# a copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish,
+# distribute, sublicense, and/or sell copies of the Software, and to
+# permit persons to whom the Software is furnished to do so, subject to
+# the following conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+"""
+Tests for epoll wrapper.
+"""
+import os
+import socket
+import errno
+import time
+import select
+import tempfile
+import unittest
+
+from test import test_support
+if not hasattr(select, "epoll"):
+ raise test_support.TestSkipped("test works only on Linux 2.6")
+
+class TestEPoll(unittest.TestCase):
+
+ def setUp(self):
+ self.serverSocket = socket.socket()
+ self.serverSocket.bind(('127.0.0.1', 0))
+ self.serverSocket.listen(1)
+ self.connections = [self.serverSocket]
+
+
+ def tearDown(self):
+ for skt in self.connections:
+ skt.close()
+
+ def _connected_pair(self):
+ client = socket.socket()
+ client.setblocking(False)
+ try:
+ client.connect(('127.0.0.1', self.serverSocket.getsockname()[1]))
+ except socket.error as e:
+ self.assertEquals(e.args[0], errno.EINPROGRESS)
+ else:
+ raise AssertionError("Connect should have raised EINPROGRESS")
+ server, addr = self.serverSocket.accept()
+
+ self.connections.extend((client, server))
+ return client, server
+
+ def test_create(self):
+ try:
+ ep = select.epoll(16)
+ except OSError as e:
+ raise AssertionError(str(e))
+ self.assert_(ep.fileno() > 0, ep.fileno())
+ self.assert_(not ep.closed)
+ ep.close()
+ self.assert_(ep.closed)
+ self.assertRaises(ValueError, ep.fileno)
+
+ def test_badcreate(self):
+ self.assertRaises(TypeError, select.epoll, 1, 2, 3)
+ self.assertRaises(TypeError, select.epoll, 'foo')
+ self.assertRaises(TypeError, select.epoll, None)
+ self.assertRaises(TypeError, select.epoll, ())
+ self.assertRaises(TypeError, select.epoll, ['foo'])
+ self.assertRaises(TypeError, select.epoll, {})
+
+ def test_add(self):
+ server, client = self._connected_pair()
+
+ ep = select.epoll(2)
+ try:
+ ep.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
+ ep.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
+ finally:
+ ep.close()
+
+ def test_fromfd(self):
+ server, client = self._connected_pair()
+
+ ep = select.epoll(2)
+ ep2 = select.epoll.fromfd(ep.fileno())
+
+ ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
+ ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
+
+ events = ep.poll(1, 4)
+ events2 = ep2.poll(0.9, 4)
+ self.assertEqual(len(events), 2)
+ self.assertEqual(len(events2), 2)
+
+ ep.close()
+ try:
+ ep2.poll(1, 4)
+ except IOError as e:
+ self.failUnlessEqual(e.args[0], errno.EBADF, e)
+ else:
+ self.fail("epoll on closed fd didn't raise EBADF")
+
+ def test_control_and_wait(self):
+ client, server = self._connected_pair()
+
+ ep = select.epoll(16)
+ ep.register(server.fileno(),
+ select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
+ ep.register(client.fileno(),
+ select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
+
+ now = time.time()
+ events = ep.poll(1, 4)
+ then = time.time()
+ self.failIf(then - now > 0.1, then - now)
+
+ events.sort()
+ expected = [(client.fileno(), select.EPOLLOUT),
+ (server.fileno(), select.EPOLLOUT)]
+ expected.sort()
+
+ self.assertEquals(events, expected)
+ self.failIf(then - now > 0.01, then - now)
+
+ now = time.time()
+ events = ep.poll(timeout=2.1, maxevents=4)
+ then = time.time()
+ self.failIf(events)
+
+ client.send(b"Hello!")
+ server.send(b"world!!!")
+
+ now = time.time()
+ events = ep.poll(1, 4)
+ then = time.time()
+ self.failIf(then - now > 0.01)
+
+ events.sort()
+ expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
+ (server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
+ expected.sort()
+
+ self.assertEquals(events, expected)
+
+ ep.unregister(client.fileno())
+ ep.modify(server.fileno(), select.EPOLLOUT)
+ now = time.time()
+ events = ep.poll(1, 4)
+ then = time.time()
+ self.failIf(then - now > 0.01)
+
+ expected = [(server.fileno(), select.EPOLLOUT)]
+ self.assertEquals(events, expected)
+
+ def test_errors(self):
+ self.assertRaises(ValueError, select.epoll, -2)
+ self.assertRaises(ValueError, select.epoll().register, -1,
+ select.EPOLLIN)
+
+ def test_unregister_closed(self):
+ server, client = self._connected_pair()
+ fd = server.fileno()
+ ep = select.epoll(16)
+ ep.register(server)
+
+ now = time.time()
+ events = ep.poll(1, 4)
+ then = time.time()
+ self.failIf(then - now > 0.01)
+
+ server.close()
+ ep.unregister(fd)
+
+def test_main():
+ test_support.run_unittest(TestEPoll)
+
+if __name__ == "__main__":
+ test_main()
diff --git a/Lib/test/test_kqueue.py b/Lib/test/test_kqueue.py
new file mode 100644
index 0000000..310eb33
--- /dev/null
+++ b/Lib/test/test_kqueue.py
@@ -0,0 +1,166 @@
+"""
+Tests for kqueue wrapper.
+"""
+import socket
+import errno
+import time
+import select
+import sys
+import unittest
+
+from test import test_support
+if not hasattr(select, "kqueue"):
+ raise test_support.TestSkipped("test works only on BSD")
+
+class TestKQueue(unittest.TestCase):
+ def test_create_queue(self):
+ kq = select.kqueue()
+ self.assert_(kq.fileno() > 0, kq.fileno())
+ self.assert_(not kq.closed)
+ kq.close()
+ self.assert_(kq.closed)
+ self.assertRaises(ValueError, kq.fileno)
+
+ def test_create_event(self):
+ fd = sys.stderr.fileno()
+ ev = select.kevent(fd)
+ other = select.kevent(1000)
+ self.assertEqual(ev.ident, fd)
+ self.assertEqual(ev.filter, select.KQ_FILTER_READ)
+ self.assertEqual(ev.flags, select.KQ_EV_ADD)
+ self.assertEqual(ev.fflags, 0)
+ self.assertEqual(ev.data, 0)
+ self.assertEqual(ev.udata, 0)
+ self.assertEqual(ev, ev)
+ self.assertNotEqual(ev, other)
+ self.assertEqual(cmp(ev, other), -1)
+ self.assert_(ev < other)
+ self.assert_(other >= ev)
+ self.assertRaises(TypeError, cmp, ev, None)
+ self.assertRaises(TypeError, cmp, ev, 1)
+ self.assertRaises(TypeError, cmp, ev, "ev")
+
+ ev = select.kevent(fd, select.KQ_FILTER_WRITE)
+ self.assertEqual(ev.ident, fd)
+ self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
+ self.assertEqual(ev.flags, select.KQ_EV_ADD)
+ self.assertEqual(ev.fflags, 0)
+ self.assertEqual(ev.data, 0)
+ self.assertEqual(ev.udata, 0)
+ self.assertEqual(ev, ev)
+ self.assertNotEqual(ev, other)
+
+ ev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ONESHOT)
+ self.assertEqual(ev.ident, fd)
+ self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
+ self.assertEqual(ev.flags, select.KQ_EV_ONESHOT)
+ self.assertEqual(ev.fflags, 0)
+ self.assertEqual(ev.data, 0)
+ self.assertEqual(ev.udata, 0)
+ self.assertEqual(ev, ev)
+ self.assertNotEqual(ev, other)
+
+ ev = select.kevent(1, 2, 3, 4, 5, 6)
+ self.assertEqual(ev.ident, 1)
+ self.assertEqual(ev.filter, 2)
+ self.assertEqual(ev.flags, 3)
+ self.assertEqual(ev.fflags, 4)
+ self.assertEqual(ev.data, 5)
+ self.assertEqual(ev.udata, 6)
+ self.assertEqual(ev, ev)
+ self.assertNotEqual(ev, other)
+
+ def test_queue_event(self):
+ serverSocket = socket.socket()
+ serverSocket.bind(('127.0.0.1', 0))
+ serverSocket.listen(1)
+ client = socket.socket()
+ client.setblocking(False)
+ try:
+ client.connect(('127.0.0.1', serverSocket.getsockname()[1]))
+ except socket.error as e:
+ self.assertEquals(e.args[0], errno.EINPROGRESS)
+ else:
+ #raise AssertionError("Connect should have raised EINPROGRESS")
+ pass # FreeBSD doesn't raise an exception here
+ server, addr = serverSocket.accept()
+
+ if sys.platform.startswith("darwin"):
+ flags = select.KQ_EV_ADD | select.KQ_EV_ENABLE
+ else:
+ flags = 0
+
+ kq = select.kqueue()
+ kq2 = select.kqueue.fromfd(kq.fileno())
+
+ ev = select.kevent(server.fileno(),
+ select.KQ_FILTER_WRITE,
+ select.KQ_EV_ADD | select.KQ_EV_ENABLE)
+ kq.control([ev], 0)
+ ev = select.kevent(server.fileno(),
+ select.KQ_FILTER_READ,
+ select.KQ_EV_ADD | select.KQ_EV_ENABLE)
+ kq.control([ev], 0)
+ ev = select.kevent(client.fileno(),
+ select.KQ_FILTER_WRITE,
+ select.KQ_EV_ADD | select.KQ_EV_ENABLE)
+ kq2.control([ev], 0)
+ ev = select.kevent(client.fileno(),
+ select.KQ_FILTER_READ,
+ select.KQ_EV_ADD | select.KQ_EV_ENABLE)
+ kq2.control([ev], 0)
+
+ events = kq.control(None, 4, 1)
+ events = [(e.ident, e.filter, e.flags) for e in events]
+ events.sort()
+ self.assertEquals(events, [
+ (client.fileno(), select.KQ_FILTER_WRITE, flags),
+ (server.fileno(), select.KQ_FILTER_WRITE, flags)])
+
+ client.send(b"Hello!")
+ server.send(b"world!!!")
+
+ events = kq.control(None, 4, 1)
+ # We may need to call it several times
+ for i in range(5):
+ if len(events) == 4:
+ break
+ events = kq.control(None, 4, 1)
+ events = [(e.ident, e.filter, e.flags) for e in events]
+ events.sort()
+
+ self.assertEquals(events, [
+ (client.fileno(), select.KQ_FILTER_WRITE, flags),
+ (client.fileno(), select.KQ_FILTER_READ, flags),
+ (server.fileno(), select.KQ_FILTER_WRITE, flags),
+ (server.fileno(), select.KQ_FILTER_READ, flags)])
+
+ # Remove completely client, and server read part
+ ev = select.kevent(client.fileno(),
+ select.KQ_FILTER_WRITE,
+ select.KQ_EV_DELETE)
+ kq.control([ev], 0)
+ ev = select.kevent(client.fileno(),
+ select.KQ_FILTER_READ,
+ select.KQ_EV_DELETE)
+ kq.control([ev], 0)
+ ev = select.kevent(server.fileno(),
+ select.KQ_FILTER_READ,
+ select.KQ_EV_DELETE)
+ kq.control([ev], 0, 0)
+
+ events = kq.control([], 4, 0.99)
+ events = [(e.ident, e.filter, e.flags) for e in events]
+ events.sort()
+ self.assertEquals(events, [
+ (server.fileno(), select.KQ_FILTER_WRITE, flags)])
+
+ client.close()
+ server.close()
+ serverSocket.close()
+
+def test_main():
+ test_support.run_unittest(TestKQueue)
+
+if __name__ == "__main__":
+ test_main()
diff --git a/Lib/test/test_poll.py b/Lib/test/test_poll.py
index cd9bfb0..d546c78 100644
--- a/Lib/test/test_poll.py
+++ b/Lib/test/test_poll.py
@@ -34,7 +34,8 @@ class PollTests(unittest.TestCase):
for i in range(NUM_PIPES):
rd, wr = os.pipe()
- p.register(rd, select.POLLIN)
+ p.register(rd)
+ p.modify(rd, select.POLLIN)
p.register(wr, select.POLLOUT)
readers.append(rd)
writers.append(wr)
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py
index 1d25814..2834076 100644
--- a/Lib/test/test_signal.py
+++ b/Lib/test/test_signal.py
@@ -1,6 +1,11 @@
import unittest
from test import test_support
+from contextlib import closing, nested
+import pickle
+import select
import signal
+import subprocess
+import traceback
import sys, os, time, errno
if sys.platform[:3] in ('win', 'os2') or sys.platform == 'riscos':
@@ -11,40 +16,20 @@ if sys.platform[:3] in ('win', 'os2') or sys.platform == 'riscos':
class HandlerBCalled(Exception):
pass
+
+def exit_subprocess():
+ """Use os._exit(0) to exit the current subprocess.
+
+ Otherwise, the test catches the SystemExit and continues executing
+ in parallel with the original test, so you wind up with an
+ exponential number of tests running concurrently.
+ """
+ os._exit(0)
+
+
class InterProcessSignalTests(unittest.TestCase):
MAX_DURATION = 20 # Entire test should last at most 20 sec.
- # Set up a child to send signals to us (the parent) after waiting
- # long enough to receive the alarm. It seems we miss the alarm
- # for some reason. This will hopefully stop the hangs on
- # Tru64/Alpha. Alas, it doesn't. Tru64 appears to miss all the
- # signals at times, or seemingly random subsets of them, and
- # nothing done in force_test_exit so far has actually helped.
- def spawn_force_test_exit_process(self, parent_pid):
- # Sigh, both imports seem necessary to avoid errors.
- import os
- fork_pid = os.fork()
- if fork_pid:
- # In parent.
- return fork_pid
-
- # In child.
- import os, time
- try:
- # Wait 5 seconds longer than the expected alarm to give enough
- # time for the normal sequence of events to occur. This is
- # just a stop-gap to try to prevent the test from hanging.
- time.sleep(self.MAX_DURATION + 5)
- print(" child should not have to kill parent",
- file=sys.__stdout__)
- for signame in "SIGHUP", "SIGUSR1", "SIGUSR2", "SIGALRM":
- os.kill(parent_pid, getattr(signal, signame))
- print(" child sent", signame, "to",
- parent_pid, file=sys.__stdout__)
- time.sleep(1)
- finally:
- os._exit(0)
-
def handlerA(self, *args):
self.a_called = True
if test_support.verbose:
@@ -56,121 +41,133 @@ class InterProcessSignalTests(unittest.TestCase):
print("handlerB invoked", args)
raise HandlerBCalled(*args)
- def test_main(self):
- self.assertEquals(signal.getsignal(signal.SIGHUP), self.handlerA)
- self.assertEquals(signal.getsignal(signal.SIGUSR1), self.handlerB)
- self.assertEquals(signal.getsignal(signal.SIGUSR2), signal.SIG_IGN)
- self.assertEquals(signal.getsignal(signal.SIGALRM),
- signal.default_int_handler)
-
- # Launch an external script to send us signals.
- # We expect the external script to:
- # send HUP, which invokes handlerA to set a_called
- # send USR1, which invokes handlerB to set b_called and raise
- # HandlerBCalled
- # send USR2, which is ignored
- #
- # Then we expect the alarm to go off, and its handler raises
- # KeyboardInterrupt, finally getting us out of the loop.
+ def wait(self, child):
+ """Wait for child to finish, ignoring EINTR."""
+ while True:
+ try:
+ child.wait()
+ return
+ except OSError as e:
+ if e.errno != errno.EINTR:
+ raise
- if test_support.verbose:
- verboseflag = '-x'
- else:
- verboseflag = '+x'
+ def run_test(self):
+ # Install handlers. This function runs in a sub-process, so we
+ # don't worry about re-setting the default handlers.
+ signal.signal(signal.SIGHUP, self.handlerA)
+ signal.signal(signal.SIGUSR1, self.handlerB)
+ signal.signal(signal.SIGUSR2, signal.SIG_IGN)
+ signal.signal(signal.SIGALRM, signal.default_int_handler)
+
+ # Variables the signals will modify:
+ self.a_called = False
+ self.b_called = False
- pid = self.pid
+ # Let the sub-processes know who to send signals to.
+ pid = os.getpid()
if test_support.verbose:
print("test runner's pid is", pid)
- # Shell script that will send us asynchronous signals
- script = """
- (
- set %(verboseflag)s
- sleep 2
- kill -HUP %(pid)d
- sleep 2
- kill -USR1 %(pid)d
- sleep 2
- kill -USR2 %(pid)d
- ) &
- """ % vars()
-
- signal.alarm(self.MAX_DURATION)
-
- handler_b_exception_raised = False
+ child = subprocess.Popen(['kill', '-HUP', str(pid)])
+ self.wait(child)
+ self.assertTrue(self.a_called)
+ self.assertFalse(self.b_called)
+ self.a_called = False
- os.system(script)
try:
+ child = subprocess.Popen(['kill', '-USR1', str(pid)])
+ # This wait should be interrupted by the signal's exception.
+ self.wait(child)
+ self.fail('HandlerBCalled exception not thrown')
+ except HandlerBCalled:
+ self.assertTrue(self.b_called)
+ self.assertFalse(self.a_called)
if test_support.verbose:
- print("starting pause() loop...")
- while 1:
- try:
- if test_support.verbose:
- print("call pause()...")
- signal.pause()
- if test_support.verbose:
- print("pause() returned")
- except HandlerBCalled:
- handler_b_exception_raised = True
- if test_support.verbose:
- print("HandlerBCalled exception caught")
+ print("HandlerBCalled exception caught")
+ child = subprocess.Popen(['kill', '-USR2', str(pid)])
+ self.wait(child) # Nothing should happen.
+
+ try:
+ signal.alarm(1)
+ # The race condition in pause doesn't matter in this case,
+ # since alarm is going to raise a KeyboardException, which
+ # will skip the call.
+ signal.pause()
except KeyboardInterrupt:
if test_support.verbose:
print("KeyboardInterrupt (the alarm() went off)")
-
- self.assert_(self.a_called)
- self.assert_(self.b_called)
- self.assert_(handler_b_exception_raised)
-
- def setUp(self):
- # Install handlers.
- self.hup = signal.signal(signal.SIGHUP, self.handlerA)
- self.usr1 = signal.signal(signal.SIGUSR1, self.handlerB)
- self.usr2 = signal.signal(signal.SIGUSR2, signal.SIG_IGN)
- self.alrm = signal.signal(signal.SIGALRM,
- signal.default_int_handler)
- self.a_called = False
- self.b_called = False
- self.pid = os.getpid()
- self.fork_pid = self.spawn_force_test_exit_process(self.pid)
-
- def tearDown(self):
- # Forcibly kill the child we created to ping us if there was a
- # test error.
- try:
- # Make sure we don't kill ourself if there was a fork
- # error.
- if self.fork_pid > 0:
- os.kill(self.fork_pid, signal.SIGKILL)
except:
- # If the child killed us, it has probably exited. Killing
- # a non-existent process will raise an error which we
- # don't care about.
- pass
-
- # Restore handlers.
- signal.alarm(0) # cancel alarm in case we died early
- signal.signal(signal.SIGHUP, self.hup)
- signal.signal(signal.SIGUSR1, self.usr1)
- signal.signal(signal.SIGUSR2, self.usr2)
- signal.signal(signal.SIGALRM, self.alrm)
+ self.fail('Some other exception woke us from pause: %s' %
+ traceback.format_exc())
+ else:
+ self.fail('pause returned of its own accord')
+
+ def test_main(self):
+ # This function spawns a child process to insulate the main
+ # test-running process from all the signals. It then
+ # communicates with that child process over a pipe and
+ # re-raises information about any exceptions the child
+ # throws. The real work happens in self.run_test().
+ os_done_r, os_done_w = os.pipe()
+ with nested(closing(os.fdopen(os_done_r, 'rb')),
+ closing(os.fdopen(os_done_w, 'wb'))) as (done_r, done_w):
+ child = os.fork()
+ if child == 0:
+ # In the child process; run the test and report results
+ # through the pipe.
+ try:
+ done_r.close()
+ # Have to close done_w again here because
+ # exit_subprocess() will skip the enclosing with block.
+ with closing(done_w):
+ try:
+ self.run_test()
+ except:
+ pickle.dump(traceback.format_exc(), done_w)
+ else:
+ pickle.dump(None, done_w)
+ except:
+ print('Uh oh, raised from pickle.')
+ traceback.print_exc()
+ finally:
+ exit_subprocess()
+
+ done_w.close()
+ # Block for up to MAX_DURATION seconds for the test to finish.
+ r, w, x = select.select([done_r], [], [], self.MAX_DURATION)
+ if done_r in r:
+ tb = pickle.load(done_r)
+ if tb:
+ self.fail(tb)
+ else:
+ os.kill(child, signal.SIGKILL)
+ self.fail('Test deadlocked after %d seconds.' %
+ self.MAX_DURATION)
class BasicSignalTests(unittest.TestCase):
+ def trivial_signal_handler(self, *args):
+ pass
+
def test_out_of_range_signal_number_raises_error(self):
self.assertRaises(ValueError, signal.getsignal, 4242)
- def trivial_signal_handler(*args):
- pass
-
self.assertRaises(ValueError, signal.signal, 4242,
- trivial_signal_handler)
+ self.trivial_signal_handler)
def test_setting_signal_handler_to_none_raises_error(self):
self.assertRaises(TypeError, signal.signal,
signal.SIGUSR1, None)
+ def test_getsignal(self):
+ hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
+ self.assertEquals(signal.getsignal(signal.SIGHUP),
+ self.trivial_signal_handler)
+ signal.signal(signal.SIGHUP, hup)
+ self.assertEquals(signal.getsignal(signal.SIGHUP), hup)
+
+
class WakeupSignalTests(unittest.TestCase):
TIMEOUT_FULL = 10
TIMEOUT_HALF = 5
@@ -233,7 +230,7 @@ class SiginterruptTest(unittest.TestCase):
os.kill(ppid, self.signum)
time.sleep(0.2)
finally:
- os._exit(0)
+ exit_subprocess()
try:
os.close(w)
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index 3d5c120..67d9ed9 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -30,32 +30,26 @@ class TestThread(threading.Thread):
self.nrunning = nrunning
def run(self):
- delay = random.random() * 2
+ delay = random.random() / 10000.0
if verbose:
print('task', self.getName(), 'will run for', delay, 'sec')
- self.sema.acquire()
+ with self.sema:
+ with self.mutex:
+ self.nrunning.inc()
+ if verbose:
+ print(self.nrunning.get(), 'tasks are running')
+ self.testcase.assert_(self.nrunning.get() <= 3)
- self.mutex.acquire()
- self.nrunning.inc()
- if verbose:
- print(self.nrunning.get(), 'tasks are running')
- self.testcase.assert_(self.nrunning.get() <= 3)
- self.mutex.release()
-
- time.sleep(delay)
- if verbose:
- print('task', self.getName(), 'done')
-
- self.mutex.acquire()
- self.nrunning.dec()
- self.testcase.assert_(self.nrunning.get() >= 0)
- if verbose:
- print(self.getName(), 'is finished.', self.nrunning.get(), \
- 'tasks are running')
- self.mutex.release()
-
- self.sema.release()
+ time.sleep(delay)
+ if verbose:
+ print('task', self.getName(), 'done')
+ with self.mutex:
+ self.nrunning.dec()
+ self.testcase.assert_(self.nrunning.get() >= 0)
+ if verbose:
+ print('%s is finished. %d tasks are running' %
+ self.getName(), self.nrunning.get())
class ThreadTests(unittest.TestCase):
@@ -218,6 +212,10 @@ class ThreadTests(unittest.TestCase):
rc = subprocess.call([sys.executable, "-c", """if 1:
import ctypes, sys, time, thread
+ # This lock is used as a simple event variable.
+ ready = thread.allocate_lock()
+ ready.acquire()
+
# Module globals are cleared before __del__ is run
# So we save the functions in class dict
class C:
@@ -229,10 +227,11 @@ class ThreadTests(unittest.TestCase):
def waitingThread():
x = C()
+ ready.release()
time.sleep(100)
thread.start_new_thread(waitingThread, ())
- time.sleep(1) # be sure the other thread is waiting
+ ready.acquire() # Be sure the other thread is waiting.
sys.exit(42)
"""])
self.assertEqual(rc, 42)
@@ -242,7 +241,6 @@ class ThreadTests(unittest.TestCase):
# threading.enumerate() after it has been join()ed.
enum = threading.enumerate
old_interval = sys.getcheckinterval()
- sys.setcheckinterval(1)
try:
for i in range(1, 1000):
t = threading.Thread(target=lambda: None)
diff --git a/Lib/test/test_urllib2.py b/Lib/test/test_urllib2.py
index 6ceec06..9f0b2c4 100644
--- a/Lib/test/test_urllib2.py
+++ b/Lib/test/test_urllib2.py
@@ -906,13 +906,14 @@ class HandlerTests(unittest.TestCase):
self.assertEqual([(handlers[0], "http_open")],
[tup[0:2] for tup in o.calls])
- def test_basic_auth(self):
+ def test_basic_auth(self, quote_char='"'):
opener = OpenerDirector()
password_manager = MockPasswordManager()
auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)
realm = "ACME Widget Store"
http_handler = MockHTTPHandler(
- 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
+ 401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' %
+ (quote_char, realm, quote_char) )
opener.add_handler(auth_handler)
opener.add_handler(http_handler)
self._test_basic_auth(opener, auth_handler, "Authorization",
@@ -921,6 +922,9 @@ class HandlerTests(unittest.TestCase):
"http://acme.example.com/protected",
)
+ def test_basic_auth_with_single_quoted_realm(self):
+ self.test_basic_auth(quote_char="'")
+
def test_proxy_basic_auth(self):
opener = OpenerDirector()
ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))