diff options
author | Jesse Noller <jnoller@gmail.com> | 2009-11-21 14:28:39 (GMT) |
---|---|---|
committer | Jesse Noller <jnoller@gmail.com> | 2009-11-21 14:28:39 (GMT) |
commit | dd31d11199ee95c77291b6a075241dd8c2557700 (patch) | |
tree | c9e4d9438ab8aace6d0d35e1a044d0bc1f460de4 /Doc/includes | |
parent | db149ac59b9384648099a971750b8c3151fbfec7 (diff) | |
download | cpython-dd31d11199ee95c77291b6a075241dd8c2557700.zip cpython-dd31d11199ee95c77291b6a075241dd8c2557700.tar.gz cpython-dd31d11199ee95c77291b6a075241dd8c2557700.tar.bz2 |
Merged revisions 76435 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/branches/py3k
................
r76435 | jesse.noller | 2009-11-21 09:20:14 -0500 (Sat, 21 Nov 2009) | 9 lines
Merged revisions 76433 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk
........
r76433 | jesse.noller | 2009-11-21 09:01:56 -0500 (Sat, 21 Nov 2009) | 1 line
issue5738: The distribution example was confusing, and out of date. It's too large to include inline in the docs as well. It belongs in an addons module outside the stdlib. Removing.
........
................
Diffstat (limited to 'Doc/includes')
-rw-r--r-- | Doc/includes/mp_distributing.py | 364 |
1 files changed, 0 insertions, 364 deletions
diff --git a/Doc/includes/mp_distributing.py b/Doc/includes/mp_distributing.py deleted file mode 100644 index 9a88825..0000000 --- a/Doc/includes/mp_distributing.py +++ /dev/null @@ -1,364 +0,0 @@ -# -# Module to allow spawning of processes on foreign host -# -# Depends on `multiprocessing` package -- tested with `processing-0.60` -# -# Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# - -__all__ = ['Cluster', 'Host', 'get_logger', 'current_process'] - -# -# Imports -# - -import sys -import os -import tarfile -import shutil -import subprocess -import logging -import itertools -import queue - -try: - import pickle as pickle -except ImportError: - import pickle - -from multiprocessing import Process, current_process, cpu_count -from multiprocessing import util, managers, connection, forking, pool - -# -# Logging -# - -def get_logger(): - return _logger - -_logger = logging.getLogger('distributing') -_logger.propagate = 0 - -_formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT) -_handler = logging.StreamHandler() -_handler.setFormatter(_formatter) -_logger.addHandler(_handler) - -info = _logger.info -debug = _logger.debug - -# -# Get number of cpus -# - -try: - slot_count = cpu_count() -except NotImplemented: - slot_count = 1 - -# -# Manager type which spawns subprocesses -# - -class HostManager(managers.SyncManager): - ''' - Manager type used for spawning processes on a (presumably) foreign host - ''' - def __init__(self, address, authkey): - managers.SyncManager.__init__(self, address, authkey) - self._name = 'Host-unknown' - - def Process(self, group=None, target=None, name=None, args=(), kwargs={}): - if hasattr(sys.modules['__main__'], '__file__'): - main_path = os.path.basename(sys.modules['__main__'].__file__) - else: - main_path = None - data = pickle.dumps((target, args, kwargs)) - p = self._RemoteProcess(data, main_path) - if name is None: - temp = self._name.split('Host-')[-1] + '/Process-%s' - name = temp % ':'.join(map(str, p.get_identity())) - p.set_name(name) - return p - - @classmethod - def from_address(cls, address, authkey): - manager = cls(address, authkey) - managers.transact(address, authkey, 'dummy') - manager._state.value = managers.State.STARTED - manager._name = 'Host-%s:%s' % manager.address - manager.shutdown = util.Finalize( - manager, HostManager._finalize_host, - args=(manager._address, manager._authkey, manager._name), - exitpriority=-10 - ) - return manager - - @staticmethod - def _finalize_host(address, authkey, name): - managers.transact(address, authkey, 'shutdown') - - def __repr__(self): - return '<Host(%s)>' % self._name - -# -# Process subclass representing a process on (possibly) a remote machine -# - -class RemoteProcess(Process): - ''' - Represents a process started on a remote host - ''' - def __init__(self, data, main_path): - assert not main_path or os.path.basename(main_path) == main_path - Process.__init__(self) - self._data = data - self._main_path = main_path - - def _bootstrap(self): - forking.prepare({'main_path': self._main_path}) - self._target, self._args, self._kwargs = pickle.loads(self._data) - return Process._bootstrap(self) - - def get_identity(self): - return self._identity - -HostManager.register('_RemoteProcess', RemoteProcess) - -# -# A Pool class that uses a cluster -# - -class DistributedPool(pool.Pool): - - def __init__(self, cluster, processes=None, initializer=None, initargs=()): - self._cluster = cluster - self.Process = cluster.Process - pool.Pool.__init__(self, processes or len(cluster), - initializer, initargs) - - def _setup_queues(self): - self._inqueue = self._cluster._SettableQueue() - self._outqueue = self._cluster._SettableQueue() - self._quick_put = self._inqueue.put - self._quick_get = self._outqueue.get - - @staticmethod - def _help_stuff_finish(inqueue, task_handler, size): - inqueue.set_contents([None] * size) - -# -# Manager type which starts host managers on other machines -# - -def LocalProcess(**kwds): - p = Process(**kwds) - p.set_name('localhost/' + p.name) - return p - -class Cluster(managers.SyncManager): - ''' - Represents collection of slots running on various hosts. - - `Cluster` is a subclass of `SyncManager` so it allows creation of - various types of shared objects. - ''' - def __init__(self, hostlist, modules): - managers.SyncManager.__init__(self, address=('localhost', 0)) - self._hostlist = hostlist - self._modules = modules - if __name__ not in modules: - modules.append(__name__) - files = [sys.modules[name].__file__ for name in modules] - for i, file in enumerate(files): - if file.endswith('.pyc') or file.endswith('.pyo'): - files[i] = file[:-4] + '.py' - self._files = [os.path.abspath(file) for file in files] - - def start(self): - managers.SyncManager.start(self) - - l = connection.Listener(family='AF_INET', authkey=self._authkey) - - for i, host in enumerate(self._hostlist): - host._start_manager(i, self._authkey, l.address, self._files) - - for host in self._hostlist: - if host.hostname != 'localhost': - conn = l.accept() - i, address, cpus = conn.recv() - conn.close() - other_host = self._hostlist[i] - other_host.manager = HostManager.from_address(address, - self._authkey) - other_host.slots = other_host.slots or cpus - other_host.Process = other_host.manager.Process - else: - host.slots = host.slots or slot_count - host.Process = LocalProcess - - self._slotlist = [ - Slot(host) for host in self._hostlist for i in range(host.slots) - ] - self._slot_iterator = itertools.cycle(self._slotlist) - self._base_shutdown = self.shutdown - del self.shutdown - - def shutdown(self): - for host in self._hostlist: - if host.hostname != 'localhost': - host.manager.shutdown() - self._base_shutdown() - - def Process(self, group=None, target=None, name=None, args=(), kwargs={}): - slot = next(self._slot_iterator) - return slot.Process( - group=group, target=target, name=name, args=args, kwargs=kwargs - ) - - def Pool(self, processes=None, initializer=None, initargs=()): - return DistributedPool(self, processes, initializer, initargs) - - def __getitem__(self, i): - return self._slotlist[i] - - def __len__(self): - return len(self._slotlist) - - def __iter__(self): - return iter(self._slotlist) - -# -# Queue subclass used by distributed pool -# - -class SettableQueue(queue.Queue): - def empty(self): - return not self.queue - def full(self): - return self.maxsize > 0 and len(self.queue) == self.maxsize - def set_contents(self, contents): - # length of contents must be at least as large as the number of - # threads which have potentially called get() - self.not_empty.acquire() - try: - self.queue.clear() - self.queue.extend(contents) - self.not_empty.notifyAll() - finally: - self.not_empty.release() - -Cluster.register('_SettableQueue', SettableQueue) - -# -# Class representing a notional cpu in the cluster -# - -class Slot(object): - def __init__(self, host): - self.host = host - self.Process = host.Process - -# -# Host -# - -class Host(object): - ''' - Represents a host to use as a node in a cluster. - - `hostname` gives the name of the host. If hostname is not - "localhost" then ssh is used to log in to the host. To log in as - a different user use a host name of the form - "username@somewhere.org" - - `slots` is used to specify the number of slots for processes on - the host. This affects how often processes will be allocated to - this host. Normally this should be equal to the number of cpus on - that host. - ''' - def __init__(self, hostname, slots=None): - self.hostname = hostname - self.slots = slots - - def _start_manager(self, index, authkey, address, files): - if self.hostname != 'localhost': - tempdir = copy_to_remote_temporary_directory(self.hostname, files) - debug('startup files copied to %s:%s', self.hostname, tempdir) - p = subprocess.Popen( - ['ssh', self.hostname, 'python', '-c', - '"import os; os.chdir(%r); ' - 'from distributing import main; main()"' % tempdir], - stdin=subprocess.PIPE - ) - data = dict( - name='BoostrappingHost', index=index, - dist_log_level=_logger.getEffectiveLevel(), - dir=tempdir, authkey=str(authkey), parent_address=address - ) - pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL) - p.stdin.close() - -# -# Copy files to remote directory, returning name of directory -# - -unzip_code = '''" -import tempfile, os, sys, tarfile -tempdir = tempfile.mkdtemp(prefix='distrib-') -os.chdir(tempdir) -tf = tarfile.open(fileobj=sys.stdin, mode='r|gz') -for ti in tf: - tf.extract(ti) -print tempdir -"''' - -def copy_to_remote_temporary_directory(host, files): - p = subprocess.Popen( - ['ssh', host, 'python', '-c', unzip_code], - stdout=subprocess.PIPE, stdin=subprocess.PIPE - ) - tf = tarfile.open(fileobj=p.stdin, mode='w|gz') - for name in files: - tf.add(name, os.path.basename(name)) - tf.close() - p.stdin.close() - return p.stdout.read().rstrip() - -# -# Code which runs a host manager -# - -def main(): - # get data from parent over stdin - data = pickle.load(sys.stdin) - sys.stdin.close() - - # set some stuff - _logger.setLevel(data['dist_log_level']) - forking.prepare(data) - - # create server for a `HostManager` object - server = managers.Server(HostManager._registry, ('', 0), data['authkey']) - current_process()._server = server - - # report server address and number of cpus back to parent - conn = connection.Client(data['parent_address'], authkey=data['authkey']) - conn.send((data['index'], server.address, slot_count)) - conn.close() - - # set name etc - current_process().set_name('Host-%s:%s' % server.address) - util._run_after_forkers() - - # register a cleanup function - def cleanup(directory): - debug('removing directory %s', directory) - shutil.rmtree(directory) - debug('shutting down host manager') - util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0) - - # start host manager - debug('remote host manager starting in %s', data['dir']) - server.serve_forever() |