summaryrefslogtreecommitdiffstats
path: root/Doc/includes
diff options
context:
space:
mode:
Diffstat (limited to 'Doc/includes')
-rw-r--r--Doc/includes/mp_distributing.py364
1 files changed, 0 insertions, 364 deletions
diff --git a/Doc/includes/mp_distributing.py b/Doc/includes/mp_distributing.py
deleted file mode 100644
index 9a88825..0000000
--- a/Doc/includes/mp_distributing.py
+++ /dev/null
@@ -1,364 +0,0 @@
-#
-# Module to allow spawning of processes on foreign host
-#
-# Depends on `multiprocessing` package -- tested with `processing-0.60`
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
-__all__ = ['Cluster', 'Host', 'get_logger', 'current_process']
-
-#
-# Imports
-#
-
-import sys
-import os
-import tarfile
-import shutil
-import subprocess
-import logging
-import itertools
-import queue
-
-try:
- import pickle as pickle
-except ImportError:
- import pickle
-
-from multiprocessing import Process, current_process, cpu_count
-from multiprocessing import util, managers, connection, forking, pool
-
-#
-# Logging
-#
-
-def get_logger():
- return _logger
-
-_logger = logging.getLogger('distributing')
-_logger.propagate = 0
-
-_formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT)
-_handler = logging.StreamHandler()
-_handler.setFormatter(_formatter)
-_logger.addHandler(_handler)
-
-info = _logger.info
-debug = _logger.debug
-
-#
-# Get number of cpus
-#
-
-try:
- slot_count = cpu_count()
-except NotImplemented:
- slot_count = 1
-
-#
-# Manager type which spawns subprocesses
-#
-
-class HostManager(managers.SyncManager):
- '''
- Manager type used for spawning processes on a (presumably) foreign host
- '''
- def __init__(self, address, authkey):
- managers.SyncManager.__init__(self, address, authkey)
- self._name = 'Host-unknown'
-
- def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
- if hasattr(sys.modules['__main__'], '__file__'):
- main_path = os.path.basename(sys.modules['__main__'].__file__)
- else:
- main_path = None
- data = pickle.dumps((target, args, kwargs))
- p = self._RemoteProcess(data, main_path)
- if name is None:
- temp = self._name.split('Host-')[-1] + '/Process-%s'
- name = temp % ':'.join(map(str, p.get_identity()))
- p.set_name(name)
- return p
-
- @classmethod
- def from_address(cls, address, authkey):
- manager = cls(address, authkey)
- managers.transact(address, authkey, 'dummy')
- manager._state.value = managers.State.STARTED
- manager._name = 'Host-%s:%s' % manager.address
- manager.shutdown = util.Finalize(
- manager, HostManager._finalize_host,
- args=(manager._address, manager._authkey, manager._name),
- exitpriority=-10
- )
- return manager
-
- @staticmethod
- def _finalize_host(address, authkey, name):
- managers.transact(address, authkey, 'shutdown')
-
- def __repr__(self):
- return '<Host(%s)>' % self._name
-
-#
-# Process subclass representing a process on (possibly) a remote machine
-#
-
-class RemoteProcess(Process):
- '''
- Represents a process started on a remote host
- '''
- def __init__(self, data, main_path):
- assert not main_path or os.path.basename(main_path) == main_path
- Process.__init__(self)
- self._data = data
- self._main_path = main_path
-
- def _bootstrap(self):
- forking.prepare({'main_path': self._main_path})
- self._target, self._args, self._kwargs = pickle.loads(self._data)
- return Process._bootstrap(self)
-
- def get_identity(self):
- return self._identity
-
-HostManager.register('_RemoteProcess', RemoteProcess)
-
-#
-# A Pool class that uses a cluster
-#
-
-class DistributedPool(pool.Pool):
-
- def __init__(self, cluster, processes=None, initializer=None, initargs=()):
- self._cluster = cluster
- self.Process = cluster.Process
- pool.Pool.__init__(self, processes or len(cluster),
- initializer, initargs)
-
- def _setup_queues(self):
- self._inqueue = self._cluster._SettableQueue()
- self._outqueue = self._cluster._SettableQueue()
- self._quick_put = self._inqueue.put
- self._quick_get = self._outqueue.get
-
- @staticmethod
- def _help_stuff_finish(inqueue, task_handler, size):
- inqueue.set_contents([None] * size)
-
-#
-# Manager type which starts host managers on other machines
-#
-
-def LocalProcess(**kwds):
- p = Process(**kwds)
- p.set_name('localhost/' + p.name)
- return p
-
-class Cluster(managers.SyncManager):
- '''
- Represents collection of slots running on various hosts.
-
- `Cluster` is a subclass of `SyncManager` so it allows creation of
- various types of shared objects.
- '''
- def __init__(self, hostlist, modules):
- managers.SyncManager.__init__(self, address=('localhost', 0))
- self._hostlist = hostlist
- self._modules = modules
- if __name__ not in modules:
- modules.append(__name__)
- files = [sys.modules[name].__file__ for name in modules]
- for i, file in enumerate(files):
- if file.endswith('.pyc') or file.endswith('.pyo'):
- files[i] = file[:-4] + '.py'
- self._files = [os.path.abspath(file) for file in files]
-
- def start(self):
- managers.SyncManager.start(self)
-
- l = connection.Listener(family='AF_INET', authkey=self._authkey)
-
- for i, host in enumerate(self._hostlist):
- host._start_manager(i, self._authkey, l.address, self._files)
-
- for host in self._hostlist:
- if host.hostname != 'localhost':
- conn = l.accept()
- i, address, cpus = conn.recv()
- conn.close()
- other_host = self._hostlist[i]
- other_host.manager = HostManager.from_address(address,
- self._authkey)
- other_host.slots = other_host.slots or cpus
- other_host.Process = other_host.manager.Process
- else:
- host.slots = host.slots or slot_count
- host.Process = LocalProcess
-
- self._slotlist = [
- Slot(host) for host in self._hostlist for i in range(host.slots)
- ]
- self._slot_iterator = itertools.cycle(self._slotlist)
- self._base_shutdown = self.shutdown
- del self.shutdown
-
- def shutdown(self):
- for host in self._hostlist:
- if host.hostname != 'localhost':
- host.manager.shutdown()
- self._base_shutdown()
-
- def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
- slot = next(self._slot_iterator)
- return slot.Process(
- group=group, target=target, name=name, args=args, kwargs=kwargs
- )
-
- def Pool(self, processes=None, initializer=None, initargs=()):
- return DistributedPool(self, processes, initializer, initargs)
-
- def __getitem__(self, i):
- return self._slotlist[i]
-
- def __len__(self):
- return len(self._slotlist)
-
- def __iter__(self):
- return iter(self._slotlist)
-
-#
-# Queue subclass used by distributed pool
-#
-
-class SettableQueue(queue.Queue):
- def empty(self):
- return not self.queue
- def full(self):
- return self.maxsize > 0 and len(self.queue) == self.maxsize
- def set_contents(self, contents):
- # length of contents must be at least as large as the number of
- # threads which have potentially called get()
- self.not_empty.acquire()
- try:
- self.queue.clear()
- self.queue.extend(contents)
- self.not_empty.notifyAll()
- finally:
- self.not_empty.release()
-
-Cluster.register('_SettableQueue', SettableQueue)
-
-#
-# Class representing a notional cpu in the cluster
-#
-
-class Slot(object):
- def __init__(self, host):
- self.host = host
- self.Process = host.Process
-
-#
-# Host
-#
-
-class Host(object):
- '''
- Represents a host to use as a node in a cluster.
-
- `hostname` gives the name of the host. If hostname is not
- "localhost" then ssh is used to log in to the host. To log in as
- a different user use a host name of the form
- "username@somewhere.org"
-
- `slots` is used to specify the number of slots for processes on
- the host. This affects how often processes will be allocated to
- this host. Normally this should be equal to the number of cpus on
- that host.
- '''
- def __init__(self, hostname, slots=None):
- self.hostname = hostname
- self.slots = slots
-
- def _start_manager(self, index, authkey, address, files):
- if self.hostname != 'localhost':
- tempdir = copy_to_remote_temporary_directory(self.hostname, files)
- debug('startup files copied to %s:%s', self.hostname, tempdir)
- p = subprocess.Popen(
- ['ssh', self.hostname, 'python', '-c',
- '"import os; os.chdir(%r); '
- 'from distributing import main; main()"' % tempdir],
- stdin=subprocess.PIPE
- )
- data = dict(
- name='BoostrappingHost', index=index,
- dist_log_level=_logger.getEffectiveLevel(),
- dir=tempdir, authkey=str(authkey), parent_address=address
- )
- pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL)
- p.stdin.close()
-
-#
-# Copy files to remote directory, returning name of directory
-#
-
-unzip_code = '''"
-import tempfile, os, sys, tarfile
-tempdir = tempfile.mkdtemp(prefix='distrib-')
-os.chdir(tempdir)
-tf = tarfile.open(fileobj=sys.stdin, mode='r|gz')
-for ti in tf:
- tf.extract(ti)
-print tempdir
-"'''
-
-def copy_to_remote_temporary_directory(host, files):
- p = subprocess.Popen(
- ['ssh', host, 'python', '-c', unzip_code],
- stdout=subprocess.PIPE, stdin=subprocess.PIPE
- )
- tf = tarfile.open(fileobj=p.stdin, mode='w|gz')
- for name in files:
- tf.add(name, os.path.basename(name))
- tf.close()
- p.stdin.close()
- return p.stdout.read().rstrip()
-
-#
-# Code which runs a host manager
-#
-
-def main():
- # get data from parent over stdin
- data = pickle.load(sys.stdin)
- sys.stdin.close()
-
- # set some stuff
- _logger.setLevel(data['dist_log_level'])
- forking.prepare(data)
-
- # create server for a `HostManager` object
- server = managers.Server(HostManager._registry, ('', 0), data['authkey'])
- current_process()._server = server
-
- # report server address and number of cpus back to parent
- conn = connection.Client(data['parent_address'], authkey=data['authkey'])
- conn.send((data['index'], server.address, slot_count))
- conn.close()
-
- # set name etc
- current_process().set_name('Host-%s:%s' % server.address)
- util._run_after_forkers()
-
- # register a cleanup function
- def cleanup(directory):
- debug('removing directory %s', directory)
- shutil.rmtree(directory)
- debug('shutting down host manager')
- util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0)
-
- # start host manager
- debug('remote host manager starting in %s', data['dir'])
- server.serve_forever()