diff options
author | Antoine Pitrou <solipsis@pitrou.net> | 2011-06-06 17:35:31 (GMT) |
---|---|---|
committer | Antoine Pitrou <solipsis@pitrou.net> | 2011-06-06 17:35:31 (GMT) |
commit | 176f07dadf80c27348486fdd8017fabbb30da842 (patch) | |
tree | d489214386a5a7d5efec5d157be5783554c6cec2 /Lib/multiprocessing | |
parent | f068ab830448ea9f78ee0dfbb29c12211d1b6aee (diff) | |
download | cpython-176f07dadf80c27348486fdd8017fabbb30da842.zip cpython-176f07dadf80c27348486fdd8017fabbb30da842.tar.gz cpython-176f07dadf80c27348486fdd8017fabbb30da842.tar.bz2 |
Issue #12040: Expose a new attribute `sentinel` on instances of
:class:`multiprocessing.Process`. Also, fix Process.join() to not use
polling anymore, when given a timeout.
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r-- | Lib/multiprocessing/forking.py | 34 | ||||
-rw-r--r-- | Lib/multiprocessing/process.py | 12 | ||||
-rw-r--r-- | Lib/multiprocessing/util.py | 21 |
3 files changed, 53 insertions, 14 deletions
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index 3d95557..3c359cb 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -101,10 +101,12 @@ else: if sys.platform != 'win32': import time + import select exit = os._exit duplicate = os.dup close = os.close + _select = util._eintr_retry(select.select) # # We define a Popen class similar to the one from subprocess, but @@ -118,8 +120,12 @@ if sys.platform != 'win32': sys.stderr.flush() self.returncode = None + r, w = os.pipe() + self.sentinel = r + self.pid = os.fork() if self.pid == 0: + os.close(r) if 'random' in sys.modules: import random random.seed() @@ -128,6 +134,11 @@ if sys.platform != 'win32': sys.stderr.flush() os._exit(code) + # `w` will be closed when the child exits, at which point `r` + # will become ready for reading (using e.g. select()). + os.close(w) + util.Finalize(self, os.close, (r,)) + def poll(self, flag=os.WNOHANG): if self.returncode is None: try: @@ -145,20 +156,14 @@ if sys.platform != 'win32': return self.returncode def wait(self, timeout=None): - if timeout is None: - return self.poll(0) - deadline = time.time() + timeout - delay = 0.0005 - while 1: - res = self.poll() - if res is not None: - break - remaining = deadline - time.time() - if remaining <= 0: - break - delay = min(delay * 2, remaining, 0.05) - time.sleep(delay) - return res + if self.returncode is None: + if timeout is not None: + r = _select([self.sentinel], [], [], timeout)[0] + if not r: + return None + # This shouldn't block if select() returned successfully. + return self.poll(os.WNOHANG if timeout == 0.0 else 0) + return self.returncode def terminate(self): if self.returncode is None: @@ -258,6 +263,7 @@ else: self.pid = pid self.returncode = None self._handle = hp + self.sentinel = int(hp) # send information to child prep_data = get_preparation_data(process_obj._name) diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py index 3fb9ff6..99ee532 100644 --- a/Lib/multiprocessing/process.py +++ b/Lib/multiprocessing/process.py @@ -132,6 +132,7 @@ class Process(object): else: from .forking import Popen self._popen = Popen(self) + self._sentinel = self._popen.sentinel _current_process._children.add(self) def terminate(self): @@ -218,6 +219,17 @@ class Process(object): pid = ident + @property + def sentinel(self): + ''' + Return a file descriptor (Unix) or handle (Windows) suitable for + waiting for process termination. + ''' + try: + return self._sentinel + except AttributeError: + raise ValueError("process not started") + def __repr__(self): if self is _current_process: status = 'started' diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index 30b7a85..7949d3a 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -32,9 +32,11 @@ # SUCH DAMAGE. # +import functools import itertools import weakref import atexit +import select import threading # we want threading to install it's # cleanup function before multiprocessing does @@ -315,3 +317,22 @@ class ForkAwareLocal(threading.local): register_after_fork(self, lambda obj : obj.__dict__.clear()) def __reduce__(self): return type(self), () + + +# +# Automatic retry after EINTR +# + +def _eintr_retry(func, _errors=(EnvironmentError, select.error)): + @functools.wraps(func) + def wrapped(*args, **kwargs): + while True: + try: + return func(*args, **kwargs) + except _errors as e: + # select.error has no `errno` attribute + if e.args[0] == errno.EINTR: + continue + raise + return wrapped + |