summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_multiprocessing.py
diff options
context:
space:
mode:
authorAntoine Pitrou <solipsis@pitrou.net>2012-03-05 18:28:37 (GMT)
committerAntoine Pitrou <solipsis@pitrou.net>2012-03-05 18:28:37 (GMT)
commitbdb1cf1ca56db25b33fb15dd91eef2cc32cd8973 (patch)
tree54137f9699833726def7c803cff7c995af22cfa5 /Lib/test/test_multiprocessing.py
parent1e88f3faa61dbaa9ea0d2404aa8563c1eeceba54 (diff)
downloadcpython-bdb1cf1ca56db25b33fb15dd91eef2cc32cd8973.zip
cpython-bdb1cf1ca56db25b33fb15dd91eef2cc32cd8973.tar.gz
cpython-bdb1cf1ca56db25b33fb15dd91eef2cc32cd8973.tar.bz2
Issue #12328: Fix multiprocessing's use of overlapped I/O on Windows.
Also, add a multiprocessing.connection.wait(rlist, timeout=None) function for polling multiple objects at once. Patch by sbt. Complete changelist from sbt's patch: * Adds a wait(rlist, timeout=None) function for polling multiple objects at once. On Unix this is just a wrapper for select(rlist, [], [], timeout=None). * Removes use of the SentinelReady exception and the sentinels argument to certain methods. concurrent.futures.process has been changed to use wait() instead of SentinelReady. * Fixes bugs concerning PipeConnection.poll() and messages of zero length. * Fixes PipeListener.accept() to call ConnectNamedPipe() with overlapped=True. * Fixes Queue.empty() and SimpleQueue.empty() so that they are threadsafe on Windows. * Now PipeConnection.poll() and wait() will not modify the pipe except possibly by consuming a zero length message. (Previously poll() could consume a partial message.) * All of multiprocesing's pipe related blocking functions/methods are now interruptible by SIGINT on Windows.
Diffstat (limited to 'Lib/test/test_multiprocessing.py')
-rw-r--r--Lib/test/test_multiprocessing.py235
1 files changed, 234 insertions, 1 deletions
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
index f141bd4..35d85d7 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -1811,6 +1811,84 @@ class _TestListenerClient(BaseTestCase):
p.join()
l.close()
+class _TestPoll(unittest.TestCase):
+
+ ALLOWED_TYPES = ('processes', 'threads')
+
+ def test_empty_string(self):
+ a, b = self.Pipe()
+ self.assertEqual(a.poll(), False)
+ b.send_bytes(b'')
+ self.assertEqual(a.poll(), True)
+ self.assertEqual(a.poll(), True)
+
+ @classmethod
+ def _child_strings(cls, conn, strings):
+ for s in strings:
+ time.sleep(0.1)
+ conn.send_bytes(s)
+ conn.close()
+
+ def test_strings(self):
+ strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
+ a, b = self.Pipe()
+ p = self.Process(target=self._child_strings, args=(b, strings))
+ p.start()
+
+ for s in strings:
+ for i in range(200):
+ if a.poll(0.01):
+ break
+ x = a.recv_bytes()
+ self.assertEqual(s, x)
+
+ p.join()
+
+ @classmethod
+ def _child_boundaries(cls, r):
+ # Polling may "pull" a message in to the child process, but we
+ # don't want it to pull only part of a message, as that would
+ # corrupt the pipe for any other processes which might later
+ # read from it.
+ r.poll(5)
+
+ def test_boundaries(self):
+ r, w = self.Pipe(False)
+ p = self.Process(target=self._child_boundaries, args=(r,))
+ p.start()
+ time.sleep(2)
+ L = [b"first", b"second"]
+ for obj in L:
+ w.send_bytes(obj)
+ w.close()
+ p.join()
+ self.assertIn(r.recv_bytes(), L)
+
+ @classmethod
+ def _child_dont_merge(cls, b):
+ b.send_bytes(b'a')
+ b.send_bytes(b'b')
+ b.send_bytes(b'cd')
+
+ def test_dont_merge(self):
+ a, b = self.Pipe()
+ self.assertEqual(a.poll(0.0), False)
+ self.assertEqual(a.poll(0.1), False)
+
+ p = self.Process(target=self._child_dont_merge, args=(b,))
+ p.start()
+
+ self.assertEqual(a.recv_bytes(), b'a')
+ self.assertEqual(a.poll(1.0), True)
+ self.assertEqual(a.poll(1.0), True)
+ self.assertEqual(a.recv_bytes(), b'b')
+ self.assertEqual(a.poll(1.0), True)
+ self.assertEqual(a.poll(1.0), True)
+ self.assertEqual(a.poll(0.0), True)
+ self.assertEqual(a.recv_bytes(), b'cd')
+
+ p.join()
+
#
# Test of sending connection and socket objects between processes
#
@@ -2404,8 +2482,163 @@ class TestStdinBadfiledescriptor(unittest.TestCase):
flike.flush()
assert sio.getvalue() == 'foo'
+
+class TestWait(unittest.TestCase):
+
+ @classmethod
+ def _child_test_wait(cls, w, slow):
+ for i in range(10):
+ if slow:
+ time.sleep(random.random()*0.1)
+ w.send((i, os.getpid()))
+ w.close()
+
+ def test_wait(self, slow=False):
+ from multiprocessing import Pipe, Process
+ from multiprocessing.connection import wait
+ readers = []
+ procs = []
+ messages = []
+
+ for i in range(4):
+ r, w = Pipe(duplex=False)
+ p = Process(target=self._child_test_wait, args=(w, slow))
+ p.daemon = True
+ p.start()
+ w.close()
+ readers.append(r)
+ procs.append(p)
+
+ while readers:
+ for r in wait(readers):
+ try:
+ msg = r.recv()
+ except EOFError:
+ readers.remove(r)
+ r.close()
+ else:
+ messages.append(msg)
+
+ messages.sort()
+ expected = sorted((i, p.pid) for i in range(10) for p in procs)
+ self.assertEqual(messages, expected)
+
+ @classmethod
+ def _child_test_wait_socket(cls, address, slow):
+ s = socket.socket()
+ s.connect(address)
+ for i in range(10):
+ if slow:
+ time.sleep(random.random()*0.1)
+ s.sendall(('%s\n' % i).encode('ascii'))
+ s.close()
+
+ def test_wait_socket(self, slow=False):
+ from multiprocessing import Process
+ from multiprocessing.connection import wait
+ l = socket.socket()
+ l.bind(('', 0))
+ l.listen(4)
+ addr = ('localhost', l.getsockname()[1])
+ readers = []
+ procs = []
+ dic = {}
+
+ for i in range(4):
+ p = Process(target=self._child_test_wait_socket, args=(addr, slow))
+ p.daemon = True
+ p.start()
+ procs.append(p)
+
+ for i in range(4):
+ r, _ = l.accept()
+ readers.append(r)
+ dic[r] = []
+ l.close()
+
+ while readers:
+ for r in wait(readers):
+ msg = r.recv(32)
+ if not msg:
+ readers.remove(r)
+ r.close()
+ else:
+ dic[r].append(msg)
+
+ expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
+ for v in dic.values():
+ self.assertEqual(b''.join(v), expected)
+
+ def test_wait_slow(self):
+ self.test_wait(True)
+
+ def test_wait_socket_slow(self):
+ self.test_wait(True)
+
+ def test_wait_timeout(self):
+ from multiprocessing.connection import wait
+
+ expected = 1
+ a, b = multiprocessing.Pipe()
+
+ start = time.time()
+ res = wait([a, b], 1)
+ delta = time.time() - start
+
+ self.assertEqual(res, [])
+ self.assertLess(delta, expected + 0.2)
+ self.assertGreater(delta, expected - 0.2)
+
+ b.send(None)
+
+ start = time.time()
+ res = wait([a, b], 1)
+ delta = time.time() - start
+
+ self.assertEqual(res, [a])
+ self.assertLess(delta, 0.2)
+
+ def test_wait_integer(self):
+ from multiprocessing.connection import wait
+
+ expected = 5
+ a, b = multiprocessing.Pipe()
+ p = multiprocessing.Process(target=time.sleep, args=(expected,))
+
+ p.start()
+ self.assertIsInstance(p.sentinel, int)
+
+ start = time.time()
+ res = wait([a, p.sentinel, b], expected + 20)
+ delta = time.time() - start
+
+ self.assertEqual(res, [p.sentinel])
+ self.assertLess(delta, expected + 1)
+ self.assertGreater(delta, expected - 1)
+
+ a.send(None)
+
+ start = time.time()
+ res = wait([a, p.sentinel, b], 20)
+ delta = time.time() - start
+
+ self.assertEqual(res, [p.sentinel, b])
+ self.assertLess(delta, 0.2)
+
+ b.send(None)
+
+ start = time.time()
+ res = wait([a, p.sentinel, b], 20)
+ delta = time.time() - start
+
+ self.assertEqual(res, [a, p.sentinel, b])
+ self.assertLess(delta, 0.2)
+
+ p.join()
+
+
testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
- TestStdinBadfiledescriptor]
+ TestStdinBadfiledescriptor, TestWait]
#
#