diff options
-rw-r--r-- | Lib/multiprocessing/connection.py | 8 | ||||
-rw-r--r-- | Lib/multiprocessing/dummy/__init__.py | 5 | ||||
-rw-r--r-- | Lib/multiprocessing/forkserver.py | 12 | ||||
-rw-r--r-- | Lib/multiprocessing/heap.py | 15 | ||||
-rw-r--r-- | Lib/multiprocessing/managers.py | 54 | ||||
-rw-r--r-- | Lib/multiprocessing/pool.py | 28 | ||||
-rw-r--r-- | Lib/multiprocessing/popen_fork.py | 2 | ||||
-rw-r--r-- | Lib/multiprocessing/queues.py | 6 | ||||
-rw-r--r-- | Lib/multiprocessing/reduction.py | 5 | ||||
-rw-r--r-- | Lib/multiprocessing/resource_sharer.py | 2 | ||||
-rw-r--r-- | Lib/multiprocessing/semaphore_tracker.py | 3 | ||||
-rw-r--r-- | Lib/multiprocessing/spawn.py | 2 | ||||
-rw-r--r-- | Lib/multiprocessing/synchronize.py | 7 | ||||
-rw-r--r-- | Lib/multiprocessing/util.py | 9 | ||||
-rw-r--r-- | Misc/NEWS.d/next/Library/2017-08-18-17-16-38.bpo-5001.gwnthq.rst | 1 |
15 files changed, 120 insertions, 39 deletions
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index ba9b17c..7a621a5 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -720,7 +720,9 @@ FAILURE = b'#FAILURE#' def deliver_challenge(connection, authkey): import hmac - assert isinstance(authkey, bytes) + if not isinstance(authkey, bytes): + raise ValueError( + "Authkey must be bytes, not {0!s}".format(type(authkey))) message = os.urandom(MESSAGE_LENGTH) connection.send_bytes(CHALLENGE + message) digest = hmac.new(authkey, message, 'md5').digest() @@ -733,7 +735,9 @@ def deliver_challenge(connection, authkey): def answer_challenge(connection, authkey): import hmac - assert isinstance(authkey, bytes) + if not isinstance(authkey, bytes): + raise ValueError( + "Authkey must be bytes, not {0!s}".format(type(authkey))) message = connection.recv_bytes(256) # reject large message assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message message = message[len(CHALLENGE):] diff --git a/Lib/multiprocessing/dummy/__init__.py b/Lib/multiprocessing/dummy/__init__.py index cbb7f49..403f5e5 100644 --- a/Lib/multiprocessing/dummy/__init__.py +++ b/Lib/multiprocessing/dummy/__init__.py @@ -41,7 +41,10 @@ class DummyProcess(threading.Thread): self._parent = current_process() def start(self): - assert self._parent is current_process() + if self._parent is not current_process(): + raise RuntimeError( + "Parent is {0!r} but current_process is {1!r}".format( + self._parent, current_process())) self._start_called = True if hasattr(self._parent, '_children'): self._parent._children[self] = None diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py index 69b842a..7a952e2 100644 --- a/Lib/multiprocessing/forkserver.py +++ b/Lib/multiprocessing/forkserver.py @@ -189,7 +189,7 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None): if alive_r in rfds: # EOF because no more client processes left - assert os.read(alive_r, 1) == b'' + assert os.read(alive_r, 1) == b'', "Not at EOF?" raise SystemExit if sig_r in rfds: @@ -208,7 +208,10 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None): if os.WIFSIGNALED(sts): returncode = -os.WTERMSIG(sts) else: - assert os.WIFEXITED(sts) + if not os.WIFEXITED(sts): + raise AssertionError( + "Child {0:n} status is {1:n}".format( + pid,sts)) returncode = os.WEXITSTATUS(sts) # Send exit code to client process try: @@ -227,7 +230,10 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None): with listener.accept()[0] as s: # Receive fds from client fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1) - assert len(fds) <= MAXFDS_TO_SEND + if len(fds) > MAXFDS_TO_SEND: + raise RuntimeError( + "Too many ({0:n}) fds to send".format( + len(fds))) child_r, child_w, *fds = fds s.close() pid = os.fork() diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py index ee3ed55..566173a 100644 --- a/Lib/multiprocessing/heap.py +++ b/Lib/multiprocessing/heap.py @@ -211,7 +211,10 @@ class Heap(object): # synchronously sometimes later from malloc() or free(), by calling # _free_pending_blocks() (appending and retrieving from a list is not # strictly thread-safe but under cPython it's atomic thanks to the GIL). - assert os.getpid() == self._lastpid + if os.getpid() != self._lastpid: + raise ValueError( + "My pid ({0:n}) is not last pid {1:n}".format( + os.getpid(),self._lastpid)) if not self._lock.acquire(False): # can't acquire the lock right now, add the block to the list of # pending blocks to free @@ -227,7 +230,10 @@ class Heap(object): def malloc(self, size): # return a block of right size (possibly rounded up) - assert 0 <= size < sys.maxsize + if size < 0: + raise ValueError("Size {0:n} out of range".format(size)) + if sys.maxsize <= size: + raise OverflowError("Size {0:n} too large".format(size)) if os.getpid() != self._lastpid: self.__init__() # reinitialize after fork with self._lock: @@ -250,7 +256,10 @@ class BufferWrapper(object): _heap = Heap() def __init__(self, size): - assert 0 <= size < sys.maxsize + if size < 0: + raise ValueError("Size {0:n} out of range".format(size)) + if sys.maxsize <= size: + raise OverflowError("Size {0:n} too large".format(size)) block = BufferWrapper._heap.malloc(size) self._state = (block, size) util.Finalize(self, BufferWrapper._heap.free, args=(block,)) diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index c672277..04df26b 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -23,7 +23,7 @@ from time import time as _time from traceback import format_exc from . import connection -from .context import reduction, get_spawning_popen +from .context import reduction, get_spawning_popen, ProcessError from . import pool from . import process from . import util @@ -133,7 +133,10 @@ class Server(object): 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref'] def __init__(self, registry, address, authkey, serializer): - assert isinstance(authkey, bytes) + if not isinstance(authkey, bytes): + raise TypeError( + "Authkey {0!r} is type {1!s}, not bytes".format( + authkey, type(authkey))) self.registry = registry self.authkey = process.AuthenticationString(authkey) Listener, Client = listener_client[serializer] @@ -163,7 +166,7 @@ class Server(object): except (KeyboardInterrupt, SystemExit): pass finally: - if sys.stdout != sys.__stdout__: + if sys.stdout != sys.__stdout__: # what about stderr? util.debug('resetting stdout, stderr') sys.stdout = sys.__stdout__ sys.stderr = sys.__stderr__ @@ -316,6 +319,7 @@ class Server(object): ''' Return some info --- useful to spot problems with refcounting ''' + # Perhaps include debug info about 'c'? with self.mutex: result = [] keys = list(self.id_to_refcount.keys()) @@ -356,7 +360,9 @@ class Server(object): self.registry[typeid] if callable is None: - assert len(args) == 1 and not kwds + if kwds or (len(args) != 1): + raise ValueError( + "Without callable, must have one non-keyword argument") obj = args[0] else: obj = callable(*args, **kwds) @@ -364,7 +370,10 @@ class Server(object): if exposed is None: exposed = public_methods(obj) if method_to_typeid is not None: - assert type(method_to_typeid) is dict + if not isinstance(method_to_typeid, dict): + raise TypeError( + "Method_to_typeid {0!r}: type {1!s}, not dict".format( + method_to_typeid, type(method_to_typeid))) exposed = list(exposed) + list(method_to_typeid) ident = '%x' % id(obj) # convert to string because xmlrpclib @@ -417,7 +426,11 @@ class Server(object): return with self.mutex: - assert self.id_to_refcount[ident] >= 1 + if self.id_to_refcount[ident] <= 0: + raise AssertionError( + "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format( + ident, self.id_to_obj[ident], + self.id_to_refcount[ident])) self.id_to_refcount[ident] -= 1 if self.id_to_refcount[ident] == 0: del self.id_to_refcount[ident] @@ -480,7 +493,14 @@ class BaseManager(object): ''' Return server object with serve_forever() method and address attribute ''' - assert self._state.value == State.INITIAL + if self._state.value != State.INITIAL: + if self._state.value == State.STARTED: + raise ProcessError("Already started server") + elif self._state.value == State.SHUTDOWN: + raise ProcessError("Manager has shut down") + else: + raise ProcessError( + "Unknown state {!r}".format(self._state.value)) return Server(self._registry, self._address, self._authkey, self._serializer) @@ -497,7 +517,14 @@ class BaseManager(object): ''' Spawn a server process for this manager object ''' - assert self._state.value == State.INITIAL + if self._state.value != State.INITIAL: + if self._state.value == State.STARTED: + raise ProcessError("Already started server") + elif self._state.value == State.SHUTDOWN: + raise ProcessError("Manager has shut down") + else: + raise ProcessError( + "Unknown state {!r}".format(self._state.value)) if initializer is not None and not callable(initializer): raise TypeError('initializer must be a callable') @@ -593,7 +620,14 @@ class BaseManager(object): def __enter__(self): if self._state.value == State.INITIAL: self.start() - assert self._state.value == State.STARTED + if self._state.value != State.STARTED: + if self._state.value == State.INITIAL: + raise ProcessError("Unable to start server") + elif self._state.value == State.SHUTDOWN: + raise ProcessError("Manager has shut down") + else: + raise ProcessError( + "Unknown state {!r}".format(self._state.value)) return self def __exit__(self, exc_type, exc_val, exc_tb): @@ -653,7 +687,7 @@ class BaseManager(object): getattr(proxytype, '_method_to_typeid_', None) if method_to_typeid: - for key, value in list(method_to_typeid.items()): + for key, value in list(method_to_typeid.items()): # isinstance? assert type(key) is str, '%r is not a string' % key assert type(value) is str, '%r is not a string' % value diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index c2364ab..e457f0a 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -92,7 +92,9 @@ class MaybeEncodingError(Exception): def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None, wrap_exception=False): - assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) + if (maxtasks is not None) and not (isinstance(maxtasks, int) + and maxtasks >= 1): + raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks)) put = outqueue.put get = inqueue.get if hasattr(inqueue, '_writer'): @@ -254,8 +256,8 @@ class Pool(object): def apply(self, func, args=(), kwds={}): ''' Equivalent of `func(*args, **kwds)`. + Pool must be running. ''' - assert self._state == RUN return self.apply_async(func, args, kwds).get() def map(self, func, iterable, chunksize=None): @@ -307,6 +309,10 @@ class Pool(object): )) return result else: + if chunksize < 1: + raise ValueError( + "Chunksize must be 1+, not {0:n}".format( + chunksize)) assert chunksize > 1 task_batches = Pool._get_tasks(func, iterable, chunksize) result = IMapIterator(self._cache) @@ -334,7 +340,9 @@ class Pool(object): )) return result else: - assert chunksize > 1 + if chunksize < 1: + raise ValueError( + "Chunksize must be 1+, not {0!r}".format(chunksize)) task_batches = Pool._get_tasks(func, iterable, chunksize) result = IMapUnorderedIterator(self._cache) self._taskqueue.put( @@ -466,7 +474,7 @@ class Pool(object): return if thread._state: - assert thread._state == TERMINATE + assert thread._state == TERMINATE, "Thread not in TERMINATE" util.debug('result handler found thread._state=TERMINATE') break @@ -542,7 +550,10 @@ class Pool(object): def join(self): util.debug('joining pool') - assert self._state in (CLOSE, TERMINATE) + if self._state == RUN: + raise ValueError("Pool is still running") + elif self._state not in (CLOSE, TERMINATE): + raise ValueError("In unknown state") self._worker_handler.join() self._task_handler.join() self._result_handler.join() @@ -570,7 +581,9 @@ class Pool(object): util.debug('helping task handler/workers to finish') cls._help_stuff_finish(inqueue, task_handler, len(pool)) - assert result_handler.is_alive() or len(cache) == 0 + if (not result_handler.is_alive()) and (len(cache) != 0): + raise AssertionError( + "Cannot have cache with result_hander not alive") result_handler._state = TERMINATE outqueue.put(None) # sentinel @@ -628,7 +641,8 @@ class ApplyResult(object): return self._event.is_set() def successful(self): - assert self.ready() + if not self.ready(): + raise ValueError("{0!r} not ready".format(self)) return self._success def wait(self, timeout=None): diff --git a/Lib/multiprocessing/popen_fork.py b/Lib/multiprocessing/popen_fork.py index 44ce9a9..cbdbfa7 100644 --- a/Lib/multiprocessing/popen_fork.py +++ b/Lib/multiprocessing/popen_fork.py @@ -35,7 +35,7 @@ class Popen(object): if os.WIFSIGNALED(sts): self.returncode = -os.WTERMSIG(sts) else: - assert os.WIFEXITED(sts) + assert os.WIFEXITED(sts), "Status is {:n}".format(sts) self.returncode = os.WEXITSTATUS(sts) return self.returncode diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 513807c..328efbd 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -78,7 +78,7 @@ class Queue(object): self._poll = self._reader.poll def put(self, obj, block=True, timeout=None): - assert not self._closed + assert not self._closed, "Queue {0!r} has been closed".format(self) if not self._sem.acquire(block, timeout): raise Full @@ -140,7 +140,7 @@ class Queue(object): def join_thread(self): debug('Queue.join_thread()') - assert self._closed + assert self._closed, "Queue {0!r} not closed".format(self) if self._jointhread: self._jointhread() @@ -281,7 +281,7 @@ class JoinableQueue(Queue): self._cond, self._unfinished_tasks = state[-2:] def put(self, obj, block=True, timeout=None): - assert not self._closed + assert not self._closed, "Queue {0!r} is closed".format(self) if not self._sem.acquire(block, timeout): raise Full diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py index 7f65947..deca19c 100644 --- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -165,7 +165,10 @@ else: if len(cmsg_data) % a.itemsize != 0: raise ValueError a.frombytes(cmsg_data) - assert len(a) % 256 == msg[0] + if len(a) % 256 != msg[0]: + raise AssertionError( + "Len is {0:n} but msg[0] is {1!r}".format( + len(a), msg[0])) return list(a) except (ValueError, IndexError): pass diff --git a/Lib/multiprocessing/resource_sharer.py b/Lib/multiprocessing/resource_sharer.py index e44a728..6d99da1 100644 --- a/Lib/multiprocessing/resource_sharer.py +++ b/Lib/multiprocessing/resource_sharer.py @@ -125,7 +125,7 @@ class _ResourceSharer(object): def _start(self): from .connection import Listener - assert self._listener is None + assert self._listener is None, "Already have Listener" util.debug('starting listener and thread for sending handles') self._listener = Listener(authkey=process.current_process().authkey) self._address = self._listener.address diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py index de7738e..d5f259c 100644 --- a/Lib/multiprocessing/semaphore_tracker.py +++ b/Lib/multiprocessing/semaphore_tracker.py @@ -80,7 +80,8 @@ class SemaphoreTracker(object): # bytes are atomic, and that PIPE_BUF >= 512 raise ValueError('name too long') nbytes = os.write(self._fd, msg) - assert nbytes == len(msg) + assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format( + nbytes, len(msg)) _semaphore_tracker = SemaphoreTracker() diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py index 4aba372..1f4f3f4 100644 --- a/Lib/multiprocessing/spawn.py +++ b/Lib/multiprocessing/spawn.py @@ -93,7 +93,7 @@ def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None): ''' Run code specified by data received over pipe ''' - assert is_forking(sys.argv) + assert is_forking(sys.argv), "Not forking" if sys.platform == 'win32': import msvcrt new_handle = reduction.steal_handle(parent_pid, pipe_handle) diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index 0590ed6..038f73f 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -270,13 +270,16 @@ class Condition(object): def notify(self, n=1): assert self._lock._semlock._is_mine(), 'lock is not owned' - assert not self._wait_semaphore.acquire(False) + assert not self._wait_semaphore.acquire( + False), ('notify: Should not have been able to acquire' + + '_wait_semaphore') # to take account of timeouts since last notify*() we subtract # woken_count from sleeping_count and rezero woken_count while self._woken_count.acquire(False): res = self._sleeping_count.acquire(False) - assert res + assert res, ('notify: Bug in sleeping_count.acquire' + + '- res should not be False') sleepers = 0 while sleepers < n and self._sleeping_count.acquire(False): diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index b490caa..f0827f0 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -149,12 +149,15 @@ class Finalize(object): Class which supports object finalization using weakrefs ''' def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): - assert exitpriority is None or type(exitpriority) is int + if (exitpriority is not None) and not isinstance(exitpriority,int): + raise TypeError( + "Exitpriority ({0!r}) must be None or int, not {1!s}".format( + exitpriority, type(exitpriority))) if obj is not None: self._weakref = weakref.ref(obj, self) - else: - assert exitpriority is not None + elif exitpriority is None: + raise ValueError("Without object, exitpriority cannot be None") self._callback = callback self._args = args diff --git a/Misc/NEWS.d/next/Library/2017-08-18-17-16-38.bpo-5001.gwnthq.rst b/Misc/NEWS.d/next/Library/2017-08-18-17-16-38.bpo-5001.gwnthq.rst new file mode 100644 index 0000000..640b801 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2017-08-18-17-16-38.bpo-5001.gwnthq.rst @@ -0,0 +1 @@ +Many asserts in `multiprocessing` are now more informative, and some error types have been changed to more specific ones. |