summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing
diff options
context:
space:
mode:
authorCharles-François Natali <cf.natali@gmail.com>2013-09-05 18:46:49 (GMT)
committerCharles-François Natali <cf.natali@gmail.com>2013-09-05 18:46:49 (GMT)
commite241ac9283a240c902b1781ed4af6952b1f5d96e (patch)
tree81c298ea51b2633f519d6d6aa26b7694497df734 /Lib/multiprocessing
parenta83a022f9a5912a9ee944c4c04a0708ed21216cc (diff)
downloadcpython-e241ac9283a240c902b1781ed4af6952b1f5d96e.zip
cpython-e241ac9283a240c902b1781ed4af6952b1f5d96e.tar.gz
cpython-e241ac9283a240c902b1781ed4af6952b1f5d96e.tar.bz2
Issue #18934: multiprocessing: use selectors module.
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r--Lib/multiprocessing/connection.py51
-rw-r--r--Lib/multiprocessing/forkserver.py14
2 files changed, 26 insertions, 39 deletions
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
index 9fbe46d..59fb664 100644
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -12,7 +12,6 @@ __all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
import io
import os
import sys
-import select
import socket
import struct
import errno
@@ -877,28 +876,7 @@ if sys.platform == 'win32':
else:
- if hasattr(select, 'poll'):
- def _poll(fds, timeout):
- if timeout is not None:
- timeout = int(timeout * 1000) # timeout is in milliseconds
- fd_map = {}
- pollster = select.poll()
- for fd in fds:
- pollster.register(fd, select.POLLIN)
- if hasattr(fd, 'fileno'):
- fd_map[fd.fileno()] = fd
- else:
- fd_map[fd] = fd
- ls = []
- for fd, event in pollster.poll(timeout):
- if event & select.POLLNVAL:
- raise ValueError('invalid file descriptor %i' % fd)
- ls.append(fd_map[fd])
- return ls
- else:
- def _poll(fds, timeout):
- return select.select(fds, [], [], timeout)[0]
-
+ import selectors
def wait(object_list, timeout=None):
'''
@@ -906,19 +884,22 @@ else:
Returns list of those objects in object_list which are ready/readable.
'''
- if timeout is not None:
- if timeout <= 0:
- return _poll(object_list, 0)
- else:
- deadline = time.time() + timeout
- while True:
- try:
- return _poll(object_list, timeout)
- except OSError as e:
- if e.errno != errno.EINTR:
- raise
+ with selectors.DefaultSelector() as selector:
+ for obj in object_list:
+ selector.register(obj, selectors.EVENT_READ)
+
if timeout is not None:
- timeout = deadline - time.time()
+ deadline = time.time() + timeout
+
+ while True:
+ ready = selector.select(timeout)
+ if ready:
+ return [key.fileobj for (key, events) in ready]
+ else:
+ if timeout is not None:
+ timeout = deadline - time.time()
+ if timeout < 0:
+ return ready
#
# Make connection and socket objects sharable if possible
diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py
index 975b15a..0a23707 100644
--- a/Lib/multiprocessing/forkserver.py
+++ b/Lib/multiprocessing/forkserver.py
@@ -1,6 +1,6 @@
import errno
import os
-import select
+import selectors
import signal
import socket
import struct
@@ -149,14 +149,20 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
# ignoring SIGCHLD means no need to reap zombie processes
handler = signal.signal(signal.SIGCHLD, signal.SIG_IGN)
- with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener:
+ with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener, \
+ selectors.DefaultSelector() as selector:
global _forkserver_address
_forkserver_address = listener.getsockname()
- readers = [listener, alive_r]
+
+ selector.register(listener, selectors.EVENT_READ)
+ selector.register(alive_r, selectors.EVENT_READ)
while True:
try:
- rfds, wfds, xfds = select.select(readers, [], [])
+ while True:
+ rfds = [key.fileobj for (key, events) in selector.select()]
+ if rfds:
+ break
if alive_r in rfds:
# EOF because no more client processes left