From 58ea9fedc825a91a3b153898afade19512bbde85 Mon Sep 17 00:00:00 2001 From: Benjamin Peterson Date: Tue, 19 Aug 2008 19:17:39 +0000 Subject: Merged revisions 65864 via svnmerge from svn+ssh://pythondev@svn.python.org/python/trunk ........ r65864 | jesse.noller | 2008-08-19 14:06:19 -0500 (Tue, 19 Aug 2008) | 2 lines issue3352: clean up the multiprocessing API to remove many get_/set_ methods and convert them to properties. Update the docs and the examples included. ........ --- Doc/includes/mp_distributing.py | 12 ++++---- Doc/includes/mp_pool.py | 2 +- Doc/includes/mp_synchronize.py | 2 +- Doc/includes/mp_webserver.py | 2 +- Doc/includes/mp_workers.py | 2 +- Doc/library/multiprocessing.rst | 20 ++++++------ Lib/multiprocessing/dummy/__init__.py | 3 +- Lib/multiprocessing/forking.py | 4 +-- Lib/multiprocessing/managers.py | 10 +++--- Lib/multiprocessing/pool.py | 2 +- Lib/multiprocessing/process.py | 39 ++++++++++++----------- Lib/multiprocessing/reduction.py | 6 ++-- Lib/multiprocessing/synchronize.py | 12 ++++---- Lib/multiprocessing/util.py | 4 +-- Lib/test/test_multiprocessing.py | 58 ++++++++++++++++------------------- 15 files changed, 86 insertions(+), 92 deletions(-) diff --git a/Doc/includes/mp_distributing.py b/Doc/includes/mp_distributing.py index 5cd12bb..7acefb8 100644 --- a/Doc/includes/mp_distributing.py +++ b/Doc/includes/mp_distributing.py @@ -17,10 +17,10 @@ import shutil import subprocess import logging import itertools -import Queue +import queue try: - import cPickle as pickle + import pickle as pickle except ImportError: import pickle @@ -152,7 +152,7 @@ class DistributedPool(pool.Pool): def LocalProcess(**kwds): p = Process(**kwds) - p.set_name('localhost/' + p.get_name()) + p.set_name('localhost/' + p.name) return p class Cluster(managers.SyncManager): @@ -210,7 +210,7 @@ class Cluster(managers.SyncManager): self._base_shutdown() def Process(self, group=None, target=None, name=None, args=(), kwargs={}): - slot = self._slot_iterator.next() + slot = next(self._slot_iterator) return slot.Process( group=group, target=target, name=name, args=args, kwargs=kwargs ) @@ -231,7 +231,7 @@ class Cluster(managers.SyncManager): # Queue subclass used by distributed pool # -class SettableQueue(Queue.Queue): +class SettableQueue(queue.Queue): def empty(self): return not self.queue def full(self): @@ -243,7 +243,7 @@ class SettableQueue(Queue.Queue): try: self.queue.clear() self.queue.extend(contents) - self.not_empty.notify_all() + self.not_empty.notifyAll() finally: self.not_empty.release() diff --git a/Doc/includes/mp_pool.py b/Doc/includes/mp_pool.py index b937b86..e7aaaac 100644 --- a/Doc/includes/mp_pool.py +++ b/Doc/includes/mp_pool.py @@ -14,7 +14,7 @@ import sys def calculate(func, args): result = func(*args) return '%s says that %s%s = %s' % ( - multiprocessing.current_process().get_name(), + multiprocessing.current_process().name, func.__name__, args, result ) diff --git a/Doc/includes/mp_synchronize.py b/Doc/includes/mp_synchronize.py index 8cf11bd..ddcd338 100644 --- a/Doc/includes/mp_synchronize.py +++ b/Doc/includes/mp_synchronize.py @@ -224,7 +224,7 @@ def test_sharedvalues(): p.start() p.join() - assert p.get_exitcode() == 0 + assert p.exitcode == 0 #### diff --git a/Doc/includes/mp_webserver.py b/Doc/includes/mp_webserver.py index 15d2b6b..4943f5d 100644 --- a/Doc/includes/mp_webserver.py +++ b/Doc/includes/mp_webserver.py @@ -21,7 +21,7 @@ if sys.platform == 'win32': def note(format, *args): - sys.stderr.write('[%s]\t%s\n' % (current_process().get_name(),format%args)) + sys.stderr.write('[%s]\t%s\n' % (current_process().name, format%args)) class RequestHandler(SimpleHTTPRequestHandler): diff --git a/Doc/includes/mp_workers.py b/Doc/includes/mp_workers.py index 795e6cb..07e4cdd 100644 --- a/Doc/includes/mp_workers.py +++ b/Doc/includes/mp_workers.py @@ -29,7 +29,7 @@ def worker(input, output): def calculate(func, args): result = func(*args) return '%s says that %s%s = %s' % \ - (current_process().get_name(), func.__name__, args, result) + (current_process().name, func.__name__, args, result) # # Functions referenced by tasks diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index 10ccb17..4bbd94c 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -290,11 +290,11 @@ The :mod:`multiprocessing` package mostly replicates the API of the A process cannot join itself because this would cause a deadlock. It is an error to attempt to join a process before it has been started. - .. method:: get_name() + .. attribute:: Process.name Return the process's name. - .. method:: set_name(name) + .. attribute:: Process.name = name Set the process's name. @@ -309,11 +309,11 @@ The :mod:`multiprocessing` package mostly replicates the API of the Roughly, a process object is alive from the moment the :meth:`start` method returns until the child process terminates. - .. method:: is_daemon() + .. attribute:: Process.daemon - Return the process's daemon flag. + Return the process's daemon flag., this is a boolean. - .. method:: set_daemon(daemonic) + .. attribute:: Process.daemon = daemonic Set the process's daemon flag to the Boolean value *daemonic*. This must be called before :meth:`start` is called. @@ -329,18 +329,18 @@ The :mod:`multiprocessing` package mostly replicates the API of the In addition process objects also support the following methods: - .. method:: get_pid() + .. attribute:: Process.pid Return the process ID. Before the process is spawned, this will be ``None``. - .. method:: get_exit_code() + .. attribute:: Process.exitcode Return the child's exit code. This will be ``None`` if the process has not yet terminated. A negative value *-N* indicates that the child was terminated by signal *N*. - .. method:: get_auth_key() + .. attribute:: Process.authkey Return the process's authentication key (a byte string). @@ -349,11 +349,11 @@ The :mod:`multiprocessing` package mostly replicates the API of the When a :class:`Process` object is created, it will inherit the authentication key of its parent process, although this may be changed - using :meth:`set_auth_key` below. + using :attr:`Process.authkey` below. See :ref:`multiprocessing-auth-keys`. - .. method:: set_auth_key(authkey) + .. attribute:: Process.authkey = authkey Set the process's authentication key which must be a byte string. diff --git a/Lib/multiprocessing/dummy/__init__.py b/Lib/multiprocessing/dummy/__init__.py index 48ca75b..da18877 100644 --- a/Lib/multiprocessing/dummy/__init__.py +++ b/Lib/multiprocessing/dummy/__init__.py @@ -47,7 +47,8 @@ class DummyProcess(threading.Thread): self._parent._children[self] = None threading.Thread.start(self) - def get_exitcode(self): + @property + def exitcode(self): if self._start_called and not self.is_alive(): return 0 else: diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index b14143b..47d54f2 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -315,7 +315,7 @@ else: sys_argv=sys.argv, log_to_stderr=_log_to_stderr, orig_dir=process.ORIGINAL_DIR, - authkey=process.current_process().get_authkey(), + authkey=process.current_process().authkey, ) if _logger is not None: @@ -363,7 +363,7 @@ def prepare(data): old_main_modules.append(sys.modules['__main__']) if 'name' in data: - process.current_process().set_name(data['name']) + process.current_process().name = data['name'] if 'authkey' in data: process.current_process()._authkey = data['authkey'] diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index d7558c7..d6b16e5 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -450,7 +450,7 @@ class BaseManager(object): def __init__(self, address=None, authkey=None, serializer='pickle'): if authkey is None: - authkey = current_process().get_authkey() + authkey = current_process().authkey self._address = address # XXX not final address if eg ('', 0) self._authkey = AuthenticationString(authkey) self._state = State() @@ -495,7 +495,7 @@ class BaseManager(object): self._serializer, writer), ) ident = ':'.join(str(i) for i in self._process._identity) - self._process.set_name(type(self).__name__ + '-' + ident) + self._process.name = type(self).__name__ + '-' + ident self._process.start() # get address of server @@ -696,7 +696,7 @@ class BaseProxy(object): elif self._manager is not None: self._authkey = self._manager._authkey else: - self._authkey = current_process().get_authkey() + self._authkey = current_process().authkey if incref: self._incref() @@ -705,7 +705,7 @@ class BaseProxy(object): def _connect(self): util.debug('making connection to manager') - name = current_process().get_name() + name = current_process().name if threading.current_thread().name != 'MainThread': name += '|' + threading.current_thread().name conn = self._Client(self._token.address, authkey=self._authkey) @@ -886,7 +886,7 @@ def AutoProxy(token, serializer, manager=None, authkey=None, if authkey is None and manager is not None: authkey = manager._authkey if authkey is None: - authkey = current_process().get_authkey() + authkey = current_process().authkey ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed) proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index cb0e49f..90fd178 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -99,7 +99,7 @@ class Pool(object): args=(self._inqueue, self._outqueue, initializer, initargs) ) self._pool.append(w) - w.name = w.get_name().replace('Process', 'PoolWorker') + w.name = w.name.replace('Process', 'PoolWorker') w.daemon = True w.start() diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py index 1f89dba..e21d2f0 100644 --- a/Lib/multiprocessing/process.py +++ b/Lib/multiprocessing/process.py @@ -132,45 +132,43 @@ class Process(object): self._popen.poll() return self._popen.returncode is None - def get_name(self): - ''' - Return name of process - ''' + @property + def name(self): return self._name - def set_name(self, name): - ''' - Set name of process - ''' + @name.setter + def name(self, name): assert isinstance(name, str), 'name must be a string' self._name = name - def is_daemon(self): + @property + def daemon(self): ''' Return whether process is a daemon ''' return self._daemonic - def set_daemon(self, daemonic): + @daemon.setter + def daemon(self, daemonic): ''' Set whether process is a daemon ''' assert self._popen is None, 'process has already started' self._daemonic = daemonic - def get_authkey(self): - ''' - Return authorization key of process - ''' + @property + def authkey(self): return self._authkey - def set_authkey(self, authkey): + @authkey.setter + def authkey(self, authkey): ''' Set authorization key of process ''' self._authkey = AuthenticationString(authkey) - def get_exitcode(self): + @property + def exitcode(self): ''' Return exit code of process or `None` if it has yet to stop ''' @@ -178,7 +176,8 @@ class Process(object): return self._popen return self._popen.poll() - def get_ident(self): + @property + def ident(self): ''' Return indentifier (PID) of process or `None` if it has yet to start ''' @@ -187,7 +186,7 @@ class Process(object): else: return self._popen and self._popen.pid - pid = property(get_ident) + pid = ident def __repr__(self): if self is _current_process: @@ -198,7 +197,7 @@ class Process(object): status = 'initial' else: if self._popen.poll() is not None: - status = self.get_exitcode() + status = self.exitcode else: status = 'started' @@ -245,7 +244,7 @@ class Process(object): except: exitcode = 1 import traceback - sys.stderr.write('Process %s:\n' % self.get_name()) + sys.stderr.write('Process %s:\n' % self.name) sys.stderr.flush() traceback.print_exc() diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py index 194bb17..010d871 100644 --- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -82,9 +82,9 @@ def _get_listener(): try: if _listener is None: debug('starting listener and thread for sending handles') - _listener = Listener(authkey=current_process().get_authkey()) + _listener = Listener(authkey=current_process().authkey) t = threading.Thread(target=_serve) - t.set_daemon(True) + t.daemon = True t.start() finally: _lock.release() @@ -127,7 +127,7 @@ def rebuild_handle(pickled_data): if inherited: return handle sub_debug('rebuilding handle %d', handle) - conn = Client(address, authkey=current_process().get_authkey()) + conn = Client(address, authkey=current_process().authkey) conn.send((handle, os.getpid())) new_handle = recv_handle(conn) conn.close() diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index 628792e..be56a5b 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -108,9 +108,9 @@ class Lock(SemLock): def __repr__(self): try: if self._semlock._is_mine(): - name = current_process().get_name() - if threading.current_thread().get_name() != 'MainThread': - name += '|' + threading.current_thread().get_name() + name = current_process().name + if threading.current_thread().name != 'MainThread': + name += '|' + threading.current_thread().name elif self._semlock._get_value() == 1: name = 'None' elif self._semlock._count() > 0: @@ -133,9 +133,9 @@ class RLock(SemLock): def __repr__(self): try: if self._semlock._is_mine(): - name = current_process().get_name() - if threading.current_thread().get_name() != 'MainThread': - name += '|' + threading.current_thread().get_name() + name = current_process().name + if threading.current_thread().name != 'MainThread': + name += '|' + threading.current_thread().name count = self._semlock._count() elif self._semlock._get_value() == 1: name, count = 'None', 0 diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index 8aff4f4..aae38c7 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -274,11 +274,11 @@ def _exit_function(): for p in active_children(): if p._daemonic: - info('calling terminate() for daemon %s', p.get_name()) + info('calling terminate() for daemon %s', p.name) p._popen.terminate() for p in active_children(): - info('calling join() for process %s', p.get_name()) + info('calling join() for process %s', p.name) p.join() debug('running the remaining "atexit" finalizers') diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py index 436cad8..102042d 100644 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -120,22 +120,22 @@ class _TestProcess(BaseTestCase): return current = self.current_process() - authkey = current.get_authkey() + authkey = current.authkey self.assertTrue(current.is_alive()) - self.assertTrue(not current.is_daemon()) + self.assertTrue(not current.daemon) self.assertTrue(isinstance(authkey, bytes)) self.assertTrue(len(authkey) > 0) - self.assertEqual(current.get_ident(), os.getpid()) - self.assertEqual(current.get_exitcode(), None) + self.assertEqual(current.ident, os.getpid()) + self.assertEqual(current.exitcode, None) def _test(self, q, *args, **kwds): current = self.current_process() q.put(args) q.put(kwds) - q.put(current.get_name()) + q.put(current.name) if self.TYPE != 'threads': - q.put(bytes(current.get_authkey())) + q.put(bytes(current.authkey)) q.put(current.pid) def test_process(self): @@ -147,33 +147,33 @@ class _TestProcess(BaseTestCase): p = self.Process( target=self._test, args=args, kwargs=kwargs, name=name ) - p.set_daemon(True) + p.daemon = True current = self.current_process() if self.TYPE != 'threads': - self.assertEquals(p.get_authkey(), current.get_authkey()) + self.assertEquals(p.authkey, current.authkey) self.assertEquals(p.is_alive(), False) - self.assertEquals(p.is_daemon(), True) + self.assertEquals(p.daemon, True) self.assertTrue(p not in self.active_children()) self.assertTrue(type(self.active_children()) is list) - self.assertEqual(p.get_exitcode(), None) + self.assertEqual(p.exitcode, None) p.start() - self.assertEquals(p.get_exitcode(), None) + self.assertEquals(p.exitcode, None) self.assertEquals(p.is_alive(), True) self.assertTrue(p in self.active_children()) self.assertEquals(q.get(), args[1:]) self.assertEquals(q.get(), kwargs) - self.assertEquals(q.get(), p.get_name()) + self.assertEquals(q.get(), p.name) if self.TYPE != 'threads': - self.assertEquals(q.get(), current.get_authkey()) + self.assertEquals(q.get(), current.authkey) self.assertEquals(q.get(), p.pid) p.join() - self.assertEquals(p.get_exitcode(), 0) + self.assertEquals(p.exitcode, 0) self.assertEquals(p.is_alive(), False) self.assertTrue(p not in self.active_children()) @@ -185,12 +185,12 @@ class _TestProcess(BaseTestCase): return p = self.Process(target=self._test_terminate) - p.set_daemon(True) + p.daemon = True p.start() self.assertEqual(p.is_alive(), True) self.assertTrue(p in self.active_children()) - self.assertEqual(p.get_exitcode(), None) + self.assertEqual(p.exitcode, None) p.terminate() @@ -203,8 +203,8 @@ class _TestProcess(BaseTestCase): p.join() - # XXX sometimes get p.get_exitcode() == 0 on Windows ... - #self.assertEqual(p.get_exitcode(), -signal.SIGTERM) + # XXX sometimes get p.exitcode == 0 on Windows ... + #self.assertEqual(p.exitcode, -signal.SIGTERM) def test_cpu_count(self): try: @@ -331,7 +331,7 @@ class _TestQueue(BaseTestCase): target=self._test_put, args=(queue, child_can_start, parent_can_continue) ) - proc.set_daemon(True) + proc.daemon = True proc.start() self.assertEqual(queue_empty(queue), True) @@ -397,7 +397,7 @@ class _TestQueue(BaseTestCase): target=self._test_get, args=(queue, child_can_start, parent_can_continue) ) - proc.set_daemon(True) + proc.daemon = True proc.start() self.assertEqual(queue_empty(queue), True) @@ -620,17 +620,11 @@ class _TestCondition(BaseTestCase): woken = self.Semaphore(0) p = self.Process(target=self.f, args=(cond, sleeping, woken)) - try: - p.set_daemon(True) - except AttributeError: - p.daemon = True + p.daemon = True p.start() p = threading.Thread(target=self.f, args=(cond, sleeping, woken)) - try: - p.set_daemon(True) - except AttributeError: - p.daemon = True + p.daemon = True p.start() # wait for both children to start sleeping @@ -672,7 +666,7 @@ class _TestCondition(BaseTestCase): for i in range(3): p = self.Process(target=self.f, args=(cond, sleeping, woken, TIMEOUT1)) - p.set_daemon(True) + p.daemon = True p.start() t = threading.Thread(target=self.f, @@ -695,7 +689,7 @@ class _TestCondition(BaseTestCase): # start some more threads/processes for i in range(3): p = self.Process(target=self.f, args=(cond, sleeping, woken)) - p.set_daemon(True) + p.daemon = True p.start() t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) @@ -1192,7 +1186,7 @@ class _TestConnection(BaseTestCase): conn, child_conn = self.Pipe() p = self.Process(target=self._echo, args=(child_conn,)) - p.set_daemon(True) + p.daemon = True p.start() seq = [1, 2.25, None] @@ -1341,7 +1335,7 @@ class _TestListenerClient(BaseTestCase): for family in self.connection.families: l = self.connection.Listener(family=family) p = self.Process(target=self._test, args=(l.address,)) - p.set_daemon(True) + p.daemon = True p.start() conn = l.accept() self.assertEqual(conn.recv(), 'hello') -- cgit v0.12