diff options
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r-- | Lib/multiprocessing/connection.py | 35 | ||||
-rw-r--r-- | Lib/multiprocessing/dummy/__init__.py | 2 | ||||
-rw-r--r-- | Lib/multiprocessing/dummy/connection.py | 5 | ||||
-rw-r--r-- | Lib/multiprocessing/forkserver.py | 28 | ||||
-rw-r--r-- | Lib/multiprocessing/heap.py | 20 | ||||
-rw-r--r-- | Lib/multiprocessing/managers.py | 37 | ||||
-rw-r--r-- | Lib/multiprocessing/pool.py | 27 | ||||
-rw-r--r-- | Lib/multiprocessing/popen_fork.py | 3 | ||||
-rw-r--r-- | Lib/multiprocessing/process.py | 7 | ||||
-rw-r--r-- | Lib/multiprocessing/queues.py | 27 | ||||
-rw-r--r-- | Lib/multiprocessing/sharedctypes.py | 26 | ||||
-rw-r--r-- | Lib/multiprocessing/spawn.py | 2 | ||||
-rw-r--r-- | Lib/multiprocessing/synchronize.py | 32 | ||||
-rw-r--r-- | Lib/multiprocessing/util.py | 37 |
14 files changed, 108 insertions, 180 deletions
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 1eb1a8d..d0a1b86 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -365,10 +365,7 @@ class Connection(_ConnectionBase): def _send(self, buf, write=_write): remaining = len(buf) while True: - try: - n = write(self._handle, buf) - except InterruptedError: - continue + n = write(self._handle, buf) remaining -= n if remaining == 0: break @@ -379,10 +376,7 @@ class Connection(_ConnectionBase): handle = self._handle remaining = size while remaining > 0: - try: - chunk = read(handle, remaining) - except InterruptedError: - continue + chunk = read(handle, remaining) n = len(chunk) if n == 0: if remaining == size: @@ -400,17 +394,14 @@ class Connection(_ConnectionBase): if n > 16384: # The payload is large so Nagle's algorithm won't be triggered # and we'd better avoid the cost of concatenation. - chunks = [header, buf] - elif n > 0: - # Issue # 20540: concatenate before sending, to avoid delays due - # to Nagle's algorithm on a TCP socket. - chunks = [header + buf] + self._send(header) + self._send(buf) else: - # This code path is necessary to avoid "broken pipe" errors - # when sending a 0-length buffer if the other end closed the pipe. - chunks = [header] - for chunk in chunks: - self._send(chunk) + # Issue #20540: concatenate before sending, to avoid delays due + # to Nagle's algorithm on a TCP socket. + # Also note we want to avoid sending a 0-length buffer separately, + # to avoid "broken pipe" errors if the other end closed the pipe. + self._send(header + buf) def _recv_bytes(self, maxsize=None): buf = self._recv(4) @@ -599,13 +590,7 @@ class SocketListener(object): self._unlink = None def accept(self): - while True: - try: - s, self._last_accepted = self._socket.accept() - except InterruptedError: - pass - else: - break + s, self._last_accepted = self._socket.accept() s.setblocking(True) return Connection(s.detach()) diff --git a/Lib/multiprocessing/dummy/__init__.py b/Lib/multiprocessing/dummy/__init__.py index 135db7f..1abea64 100644 --- a/Lib/multiprocessing/dummy/__init__.py +++ b/Lib/multiprocessing/dummy/__init__.py @@ -86,7 +86,7 @@ class Namespace(object): if not name.startswith('_'): temp.append('%s=%r' % (name, value)) temp.sort() - return 'Namespace(%s)' % str.join(', ', temp) + return '%s(%s)' % (self.__class__.__name__, ', '.join(temp)) dict = dict list = list diff --git a/Lib/multiprocessing/dummy/connection.py b/Lib/multiprocessing/dummy/connection.py index 694ef96..1984375 100644 --- a/Lib/multiprocessing/dummy/connection.py +++ b/Lib/multiprocessing/dummy/connection.py @@ -59,9 +59,8 @@ class Connection(object): return True if timeout <= 0.0: return False - self._in.not_empty.acquire() - self._in.not_empty.wait(timeout) - self._in.not_empty.release() + with self._in.not_empty: + self._in.not_empty.wait(timeout) return self._in.qsize() > 0 def close(self): diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py index 387517e..ad01ede 100644 --- a/Lib/multiprocessing/forkserver.py +++ b/Lib/multiprocessing/forkserver.py @@ -107,7 +107,7 @@ class ForkServer(object): address = connection.arbitrary_address('AF_UNIX') listener.bind(address) os.chmod(address, 0o600) - listener.listen(100) + listener.listen() # all client processes own the write end of the "alive" pipe; # when they all terminate the read end becomes ready. @@ -147,13 +147,7 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None): except ImportError: pass - # close sys.stdin - if sys.stdin is not None: - try: - sys.stdin.close() - sys.stdin = open(os.devnull) - except (OSError, ValueError): - pass + util._close_stdin() # ignoring SIGCHLD means no need to reap zombie processes handler = signal.signal(signal.SIGCHLD, signal.SIG_IGN) @@ -188,8 +182,6 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None): finally: os._exit(code) - except InterruptedError: - pass except OSError as e: if e.errno != errno.ECONNABORTED: raise @@ -230,13 +222,7 @@ def read_unsigned(fd): data = b'' length = UNSIGNED_STRUCT.size while len(data) < length: - while True: - try: - s = os.read(fd, length - len(data)) - except InterruptedError: - pass - else: - break + s = os.read(fd, length - len(data)) if not s: raise EOFError('unexpected EOF') data += s @@ -245,13 +231,7 @@ def read_unsigned(fd): def write_unsigned(fd, n): msg = UNSIGNED_STRUCT.pack(n) while msg: - while True: - try: - nbytes = os.write(fd, msg) - except InterruptedError: - pass - else: - break + nbytes = os.write(fd, msg) if nbytes == 0: raise RuntimeError('should not get here') msg = msg[nbytes:] diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py index 344a45f..44d9638 100644 --- a/Lib/multiprocessing/heap.py +++ b/Lib/multiprocessing/heap.py @@ -54,7 +54,9 @@ if sys.platform == 'win32': def __setstate__(self, state): self.size, self.name = self._state = state self.buffer = mmap.mmap(-1, self.size, tagname=self.name) - assert _winapi.GetLastError() == _winapi.ERROR_ALREADY_EXISTS + # XXX Temporarily preventing buildbot failures while determining + # XXX the correct long-term fix. See issue 23060 + #assert _winapi.GetLastError() == _winapi.ERROR_ALREADY_EXISTS else: @@ -69,7 +71,14 @@ else: os.unlink(name) util.Finalize(self, os.close, (self.fd,)) with open(self.fd, 'wb', closefd=False) as f: - f.write(b'\0'*size) + bs = 1024 * 1024 + if size >= bs: + zeros = b'\0' * bs + for _ in range(size // bs): + f.write(zeros) + del zeros + f.write(b'\0' * (size % bs)) + assert f.tell() == size self.buffer = mmap.mmap(self.fd, self.size) def reduce_arena(a): @@ -216,9 +225,8 @@ class Heap(object): assert 0 <= size < sys.maxsize if os.getpid() != self._lastpid: self.__init__() # reinitialize after fork - self._lock.acquire() - self._free_pending_blocks() - try: + with self._lock: + self._free_pending_blocks() size = self._roundup(max(size,1), self._alignment) (arena, start, stop) = self._malloc(size) new_stop = start + size @@ -227,8 +235,6 @@ class Heap(object): block = (arena, start, new_stop) self._allocated_blocks.add(block) return block - finally: - self._lock.release() # # Class representing a chunk of an mmap -- can be inherited by child process diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 66d46fc..c559b55 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -65,8 +65,8 @@ class Token(object): (self.typeid, self.address, self.id) = state def __repr__(self): - return 'Token(typeid=%r, address=%r, id=%r)' % \ - (self.typeid, self.address, self.id) + return '%s(typeid=%r, address=%r, id=%r)' % \ + (self.__class__.__name__, self.typeid, self.address, self.id) # # Function for communication with a manager's server process @@ -306,8 +306,7 @@ class Server(object): ''' Return some info --- useful to spot problems with refcounting ''' - self.mutex.acquire() - try: + with self.mutex: result = [] keys = list(self.id_to_obj.keys()) keys.sort() @@ -317,8 +316,6 @@ class Server(object): (ident, self.id_to_refcount[ident], str(self.id_to_obj[ident][0])[:75])) return '\n'.join(result) - finally: - self.mutex.release() def number_of_objects(self, c): ''' @@ -343,8 +340,7 @@ class Server(object): ''' Create a new shared object and return its id ''' - self.mutex.acquire() - try: + with self.mutex: callable, exposed, method_to_typeid, proxytype = \ self.registry[typeid] @@ -374,8 +370,6 @@ class Server(object): # has been created. self.incref(c, ident) return ident, tuple(exposed) - finally: - self.mutex.release() def get_methods(self, c, token): ''' @@ -392,22 +386,16 @@ class Server(object): self.serve_client(c) def incref(self, c, ident): - self.mutex.acquire() - try: + with self.mutex: self.id_to_refcount[ident] += 1 - finally: - self.mutex.release() def decref(self, c, ident): - self.mutex.acquire() - try: + with self.mutex: assert self.id_to_refcount[ident] >= 1 self.id_to_refcount[ident] -= 1 if self.id_to_refcount[ident] == 0: del self.id_to_obj[ident], self.id_to_refcount[ident] util.debug('disposing of obj with id %r', ident) - finally: - self.mutex.release() # # Class to represent state of a manager @@ -671,14 +659,11 @@ class BaseProxy(object): def __init__(self, token, serializer, manager=None, authkey=None, exposed=None, incref=True): - BaseProxy._mutex.acquire() - try: + with BaseProxy._mutex: tls_idset = BaseProxy._address_to_local.get(token.address, None) if tls_idset is None: tls_idset = util.ForkAwareLocal(), ProcessLocalSet() BaseProxy._address_to_local[token.address] = tls_idset - finally: - BaseProxy._mutex.release() # self._tls is used to record the connection used by this # thread to communicate with the manager at token.address @@ -818,8 +803,8 @@ class BaseProxy(object): return self._getvalue() def __repr__(self): - return '<%s object, typeid %r at %s>' % \ - (type(self).__name__, self._token.typeid, '0x%x' % id(self)) + return '<%s object, typeid %r at %#x>' % \ + (type(self).__name__, self._token.typeid, id(self)) def __str__(self): ''' @@ -857,7 +842,7 @@ def RebuildProxy(func, token, serializer, kwds): def MakeProxyType(name, exposed, _cache={}): ''' - Return an proxy type whose methods are given by `exposed` + Return a proxy type whose methods are given by `exposed` ''' exposed = tuple(exposed) try: @@ -916,7 +901,7 @@ class Namespace(object): if not name.startswith('_'): temp.append('%s=%r' % (name, value)) temp.sort() - return 'Namespace(%s)' % str.join(', ', temp) + return '%s(%s)' % (self.__class__.__name__, ', '.join(temp)) class Value(object): def __init__(self, typecode, value, lock=True): diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index db6e3e1..6d25469 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -87,7 +87,7 @@ class MaybeEncodingError(Exception): self.exc) def __repr__(self): - return "<MaybeEncodingError: %s>" % str(self) + return "<%s: %s>" % (self.__class__.__name__, self) def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None, @@ -675,8 +675,7 @@ class IMapIterator(object): return self def next(self, timeout=None): - self._cond.acquire() - try: + with self._cond: try: item = self._items.popleft() except IndexError: @@ -689,8 +688,6 @@ class IMapIterator(object): if self._index == self._length: raise StopIteration raise TimeoutError - finally: - self._cond.release() success, value = item if success: @@ -700,8 +697,7 @@ class IMapIterator(object): __next__ = next # XXX def _set(self, i, obj): - self._cond.acquire() - try: + with self._cond: if self._index == i: self._items.append(obj) self._index += 1 @@ -715,18 +711,13 @@ class IMapIterator(object): if self._index == self._length: del self._cache[self._job] - finally: - self._cond.release() def _set_length(self, length): - self._cond.acquire() - try: + with self._cond: self._length = length if self._index == self._length: self._cond.notify() del self._cache[self._job] - finally: - self._cond.release() # # Class whose instances are returned by `Pool.imap_unordered()` @@ -735,15 +726,12 @@ class IMapIterator(object): class IMapUnorderedIterator(IMapIterator): def _set(self, i, obj): - self._cond.acquire() - try: + with self._cond: self._items.append(obj) self._index += 1 self._cond.notify() if self._index == self._length: del self._cache[self._job] - finally: - self._cond.release() # # @@ -769,10 +757,7 @@ class ThreadPool(Pool): @staticmethod def _help_stuff_finish(inqueue, task_handler, size): # put sentinels at head of inqueue to make workers finish - inqueue.not_empty.acquire() - try: + with inqueue.not_empty: inqueue.queue.clear() inqueue.queue.extend([None] * size) inqueue.not_empty.notify_all() - finally: - inqueue.not_empty.release() diff --git a/Lib/multiprocessing/popen_fork.py b/Lib/multiprocessing/popen_fork.py index 367e72e..d2ebd7c 100644 --- a/Lib/multiprocessing/popen_fork.py +++ b/Lib/multiprocessing/popen_fork.py @@ -1,7 +1,6 @@ import os import sys import signal -import errno from . import util @@ -29,8 +28,6 @@ class Popen(object): try: pid, sts = os.waitpid(self.pid, flag) except OSError as e: - if e.errno == errno.EINTR: - continue # Child process not yet created. See #1731717 # e.errno == errno.ECHILD == 10 return None diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py index 68959bf..bca8b7a 100644 --- a/Lib/multiprocessing/process.py +++ b/Lib/multiprocessing/process.py @@ -234,12 +234,7 @@ class BaseProcess(object): context._force_start_method(self._start_method) _process_counter = itertools.count(1) _children = set() - if sys.stdin is not None: - try: - sys.stdin.close() - sys.stdin = open(os.devnull) - except (OSError, ValueError): - pass + util._close_stdin() old_process = _current_process _current_process = self try: diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 293ad76..786a303 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -82,14 +82,11 @@ class Queue(object): if not self._sem.acquire(block, timeout): raise Full - self._notempty.acquire() - try: + with self._notempty: if self._thread is None: self._start_thread() self._buffer.append(obj) self._notempty.notify() - finally: - self._notempty.release() def get(self, block=True, timeout=None): if block and timeout is None: @@ -206,12 +203,9 @@ class Queue(object): @staticmethod def _finalize_close(buffer, notempty): debug('telling queue thread to quit') - notempty.acquire() - try: + with notempty: buffer.append(_sentinel) notempty.notify() - finally: - notempty.release() @staticmethod def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe): @@ -300,35 +294,24 @@ class JoinableQueue(Queue): if not self._sem.acquire(block, timeout): raise Full - self._notempty.acquire() - self._cond.acquire() - try: + with self._notempty, self._cond: if self._thread is None: self._start_thread() self._buffer.append(obj) self._unfinished_tasks.release() self._notempty.notify() - finally: - self._cond.release() - self._notempty.release() def task_done(self): - self._cond.acquire() - try: + with self._cond: if not self._unfinished_tasks.acquire(False): raise ValueError('task_done() called too many times') if self._unfinished_tasks._semlock._is_zero(): self._cond.notify_all() - finally: - self._cond.release() def join(self): - self._cond.acquire() - try: + with self._cond: if not self._unfinished_tasks._semlock._is_zero(): self._cond.wait() - finally: - self._cond.release() # # Simplified Queue type -- really just a locked pipe diff --git a/Lib/multiprocessing/sharedctypes.py b/Lib/multiprocessing/sharedctypes.py index 0c17825..4258f59 100644 --- a/Lib/multiprocessing/sharedctypes.py +++ b/Lib/multiprocessing/sharedctypes.py @@ -188,6 +188,12 @@ class SynchronizedBase(object): self.acquire = self._lock.acquire self.release = self._lock.release + def __enter__(self): + return self._lock.__enter__() + + def __exit__(self, *args): + return self._lock.__exit__(*args) + def __reduce__(self): assert_spawning(self) return synchronized, (self._obj, self._lock) @@ -212,32 +218,20 @@ class SynchronizedArray(SynchronizedBase): return len(self._obj) def __getitem__(self, i): - self.acquire() - try: + with self: return self._obj[i] - finally: - self.release() def __setitem__(self, i, value): - self.acquire() - try: + with self: self._obj[i] = value - finally: - self.release() def __getslice__(self, start, stop): - self.acquire() - try: + with self: return self._obj[start:stop] - finally: - self.release() def __setslice__(self, start, stop, values): - self.acquire() - try: + with self: self._obj[start:stop] = values - finally: - self.release() class SynchronizedString(SynchronizedArray): diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py index 336e479..4d76951 100644 --- a/Lib/multiprocessing/spawn.py +++ b/Lib/multiprocessing/spawn.py @@ -91,7 +91,7 @@ def get_command_line(**kwds): def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None): ''' - Run code specifed by data received over pipe + Run code specified by data received over pipe ''' assert is_forking(sys.argv) if sys.platform == 'win32': diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index dea1cbd..d4bdf0e 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -134,7 +134,7 @@ class Semaphore(SemLock): value = self._semlock._get_value() except Exception: value = 'unknown' - return '<Semaphore(value=%s)>' % value + return '<%s(value=%s)>' % (self.__class__.__name__, value) # # Bounded semaphore @@ -150,8 +150,8 @@ class BoundedSemaphore(Semaphore): value = self._semlock._get_value() except Exception: value = 'unknown' - return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \ - (value, self._semlock.maxvalue) + return '<%s(value=%s, maxvalue=%s)>' % \ + (self.__class__.__name__, value, self._semlock.maxvalue) # # Non-recursive lock @@ -176,7 +176,7 @@ class Lock(SemLock): name = 'SomeOtherProcess' except Exception: name = 'unknown' - return '<Lock(owner=%s)>' % name + return '<%s(owner=%s)>' % (self.__class__.__name__, name) # # Recursive lock @@ -202,7 +202,7 @@ class RLock(SemLock): name, count = 'SomeOtherProcess', 'nonzero' except Exception: name, count = 'unknown', 'unknown' - return '<RLock(%s, %s)>' % (name, count) + return '<%s(%s, %s)>' % (self.__class__.__name__, name, count) # # Condition variable @@ -243,7 +243,7 @@ class Condition(object): self._woken_count._semlock._get_value()) except Exception: num_waiters = 'unknown' - return '<Condition(%s, %s)>' % (self._lock, num_waiters) + return '<%s(%s, %s)>' % (self.__class__.__name__, self._lock, num_waiters) def wait(self, timeout=None): assert self._lock._semlock._is_mine(), \ @@ -337,34 +337,24 @@ class Event(object): self._flag = ctx.Semaphore(0) def is_set(self): - self._cond.acquire() - try: + with self._cond: if self._flag.acquire(False): self._flag.release() return True return False - finally: - self._cond.release() def set(self): - self._cond.acquire() - try: + with self._cond: self._flag.acquire(False) self._flag.release() self._cond.notify_all() - finally: - self._cond.release() def clear(self): - self._cond.acquire() - try: + with self._cond: self._flag.acquire(False) - finally: - self._cond.release() def wait(self, timeout=None): - self._cond.acquire() - try: + with self._cond: if self._flag.acquire(False): self._flag.release() else: @@ -374,8 +364,6 @@ class Event(object): self._flag.release() return True return False - finally: - self._cond.release() # # Barrier diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index 0b695e4..1a2c0db 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -9,6 +9,7 @@ import os import itertools +import sys import weakref import atexit import threading # we want threading to install it's @@ -212,10 +213,11 @@ class Finalize(object): obj = None if obj is None: - return '<Finalize object, dead>' + return '<%s object, dead>' % self.__class__.__name__ - x = '<Finalize object, callback=%s' % \ - getattr(self._callback, '__name__', self._callback) + x = '<%s object, callback=%s' % ( + self.__class__.__name__, + getattr(self._callback, '__name__', self._callback)) if self._args: x += ', args=' + str(self._args) if self._kwargs: @@ -327,6 +329,13 @@ class ForkAwareThreadLock(object): self.acquire = self._lock.acquire self.release = self._lock.release + def __enter__(self): + return self._lock.__enter__() + + def __exit__(self, *args): + return self._lock.__exit__(*args) + + class ForkAwareLocal(threading.local): def __init__(self): register_after_fork(self, lambda obj : obj.__dict__.clear()) @@ -348,6 +357,28 @@ def close_all_fds_except(fds): assert fds[-1] == MAXFD, 'fd too large' for i in range(len(fds) - 1): os.closerange(fds[i]+1, fds[i+1]) +# +# Close sys.stdin and replace stdin with os.devnull +# + +def _close_stdin(): + if sys.stdin is None: + return + + try: + sys.stdin.close() + except (OSError, ValueError): + pass + + try: + fd = os.open(os.devnull, os.O_RDONLY) + try: + sys.stdin = open(fd, closefd=False) + except: + os.close(fd) + raise + except (OSError, ValueError): + pass # # Start a program with only specified fds kept open |