summaryrefslogtreecommitdiffstats
path: root/Doc/includes
diff options
context:
space:
mode:
authorGeorg Brandl <georg@python.org>2009-01-03 21:55:17 (GMT)
committerGeorg Brandl <georg@python.org>2009-01-03 21:55:17 (GMT)
commit734373cc4dfe81da12674b2f5a801ff14a5ebfe3 (patch)
tree353daaf1d6004adc26631a22ae31ca655eb966c0 /Doc/includes
parent9b520efa8f4cebf9ecac41ccb529b9894c5cef95 (diff)
downloadcpython-734373cc4dfe81da12674b2f5a801ff14a5ebfe3.zip
cpython-734373cc4dfe81da12674b2f5a801ff14a5ebfe3.tar.gz
cpython-734373cc4dfe81da12674b2f5a801ff14a5ebfe3.tar.bz2
Merged revisions 68133-68134,68141-68142,68145-68146,68148-68149,68159-68162,68166,68171-68174,68179,68195-68196,68210,68214-68215,68217-68222 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk ........ r68133 | antoine.pitrou | 2009-01-01 16:38:03 +0100 (Thu, 01 Jan 2009) | 1 line fill in actual issue number in tests ........ r68134 | hirokazu.yamamoto | 2009-01-01 16:45:39 +0100 (Thu, 01 Jan 2009) | 2 lines Issue #4797: IOError.filename was not set when _fileio.FileIO failed to open file with `str' filename on Windows. ........ r68141 | benjamin.peterson | 2009-01-01 17:43:12 +0100 (Thu, 01 Jan 2009) | 1 line fix highlighting ........ r68142 | benjamin.peterson | 2009-01-01 18:29:49 +0100 (Thu, 01 Jan 2009) | 2 lines welcome to 2009, Python! ........ r68145 | amaury.forgeotdarc | 2009-01-02 01:03:54 +0100 (Fri, 02 Jan 2009) | 5 lines #4801 _collections module fails to build on cygwin. _PyObject_GC_TRACK is the macro version of PyObject_GC_Track, and according to documentation it should not be used for extension modules. ........ r68146 | ronald.oussoren | 2009-01-02 11:44:46 +0100 (Fri, 02 Jan 2009) | 2 lines Fix for issue4472: "configure --enable-shared doesn't work on OSX" ........ r68148 | ronald.oussoren | 2009-01-02 11:48:31 +0100 (Fri, 02 Jan 2009) | 2 lines Forgot to add a NEWS item in my previous checkin ........ r68149 | ronald.oussoren | 2009-01-02 11:50:48 +0100 (Fri, 02 Jan 2009) | 2 lines Fix for issue4780 ........ r68159 | ronald.oussoren | 2009-01-02 15:48:17 +0100 (Fri, 02 Jan 2009) | 2 lines Fix for issue 1627952 ........ r68160 | ronald.oussoren | 2009-01-02 15:52:09 +0100 (Fri, 02 Jan 2009) | 2 lines Fix for issue r1737832 ........ r68161 | ronald.oussoren | 2009-01-02 16:00:05 +0100 (Fri, 02 Jan 2009) | 3 lines Fix for issue 1149804 ........ r68162 | ronald.oussoren | 2009-01-02 16:06:00 +0100 (Fri, 02 Jan 2009) | 3 lines Fix for issue 4472 is incompatible with Cygwin, this patch should fix that. ........ r68166 | benjamin.peterson | 2009-01-02 19:26:23 +0100 (Fri, 02 Jan 2009) | 1 line document PyMemberDef ........ r68171 | georg.brandl | 2009-01-02 21:25:14 +0100 (Fri, 02 Jan 2009) | 3 lines #4811: fix markup glitches (mostly remains of the conversion), found by Gabriel Genellina. ........ r68172 | martin.v.loewis | 2009-01-02 21:32:55 +0100 (Fri, 02 Jan 2009) | 2 lines Issue #4075: Use OutputDebugStringW in Py_FatalError. ........ r68173 | martin.v.loewis | 2009-01-02 21:40:14 +0100 (Fri, 02 Jan 2009) | 2 lines Issue #4051: Prevent conflict of UNICODE macros in cPickle. ........ r68174 | benjamin.peterson | 2009-01-02 21:47:27 +0100 (Fri, 02 Jan 2009) | 1 line fix compilation on non-Windows platforms ........ r68179 | raymond.hettinger | 2009-01-02 22:26:45 +0100 (Fri, 02 Jan 2009) | 1 line Issue #4615. Document how to use itertools for de-duping. ........ r68195 | georg.brandl | 2009-01-03 14:45:15 +0100 (Sat, 03 Jan 2009) | 2 lines Remove useless string literal. ........ r68196 | georg.brandl | 2009-01-03 15:29:53 +0100 (Sat, 03 Jan 2009) | 2 lines Fix indentation. ........ r68210 | georg.brandl | 2009-01-03 20:10:12 +0100 (Sat, 03 Jan 2009) | 2 lines Set eol-style correctly for mp_distributing.py. ........ r68214 | georg.brandl | 2009-01-03 20:44:48 +0100 (Sat, 03 Jan 2009) | 2 lines Make indentation consistent. ........ r68215 | georg.brandl | 2009-01-03 21:15:14 +0100 (Sat, 03 Jan 2009) | 2 lines Fix role name. ........ r68217 | georg.brandl | 2009-01-03 21:30:15 +0100 (Sat, 03 Jan 2009) | 2 lines Add rstlint, a little tool to find subtle markup problems and inconsistencies in the Doc sources. ........ r68218 | georg.brandl | 2009-01-03 21:38:59 +0100 (Sat, 03 Jan 2009) | 2 lines Recognize usage of the default role. ........ r68219 | georg.brandl | 2009-01-03 21:47:01 +0100 (Sat, 03 Jan 2009) | 2 lines Fix uses of the default role. ........ r68220 | georg.brandl | 2009-01-03 21:55:06 +0100 (Sat, 03 Jan 2009) | 2 lines Remove trailing whitespace. ........ r68221 | georg.brandl | 2009-01-03 22:04:55 +0100 (Sat, 03 Jan 2009) | 2 lines Remove tabs from the documentation. ........ r68222 | georg.brandl | 2009-01-03 22:11:58 +0100 (Sat, 03 Jan 2009) | 2 lines Disable the line length checker by default. ........
Diffstat (limited to 'Doc/includes')
-rw-r--r--Doc/includes/mp_distributing.py728
1 files changed, 364 insertions, 364 deletions
diff --git a/Doc/includes/mp_distributing.py b/Doc/includes/mp_distributing.py
index 5ec718b..43c7ad1 100644
--- a/Doc/includes/mp_distributing.py
+++ b/Doc/includes/mp_distributing.py
@@ -1,364 +1,364 @@
-#
-# Module to allow spawning of processes on foreign host
-#
-# Depends on `multiprocessing` package -- tested with `processing-0.60`
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
-__all__ = ['Cluster', 'Host', 'get_logger', 'current_process']
-
-#
-# Imports
-#
-
-import sys
-import os
-import tarfile
-import shutil
-import subprocess
-import logging
-import itertools
-import Queue
-
-try:
- import cPickle as pickle
-except ImportError:
- import pickle
-
-from multiprocessing import Process, current_process, cpu_count
-from multiprocessing import util, managers, connection, forking, pool
-
-#
-# Logging
-#
-
-def get_logger():
- return _logger
-
-_logger = logging.getLogger('distributing')
-_logger.propogate = 0
-
-_formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT)
-_handler = logging.StreamHandler()
-_handler.setFormatter(_formatter)
-_logger.addHandler(_handler)
-
-info = _logger.info
-debug = _logger.debug
-
-#
-# Get number of cpus
-#
-
-try:
- slot_count = cpu_count()
-except NotImplemented:
- slot_count = 1
-
-#
-# Manager type which spawns subprocesses
-#
-
-class HostManager(managers.SyncManager):
- '''
- Manager type used for spawning processes on a (presumably) foreign host
- '''
- def __init__(self, address, authkey):
- managers.SyncManager.__init__(self, address, authkey)
- self._name = 'Host-unknown'
-
- def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
- if hasattr(sys.modules['__main__'], '__file__'):
- main_path = os.path.basename(sys.modules['__main__'].__file__)
- else:
- main_path = None
- data = pickle.dumps((target, args, kwargs))
- p = self._RemoteProcess(data, main_path)
- if name is None:
- temp = self._name.split('Host-')[-1] + '/Process-%s'
- name = temp % ':'.join(map(str, p.get_identity()))
- p.set_name(name)
- return p
-
- @classmethod
- def from_address(cls, address, authkey):
- manager = cls(address, authkey)
- managers.transact(address, authkey, 'dummy')
- manager._state.value = managers.State.STARTED
- manager._name = 'Host-%s:%s' % manager.address
- manager.shutdown = util.Finalize(
- manager, HostManager._finalize_host,
- args=(manager._address, manager._authkey, manager._name),
- exitpriority=-10
- )
- return manager
-
- @staticmethod
- def _finalize_host(address, authkey, name):
- managers.transact(address, authkey, 'shutdown')
-
- def __repr__(self):
- return '<Host(%s)>' % self._name
-
-#
-# Process subclass representing a process on (possibly) a remote machine
-#
-
-class RemoteProcess(Process):
- '''
- Represents a process started on a remote host
- '''
- def __init__(self, data, main_path):
- assert not main_path or os.path.basename(main_path) == main_path
- Process.__init__(self)
- self._data = data
- self._main_path = main_path
-
- def _bootstrap(self):
- forking.prepare({'main_path': self._main_path})
- self._target, self._args, self._kwargs = pickle.loads(self._data)
- return Process._bootstrap(self)
-
- def get_identity(self):
- return self._identity
-
-HostManager.register('_RemoteProcess', RemoteProcess)
-
-#
-# A Pool class that uses a cluster
-#
-
-class DistributedPool(pool.Pool):
-
- def __init__(self, cluster, processes=None, initializer=None, initargs=()):
- self._cluster = cluster
- self.Process = cluster.Process
- pool.Pool.__init__(self, processes or len(cluster),
- initializer, initargs)
-
- def _setup_queues(self):
- self._inqueue = self._cluster._SettableQueue()
- self._outqueue = self._cluster._SettableQueue()
- self._quick_put = self._inqueue.put
- self._quick_get = self._outqueue.get
-
- @staticmethod
- def _help_stuff_finish(inqueue, task_handler, size):
- inqueue.set_contents([None] * size)
-
-#
-# Manager type which starts host managers on other machines
-#
-
-def LocalProcess(**kwds):
- p = Process(**kwds)
- p.set_name('localhost/' + p.name)
- return p
-
-class Cluster(managers.SyncManager):
- '''
- Represents collection of slots running on various hosts.
-
- `Cluster` is a subclass of `SyncManager` so it allows creation of
- various types of shared objects.
- '''
- def __init__(self, hostlist, modules):
- managers.SyncManager.__init__(self, address=('localhost', 0))
- self._hostlist = hostlist
- self._modules = modules
- if __name__ not in modules:
- modules.append(__name__)
- files = [sys.modules[name].__file__ for name in modules]
- for i, file in enumerate(files):
- if file.endswith('.pyc') or file.endswith('.pyo'):
- files[i] = file[:-4] + '.py'
- self._files = [os.path.abspath(file) for file in files]
-
- def start(self):
- managers.SyncManager.start(self)
-
- l = connection.Listener(family='AF_INET', authkey=self._authkey)
-
- for i, host in enumerate(self._hostlist):
- host._start_manager(i, self._authkey, l.address, self._files)
-
- for host in self._hostlist:
- if host.hostname != 'localhost':
- conn = l.accept()
- i, address, cpus = conn.recv()
- conn.close()
- other_host = self._hostlist[i]
- other_host.manager = HostManager.from_address(address,
- self._authkey)
- other_host.slots = other_host.slots or cpus
- other_host.Process = other_host.manager.Process
- else:
- host.slots = host.slots or slot_count
- host.Process = LocalProcess
-
- self._slotlist = [
- Slot(host) for host in self._hostlist for i in range(host.slots)
- ]
- self._slot_iterator = itertools.cycle(self._slotlist)
- self._base_shutdown = self.shutdown
- del self.shutdown
-
- def shutdown(self):
- for host in self._hostlist:
- if host.hostname != 'localhost':
- host.manager.shutdown()
- self._base_shutdown()
-
- def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
- slot = self._slot_iterator.next()
- return slot.Process(
- group=group, target=target, name=name, args=args, kwargs=kwargs
- )
-
- def Pool(self, processes=None, initializer=None, initargs=()):
- return DistributedPool(self, processes, initializer, initargs)
-
- def __getitem__(self, i):
- return self._slotlist[i]
-
- def __len__(self):
- return len(self._slotlist)
-
- def __iter__(self):
- return iter(self._slotlist)
-
-#
-# Queue subclass used by distributed pool
-#
-
-class SettableQueue(Queue.Queue):
- def empty(self):
- return not self.queue
- def full(self):
- return self.maxsize > 0 and len(self.queue) == self.maxsize
- def set_contents(self, contents):
- # length of contents must be at least as large as the number of
- # threads which have potentially called get()
- self.not_empty.acquire()
- try:
- self.queue.clear()
- self.queue.extend(contents)
- self.not_empty.notifyAll()
- finally:
- self.not_empty.release()
-
-Cluster.register('_SettableQueue', SettableQueue)
-
-#
-# Class representing a notional cpu in the cluster
-#
-
-class Slot(object):
- def __init__(self, host):
- self.host = host
- self.Process = host.Process
-
-#
-# Host
-#
-
-class Host(object):
- '''
- Represents a host to use as a node in a cluster.
-
- `hostname` gives the name of the host. If hostname is not
- "localhost" then ssh is used to log in to the host. To log in as
- a different user use a host name of the form
- "username@somewhere.org"
-
- `slots` is used to specify the number of slots for processes on
- the host. This affects how often processes will be allocated to
- this host. Normally this should be equal to the number of cpus on
- that host.
- '''
- def __init__(self, hostname, slots=None):
- self.hostname = hostname
- self.slots = slots
-
- def _start_manager(self, index, authkey, address, files):
- if self.hostname != 'localhost':
- tempdir = copy_to_remote_temporary_directory(self.hostname, files)
- debug('startup files copied to %s:%s', self.hostname, tempdir)
- p = subprocess.Popen(
- ['ssh', self.hostname, 'python', '-c',
- '"import os; os.chdir(%r); '
- 'from distributing import main; main()"' % tempdir],
- stdin=subprocess.PIPE
- )
- data = dict(
- name='BoostrappingHost', index=index,
- dist_log_level=_logger.getEffectiveLevel(),
- dir=tempdir, authkey=str(authkey), parent_address=address
- )
- pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL)
- p.stdin.close()
-
-#
-# Copy files to remote directory, returning name of directory
-#
-
-unzip_code = '''"
-import tempfile, os, sys, tarfile
-tempdir = tempfile.mkdtemp(prefix='distrib-')
-os.chdir(tempdir)
-tf = tarfile.open(fileobj=sys.stdin, mode='r|gz')
-for ti in tf:
- tf.extract(ti)
-print tempdir
-"'''
-
-def copy_to_remote_temporary_directory(host, files):
- p = subprocess.Popen(
- ['ssh', host, 'python', '-c', unzip_code],
- stdout=subprocess.PIPE, stdin=subprocess.PIPE
- )
- tf = tarfile.open(fileobj=p.stdin, mode='w|gz')
- for name in files:
- tf.add(name, os.path.basename(name))
- tf.close()
- p.stdin.close()
- return p.stdout.read().rstrip()
-
-#
-# Code which runs a host manager
-#
-
-def main():
- # get data from parent over stdin
- data = pickle.load(sys.stdin)
- sys.stdin.close()
-
- # set some stuff
- _logger.setLevel(data['dist_log_level'])
- forking.prepare(data)
-
- # create server for a `HostManager` object
- server = managers.Server(HostManager._registry, ('', 0), data['authkey'])
- current_process()._server = server
-
- # report server address and number of cpus back to parent
- conn = connection.Client(data['parent_address'], authkey=data['authkey'])
- conn.send((data['index'], server.address, slot_count))
- conn.close()
-
- # set name etc
- current_process().set_name('Host-%s:%s' % server.address)
- util._run_after_forkers()
-
- # register a cleanup function
- def cleanup(directory):
- debug('removing directory %s', directory)
- shutil.rmtree(directory)
- debug('shutting down host manager')
- util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0)
-
- # start host manager
- debug('remote host manager starting in %s', data['dir'])
- server.serve_forever()
+#
+# Module to allow spawning of processes on foreign host
+#
+# Depends on `multiprocessing` package -- tested with `processing-0.60`
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# All rights reserved.
+#
+
+__all__ = ['Cluster', 'Host', 'get_logger', 'current_process']
+
+#
+# Imports
+#
+
+import sys
+import os
+import tarfile
+import shutil
+import subprocess
+import logging
+import itertools
+import Queue
+
+try:
+ import cPickle as pickle
+except ImportError:
+ import pickle
+
+from multiprocessing import Process, current_process, cpu_count
+from multiprocessing import util, managers, connection, forking, pool
+
+#
+# Logging
+#
+
+def get_logger():
+ return _logger
+
+_logger = logging.getLogger('distributing')
+_logger.propogate = 0
+
+_formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT)
+_handler = logging.StreamHandler()
+_handler.setFormatter(_formatter)
+_logger.addHandler(_handler)
+
+info = _logger.info
+debug = _logger.debug
+
+#
+# Get number of cpus
+#
+
+try:
+ slot_count = cpu_count()
+except NotImplemented:
+ slot_count = 1
+
+#
+# Manager type which spawns subprocesses
+#
+
+class HostManager(managers.SyncManager):
+ '''
+ Manager type used for spawning processes on a (presumably) foreign host
+ '''
+ def __init__(self, address, authkey):
+ managers.SyncManager.__init__(self, address, authkey)
+ self._name = 'Host-unknown'
+
+ def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
+ if hasattr(sys.modules['__main__'], '__file__'):
+ main_path = os.path.basename(sys.modules['__main__'].__file__)
+ else:
+ main_path = None
+ data = pickle.dumps((target, args, kwargs))
+ p = self._RemoteProcess(data, main_path)
+ if name is None:
+ temp = self._name.split('Host-')[-1] + '/Process-%s'
+ name = temp % ':'.join(map(str, p.get_identity()))
+ p.set_name(name)
+ return p
+
+ @classmethod
+ def from_address(cls, address, authkey):
+ manager = cls(address, authkey)
+ managers.transact(address, authkey, 'dummy')
+ manager._state.value = managers.State.STARTED
+ manager._name = 'Host-%s:%s' % manager.address
+ manager.shutdown = util.Finalize(
+ manager, HostManager._finalize_host,
+ args=(manager._address, manager._authkey, manager._name),
+ exitpriority=-10
+ )
+ return manager
+
+ @staticmethod
+ def _finalize_host(address, authkey, name):
+ managers.transact(address, authkey, 'shutdown')
+
+ def __repr__(self):
+ return '<Host(%s)>' % self._name
+
+#
+# Process subclass representing a process on (possibly) a remote machine
+#
+
+class RemoteProcess(Process):
+ '''
+ Represents a process started on a remote host
+ '''
+ def __init__(self, data, main_path):
+ assert not main_path or os.path.basename(main_path) == main_path
+ Process.__init__(self)
+ self._data = data
+ self._main_path = main_path
+
+ def _bootstrap(self):
+ forking.prepare({'main_path': self._main_path})
+ self._target, self._args, self._kwargs = pickle.loads(self._data)
+ return Process._bootstrap(self)
+
+ def get_identity(self):
+ return self._identity
+
+HostManager.register('_RemoteProcess', RemoteProcess)
+
+#
+# A Pool class that uses a cluster
+#
+
+class DistributedPool(pool.Pool):
+
+ def __init__(self, cluster, processes=None, initializer=None, initargs=()):
+ self._cluster = cluster
+ self.Process = cluster.Process
+ pool.Pool.__init__(self, processes or len(cluster),
+ initializer, initargs)
+
+ def _setup_queues(self):
+ self._inqueue = self._cluster._SettableQueue()
+ self._outqueue = self._cluster._SettableQueue()
+ self._quick_put = self._inqueue.put
+ self._quick_get = self._outqueue.get
+
+ @staticmethod
+ def _help_stuff_finish(inqueue, task_handler, size):
+ inqueue.set_contents([None] * size)
+
+#
+# Manager type which starts host managers on other machines
+#
+
+def LocalProcess(**kwds):
+ p = Process(**kwds)
+ p.set_name('localhost/' + p.name)
+ return p
+
+class Cluster(managers.SyncManager):
+ '''
+ Represents collection of slots running on various hosts.
+
+ `Cluster` is a subclass of `SyncManager` so it allows creation of
+ various types of shared objects.
+ '''
+ def __init__(self, hostlist, modules):
+ managers.SyncManager.__init__(self, address=('localhost', 0))
+ self._hostlist = hostlist
+ self._modules = modules
+ if __name__ not in modules:
+ modules.append(__name__)
+ files = [sys.modules[name].__file__ for name in modules]
+ for i, file in enumerate(files):
+ if file.endswith('.pyc') or file.endswith('.pyo'):
+ files[i] = file[:-4] + '.py'
+ self._files = [os.path.abspath(file) for file in files]
+
+ def start(self):
+ managers.SyncManager.start(self)
+
+ l = connection.Listener(family='AF_INET', authkey=self._authkey)
+
+ for i, host in enumerate(self._hostlist):
+ host._start_manager(i, self._authkey, l.address, self._files)
+
+ for host in self._hostlist:
+ if host.hostname != 'localhost':
+ conn = l.accept()
+ i, address, cpus = conn.recv()
+ conn.close()
+ other_host = self._hostlist[i]
+ other_host.manager = HostManager.from_address(address,
+ self._authkey)
+ other_host.slots = other_host.slots or cpus
+ other_host.Process = other_host.manager.Process
+ else:
+ host.slots = host.slots or slot_count
+ host.Process = LocalProcess
+
+ self._slotlist = [
+ Slot(host) for host in self._hostlist for i in range(host.slots)
+ ]
+ self._slot_iterator = itertools.cycle(self._slotlist)
+ self._base_shutdown = self.shutdown
+ del self.shutdown
+
+ def shutdown(self):
+ for host in self._hostlist:
+ if host.hostname != 'localhost':
+ host.manager.shutdown()
+ self._base_shutdown()
+
+ def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
+ slot = self._slot_iterator.next()
+ return slot.Process(
+ group=group, target=target, name=name, args=args, kwargs=kwargs
+ )
+
+ def Pool(self, processes=None, initializer=None, initargs=()):
+ return DistributedPool(self, processes, initializer, initargs)
+
+ def __getitem__(self, i):
+ return self._slotlist[i]
+
+ def __len__(self):
+ return len(self._slotlist)
+
+ def __iter__(self):
+ return iter(self._slotlist)
+
+#
+# Queue subclass used by distributed pool
+#
+
+class SettableQueue(Queue.Queue):
+ def empty(self):
+ return not self.queue
+ def full(self):
+ return self.maxsize > 0 and len(self.queue) == self.maxsize
+ def set_contents(self, contents):
+ # length of contents must be at least as large as the number of
+ # threads which have potentially called get()
+ self.not_empty.acquire()
+ try:
+ self.queue.clear()
+ self.queue.extend(contents)
+ self.not_empty.notifyAll()
+ finally:
+ self.not_empty.release()
+
+Cluster.register('_SettableQueue', SettableQueue)
+
+#
+# Class representing a notional cpu in the cluster
+#
+
+class Slot(object):
+ def __init__(self, host):
+ self.host = host
+ self.Process = host.Process
+
+#
+# Host
+#
+
+class Host(object):
+ '''
+ Represents a host to use as a node in a cluster.
+
+ `hostname` gives the name of the host. If hostname is not
+ "localhost" then ssh is used to log in to the host. To log in as
+ a different user use a host name of the form
+ "username@somewhere.org"
+
+ `slots` is used to specify the number of slots for processes on
+ the host. This affects how often processes will be allocated to
+ this host. Normally this should be equal to the number of cpus on
+ that host.
+ '''
+ def __init__(self, hostname, slots=None):
+ self.hostname = hostname
+ self.slots = slots
+
+ def _start_manager(self, index, authkey, address, files):
+ if self.hostname != 'localhost':
+ tempdir = copy_to_remote_temporary_directory(self.hostname, files)
+ debug('startup files copied to %s:%s', self.hostname, tempdir)
+ p = subprocess.Popen(
+ ['ssh', self.hostname, 'python', '-c',
+ '"import os; os.chdir(%r); '
+ 'from distributing import main; main()"' % tempdir],
+ stdin=subprocess.PIPE
+ )
+ data = dict(
+ name='BoostrappingHost', index=index,
+ dist_log_level=_logger.getEffectiveLevel(),
+ dir=tempdir, authkey=str(authkey), parent_address=address
+ )
+ pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL)
+ p.stdin.close()
+
+#
+# Copy files to remote directory, returning name of directory
+#
+
+unzip_code = '''"
+import tempfile, os, sys, tarfile
+tempdir = tempfile.mkdtemp(prefix='distrib-')
+os.chdir(tempdir)
+tf = tarfile.open(fileobj=sys.stdin, mode='r|gz')
+for ti in tf:
+ tf.extract(ti)
+print tempdir
+"'''
+
+def copy_to_remote_temporary_directory(host, files):
+ p = subprocess.Popen(
+ ['ssh', host, 'python', '-c', unzip_code],
+ stdout=subprocess.PIPE, stdin=subprocess.PIPE
+ )
+ tf = tarfile.open(fileobj=p.stdin, mode='w|gz')
+ for name in files:
+ tf.add(name, os.path.basename(name))
+ tf.close()
+ p.stdin.close()
+ return p.stdout.read().rstrip()
+
+#
+# Code which runs a host manager
+#
+
+def main():
+ # get data from parent over stdin
+ data = pickle.load(sys.stdin)
+ sys.stdin.close()
+
+ # set some stuff
+ _logger.setLevel(data['dist_log_level'])
+ forking.prepare(data)
+
+ # create server for a `HostManager` object
+ server = managers.Server(HostManager._registry, ('', 0), data['authkey'])
+ current_process()._server = server
+
+ # report server address and number of cpus back to parent
+ conn = connection.Client(data['parent_address'], authkey=data['authkey'])
+ conn.send((data['index'], server.address, slot_count))
+ conn.close()
+
+ # set name etc
+ current_process().set_name('Host-%s:%s' % server.address)
+ util._run_after_forkers()
+
+ # register a cleanup function
+ def cleanup(directory):
+ debug('removing directory %s', directory)
+ shutil.rmtree(directory)
+ debug('shutting down host manager')
+ util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0)
+
+ # start host manager
+ debug('remote host manager starting in %s', data['dir'])
+ server.serve_forever()