summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing
diff options
context:
space:
mode:
authorBenjamin Peterson <benjamin@python.org>2008-06-13 19:20:48 (GMT)
committerBenjamin Peterson <benjamin@python.org>2008-06-13 19:20:48 (GMT)
commit7f03ea77bf43257789469b5cbc16982eb0a63b0f (patch)
treef8366c7dfaff9cac4ea1a186e67340535e80f53f /Lib/multiprocessing
parentdfd79494ce78868c937dc91eddd630cbdcae5611 (diff)
downloadcpython-7f03ea77bf43257789469b5cbc16982eb0a63b0f.zip
cpython-7f03ea77bf43257789469b5cbc16982eb0a63b0f.tar.gz
cpython-7f03ea77bf43257789469b5cbc16982eb0a63b0f.tar.bz2
darn! I converted half of the files the wrong way.
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r--Lib/multiprocessing/__init__.py542
-rw-r--r--Lib/multiprocessing/connection.py850
-rw-r--r--Lib/multiprocessing/forking.py858
-rw-r--r--Lib/multiprocessing/heap.py402
-rw-r--r--Lib/multiprocessing/managers.py2184
-rw-r--r--Lib/multiprocessing/pool.py1192
-rw-r--r--Lib/multiprocessing/process.py604
-rw-r--r--Lib/multiprocessing/queues.py712
-rw-r--r--Lib/multiprocessing/reduction.py380
-rw-r--r--Lib/multiprocessing/sharedctypes.py468
-rw-r--r--Lib/multiprocessing/synchronize.py588
-rw-r--r--Lib/multiprocessing/util.py672
12 files changed, 4726 insertions, 4726 deletions
diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py
index decb2ad..f965056 100644
--- a/Lib/multiprocessing/__init__.py
+++ b/Lib/multiprocessing/__init__.py
@@ -1,271 +1,271 @@
-#
-# Package analogous to 'threading.py' but using processes
-#
-# multiprocessing/__init__.py
-#
-# This package is intended to duplicate the functionality (and much of
-# the API) of threading.py but uses processes instead of threads. A
-# subpackage 'multiprocessing.dummy' has the same API but is a simple
-# wrapper for 'threading'.
-#
-# Try calling `multiprocessing.doc.main()` to read the html
-# documentation in in a webbrowser.
-#
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions
-# are met:
-#
-# 1. Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions and the following disclaimer in the
-# documentation and/or other materials provided with the distribution.
-# 3. Neither the name of author nor the names of any contributors may be
-# used to endorse or promote products derived from this software
-# without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
-# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
-# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
-# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
-# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-#
-
-__version__ = '0.70a1'
-
-__all__ = [
- 'Process', 'current_process', 'active_children', 'freeze_support',
- 'Manager', 'Pipe', 'cpu_count', 'log_to_stderr', 'get_logger',
- 'allow_connection_pickling', 'BufferTooShort', 'TimeoutError',
- 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition',
- 'Event', 'Queue', 'JoinableQueue', 'Pool', 'Value', 'Array',
- 'RawValue', 'RawArray'
- ]
-
-__author__ = 'R. Oudkerk (r.m.oudkerk@gmail.com)'
-
-#
-# Imports
-#
-
-import os
-import sys
-
-from multiprocessing.process import Process, current_process, active_children
-
-#
-# Exceptions
-#
-
-class ProcessError(Exception):
- pass
-
-class BufferTooShort(ProcessError):
- pass
-
-class TimeoutError(ProcessError):
- pass
-
-class AuthenticationError(ProcessError):
- pass
-
-# This is down here because _multiprocessing uses BufferTooShort
-import _multiprocessing
-
-#
-# Definitions not depending on native semaphores
-#
-
-def Manager():
- '''
- Returns a manager associated with a running server process
-
- The managers methods such as `Lock()`, `Condition()` and `Queue()`
- can be used to create shared objects.
- '''
- from multiprocessing.managers import SyncManager
- m = SyncManager()
- m.start()
- return m
-
-def Pipe(duplex=True):
- '''
- Returns two connection object connected by a pipe
- '''
- from multiprocessing.connection import Pipe
- return Pipe(duplex)
-
-def cpu_count():
- '''
- Returns the number of CPUs in the system
- '''
- if sys.platform == 'win32':
- try:
- num = int(os.environ['NUMBER_OF_PROCESSORS'])
- except (ValueError, KeyError):
- num = 0
- elif sys.platform == 'darwin':
- try:
- num = int(os.popen('sysctl -n hw.ncpu').read())
- except ValueError:
- num = 0
- else:
- try:
- num = os.sysconf('SC_NPROCESSORS_ONLN')
- except (ValueError, OSError, AttributeError):
- num = 0
-
- if num >= 1:
- return num
- else:
- raise NotImplementedError('cannot determine number of cpus')
-
-def freeze_support():
- '''
- Check whether this is a fake forked process in a frozen executable.
- If so then run code specified by commandline and exit.
- '''
- if sys.platform == 'win32' and getattr(sys, 'frozen', False):
- from multiprocessing.forking import freeze_support
- freeze_support()
-
-def get_logger():
- '''
- Return package logger -- if it does not already exist then it is created
- '''
- from multiprocessing.util import get_logger
- return get_logger()
-
-def log_to_stderr(level=None):
- '''
- Turn on logging and add a handler which prints to stderr
- '''
- from multiprocessing.util import log_to_stderr
- return log_to_stderr(level)
-
-def allow_connection_pickling():
- '''
- Install support for sending connections and sockets between processes
- '''
- from multiprocessing import reduction
-
-#
-# Definitions depending on native semaphores
-#
-
-def Lock():
- '''
- Returns a non-recursive lock object
- '''
- from multiprocessing.synchronize import Lock
- return Lock()
-
-def RLock():
- '''
- Returns a recursive lock object
- '''
- from multiprocessing.synchronize import RLock
- return RLock()
-
-def Condition(lock=None):
- '''
- Returns a condition object
- '''
- from multiprocessing.synchronize import Condition
- return Condition(lock)
-
-def Semaphore(value=1):
- '''
- Returns a semaphore object
- '''
- from multiprocessing.synchronize import Semaphore
- return Semaphore(value)
-
-def BoundedSemaphore(value=1):
- '''
- Returns a bounded semaphore object
- '''
- from multiprocessing.synchronize import BoundedSemaphore
- return BoundedSemaphore(value)
-
-def Event():
- '''
- Returns an event object
- '''
- from multiprocessing.synchronize import Event
- return Event()
-
-def Queue(maxsize=0):
- '''
- Returns a queue object
- '''
- from multiprocessing.queues import Queue
- return Queue(maxsize)
-
-def JoinableQueue(maxsize=0):
- '''
- Returns a queue object
- '''
- from multiprocessing.queues import JoinableQueue
- return JoinableQueue(maxsize)
-
-def Pool(processes=None, initializer=None, initargs=()):
- '''
- Returns a process pool object
- '''
- from multiprocessing.pool import Pool
- return Pool(processes, initializer, initargs)
-
-def RawValue(typecode_or_type, *args):
- '''
- Returns a shared object
- '''
- from multiprocessing.sharedctypes import RawValue
- return RawValue(typecode_or_type, *args)
-
-def RawArray(typecode_or_type, size_or_initializer):
- '''
- Returns a shared array
- '''
- from multiprocessing.sharedctypes import RawArray
- return RawArray(typecode_or_type, size_or_initializer)
-
-def Value(typecode_or_type, *args, **kwds):
- '''
- Returns a synchronized shared object
- '''
- from multiprocessing.sharedctypes import Value
- return Value(typecode_or_type, *args, **kwds)
-
-def Array(typecode_or_type, size_or_initializer, **kwds):
- '''
- Returns a synchronized shared array
- '''
- from multiprocessing.sharedctypes import Array
- return Array(typecode_or_type, size_or_initializer, **kwds)
-
-#
-#
-#
-
-if sys.platform == 'win32':
-
- def set_executable(executable):
- '''
- Sets the path to a python.exe or pythonw.exe binary used to run
- child processes on Windows instead of sys.executable.
- Useful for people embedding Python.
- '''
- from multiprocessing.forking import set_executable
- set_executable(executable)
-
- __all__ += ['set_executable']
+#
+# Package analogous to 'threading.py' but using processes
+#
+# multiprocessing/__init__.py
+#
+# This package is intended to duplicate the functionality (and much of
+# the API) of threading.py but uses processes instead of threads. A
+# subpackage 'multiprocessing.dummy' has the same API but is a simple
+# wrapper for 'threading'.
+#
+# Try calling `multiprocessing.doc.main()` to read the html
+# documentation in in a webbrowser.
+#
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+# 3. Neither the name of author nor the names of any contributors may be
+# used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+#
+
+__version__ = '0.70a1'
+
+__all__ = [
+ 'Process', 'current_process', 'active_children', 'freeze_support',
+ 'Manager', 'Pipe', 'cpu_count', 'log_to_stderr', 'get_logger',
+ 'allow_connection_pickling', 'BufferTooShort', 'TimeoutError',
+ 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition',
+ 'Event', 'Queue', 'JoinableQueue', 'Pool', 'Value', 'Array',
+ 'RawValue', 'RawArray'
+ ]
+
+__author__ = 'R. Oudkerk (r.m.oudkerk@gmail.com)'
+
+#
+# Imports
+#
+
+import os
+import sys
+
+from multiprocessing.process import Process, current_process, active_children
+
+#
+# Exceptions
+#
+
+class ProcessError(Exception):
+ pass
+
+class BufferTooShort(ProcessError):
+ pass
+
+class TimeoutError(ProcessError):
+ pass
+
+class AuthenticationError(ProcessError):
+ pass
+
+# This is down here because _multiprocessing uses BufferTooShort
+import _multiprocessing
+
+#
+# Definitions not depending on native semaphores
+#
+
+def Manager():
+ '''
+ Returns a manager associated with a running server process
+
+ The managers methods such as `Lock()`, `Condition()` and `Queue()`
+ can be used to create shared objects.
+ '''
+ from multiprocessing.managers import SyncManager
+ m = SyncManager()
+ m.start()
+ return m
+
+def Pipe(duplex=True):
+ '''
+ Returns two connection object connected by a pipe
+ '''
+ from multiprocessing.connection import Pipe
+ return Pipe(duplex)
+
+def cpu_count():
+ '''
+ Returns the number of CPUs in the system
+ '''
+ if sys.platform == 'win32':
+ try:
+ num = int(os.environ['NUMBER_OF_PROCESSORS'])
+ except (ValueError, KeyError):
+ num = 0
+ elif sys.platform == 'darwin':
+ try:
+ num = int(os.popen('sysctl -n hw.ncpu').read())
+ except ValueError:
+ num = 0
+ else:
+ try:
+ num = os.sysconf('SC_NPROCESSORS_ONLN')
+ except (ValueError, OSError, AttributeError):
+ num = 0
+
+ if num >= 1:
+ return num
+ else:
+ raise NotImplementedError('cannot determine number of cpus')
+
+def freeze_support():
+ '''
+ Check whether this is a fake forked process in a frozen executable.
+ If so then run code specified by commandline and exit.
+ '''
+ if sys.platform == 'win32' and getattr(sys, 'frozen', False):
+ from multiprocessing.forking import freeze_support
+ freeze_support()
+
+def get_logger():
+ '''
+ Return package logger -- if it does not already exist then it is created
+ '''
+ from multiprocessing.util import get_logger
+ return get_logger()
+
+def log_to_stderr(level=None):
+ '''
+ Turn on logging and add a handler which prints to stderr
+ '''
+ from multiprocessing.util import log_to_stderr
+ return log_to_stderr(level)
+
+def allow_connection_pickling():
+ '''
+ Install support for sending connections and sockets between processes
+ '''
+ from multiprocessing import reduction
+
+#
+# Definitions depending on native semaphores
+#
+
+def Lock():
+ '''
+ Returns a non-recursive lock object
+ '''
+ from multiprocessing.synchronize import Lock
+ return Lock()
+
+def RLock():
+ '''
+ Returns a recursive lock object
+ '''
+ from multiprocessing.synchronize import RLock
+ return RLock()
+
+def Condition(lock=None):
+ '''
+ Returns a condition object
+ '''
+ from multiprocessing.synchronize import Condition
+ return Condition(lock)
+
+def Semaphore(value=1):
+ '''
+ Returns a semaphore object
+ '''
+ from multiprocessing.synchronize import Semaphore
+ return Semaphore(value)
+
+def BoundedSemaphore(value=1):
+ '''
+ Returns a bounded semaphore object
+ '''
+ from multiprocessing.synchronize import BoundedSemaphore
+ return BoundedSemaphore(value)
+
+def Event():
+ '''
+ Returns an event object
+ '''
+ from multiprocessing.synchronize import Event
+ return Event()
+
+def Queue(maxsize=0):
+ '''
+ Returns a queue object
+ '''
+ from multiprocessing.queues import Queue
+ return Queue(maxsize)
+
+def JoinableQueue(maxsize=0):
+ '''
+ Returns a queue object
+ '''
+ from multiprocessing.queues import JoinableQueue
+ return JoinableQueue(maxsize)
+
+def Pool(processes=None, initializer=None, initargs=()):
+ '''
+ Returns a process pool object
+ '''
+ from multiprocessing.pool import Pool
+ return Pool(processes, initializer, initargs)
+
+def RawValue(typecode_or_type, *args):
+ '''
+ Returns a shared object
+ '''
+ from multiprocessing.sharedctypes import RawValue
+ return RawValue(typecode_or_type, *args)
+
+def RawArray(typecode_or_type, size_or_initializer):
+ '''
+ Returns a shared array
+ '''
+ from multiprocessing.sharedctypes import RawArray
+ return RawArray(typecode_or_type, size_or_initializer)
+
+def Value(typecode_or_type, *args, **kwds):
+ '''
+ Returns a synchronized shared object
+ '''
+ from multiprocessing.sharedctypes import Value
+ return Value(typecode_or_type, *args, **kwds)
+
+def Array(typecode_or_type, size_or_initializer, **kwds):
+ '''
+ Returns a synchronized shared array
+ '''
+ from multiprocessing.sharedctypes import Array
+ return Array(typecode_or_type, size_or_initializer, **kwds)
+
+#
+#
+#
+
+if sys.platform == 'win32':
+
+ def set_executable(executable):
+ '''
+ Sets the path to a python.exe or pythonw.exe binary used to run
+ child processes on Windows instead of sys.executable.
+ Useful for people embedding Python.
+ '''
+ from multiprocessing.forking import set_executable
+ set_executable(executable)
+
+ __all__ += ['set_executable']
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
index f5a3301..cfb7201 100644
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -1,425 +1,425 @@
-#
-# A higher level module for using sockets (or Windows named pipes)
-#
-# multiprocessing/connection.py
-#
-# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
-#
-
-__all__ = [ 'Client', 'Listener', 'Pipe' ]
-
-import os
-import sys
-import socket
-import time
-import tempfile
-import itertools
-
-import _multiprocessing
-from multiprocessing import current_process
-from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
-from multiprocessing.forking import duplicate, close
-
-
-#
-#
-#
-
-BUFSIZE = 8192
-
-_mmap_counter = itertools.count()
-
-default_family = 'AF_INET'
-families = ['AF_INET']
-
-if hasattr(socket, 'AF_UNIX'):
- default_family = 'AF_UNIX'
- families += ['AF_UNIX']
-
-if sys.platform == 'win32':
- default_family = 'AF_PIPE'
- families += ['AF_PIPE']
-
-#
-#
-#
-
-def arbitrary_address(family):
- '''
- Return an arbitrary free address for the given family
- '''
- if family == 'AF_INET':
- return ('localhost', 0)
- elif family == 'AF_UNIX':
- return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
- elif family == 'AF_PIPE':
- return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
- (os.getpid(), _mmap_counter.next()))
- else:
- raise ValueError('unrecognized family')
-
-
-def address_type(address):
- '''
- Return the types of the address
-
- This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
- '''
- if type(address) == tuple:
- return 'AF_INET'
- elif type(address) is str and address.startswith('\\\\'):
- return 'AF_PIPE'
- elif type(address) is str:
- return 'AF_UNIX'
- else:
- raise ValueError('address type of %r unrecognized' % address)
-
-#
-# Public functions
-#
-
-class Listener(object):
- '''
- Returns a listener object.
-
- This is a wrapper for a bound socket which is 'listening' for
- connections, or for a Windows named pipe.
- '''
- def __init__(self, address=None, family=None, backlog=1, authkey=None):
- family = family or (address and address_type(address)) \
- or default_family
- address = address or arbitrary_address(family)
-
- if family == 'AF_PIPE':
- self._listener = PipeListener(address, backlog)
- else:
- self._listener = SocketListener(address, family, backlog)
-
- if authkey is not None and not isinstance(authkey, bytes):
- raise TypeError, 'authkey should be a byte string'
-
- self._authkey = authkey
-
- def accept(self):
- '''
- Accept a connection on the bound socket or named pipe of `self`.
-
- Returns a `Connection` object.
- '''
- c = self._listener.accept()
- if self._authkey:
- deliver_challenge(c, self._authkey)
- answer_challenge(c, self._authkey)
- return c
-
- def close(self):
- '''
- Close the bound socket or named pipe of `self`.
- '''
- return self._listener.close()
-
- address = property(lambda self: self._listener._address)
- last_accepted = property(lambda self: self._listener._last_accepted)
-
-
-def Client(address, family=None, authkey=None):
- '''
- Returns a connection to the address of a `Listener`
- '''
- family = family or address_type(address)
- if family == 'AF_PIPE':
- c = PipeClient(address)
- else:
- c = SocketClient(address)
-
- if authkey is not None and not isinstance(authkey, bytes):
- raise TypeError, 'authkey should be a byte string'
-
- if authkey is not None:
- answer_challenge(c, authkey)
- deliver_challenge(c, authkey)
-
- return c
-
-
-if sys.platform != 'win32':
-
- def Pipe(duplex=True):
- '''
- Returns pair of connection objects at either end of a pipe
- '''
- if duplex:
- s1, s2 = socket.socketpair()
- c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
- c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
- s1.close()
- s2.close()
- else:
- fd1, fd2 = os.pipe()
- c1 = _multiprocessing.Connection(fd1, writable=False)
- c2 = _multiprocessing.Connection(fd2, readable=False)
-
- return c1, c2
-
-else:
-
- from ._multiprocessing import win32
-
- def Pipe(duplex=True):
- '''
- Returns pair of connection objects at either end of a pipe
- '''
- address = arbitrary_address('AF_PIPE')
- if duplex:
- openmode = win32.PIPE_ACCESS_DUPLEX
- access = win32.GENERIC_READ | win32.GENERIC_WRITE
- obsize, ibsize = BUFSIZE, BUFSIZE
- else:
- openmode = win32.PIPE_ACCESS_INBOUND
- access = win32.GENERIC_WRITE
- obsize, ibsize = 0, BUFSIZE
-
- h1 = win32.CreateNamedPipe(
- address, openmode,
- win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
- win32.PIPE_WAIT,
- 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
- )
- h2 = win32.CreateFile(
- address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
- )
- win32.SetNamedPipeHandleState(
- h2, win32.PIPE_READMODE_MESSAGE, None, None
- )
-
- try:
- win32.ConnectNamedPipe(h1, win32.NULL)
- except WindowsError, e:
- if e.args[0] != win32.ERROR_PIPE_CONNECTED:
- raise
-
- c1 = _multiprocessing.PipeConnection(h1, writable=duplex)
- c2 = _multiprocessing.PipeConnection(h2, readable=duplex)
-
- return c1, c2
-
-#
-# Definitions for connections based on sockets
-#
-
-class SocketListener(object):
- '''
- Represtation of a socket which is bound to an address and listening
- '''
- def __init__(self, address, family, backlog=1):
- self._socket = socket.socket(getattr(socket, family))
- self._socket.bind(address)
- self._socket.listen(backlog)
- address = self._socket.getsockname()
- if type(address) is tuple:
- address = (socket.getfqdn(address[0]),) + address[1:]
- self._address = address
- self._family = family
- self._last_accepted = None
-
- sub_debug('listener bound to address %r', self._address)
-
- if family == 'AF_UNIX':
- self._unlink = Finalize(
- self, os.unlink, args=(self._address,), exitpriority=0
- )
- else:
- self._unlink = None
-
- def accept(self):
- s, self._last_accepted = self._socket.accept()
- fd = duplicate(s.fileno())
- conn = _multiprocessing.Connection(fd)
- s.close()
- return conn
-
- def close(self):
- self._socket.close()
- if self._unlink is not None:
- self._unlink()
-
-
-def SocketClient(address):
- '''
- Return a connection object connected to the socket given by `address`
- '''
- family = address_type(address)
- s = socket.socket( getattr(socket, family) )
-
- while 1:
- try:
- s.connect(address)
- except socket.error, e:
- if e.args[0] != 10061: # 10061 => connection refused
- debug('failed to connect to address %s', address)
- raise
- time.sleep(0.01)
- else:
- break
- else:
- raise
-
- fd = duplicate(s.fileno())
- conn = _multiprocessing.Connection(fd)
- s.close()
- return conn
-
-#
-# Definitions for connections based on named pipes
-#
-
-if sys.platform == 'win32':
-
- class PipeListener(object):
- '''
- Representation of a named pipe
- '''
- def __init__(self, address, backlog=None):
- self._address = address
- handle = win32.CreateNamedPipe(
- address, win32.PIPE_ACCESS_DUPLEX,
- win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
- win32.PIPE_WAIT,
- win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
- win32.NMPWAIT_WAIT_FOREVER, win32.NULL
- )
- self._handle_queue = [handle]
- self._last_accepted = None
-
- sub_debug('listener created with address=%r', self._address)
-
- self.close = Finalize(
- self, PipeListener._finalize_pipe_listener,
- args=(self._handle_queue, self._address), exitpriority=0
- )
-
- def accept(self):
- newhandle = win32.CreateNamedPipe(
- self._address, win32.PIPE_ACCESS_DUPLEX,
- win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
- win32.PIPE_WAIT,
- win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
- win32.NMPWAIT_WAIT_FOREVER, win32.NULL
- )
- self._handle_queue.append(newhandle)
- handle = self._handle_queue.pop(0)
- try:
- win32.ConnectNamedPipe(handle, win32.NULL)
- except WindowsError, e:
- if e.args[0] != win32.ERROR_PIPE_CONNECTED:
- raise
- return _multiprocessing.PipeConnection(handle)
-
- @staticmethod
- def _finalize_pipe_listener(queue, address):
- sub_debug('closing listener with address=%r', address)
- for handle in queue:
- close(handle)
-
- def PipeClient(address):
- '''
- Return a connection object connected to the pipe given by `address`
- '''
- while 1:
- try:
- win32.WaitNamedPipe(address, 1000)
- h = win32.CreateFile(
- address, win32.GENERIC_READ | win32.GENERIC_WRITE,
- 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
- )
- except WindowsError, e:
- if e.args[0] not in (win32.ERROR_SEM_TIMEOUT,
- win32.ERROR_PIPE_BUSY):
- raise
- else:
- break
- else:
- raise
-
- win32.SetNamedPipeHandleState(
- h, win32.PIPE_READMODE_MESSAGE, None, None
- )
- return _multiprocessing.PipeConnection(h)
-
-#
-# Authentication stuff
-#
-
-MESSAGE_LENGTH = 20
-
-CHALLENGE = '#CHALLENGE#'
-WELCOME = '#WELCOME#'
-FAILURE = '#FAILURE#'
-
-if sys.version_info >= (3, 0): # XXX can use bytes literals in 2.6/3.0
- CHALLENGE = CHALLENGE.encode('ascii')
- WELCOME = WELCOME.encode('ascii')
- FAILURE = FAILURE.encode('ascii')
-
-def deliver_challenge(connection, authkey):
- import hmac
- assert isinstance(authkey, bytes)
- message = os.urandom(MESSAGE_LENGTH)
- connection.send_bytes(CHALLENGE + message)
- digest = hmac.new(authkey, message).digest()
- response = connection.recv_bytes(256) # reject large message
- if response == digest:
- connection.send_bytes(WELCOME)
- else:
- connection.send_bytes(FAILURE)
- raise AuthenticationError('digest received was wrong')
-
-def answer_challenge(connection, authkey):
- import hmac
- assert isinstance(authkey, bytes)
- message = connection.recv_bytes(256) # reject large message
- assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
- message = message[len(CHALLENGE):]
- digest = hmac.new(authkey, message).digest()
- connection.send_bytes(digest)
- response = connection.recv_bytes(256) # reject large message
- if response != WELCOME:
- raise AuthenticationError('digest sent was rejected')
-
-#
-# Support for using xmlrpclib for serialization
-#
-
-class ConnectionWrapper(object):
- def __init__(self, conn, dumps, loads):
- self._conn = conn
- self._dumps = dumps
- self._loads = loads
- for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
- obj = getattr(conn, attr)
- setattr(self, attr, obj)
- def send(self, obj):
- s = self._dumps(obj)
- self._conn.send_bytes(s)
- def recv(self):
- s = self._conn.recv_bytes()
- return self._loads(s)
-
-def _xml_dumps(obj):
- return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8')
-
-def _xml_loads(s):
- (obj,), method = xmlrpclib.loads(s.decode('utf8'))
- return obj
-
-class XmlListener(Listener):
- def accept(self):
- global xmlrpclib
- import xmlrpclib
- obj = Listener.accept(self)
- return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
-
-def XmlClient(*args, **kwds):
- global xmlrpclib
- import xmlrpclib
- return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
+#
+# A higher level module for using sockets (or Windows named pipes)
+#
+# multiprocessing/connection.py
+#
+# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+#
+
+__all__ = [ 'Client', 'Listener', 'Pipe' ]
+
+import os
+import sys
+import socket
+import time
+import tempfile
+import itertools
+
+import _multiprocessing
+from multiprocessing import current_process
+from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
+from multiprocessing.forking import duplicate, close
+
+
+#
+#
+#
+
+BUFSIZE = 8192
+
+_mmap_counter = itertools.count()
+
+default_family = 'AF_INET'
+families = ['AF_INET']
+
+if hasattr(socket, 'AF_UNIX'):
+ default_family = 'AF_UNIX'
+ families += ['AF_UNIX']
+
+if sys.platform == 'win32':
+ default_family = 'AF_PIPE'
+ families += ['AF_PIPE']
+
+#
+#
+#
+
+def arbitrary_address(family):
+ '''
+ Return an arbitrary free address for the given family
+ '''
+ if family == 'AF_INET':
+ return ('localhost', 0)
+ elif family == 'AF_UNIX':
+ return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
+ elif family == 'AF_PIPE':
+ return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
+ (os.getpid(), _mmap_counter.next()))
+ else:
+ raise ValueError('unrecognized family')
+
+
+def address_type(address):
+ '''
+ Return the types of the address
+
+ This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
+ '''
+ if type(address) == tuple:
+ return 'AF_INET'
+ elif type(address) is str and address.startswith('\\\\'):
+ return 'AF_PIPE'
+ elif type(address) is str:
+ return 'AF_UNIX'
+ else:
+ raise ValueError('address type of %r unrecognized' % address)
+
+#
+# Public functions
+#
+
+class Listener(object):
+ '''
+ Returns a listener object.
+
+ This is a wrapper for a bound socket which is 'listening' for
+ connections, or for a Windows named pipe.
+ '''
+ def __init__(self, address=None, family=None, backlog=1, authkey=None):
+ family = family or (address and address_type(address)) \
+ or default_family
+ address = address or arbitrary_address(family)
+
+ if family == 'AF_PIPE':
+ self._listener = PipeListener(address, backlog)
+ else:
+ self._listener = SocketListener(address, family, backlog)
+
+ if authkey is not None and not isinstance(authkey, bytes):
+ raise TypeError, 'authkey should be a byte string'
+
+ self._authkey = authkey
+
+ def accept(self):
+ '''
+ Accept a connection on the bound socket or named pipe of `self`.
+
+ Returns a `Connection` object.
+ '''
+ c = self._listener.accept()
+ if self._authkey:
+ deliver_challenge(c, self._authkey)
+ answer_challenge(c, self._authkey)
+ return c
+
+ def close(self):
+ '''
+ Close the bound socket or named pipe of `self`.
+ '''
+ return self._listener.close()
+
+ address = property(lambda self: self._listener._address)
+ last_accepted = property(lambda self: self._listener._last_accepted)
+
+
+def Client(address, family=None, authkey=None):
+ '''
+ Returns a connection to the address of a `Listener`
+ '''
+ family = family or address_type(address)
+ if family == 'AF_PIPE':
+ c = PipeClient(address)
+ else:
+ c = SocketClient(address)
+
+ if authkey is not None and not isinstance(authkey, bytes):
+ raise TypeError, 'authkey should be a byte string'
+
+ if authkey is not None:
+ answer_challenge(c, authkey)
+ deliver_challenge(c, authkey)
+
+ return c
+
+
+if sys.platform != 'win32':
+
+ def Pipe(duplex=True):
+ '''
+ Returns pair of connection objects at either end of a pipe
+ '''
+ if duplex:
+ s1, s2 = socket.socketpair()
+ c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
+ c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
+ s1.close()
+ s2.close()
+ else:
+ fd1, fd2 = os.pipe()
+ c1 = _multiprocessing.Connection(fd1, writable=False)
+ c2 = _multiprocessing.Connection(fd2, readable=False)
+
+ return c1, c2
+
+else:
+
+ from ._multiprocessing import win32
+
+ def Pipe(duplex=True):
+ '''
+ Returns pair of connection objects at either end of a pipe
+ '''
+ address = arbitrary_address('AF_PIPE')
+ if duplex:
+ openmode = win32.PIPE_ACCESS_DUPLEX
+ access = win32.GENERIC_READ | win32.GENERIC_WRITE
+ obsize, ibsize = BUFSIZE, BUFSIZE
+ else:
+ openmode = win32.PIPE_ACCESS_INBOUND
+ access = win32.GENERIC_WRITE
+ obsize, ibsize = 0, BUFSIZE
+
+ h1 = win32.CreateNamedPipe(
+ address, openmode,
+ win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
+ win32.PIPE_WAIT,
+ 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
+ )
+ h2 = win32.CreateFile(
+ address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
+ )
+ win32.SetNamedPipeHandleState(
+ h2, win32.PIPE_READMODE_MESSAGE, None, None
+ )
+
+ try:
+ win32.ConnectNamedPipe(h1, win32.NULL)
+ except WindowsError, e:
+ if e.args[0] != win32.ERROR_PIPE_CONNECTED:
+ raise
+
+ c1 = _multiprocessing.PipeConnection(h1, writable=duplex)
+ c2 = _multiprocessing.PipeConnection(h2, readable=duplex)
+
+ return c1, c2
+
+#
+# Definitions for connections based on sockets
+#
+
+class SocketListener(object):
+ '''
+ Represtation of a socket which is bound to an address and listening
+ '''
+ def __init__(self, address, family, backlog=1):
+ self._socket = socket.socket(getattr(socket, family))
+ self._socket.bind(address)
+ self._socket.listen(backlog)
+ address = self._socket.getsockname()
+ if type(address) is tuple:
+ address = (socket.getfqdn(address[0]),) + address[1:]
+ self._address = address
+ self._family = family
+ self._last_accepted = None
+
+ sub_debug('listener bound to address %r', self._address)
+
+ if family == 'AF_UNIX':
+ self._unlink = Finalize(
+ self, os.unlink, args=(self._address,), exitpriority=0
+ )
+ else:
+ self._unlink = None
+
+ def accept(self):
+ s, self._last_accepted = self._socket.accept()
+ fd = duplicate(s.fileno())
+ conn = _multiprocessing.Connection(fd)
+ s.close()
+ return conn
+
+ def close(self):
+ self._socket.close()
+ if self._unlink is not None:
+ self._unlink()
+
+
+def SocketClient(address):
+ '''
+ Return a connection object connected to the socket given by `address`
+ '''
+ family = address_type(address)
+ s = socket.socket( getattr(socket, family) )
+
+ while 1:
+ try:
+ s.connect(address)
+ except socket.error, e:
+ if e.args[0] != 10061: # 10061 => connection refused
+ debug('failed to connect to address %s', address)
+ raise
+ time.sleep(0.01)
+ else:
+ break
+ else:
+ raise
+
+ fd = duplicate(s.fileno())
+ conn = _multiprocessing.Connection(fd)
+ s.close()
+ return conn
+
+#
+# Definitions for connections based on named pipes
+#
+
+if sys.platform == 'win32':
+
+ class PipeListener(object):
+ '''
+ Representation of a named pipe
+ '''
+ def __init__(self, address, backlog=None):
+ self._address = address
+ handle = win32.CreateNamedPipe(
+ address, win32.PIPE_ACCESS_DUPLEX,
+ win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
+ win32.PIPE_WAIT,
+ win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
+ win32.NMPWAIT_WAIT_FOREVER, win32.NULL
+ )
+ self._handle_queue = [handle]
+ self._last_accepted = None
+
+ sub_debug('listener created with address=%r', self._address)
+
+ self.close = Finalize(
+ self, PipeListener._finalize_pipe_listener,
+ args=(self._handle_queue, self._address), exitpriority=0
+ )
+
+ def accept(self):
+ newhandle = win32.CreateNamedPipe(
+ self._address, win32.PIPE_ACCESS_DUPLEX,
+ win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
+ win32.PIPE_WAIT,
+ win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
+ win32.NMPWAIT_WAIT_FOREVER, win32.NULL
+ )
+ self._handle_queue.append(newhandle)
+ handle = self._handle_queue.pop(0)
+ try:
+ win32.ConnectNamedPipe(handle, win32.NULL)
+ except WindowsError, e:
+ if e.args[0] != win32.ERROR_PIPE_CONNECTED:
+ raise
+ return _multiprocessing.PipeConnection(handle)
+
+ @staticmethod
+ def _finalize_pipe_listener(queue, address):
+ sub_debug('closing listener with address=%r', address)
+ for handle in queue:
+ close(handle)
+
+ def PipeClient(address):
+ '''
+ Return a connection object connected to the pipe given by `address`
+ '''
+ while 1:
+ try:
+ win32.WaitNamedPipe(address, 1000)
+ h = win32.CreateFile(
+ address, win32.GENERIC_READ | win32.GENERIC_WRITE,
+ 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
+ )
+ except WindowsError, e:
+ if e.args[0] not in (win32.ERROR_SEM_TIMEOUT,
+ win32.ERROR_PIPE_BUSY):
+ raise
+ else:
+ break
+ else:
+ raise
+
+ win32.SetNamedPipeHandleState(
+ h, win32.PIPE_READMODE_MESSAGE, None, None
+ )
+ return _multiprocessing.PipeConnection(h)
+
+#
+# Authentication stuff
+#
+
+MESSAGE_LENGTH = 20
+
+CHALLENGE = '#CHALLENGE#'
+WELCOME = '#WELCOME#'
+FAILURE = '#FAILURE#'
+
+if sys.version_info >= (3, 0): # XXX can use bytes literals in 2.6/3.0
+ CHALLENGE = CHALLENGE.encode('ascii')
+ WELCOME = WELCOME.encode('ascii')
+ FAILURE = FAILURE.encode('ascii')
+
+def deliver_challenge(connection, authkey):
+ import hmac
+ assert isinstance(authkey, bytes)
+ message = os.urandom(MESSAGE_LENGTH)
+ connection.send_bytes(CHALLENGE + message)
+ digest = hmac.new(authkey, message).digest()
+ response = connection.recv_bytes(256) # reject large message
+ if response == digest:
+ connection.send_bytes(WELCOME)
+ else:
+ connection.send_bytes(FAILURE)
+ raise AuthenticationError('digest received was wrong')
+
+def answer_challenge(connection, authkey):
+ import hmac
+ assert isinstance(authkey, bytes)
+ message = connection.recv_bytes(256) # reject large message
+ assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
+ message = message[len(CHALLENGE):]
+ digest = hmac.new(authkey, message).digest()
+ connection.send_bytes(digest)
+ response = connection.recv_bytes(256) # reject large message
+ if response != WELCOME:
+ raise AuthenticationError('digest sent was rejected')
+
+#
+# Support for using xmlrpclib for serialization
+#
+
+class ConnectionWrapper(object):
+ def __init__(self, conn, dumps, loads):
+ self._conn = conn
+ self._dumps = dumps
+ self._loads = loads
+ for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
+ obj = getattr(conn, attr)
+ setattr(self, attr, obj)
+ def send(self, obj):
+ s = self._dumps(obj)
+ self._conn.send_bytes(s)
+ def recv(self):
+ s = self._conn.recv_bytes()
+ return self._loads(s)
+
+def _xml_dumps(obj):
+ return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8')
+
+def _xml_loads(s):
+ (obj,), method = xmlrpclib.loads(s.decode('utf8'))
+ return obj
+
+class XmlListener(Listener):
+ def accept(self):
+ global xmlrpclib
+ import xmlrpclib
+ obj = Listener.accept(self)
+ return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
+
+def XmlClient(*args, **kwds):
+ global xmlrpclib
+ import xmlrpclib
+ return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py
index 6107f07..43e3e83 100644
--- a/Lib/multiprocessing/forking.py
+++ b/Lib/multiprocessing/forking.py
@@ -1,429 +1,429 @@
-#
-# Module for starting a process object using os.fork() or CreateProcess()
-#
-# multiprocessing/forking.py
-#
-# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
-#
-
-import os
-import sys
-import signal
-
-from multiprocessing import util, process
-
-__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close']
-
-#
-# Check that the current thread is spawning a child process
-#
-
-def assert_spawning(self):
- if not Popen.thread_is_spawning():
- raise RuntimeError(
- '%s objects should only be shared between processes'
- ' through inheritance' % type(self).__name__
- )
-
-#
-# Unix
-#
-
-if sys.platform != 'win32':
- import time
-
- exit = os._exit
- duplicate = os.dup
- close = os.close
-
- #
- # We define a Popen class similar to the one from subprocess, but
- # whose constructor takes a process object as its argument.
- #
-
- class Popen(object):
-
- def __init__(self, process_obj):
- sys.stdout.flush()
- sys.stderr.flush()
- self.returncode = None
-
- self.pid = os.fork()
- if self.pid == 0:
- if 'random' in sys.modules:
- import random
- random.seed()
- code = process_obj._bootstrap()
- sys.stdout.flush()
- sys.stderr.flush()
- os._exit(code)
-
- def poll(self, flag=os.WNOHANG):
- if self.returncode is None:
- pid, sts = os.waitpid(self.pid, flag)
- if pid == self.pid:
- if os.WIFSIGNALED(sts):
- self.returncode = -os.WTERMSIG(sts)
- else:
- assert os.WIFEXITED(sts)
- self.returncode = os.WEXITSTATUS(sts)
- return self.returncode
-
- def wait(self, timeout=None):
- if timeout is None:
- return self.poll(0)
- deadline = time.time() + timeout
- delay = 0.0005
- while 1:
- res = self.poll()
- if res is not None:
- break
- remaining = deadline - time.time()
- if remaining <= 0:
- break
- delay = min(delay * 2, remaining, 0.05)
- time.sleep(delay)
- return res
-
- def terminate(self):
- if self.returncode is None:
- try:
- os.kill(self.pid, signal.SIGTERM)
- except OSError, e:
- if self.wait(timeout=0.1) is None:
- raise
-
- @staticmethod
- def thread_is_spawning():
- return False
-
-#
-# Windows
-#
-
-else:
- import thread
- import msvcrt
- import _subprocess
- import copy_reg
- import time
-
- from ._multiprocessing import win32, Connection, PipeConnection
- from .util import Finalize
-
- try:
- from cPickle import dump, load, HIGHEST_PROTOCOL
- except ImportError:
- from pickle import dump, load, HIGHEST_PROTOCOL
-
- #
- #
- #
-
- TERMINATE = 0x10000
- WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
-
- exit = win32.ExitProcess
- close = win32.CloseHandle
-
- #
- # _python_exe is the assumed path to the python executable.
- # People embedding Python want to modify it.
- #
-
- if sys.executable.lower().endswith('pythonservice.exe'):
- _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
- else:
- _python_exe = sys.executable
-
- def set_executable(exe):
- global _python_exe
- _python_exe = exe
-
- #
- #
- #
-
- def duplicate(handle, target_process=None, inheritable=False):
- if target_process is None:
- target_process = _subprocess.GetCurrentProcess()
- return _subprocess.DuplicateHandle(
- _subprocess.GetCurrentProcess(), handle, target_process,
- 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS
- ).Detach()
-
- #
- # We define a Popen class similar to the one from subprocess, but
- # whose constructor takes a process object as its argument.
- #
-
- class Popen(object):
- '''
- Start a subprocess to run the code of a process object
- '''
- _tls = thread._local()
-
- def __init__(self, process_obj):
- # create pipe for communication with child
- rfd, wfd = os.pipe()
-
- # get handle for read end of the pipe and make it inheritable
- rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
- os.close(rfd)
-
- # start process
- cmd = get_command_line() + [rhandle]
- cmd = ' '.join('"%s"' % x for x in cmd)
- hp, ht, pid, tid = _subprocess.CreateProcess(
- _python_exe, cmd, None, None, 1, 0, None, None, None
- )
- ht.Close()
- close(rhandle)
-
- # set attributes of self
- self.pid = pid
- self.returncode = None
- self._handle = hp
-
- # send information to child
- prep_data = get_preparation_data(process_obj._name)
- to_child = os.fdopen(wfd, 'wb')
- Popen._tls.process_handle = int(hp)
- try:
- dump(prep_data, to_child, HIGHEST_PROTOCOL)
- dump(process_obj, to_child, HIGHEST_PROTOCOL)
- finally:
- del Popen._tls.process_handle
- to_child.close()
-
- @staticmethod
- def thread_is_spawning():
- return getattr(Popen._tls, 'process_handle', None) is not None
-
- @staticmethod
- def duplicate_for_child(handle):
- return duplicate(handle, Popen._tls.process_handle)
-
- def wait(self, timeout=None):
- if self.returncode is None:
- if timeout is None:
- msecs = _subprocess.INFINITE
- else:
- msecs = max(0, int(timeout * 1000 + 0.5))
-
- res = _subprocess.WaitForSingleObject(int(self._handle), msecs)
- if res == _subprocess.WAIT_OBJECT_0:
- code = _subprocess.GetExitCodeProcess(self._handle)
- if code == TERMINATE:
- code = -signal.SIGTERM
- self.returncode = code
-
- return self.returncode
-
- def poll(self):
- return self.wait(timeout=0)
-
- def terminate(self):
- if self.returncode is None:
- try:
- _subprocess.TerminateProcess(int(self._handle), TERMINATE)
- except WindowsError:
- if self.wait(timeout=0.1) is None:
- raise
-
- #
- #
- #
-
- def is_forking(argv):
- '''
- Return whether commandline indicates we are forking
- '''
- if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
- assert len(argv) == 3
- return True
- else:
- return False
-
-
- def freeze_support():
- '''
- Run code for process object if this in not the main process
- '''
- if is_forking(sys.argv):
- main()
- sys.exit()
-
-
- def get_command_line():
- '''
- Returns prefix of command line used for spawning a child process
- '''
- if process.current_process()._identity==() and is_forking(sys.argv):
- raise RuntimeError('''
- Attempt to start a new process before the current process
- has finished its bootstrapping phase.
-
- This probably means that you are on Windows and you have
- forgotten to use the proper idiom in the main module:
-
- if __name__ == '__main__':
- freeze_support()
- ...
-
- The "freeze_support()" line can be omitted if the program
- is not going to be frozen to produce a Windows executable.''')
-
- if getattr(sys, 'frozen', False):
- return [sys.executable, '--multiprocessing-fork']
- else:
- prog = 'from multiprocessing.forking import main; main()'
- return [_python_exe, '-c', prog, '--multiprocessing-fork']
-
-
- def main():
- '''
- Run code specifed by data received over pipe
- '''
- assert is_forking(sys.argv)
-
- handle = int(sys.argv[-1])
- fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
- from_parent = os.fdopen(fd, 'rb')
-
- process.current_process()._inheriting = True
- preparation_data = load(from_parent)
- prepare(preparation_data)
- self = load(from_parent)
- process.current_process()._inheriting = False
-
- from_parent.close()
-
- exitcode = self._bootstrap()
- exit(exitcode)
-
-
- def get_preparation_data(name):
- '''
- Return info about parent needed by child to unpickle process object
- '''
- from .util import _logger, _log_to_stderr
-
- d = dict(
- name=name,
- sys_path=sys.path,
- sys_argv=sys.argv,
- log_to_stderr=_log_to_stderr,
- orig_dir=process.ORIGINAL_DIR,
- authkey=process.current_process().get_authkey(),
- )
-
- if _logger is not None:
- d['log_level'] = _logger.getEffectiveLevel()
-
- if not WINEXE:
- main_path = getattr(sys.modules['__main__'], '__file__', None)
- if not main_path and sys.argv[0] not in ('', '-c'):
- main_path = sys.argv[0]
- if main_path is not None:
- if not os.path.isabs(main_path) and \
- process.ORIGINAL_DIR is not None:
- main_path = os.path.join(process.ORIGINAL_DIR, main_path)
- d['main_path'] = os.path.normpath(main_path)
-
- return d
-
- #
- # Make (Pipe)Connection picklable
- #
-
- def reduce_connection(conn):
- if not Popen.thread_is_spawning():
- raise RuntimeError(
- 'By default %s objects can only be shared between processes\n'
- 'using inheritance' % type(conn).__name__
- )
- return type(conn), (Popen.duplicate_for_child(conn.fileno()),
- conn.readable, conn.writable)
-
- copy_reg.pickle(Connection, reduce_connection)
- copy_reg.pickle(PipeConnection, reduce_connection)
-
-
-#
-# Prepare current process
-#
-
-old_main_modules = []
-
-def prepare(data):
- '''
- Try to get current process ready to unpickle process object
- '''
- old_main_modules.append(sys.modules['__main__'])
-
- if 'name' in data:
- process.current_process().set_name(data['name'])
-
- if 'authkey' in data:
- process.current_process()._authkey = data['authkey']
-
- if 'log_to_stderr' in data and data['log_to_stderr']:
- util.log_to_stderr()
-
- if 'log_level' in data:
- util.get_logger().setLevel(data['log_level'])
-
- if 'sys_path' in data:
- sys.path = data['sys_path']
-
- if 'sys_argv' in data:
- sys.argv = data['sys_argv']
-
- if 'dir' in data:
- os.chdir(data['dir'])
-
- if 'orig_dir' in data:
- process.ORIGINAL_DIR = data['orig_dir']
-
- if 'main_path' in data:
- main_path = data['main_path']
- main_name = os.path.splitext(os.path.basename(main_path))[0]
- if main_name == '__init__':
- main_name = os.path.basename(os.path.dirname(main_path))
-
- if main_name != 'ipython':
- import imp
-
- if main_path is None:
- dirs = None
- elif os.path.basename(main_path).startswith('__init__.py'):
- dirs = [os.path.dirname(os.path.dirname(main_path))]
- else:
- dirs = [os.path.dirname(main_path)]
-
- assert main_name not in sys.modules, main_name
- file, path_name, etc = imp.find_module(main_name, dirs)
- try:
- # We would like to do "imp.load_module('__main__', ...)"
- # here. However, that would cause 'if __name__ ==
- # "__main__"' clauses to be executed.
- main_module = imp.load_module(
- '__parents_main__', file, path_name, etc
- )
- finally:
- if file:
- file.close()
-
- sys.modules['__main__'] = main_module
- main_module.__name__ = '__main__'
-
- # Try to make the potentially picklable objects in
- # sys.modules['__main__'] realize they are in the main
- # module -- somewhat ugly.
- for obj in main_module.__dict__.values():
- try:
- if obj.__module__ == '__parents_main__':
- obj.__module__ = '__main__'
- except Exception:
- pass
+#
+# Module for starting a process object using os.fork() or CreateProcess()
+#
+# multiprocessing/forking.py
+#
+# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+#
+
+import os
+import sys
+import signal
+
+from multiprocessing import util, process
+
+__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close']
+
+#
+# Check that the current thread is spawning a child process
+#
+
+def assert_spawning(self):
+ if not Popen.thread_is_spawning():
+ raise RuntimeError(
+ '%s objects should only be shared between processes'
+ ' through inheritance' % type(self).__name__
+ )
+
+#
+# Unix
+#
+
+if sys.platform != 'win32':
+ import time
+
+ exit = os._exit
+ duplicate = os.dup
+ close = os.close
+
+ #
+ # We define a Popen class similar to the one from subprocess, but
+ # whose constructor takes a process object as its argument.
+ #
+
+ class Popen(object):
+
+ def __init__(self, process_obj):
+ sys.stdout.flush()
+ sys.stderr.flush()
+ self.returncode = None
+
+ self.pid = os.fork()
+ if self.pid == 0:
+ if 'random' in sys.modules:
+ import random
+ random.seed()
+ code = process_obj._bootstrap()
+ sys.stdout.flush()
+ sys.stderr.flush()
+ os._exit(code)
+
+ def poll(self, flag=os.WNOHANG):
+ if self.returncode is None:
+ pid, sts = os.waitpid(self.pid, flag)
+ if pid == self.pid:
+ if os.WIFSIGNALED(sts):
+ self.returncode = -os.WTERMSIG(sts)
+ else:
+ assert os.WIFEXITED(sts)
+ self.returncode = os.WEXITSTATUS(sts)
+ return self.returncode
+
+ def wait(self, timeout=None):
+ if timeout is None:
+ return self.poll(0)
+ deadline = time.time() + timeout
+ delay = 0.0005
+ while 1:
+ res = self.poll()
+ if res is not None:
+ break
+ remaining = deadline - time.time()
+ if remaining <= 0:
+ break
+ delay = min(delay * 2, remaining, 0.05)
+ time.sleep(delay)
+ return res
+
+ def terminate(self):
+ if self.returncode is None:
+ try:
+ os.kill(self.pid, signal.SIGTERM)
+ except OSError, e:
+ if self.wait(timeout=0.1) is None:
+ raise
+
+ @staticmethod
+ def thread_is_spawning():
+ return False
+
+#
+# Windows
+#
+
+else:
+ import thread
+ import msvcrt
+ import _subprocess
+ import copy_reg
+ import time
+
+ from ._multiprocessing import win32, Connection, PipeConnection
+ from .util import Finalize
+
+ try:
+ from cPickle import dump, load, HIGHEST_PROTOCOL
+ except ImportError:
+ from pickle import dump, load, HIGHEST_PROTOCOL
+
+ #
+ #
+ #
+
+ TERMINATE = 0x10000
+ WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
+
+ exit = win32.ExitProcess
+ close = win32.CloseHandle
+
+ #
+ # _python_exe is the assumed path to the python executable.
+ # People embedding Python want to modify it.
+ #
+
+ if sys.executable.lower().endswith('pythonservice.exe'):
+ _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
+ else:
+ _python_exe = sys.executable
+
+ def set_executable(exe):
+ global _python_exe
+ _python_exe = exe
+
+ #
+ #
+ #
+
+ def duplicate(handle, target_process=None, inheritable=False):
+ if target_process is None:
+ target_process = _subprocess.GetCurrentProcess()
+ return _subprocess.DuplicateHandle(
+ _subprocess.GetCurrentProcess(), handle, target_process,
+ 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS
+ ).Detach()
+
+ #
+ # We define a Popen class similar to the one from subprocess, but
+ # whose constructor takes a process object as its argument.
+ #
+
+ class Popen(object):
+ '''
+ Start a subprocess to run the code of a process object
+ '''
+ _tls = thread._local()
+
+ def __init__(self, process_obj):
+ # create pipe for communication with child
+ rfd, wfd = os.pipe()
+
+ # get handle for read end of the pipe and make it inheritable
+ rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
+ os.close(rfd)
+
+ # start process
+ cmd = get_command_line() + [rhandle]
+ cmd = ' '.join('"%s"' % x for x in cmd)
+ hp, ht, pid, tid = _subprocess.CreateProcess(
+ _python_exe, cmd, None, None, 1, 0, None, None, None
+ )
+ ht.Close()
+ close(rhandle)
+
+ # set attributes of self
+ self.pid = pid
+ self.returncode = None
+ self._handle = hp
+
+ # send information to child
+ prep_data = get_preparation_data(process_obj._name)
+ to_child = os.fdopen(wfd, 'wb')
+ Popen._tls.process_handle = int(hp)
+ try:
+ dump(prep_data, to_child, HIGHEST_PROTOCOL)
+ dump(process_obj, to_child, HIGHEST_PROTOCOL)
+ finally:
+ del Popen._tls.process_handle
+ to_child.close()
+
+ @staticmethod
+ def thread_is_spawning():
+ return getattr(Popen._tls, 'process_handle', None) is not None
+
+ @staticmethod
+ def duplicate_for_child(handle):
+ return duplicate(handle, Popen._tls.process_handle)
+
+ def wait(self, timeout=None):
+ if self.returncode is None:
+ if timeout is None:
+ msecs = _subprocess.INFINITE
+ else:
+ msecs = max(0, int(timeout * 1000 + 0.5))
+
+ res = _subprocess.WaitForSingleObject(int(self._handle), msecs)
+ if res == _subprocess.WAIT_OBJECT_0:
+ code = _subprocess.GetExitCodeProcess(self._handle)
+ if code == TERMINATE:
+ code = -signal.SIGTERM
+ self.returncode = code
+
+ return self.returncode
+
+ def poll(self):
+ return self.wait(timeout=0)
+
+ def terminate(self):
+ if self.returncode is None:
+ try:
+ _subprocess.TerminateProcess(int(self._handle), TERMINATE)
+ except WindowsError:
+ if self.wait(timeout=0.1) is None:
+ raise
+
+ #
+ #
+ #
+
+ def is_forking(argv):
+ '''
+ Return whether commandline indicates we are forking
+ '''
+ if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
+ assert len(argv) == 3
+ return True
+ else:
+ return False
+
+
+ def freeze_support():
+ '''
+ Run code for process object if this in not the main process
+ '''
+ if is_forking(sys.argv):
+ main()
+ sys.exit()
+
+
+ def get_command_line():
+ '''
+ Returns prefix of command line used for spawning a child process
+ '''
+ if process.current_process()._identity==() and is_forking(sys.argv):
+ raise RuntimeError('''
+ Attempt to start a new process before the current process
+ has finished its bootstrapping phase.
+
+ This probably means that you are on Windows and you have
+ forgotten to use the proper idiom in the main module:
+
+ if __name__ == '__main__':
+ freeze_support()
+ ...
+
+ The "freeze_support()" line can be omitted if the program
+ is not going to be frozen to produce a Windows executable.''')
+
+ if getattr(sys, 'frozen', False):
+ return [sys.executable, '--multiprocessing-fork']
+ else:
+ prog = 'from multiprocessing.forking import main; main()'
+ return [_python_exe, '-c', prog, '--multiprocessing-fork']
+
+
+ def main():
+ '''
+ Run code specifed by data received over pipe
+ '''
+ assert is_forking(sys.argv)
+
+ handle = int(sys.argv[-1])
+ fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
+ from_parent = os.fdopen(fd, 'rb')
+
+ process.current_process()._inheriting = True
+ preparation_data = load(from_parent)
+ prepare(preparation_data)
+ self = load(from_parent)
+ process.current_process()._inheriting = False
+
+ from_parent.close()
+
+ exitcode = self._bootstrap()
+ exit(exitcode)
+
+
+ def get_preparation_data(name):
+ '''
+ Return info about parent needed by child to unpickle process object
+ '''
+ from .util import _logger, _log_to_stderr
+
+ d = dict(
+ name=name,
+ sys_path=sys.path,
+ sys_argv=sys.argv,
+ log_to_stderr=_log_to_stderr,
+ orig_dir=process.ORIGINAL_DIR,
+ authkey=process.current_process().get_authkey(),
+ )
+
+ if _logger is not None:
+ d['log_level'] = _logger.getEffectiveLevel()
+
+ if not WINEXE:
+ main_path = getattr(sys.modules['__main__'], '__file__', None)
+ if not main_path and sys.argv[0] not in ('', '-c'):
+ main_path = sys.argv[0]
+ if main_path is not None:
+ if not os.path.isabs(main_path) and \
+ process.ORIGINAL_DIR is not None:
+ main_path = os.path.join(process.ORIGINAL_DIR, main_path)
+ d['main_path'] = os.path.normpath(main_path)
+
+ return d
+
+ #
+ # Make (Pipe)Connection picklable
+ #
+
+ def reduce_connection(conn):
+ if not Popen.thread_is_spawning():
+ raise RuntimeError(
+ 'By default %s objects can only be shared between processes\n'
+ 'using inheritance' % type(conn).__name__
+ )
+ return type(conn), (Popen.duplicate_for_child(conn.fileno()),
+ conn.readable, conn.writable)
+
+ copy_reg.pickle(Connection, reduce_connection)
+ copy_reg.pickle(PipeConnection, reduce_connection)
+
+
+#
+# Prepare current process
+#
+
+old_main_modules = []
+
+def prepare(data):
+ '''
+ Try to get current process ready to unpickle process object
+ '''
+ old_main_modules.append(sys.modules['__main__'])
+
+ if 'name' in data:
+ process.current_process().set_name(data['name'])
+
+ if 'authkey' in data:
+ process.current_process()._authkey = data['authkey']
+
+ if 'log_to_stderr' in data and data['log_to_stderr']:
+ util.log_to_stderr()
+
+ if 'log_level' in data:
+ util.get_logger().setLevel(data['log_level'])
+
+ if 'sys_path' in data:
+ sys.path = data['sys_path']
+
+ if 'sys_argv' in data:
+ sys.argv = data['sys_argv']
+
+ if 'dir' in data:
+ os.chdir(data['dir'])
+
+ if 'orig_dir' in data:
+ process.ORIGINAL_DIR = data['orig_dir']
+
+ if 'main_path' in data:
+ main_path = data['main_path']
+ main_name = os.path.splitext(os.path.basename(main_path))[0]
+ if main_name == '__init__':
+ main_name = os.path.basename(os.path.dirname(main_path))
+
+ if main_name != 'ipython':
+ import imp
+
+ if main_path is None:
+ dirs = None
+ elif os.path.basename(main_path).startswith('__init__.py'):
+ dirs = [os.path.dirname(os.path.dirname(main_path))]
+ else:
+ dirs = [os.path.dirname(main_path)]
+
+ assert main_name not in sys.modules, main_name
+ file, path_name, etc = imp.find_module(main_name, dirs)
+ try:
+ # We would like to do "imp.load_module('__main__', ...)"
+ # here. However, that would cause 'if __name__ ==
+ # "__main__"' clauses to be executed.
+ main_module = imp.load_module(
+ '__parents_main__', file, path_name, etc
+ )
+ finally:
+ if file:
+ file.close()
+
+ sys.modules['__main__'] = main_module
+ main_module.__name__ = '__main__'
+
+ # Try to make the potentially picklable objects in
+ # sys.modules['__main__'] realize they are in the main
+ # module -- somewhat ugly.
+ for obj in main_module.__dict__.values():
+ try:
+ if obj.__module__ == '__parents_main__':
+ obj.__module__ = '__main__'
+ except Exception:
+ pass
diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py
index f6b3404..7e596ca 100644
--- a/Lib/multiprocessing/heap.py
+++ b/Lib/multiprocessing/heap.py
@@ -1,201 +1,201 @@
-#
-# Module which supports allocation of memory from an mmap
-#
-# multiprocessing/heap.py
-#
-# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
-#
-
-import bisect
-import mmap
-import tempfile
-import os
-import sys
-import threading
-import itertools
-
-import _multiprocessing
-from multiprocessing.util import Finalize, info
-from multiprocessing.forking import assert_spawning
-
-__all__ = ['BufferWrapper']
-
-#
-# Inheirtable class which wraps an mmap, and from which blocks can be allocated
-#
-
-if sys.platform == 'win32':
-
- from ._multiprocessing import win32
-
- class Arena(object):
-
- _counter = itertools.count()
-
- def __init__(self, size):
- self.size = size
- self.name = 'pym-%d-%d' % (os.getpid(), Arena._counter.next())
- self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
- assert win32.GetLastError() == 0, 'tagname already in use'
- self._state = (self.size, self.name)
-
- def __getstate__(self):
- assert_spawning(self)
- return self._state
-
- def __setstate__(self, state):
- self.size, self.name = self._state = state
- self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
- assert win32.GetLastError() == win32.ERROR_ALREADY_EXISTS
-
-else:
-
- class Arena(object):
-
- def __init__(self, size):
- self.buffer = mmap.mmap(-1, size)
- self.size = size
- self.name = None
-
-#
-# Class allowing allocation of chunks of memory from arenas
-#
-
-class Heap(object):
-
- _alignment = 8
-
- def __init__(self, size=mmap.PAGESIZE):
- self._lastpid = os.getpid()
- self._lock = threading.Lock()
- self._size = size
- self._lengths = []
- self._len_to_seq = {}
- self._start_to_block = {}
- self._stop_to_block = {}
- self._allocated_blocks = set()
- self._arenas = []
-
- @staticmethod
- def _roundup(n, alignment):
- # alignment must be a power of 2
- mask = alignment - 1
- return (n + mask) & ~mask
-
- def _malloc(self, size):
- # returns a large enough block -- it might be much larger
- i = bisect.bisect_left(self._lengths, size)
- if i == len(self._lengths):
- length = self._roundup(max(self._size, size), mmap.PAGESIZE)
- self._size *= 2
- info('allocating a new mmap of length %d', length)
- arena = Arena(length)
- self._arenas.append(arena)
- return (arena, 0, length)
- else:
- length = self._lengths[i]
- seq = self._len_to_seq[length]
- block = seq.pop()
- if not seq:
- del self._len_to_seq[length], self._lengths[i]
-
- (arena, start, stop) = block
- del self._start_to_block[(arena, start)]
- del self._stop_to_block[(arena, stop)]
- return block
-
- def _free(self, block):
- # free location and try to merge with neighbours
- (arena, start, stop) = block
-
- try:
- prev_block = self._stop_to_block[(arena, start)]
- except KeyError:
- pass
- else:
- start, _ = self._absorb(prev_block)
-
- try:
- next_block = self._start_to_block[(arena, stop)]
- except KeyError:
- pass
- else:
- _, stop = self._absorb(next_block)
-
- block = (arena, start, stop)
- length = stop - start
-
- try:
- self._len_to_seq[length].append(block)
- except KeyError:
- self._len_to_seq[length] = [block]
- bisect.insort(self._lengths, length)
-
- self._start_to_block[(arena, start)] = block
- self._stop_to_block[(arena, stop)] = block
-
- def _absorb(self, block):
- # deregister this block so it can be merged with a neighbour
- (arena, start, stop) = block
- del self._start_to_block[(arena, start)]
- del self._stop_to_block[(arena, stop)]
-
- length = stop - start
- seq = self._len_to_seq[length]
- seq.remove(block)
- if not seq:
- del self._len_to_seq[length]
- self._lengths.remove(length)
-
- return start, stop
-
- def free(self, block):
- # free a block returned by malloc()
- assert os.getpid() == self._lastpid
- self._lock.acquire()
- try:
- self._allocated_blocks.remove(block)
- self._free(block)
- finally:
- self._lock.release()
-
- def malloc(self, size):
- # return a block of right size (possibly rounded up)
- assert 0 <= size < sys.maxint
- if os.getpid() != self._lastpid:
- self.__init__() # reinitialize after fork
- self._lock.acquire()
- try:
- size = self._roundup(max(size,1), self._alignment)
- (arena, start, stop) = self._malloc(size)
- new_stop = start + size
- if new_stop < stop:
- self._free((arena, new_stop, stop))
- block = (arena, start, new_stop)
- self._allocated_blocks.add(block)
- return block
- finally:
- self._lock.release()
-
-#
-# Class representing a chunk of an mmap -- can be inherited
-#
-
-class BufferWrapper(object):
-
- _heap = Heap()
-
- def __init__(self, size):
- assert 0 <= size < sys.maxint
- block = BufferWrapper._heap.malloc(size)
- self._state = (block, size)
- Finalize(self, BufferWrapper._heap.free, args=(block,))
-
- def get_address(self):
- (arena, start, stop), size = self._state
- address, length = _multiprocessing.address_of_buffer(arena.buffer)
- assert size <= length
- return address + start
-
- def get_size(self):
- return self._state[1]
+#
+# Module which supports allocation of memory from an mmap
+#
+# multiprocessing/heap.py
+#
+# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
+#
+
+import bisect
+import mmap
+import tempfile
+import os
+import sys
+import threading
+import itertools
+
+import _multiprocessing
+from multiprocessing.util import Finalize, info
+from multiprocessing.forking import assert_spawning
+
+__all__ = ['BufferWrapper']
+
+#
+# Inheirtable class which wraps an mmap, and from which blocks can be allocated
+#
+
+if sys.platform == 'win32':
+
+ from ._multiprocessing import win32
+
+ class Arena(object):
+
+ _counter = itertools.count()
+
+ def __init__(self, size):
+ self.size = size
+ self.name = 'pym-%d-%d' % (os.getpid(), Arena._counter.next())
+ self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
+ assert win32.GetLastError() == 0, 'tagname already in use'
+ self._state = (self.size, self.name)
+
+ def __getstate__(self):
+ assert_spawning(self)
+ return self._state
+
+ def __setstate__(self, state):
+ self.size, self.name = self._state = state
+ self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
+ assert win32.GetLastError() == win32.ERROR_ALREADY_EXISTS
+
+else:
+
+ class Arena(object):
+
+ def __init__(self, size):
+ self.buffer = mmap.mmap(-1, size)
+ self.size = size
+ self.name = None
+
+#
+# Class allowing allocation of chunks of memory from arenas
+#
+
+class Heap(object):
+
+ _alignment = 8
+
+ def __init__(self, size=mmap.PAGESIZE):
+ self._lastpid = os.getpid()
+ self._lock = threading.Lock()
+ self._size = size
+ self._lengths = []
+ self._len_to_seq = {}
+ self._start_to_block = {}
+ self._stop_to_block = {}
+ self._allocated_blocks = set()
+ self._arenas = []
+
+ @staticmethod
+ def _roundup(n, alignment):
+ # alignment must be a power of 2
+ mask = alignment - 1
+ return (n + mask) & ~mask
+
+ def _malloc(self, size):
+ # returns a large enough block -- it might be much larger
+ i = bisect.bisect_left(self._lengths, size)
+ if i == len(self._lengths):
+ length = self._roundup(max(self._size, size), mmap.PAGESIZE)
+ self._size *= 2
+ info('allocating a new mmap of length %d', length)
+ arena = Arena(length)
+ self._arenas.append(arena)
+ return (arena, 0, length)
+ else:
+ length = self._lengths[i]
+ seq = self._len_to_seq[length]
+ block = seq.pop()
+ if not seq:
+ del self._len_to_seq[length], self._lengths[i]
+
+ (arena, start, stop) = block
+ del self._start_to_block[(arena, start)]
+ del self._stop_to_block[(arena, stop)]
+ return block
+
+ def _free(self, block):
+ # free location and try to merge with neighbours
+ (arena, start, stop) = block
+
+ try:
+ prev_block = self._stop_to_block[(arena, start)]
+ except KeyError:
+ pass
+ else:
+ start, _ = self._absorb(prev_block)
+
+ try:
+ next_block = self._start_to_block[(arena, stop)]
+ except KeyError:
+ pass
+ else:
+ _, stop = self._absorb(next_block)
+
+ block = (arena, start, stop)
+ length = stop - start
+
+ try:
+ self._len_to_seq[length].append(block)
+ except KeyError:
+ self._len_to_seq[length] = [block]
+ bisect.insort(self._lengths, length)
+
+ self._start_to_block[(arena, start)] = block
+ self._stop_to_block[(arena, stop)] = block
+
+ def _absorb(self, block):
+ # deregister this block so it can be merged with a neighbour
+ (arena, start, stop) = block
+ del self._start_to_block[(arena, start)]
+ del self._stop_to_block[(arena, stop)]
+
+ length = stop - start
+ seq = self._len_to_seq[length]
+ seq.remove(block)
+ if not seq:
+ del self._len_to_seq[length]
+ self._lengths.remove(length)
+
+ return start, stop
+
+ def free(self, block):
+ # free a block returned by malloc()
+ assert os.getpid() == self._lastpid
+ self._lock.acquire()
+ try:
+ self._allocated_blocks.remove(block)
+ self._free(block)
+ finally:
+ self._lock.release()
+
+ def malloc(self, size):
+ # return a block of right size (possibly rounded up)
+ assert 0 <= size < sys.maxint
+ if os.getpid() != self._lastpid:
+ self.__init__() # reinitialize after fork
+ self._lock.acquire()
+ try:
+ size = self._roundup(max(size,1), self._alignment)
+ (arena, start, stop) = self._malloc(size)
+ new_stop = start + size
+ if new_stop < stop:
+ self._free((arena, new_stop, stop))
+ block = (arena, start, new_stop)
+ self._allocated_blocks.add(block)
+ return block
+ finally:
+ self._lock.release()
+
+#
+# Class representing a chunk of an mmap -- can be inherited
+#
+
+class BufferWrapper(object):
+
+ _heap = Heap()
+
+ def __init__(self, size):
+ assert 0 <= size < sys.maxint
+ block = BufferWrapper._heap.malloc(size)
+ self._state = (block, size)
+ Finalize(self, BufferWrapper._heap.free, args=(block,))
+
+ def get_address(self):
+ (arena, start, stop), size = self._state
+ address, length = _multiprocessing.address_of_buffer(arena.buffer)
+ assert size <= length
+ return address + start
+
+ def get_size(self):
+ return self._state[1]
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index 6c1d912..fb705cb 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -1,1092 +1,1092 @@
-#
-# Module providing the `SyncManager` class for dealing
-# with shared objects
-#
-# multiprocessing/managers.py
-#
-# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
-#
-
-__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
-
-#
-# Imports
-#
-
-import os
-import sys
-import weakref
-import threading
-import array
-import copy_reg
-import Queue
-
-from traceback import format_exc
-from multiprocessing import Process, current_process, active_children, Pool, util, connection
-from multiprocessing.process import AuthenticationString
-from multiprocessing.forking import exit, Popen, assert_spawning
-from multiprocessing.util import Finalize, info
-
-try:
- from cPickle import PicklingError
-except ImportError:
- from pickle import PicklingError
-
-#
-#
-#
-
-try:
- bytes
-except NameError:
- bytes = str # XXX not needed in Py2.6 and Py3.0
-
-#
-# Register some things for pickling
-#
-
-def reduce_array(a):
- return array.array, (a.typecode, a.tostring())
-copy_reg.pickle(array.array, reduce_array)
-
-view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
-if view_types[0] is not list: # XXX only needed in Py3.0
- def rebuild_as_list(obj):
- return list, (list(obj),)
- for view_type in view_types:
- copy_reg.pickle(view_type, rebuild_as_list)
-
-#
-# Type for identifying shared objects
-#
-
-class Token(object):
- '''
- Type to uniquely indentify a shared object
- '''
- __slots__ = ('typeid', 'address', 'id')
-
- def __init__(self, typeid, address, id):
- (self.typeid, self.address, self.id) = (typeid, address, id)
-
- def __getstate__(self):
- return (self.typeid, self.address, self.id)
-
- def __setstate__(self, state):
- (self.typeid, self.address, self.id) = state
-
- def __repr__(self):
- return 'Token(typeid=%r, address=%r, id=%r)' % \
- (self.typeid, self.address, self.id)
-
-#
-# Function for communication with a manager's server process
-#
-
-def dispatch(c, id, methodname, args=(), kwds={}):
- '''
- Send a message to manager using connection `c` and return response
- '''
- c.send((id, methodname, args, kwds))
- kind, result = c.recv()
- if kind == '#RETURN':
- return result
- raise convert_to_error(kind, result)
-
-def convert_to_error(kind, result):
- if kind == '#ERROR':
- return result
- elif kind == '#TRACEBACK':
- assert type(result) is str
- return RemoteError(result)
- elif kind == '#UNSERIALIZABLE':
- assert type(result) is str
- return RemoteError('Unserializable message: %s\n' % result)
- else:
- return ValueError('Unrecognized message type')
-
-class RemoteError(Exception):
- def __str__(self):
- return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
-
-#
-# Functions for finding the method names of an object
-#
-
-def all_methods(obj):
- '''
- Return a list of names of methods of `obj`
- '''
- temp = []
- for name in dir(obj):
- func = getattr(obj, name)
- if hasattr(func, '__call__'):
- temp.append(name)
- return temp
-
-def public_methods(obj):
- '''
- Return a list of names of methods of `obj` which do not start with '_'
- '''
- return [name for name in all_methods(obj) if name[0] != '_']
-
-#
-# Server which is run in a process controlled by a manager
-#
-
-class Server(object):
- '''
- Server class which runs in a process controlled by a manager object
- '''
- public = ['shutdown', 'create', 'accept_connection', 'get_methods',
- 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
-
- def __init__(self, registry, address, authkey, serializer):
- assert isinstance(authkey, bytes)
- self.registry = registry
- self.authkey = AuthenticationString(authkey)
- Listener, Client = listener_client[serializer]
-
- # do authentication later
- self.listener = Listener(address=address, backlog=5)
- self.address = self.listener.address
-
- self.id_to_obj = {0: (None, ())}
- self.id_to_refcount = {}
- self.mutex = threading.RLock()
- self.stop = 0
-
- def serve_forever(self):
- '''
- Run the server forever
- '''
- current_process()._manager_server = self
- try:
- try:
- while 1:
- try:
- c = self.listener.accept()
- except (OSError, IOError):
- continue
- t = threading.Thread(target=self.handle_request, args=(c,))
- t.set_daemon(True)
- t.start()
- except (KeyboardInterrupt, SystemExit):
- pass
- finally:
- self.stop = 999
- self.listener.close()
-
- def handle_request(self, c):
- '''
- Handle a new connection
- '''
- funcname = result = request = None
- try:
- connection.deliver_challenge(c, self.authkey)
- connection.answer_challenge(c, self.authkey)
- request = c.recv()
- ignore, funcname, args, kwds = request
- assert funcname in self.public, '%r unrecognized' % funcname
- func = getattr(self, funcname)
- except Exception:
- msg = ('#TRACEBACK', format_exc())
- else:
- try:
- result = func(c, *args, **kwds)
- except Exception:
- msg = ('#TRACEBACK', format_exc())
- else:
- msg = ('#RETURN', result)
- try:
- c.send(msg)
- except Exception, e:
- try:
- c.send(('#TRACEBACK', format_exc()))
- except Exception:
- pass
- util.info('Failure to send message: %r', msg)
- util.info(' ... request was %r', request)
- util.info(' ... exception was %r', e)
-
- c.close()
-
- def serve_client(self, conn):
- '''
- Handle requests from the proxies in a particular process/thread
- '''
- util.debug('starting server thread to service %r',
- threading.current_thread().get_name())
-
- recv = conn.recv
- send = conn.send
- id_to_obj = self.id_to_obj
-
- while not self.stop:
-
- try:
- methodname = obj = None
- request = recv()
- ident, methodname, args, kwds = request
- obj, exposed, gettypeid = id_to_obj[ident]
-
- if methodname not in exposed:
- raise AttributeError(
- 'method %r of %r object is not in exposed=%r' %
- (methodname, type(obj), exposed)
- )
-
- function = getattr(obj, methodname)
-
- try:
- res = function(*args, **kwds)
- except Exception, e:
- msg = ('#ERROR', e)
- else:
- typeid = gettypeid and gettypeid.get(methodname, None)
- if typeid:
- rident, rexposed = self.create(conn, typeid, res)
- token = Token(typeid, self.address, rident)
- msg = ('#PROXY', (rexposed, token))
- else:
- msg = ('#RETURN', res)
-
- except AttributeError:
- if methodname is None:
- msg = ('#TRACEBACK', format_exc())
- else:
- try:
- fallback_func = self.fallback_mapping[methodname]
- result = fallback_func(
- self, conn, ident, obj, *args, **kwds
- )
- msg = ('#RETURN', result)
- except Exception:
- msg = ('#TRACEBACK', format_exc())
-
- except EOFError:
- util.debug('got EOF -- exiting thread serving %r',
- threading.current_thread().get_name())
- sys.exit(0)
-
- except Exception:
- msg = ('#TRACEBACK', format_exc())
-
- try:
- try:
- send(msg)
- except Exception, e:
- send(('#UNSERIALIZABLE', repr(msg)))
- except Exception, e:
- util.info('exception in thread serving %r',
- threading.current_thread().get_name())
- util.info(' ... message was %r', msg)
- util.info(' ... exception was %r', e)
- conn.close()
- sys.exit(1)
-
- def fallback_getvalue(self, conn, ident, obj):
- return obj
-
- def fallback_str(self, conn, ident, obj):
- return str(obj)
-
- def fallback_repr(self, conn, ident, obj):
- return repr(obj)
-
- fallback_mapping = {
- '__str__':fallback_str,
- '__repr__':fallback_repr,
- '#GETVALUE':fallback_getvalue
- }
-
- def dummy(self, c):
- pass
-
- def debug_info(self, c):
- '''
- Return some info --- useful to spot problems with refcounting
- '''
- self.mutex.acquire()
- try:
- result = []
- keys = self.id_to_obj.keys()
- keys.sort()
- for ident in keys:
- if ident != 0:
- result.append(' %s: refcount=%s\n %s' %
- (ident, self.id_to_refcount[ident],
- str(self.id_to_obj[ident][0])[:75]))
- return '\n'.join(result)
- finally:
- self.mutex.release()
-
- def number_of_objects(self, c):
- '''
- Number of shared objects
- '''
- return len(self.id_to_obj) - 1 # don't count ident=0
-
- def shutdown(self, c):
- '''
- Shutdown this process
- '''
- try:
- try:
- util.debug('manager received shutdown message')
- c.send(('#RETURN', None))
-
- if sys.stdout != sys.__stdout__:
- util.debug('resetting stdout, stderr')
- sys.stdout = sys.__stdout__
- sys.stderr = sys.__stderr__
-
- util._run_finalizers(0)
-
- for p in active_children():
- util.debug('terminating a child process of manager')
- p.terminate()
-
- for p in active_children():
- util.debug('terminating a child process of manager')
- p.join()
-
- util._run_finalizers()
- util.info('manager exiting with exitcode 0')
- except:
- import traceback
- traceback.print_exc()
- finally:
- exit(0)
-
- def create(self, c, typeid, *args, **kwds):
- '''
- Create a new shared object and return its id
- '''
- self.mutex.acquire()
- try:
- callable, exposed, method_to_typeid, proxytype = \
- self.registry[typeid]
-
- if callable is None:
- assert len(args) == 1 and not kwds
- obj = args[0]
- else:
- obj = callable(*args, **kwds)
-
- if exposed is None:
- exposed = public_methods(obj)
- if method_to_typeid is not None:
- assert type(method_to_typeid) is dict
- exposed = list(exposed) + list(method_to_typeid)
-
- ident = '%x' % id(obj) # convert to string because xmlrpclib
- # only has 32 bit signed integers
- util.debug('%r callable returned object with id %r', typeid, ident)
-
- self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
- if ident not in self.id_to_refcount:
- self.id_to_refcount[ident] = None
- return ident, tuple(exposed)
- finally:
- self.mutex.release()
-
- def get_methods(self, c, token):
- '''
- Return the methods of the shared object indicated by token
- '''
- return tuple(self.id_to_obj[token.id][1])
-
- def accept_connection(self, c, name):
- '''
- Spawn a new thread to serve this connection
- '''
- threading.current_thread().set_name(name)
- c.send(('#RETURN', None))
- self.serve_client(c)
-
- def incref(self, c, ident):
- self.mutex.acquire()
- try:
- try:
- self.id_to_refcount[ident] += 1
- except TypeError:
- assert self.id_to_refcount[ident] is None
- self.id_to_refcount[ident] = 1
- finally:
- self.mutex.release()
-
- def decref(self, c, ident):
- self.mutex.acquire()
- try:
- assert self.id_to_refcount[ident] >= 1
- self.id_to_refcount[ident] -= 1
- if self.id_to_refcount[ident] == 0:
- del self.id_to_obj[ident], self.id_to_refcount[ident]
- util.debug('disposing of obj with id %d', ident)
- finally:
- self.mutex.release()
-
-#
-# Class to represent state of a manager
-#
-
-class State(object):
- __slots__ = ['value']
- INITIAL = 0
- STARTED = 1
- SHUTDOWN = 2
-
-#
-# Mapping from serializer name to Listener and Client types
-#
-
-listener_client = {
- 'pickle' : (connection.Listener, connection.Client),
- 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
- }
-
-#
-# Definition of BaseManager
-#
-
-class BaseManager(object):
- '''
- Base class for managers
- '''
- _registry = {}
- _Server = Server
-
- def __init__(self, address=None, authkey=None, serializer='pickle'):
- if authkey is None:
- authkey = current_process().get_authkey()
- self._address = address # XXX not final address if eg ('', 0)
- self._authkey = AuthenticationString(authkey)
- self._state = State()
- self._state.value = State.INITIAL
- self._serializer = serializer
- self._Listener, self._Client = listener_client[serializer]
-
- def __reduce__(self):
- return type(self).from_address, \
- (self._address, self._authkey, self._serializer)
-
- def get_server(self):
- '''
- Return server object with serve_forever() method and address attribute
- '''
- assert self._state.value == State.INITIAL
- return Server(self._registry, self._address,
- self._authkey, self._serializer)
-
- def connect(self):
- '''
- Connect manager object to the server process
- '''
- Listener, Client = listener_client[self._serializer]
- conn = Client(self._address, authkey=self._authkey)
- dispatch(conn, None, 'dummy')
- self._state.value = State.STARTED
-
- def start(self):
- '''
- Spawn a server process for this manager object
- '''
- assert self._state.value == State.INITIAL
-
- # pipe over which we will retrieve address of server
- reader, writer = connection.Pipe(duplex=False)
-
- # spawn process which runs a server
- self._process = Process(
- target=type(self)._run_server,
- args=(self._registry, self._address, self._authkey,
- self._serializer, writer),
- )
- ident = ':'.join(str(i) for i in self._process._identity)
- self._process.set_name(type(self).__name__ + '-' + ident)
- self._process.start()
-
- # get address of server
- writer.close()
- self._address = reader.recv()
- reader.close()
-
- # register a finalizer
- self._state.value = State.STARTED
- self.shutdown = util.Finalize(
- self, type(self)._finalize_manager,
- args=(self._process, self._address, self._authkey,
- self._state, self._Client),
- exitpriority=0
- )
-
- @classmethod
- def _run_server(cls, registry, address, authkey, serializer, writer):
- '''
- Create a server, report its address and run it
- '''
- # create server
- server = cls._Server(registry, address, authkey, serializer)
-
- # inform parent process of the server's address
- writer.send(server.address)
- writer.close()
-
- # run the manager
- util.info('manager serving at %r', server.address)
- server.serve_forever()
-
- def _create(self, typeid, *args, **kwds):
- '''
- Create a new shared object; return the token and exposed tuple
- '''
- assert self._state.value == State.STARTED, 'server not yet started'
- conn = self._Client(self._address, authkey=self._authkey)
- try:
- id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
- finally:
- conn.close()
- return Token(typeid, self._address, id), exposed
-
- def join(self, timeout=None):
- '''
- Join the manager process (if it has been spawned)
- '''
- self._process.join(timeout)
-
- def _debug_info(self):
- '''
- Return some info about the servers shared objects and connections
- '''
- conn = self._Client(self._address, authkey=self._authkey)
- try:
- return dispatch(conn, None, 'debug_info')
- finally:
- conn.close()
-
- def _number_of_objects(self):
- '''
- Return the number of shared objects
- '''
- conn = self._Client(self._address, authkey=self._authkey)
- try:
- return dispatch(conn, None, 'number_of_objects')
- finally:
- conn.close()
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.shutdown()
-
- @staticmethod
- def _finalize_manager(process, address, authkey, state, _Client):
- '''
- Shutdown the manager process; will be registered as a finalizer
- '''
- if process.is_alive():
- util.info('sending shutdown message to manager')
- try:
- conn = _Client(address, authkey=authkey)
- try:
- dispatch(conn, None, 'shutdown')
- finally:
- conn.close()
- except Exception:
- pass
-
- process.join(timeout=0.2)
- if process.is_alive():
- util.info('manager still alive')
- if hasattr(process, 'terminate'):
- util.info('trying to `terminate()` manager process')
- process.terminate()
- process.join(timeout=0.1)
- if process.is_alive():
- util.info('manager still alive after terminate')
-
- state.value = State.SHUTDOWN
- try:
- del BaseProxy._address_to_local[address]
- except KeyError:
- pass
-
- address = property(lambda self: self._address)
-
- @classmethod
- def register(cls, typeid, callable=None, proxytype=None, exposed=None,
- method_to_typeid=None, create_method=True):
- '''
- Register a typeid with the manager type
- '''
- if '_registry' not in cls.__dict__:
- cls._registry = cls._registry.copy()
-
- if proxytype is None:
- proxytype = AutoProxy
-
- exposed = exposed or getattr(proxytype, '_exposed_', None)
-
- method_to_typeid = method_to_typeid or \
- getattr(proxytype, '_method_to_typeid_', None)
-
- if method_to_typeid:
- for key, value in method_to_typeid.items():
- assert type(key) is str, '%r is not a string' % key
- assert type(value) is str, '%r is not a string' % value
-
- cls._registry[typeid] = (
- callable, exposed, method_to_typeid, proxytype
- )
-
- if create_method:
- def temp(self, *args, **kwds):
- util.debug('requesting creation of a shared %r object', typeid)
- token, exp = self._create(typeid, *args, **kwds)
- proxy = proxytype(
- token, self._serializer, manager=self,
- authkey=self._authkey, exposed=exp
- )
- return proxy
- temp.__name__ = typeid
- setattr(cls, typeid, temp)
-
-#
-# Subclass of set which get cleared after a fork
-#
-
-class ProcessLocalSet(set):
- def __init__(self):
- util.register_after_fork(self, lambda obj: obj.clear())
- def __reduce__(self):
- return type(self), ()
-
-#
-# Definition of BaseProxy
-#
-
-class BaseProxy(object):
- '''
- A base for proxies of shared objects
- '''
- _address_to_local = {}
- _mutex = util.ForkAwareThreadLock()
-
- def __init__(self, token, serializer, manager=None,
- authkey=None, exposed=None, incref=True):
- BaseProxy._mutex.acquire()
- try:
- tls_idset = BaseProxy._address_to_local.get(token.address, None)
- if tls_idset is None:
- tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
- BaseProxy._address_to_local[token.address] = tls_idset
- finally:
- BaseProxy._mutex.release()
-
- # self._tls is used to record the connection used by this
- # thread to communicate with the manager at token.address
- self._tls = tls_idset[0]
-
- # self._idset is used to record the identities of all shared
- # objects for which the current process owns references and
- # which are in the manager at token.address
- self._idset = tls_idset[1]
-
- self._token = token
- self._id = self._token.id
- self._manager = manager
- self._serializer = serializer
- self._Client = listener_client[serializer][1]
-
- if authkey is not None:
- self._authkey = AuthenticationString(authkey)
- elif self._manager is not None:
- self._authkey = self._manager._authkey
- else:
- self._authkey = current_process().get_authkey()
-
- if incref:
- self._incref()
-
- util.register_after_fork(self, BaseProxy._after_fork)
-
- def _connect(self):
- util.debug('making connection to manager')
- name = current_process().get_name()
- if threading.current_thread().get_name() != 'MainThread':
- name += '|' + threading.current_thread().get_name()
- conn = self._Client(self._token.address, authkey=self._authkey)
- dispatch(conn, None, 'accept_connection', (name,))
- self._tls.connection = conn
-
- def _callmethod(self, methodname, args=(), kwds={}):
- '''
- Try to call a method of the referrent and return a copy of the result
- '''
- try:
- conn = self._tls.connection
- except AttributeError:
- util.debug('thread %r does not own a connection',
- threading.current_thread().get_name())
- self._connect()
- conn = self._tls.connection
-
- conn.send((self._id, methodname, args, kwds))
- kind, result = conn.recv()
-
- if kind == '#RETURN':
- return result
- elif kind == '#PROXY':
- exposed, token = result
- proxytype = self._manager._registry[token.typeid][-1]
- return proxytype(
- token, self._serializer, manager=self._manager,
- authkey=self._authkey, exposed=exposed
- )
- raise convert_to_error(kind, result)
-
- def _getvalue(self):
- '''
- Get a copy of the value of the referent
- '''
- return self._callmethod('#GETVALUE')
-
- def _incref(self):
- conn = self._Client(self._token.address, authkey=self._authkey)
- dispatch(conn, None, 'incref', (self._id,))
- util.debug('INCREF %r', self._token.id)
-
- self._idset.add(self._id)
-
- state = self._manager and self._manager._state
-
- self._close = util.Finalize(
- self, BaseProxy._decref,
- args=(self._token, self._authkey, state,
- self._tls, self._idset, self._Client),
- exitpriority=10
- )
-
- @staticmethod
- def _decref(token, authkey, state, tls, idset, _Client):
- idset.discard(token.id)
-
- # check whether manager is still alive
- if state is None or state.value == State.STARTED:
- # tell manager this process no longer cares about referent
- try:
- util.debug('DECREF %r', token.id)
- conn = _Client(token.address, authkey=authkey)
- dispatch(conn, None, 'decref', (token.id,))
- except Exception, e:
- util.debug('... decref failed %s', e)
-
- else:
- util.debug('DECREF %r -- manager already shutdown', token.id)
-
- # check whether we can close this thread's connection because
- # the process owns no more references to objects for this manager
- if not idset and hasattr(tls, 'connection'):
- util.debug('thread %r has no more proxies so closing conn',
- threading.current_thread().get_name())
- tls.connection.close()
- del tls.connection
-
- def _after_fork(self):
- self._manager = None
- try:
- self._incref()
- except Exception, e:
- # the proxy may just be for a manager which has shutdown
- util.info('incref failed: %s' % e)
-
- def __reduce__(self):
- kwds = {}
- if Popen.thread_is_spawning():
- kwds['authkey'] = self._authkey
-
- if getattr(self, '_isauto', False):
- kwds['exposed'] = self._exposed_
- return (RebuildProxy,
- (AutoProxy, self._token, self._serializer, kwds))
- else:
- return (RebuildProxy,
- (type(self), self._token, self._serializer, kwds))
-
- def __deepcopy__(self, memo):
- return self._getvalue()
-
- def __repr__(self):
- return '<%s object, typeid %r at %s>' % \
- (type(self).__name__, self._token.typeid, '0x%x' % id(self))
-
- def __str__(self):
- '''
- Return representation of the referent (or a fall-back if that fails)
- '''
- try:
- return self._callmethod('__repr__')
- except Exception:
- return repr(self)[:-1] + "; '__str__()' failed>"
-
-#
-# Function used for unpickling
-#
-
-def RebuildProxy(func, token, serializer, kwds):
- '''
- Function used for unpickling proxy objects.
-
- If possible the shared object is returned, or otherwise a proxy for it.
- '''
- server = getattr(current_process(), '_manager_server', None)
-
- if server and server.address == token.address:
- return server.id_to_obj[token.id][0]
- else:
- incref = (
- kwds.pop('incref', True) and
- not getattr(current_process(), '_inheriting', False)
- )
- return func(token, serializer, incref=incref, **kwds)
-
-#
-# Functions to create proxies and proxy types
-#
-
-def MakeProxyType(name, exposed, _cache={}):
- '''
- Return an proxy type whose methods are given by `exposed`
- '''
- exposed = tuple(exposed)
- try:
- return _cache[(name, exposed)]
- except KeyError:
- pass
-
- dic = {}
-
- for meth in exposed:
- exec '''def %s(self, *args, **kwds):
- return self._callmethod(%r, args, kwds)''' % (meth, meth) in dic
-
- ProxyType = type(name, (BaseProxy,), dic)
- ProxyType._exposed_ = exposed
- _cache[(name, exposed)] = ProxyType
- return ProxyType
-
-
-def AutoProxy(token, serializer, manager=None, authkey=None,
- exposed=None, incref=True):
- '''
- Return an auto-proxy for `token`
- '''
- _Client = listener_client[serializer][1]
-
- if exposed is None:
- conn = _Client(token.address, authkey=authkey)
- try:
- exposed = dispatch(conn, None, 'get_methods', (token,))
- finally:
- conn.close()
-
- if authkey is None and manager is not None:
- authkey = manager._authkey
- if authkey is None:
- authkey = current_process().get_authkey()
-
- ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
- proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
- incref=incref)
- proxy._isauto = True
- return proxy
-
-#
-# Types/callables which we will register with SyncManager
-#
-
-class Namespace(object):
- def __init__(self, **kwds):
- self.__dict__.update(kwds)
- def __repr__(self):
- items = self.__dict__.items()
- temp = []
- for name, value in items:
- if not name.startswith('_'):
- temp.append('%s=%r' % (name, value))
- temp.sort()
- return 'Namespace(%s)' % str.join(', ', temp)
-
-class Value(object):
- def __init__(self, typecode, value, lock=True):
- self._typecode = typecode
- self._value = value
- def get(self):
- return self._value
- def set(self, value):
- self._value = value
- def __repr__(self):
- return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
- value = property(get, set)
-
-def Array(typecode, sequence, lock=True):
- return array.array(typecode, sequence)
-
-#
-# Proxy types used by SyncManager
-#
-
-class IteratorProxy(BaseProxy):
- # XXX remove methods for Py3.0 and Py2.6
- _exposed_ = ('__next__', 'next', 'send', 'throw', 'close')
- def __iter__(self):
- return self
- def __next__(self, *args):
- return self._callmethod('__next__', args)
- def next(self, *args):
- return self._callmethod('next', args)
- def send(self, *args):
- return self._callmethod('send', args)
- def throw(self, *args):
- return self._callmethod('throw', args)
- def close(self, *args):
- return self._callmethod('close', args)
-
-
-class AcquirerProxy(BaseProxy):
- _exposed_ = ('acquire', 'release')
- def acquire(self, blocking=True):
- return self._callmethod('acquire', (blocking,))
- def release(self):
- return self._callmethod('release')
- def __enter__(self):
- return self._callmethod('acquire')
- def __exit__(self, exc_type, exc_val, exc_tb):
- return self._callmethod('release')
-
-
-class ConditionProxy(AcquirerProxy):
- # XXX will Condition.notfyAll() name be available in Py3.0?
- _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
- def wait(self, timeout=None):
- return self._callmethod('wait', (timeout,))
- def notify(self):
- return self._callmethod('notify')
- def notify_all(self):
- return self._callmethod('notify_all')
-
-class EventProxy(BaseProxy):
- # XXX will Event.isSet name be available in Py3.0?
- _exposed_ = ('isSet', 'set', 'clear', 'wait')
- def is_set(self):
- return self._callmethod('isSet')
- def set(self):
- return self._callmethod('set')
- def clear(self):
- return self._callmethod('clear')
- def wait(self, timeout=None):
- return self._callmethod('wait', (timeout,))
-
-class NamespaceProxy(BaseProxy):
- _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
- def __getattr__(self, key):
- if key[0] == '_':
- return object.__getattribute__(self, key)
- callmethod = object.__getattribute__(self, '_callmethod')
- return callmethod('__getattribute__', (key,))
- def __setattr__(self, key, value):
- if key[0] == '_':
- return object.__setattr__(self, key, value)
- callmethod = object.__getattribute__(self, '_callmethod')
- return callmethod('__setattr__', (key, value))
- def __delattr__(self, key):
- if key[0] == '_':
- return object.__delattr__(self, key)
- callmethod = object.__getattribute__(self, '_callmethod')
- return callmethod('__delattr__', (key,))
-
-
-class ValueProxy(BaseProxy):
- _exposed_ = ('get', 'set')
- def get(self):
- return self._callmethod('get')
- def set(self, value):
- return self._callmethod('set', (value,))
- value = property(get, set)
-
-
-BaseListProxy = MakeProxyType('BaseListProxy', (
- '__add__', '__contains__', '__delitem__', '__delslice__',
- '__getitem__', '__getslice__', '__len__', '__mul__',
- '__reversed__', '__rmul__', '__setitem__', '__setslice__',
- 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
- 'reverse', 'sort', '__imul__'
- )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
-class ListProxy(BaseListProxy):
- def __iadd__(self, value):
- self._callmethod('extend', (value,))
- return self
- def __imul__(self, value):
- self._callmethod('__imul__', (value,))
- return self
-
-
-DictProxy = MakeProxyType('DictProxy', (
- '__contains__', '__delitem__', '__getitem__', '__len__',
- '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
- 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
- ))
-
-
-ArrayProxy = MakeProxyType('ArrayProxy', (
- '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
- )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
-
-
-PoolProxy = MakeProxyType('PoolProxy', (
- 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
- 'map', 'map_async', 'terminate'
- ))
-PoolProxy._method_to_typeid_ = {
- 'apply_async': 'AsyncResult',
- 'map_async': 'AsyncResult',
- 'imap': 'Iterator',
- 'imap_unordered': 'Iterator'
- }
-
-#
-# Definition of SyncManager
-#
-
-class SyncManager(BaseManager):
- '''
- Subclass of `BaseManager` which supports a number of shared object types.
-
- The types registered are those intended for the synchronization
- of threads, plus `dict`, `list` and `Namespace`.
-
- The `multiprocessing.Manager()` function creates started instances of
- this class.
- '''
-
-SyncManager.register('Queue', Queue.Queue)
-SyncManager.register('JoinableQueue', Queue.Queue)
-SyncManager.register('Event', threading.Event, EventProxy)
-SyncManager.register('Lock', threading.Lock, AcquirerProxy)
-SyncManager.register('RLock', threading.RLock, AcquirerProxy)
-SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
-SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
- AcquirerProxy)
-SyncManager.register('Condition', threading.Condition, ConditionProxy)
-SyncManager.register('Pool', Pool, PoolProxy)
-SyncManager.register('list', list, ListProxy)
-SyncManager.register('dict', dict, DictProxy)
-SyncManager.register('Value', Value, ValueProxy)
-SyncManager.register('Array', Array, ArrayProxy)
-SyncManager.register('Namespace', Namespace, NamespaceProxy)
-
-# types returned by methods of PoolProxy
-SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
-SyncManager.register('AsyncResult', create_method=False)
+#
+# Module providing the `SyncManager` class for dealing
+# with shared objects
+#
+# multiprocessing/managers.py
+#
+# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+#
+
+__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
+
+#
+# Imports
+#
+
+import os
+import sys
+import weakref
+import threading
+import array
+import copy_reg
+import Queue
+
+from traceback import format_exc
+from multiprocessing import Process, current_process, active_children, Pool, util, connection
+from multiprocessing.process import AuthenticationString
+from multiprocessing.forking import exit, Popen, assert_spawning
+from multiprocessing.util import Finalize, info
+
+try:
+ from cPickle import PicklingError
+except ImportError:
+ from pickle import PicklingError
+
+#
+#
+#
+
+try:
+ bytes
+except NameError:
+ bytes = str # XXX not needed in Py2.6 and Py3.0
+
+#
+# Register some things for pickling
+#
+
+def reduce_array(a):
+ return array.array, (a.typecode, a.tostring())
+copy_reg.pickle(array.array, reduce_array)
+
+view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
+if view_types[0] is not list: # XXX only needed in Py3.0
+ def rebuild_as_list(obj):
+ return list, (list(obj),)
+ for view_type in view_types:
+ copy_reg.pickle(view_type, rebuild_as_list)
+
+#
+# Type for identifying shared objects
+#
+
+class Token(object):
+ '''
+ Type to uniquely indentify a shared object
+ '''
+ __slots__ = ('typeid', 'address', 'id')
+
+ def __init__(self, typeid, address, id):
+ (self.typeid, self.address, self.id) = (typeid, address, id)
+
+ def __getstate__(self):
+ return (self.typeid, self.address, self.id)
+
+ def __setstate__(self, state):
+ (self.typeid, self.address, self.id) = state
+
+ def __repr__(self):
+ return 'Token(typeid=%r, address=%r, id=%r)' % \
+ (self.typeid, self.address, self.id)
+
+#
+# Function for communication with a manager's server process
+#
+
+def dispatch(c, id, methodname, args=(), kwds={}):
+ '''
+ Send a message to manager using connection `c` and return response
+ '''
+ c.send((id, methodname, args, kwds))
+ kind, result = c.recv()
+ if kind == '#RETURN':
+ return result
+ raise convert_to_error(kind, result)
+
+def convert_to_error(kind, result):
+ if kind == '#ERROR':
+ return result
+ elif kind == '#TRACEBACK':
+ assert type(result) is str
+ return RemoteError(result)
+ elif kind == '#UNSERIALIZABLE':
+ assert type(result) is str
+ return RemoteError('Unserializable message: %s\n' % result)
+ else:
+ return ValueError('Unrecognized message type')
+
+class RemoteError(Exception):
+ def __str__(self):
+ return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
+
+#
+# Functions for finding the method names of an object
+#
+
+def all_methods(obj):
+ '''
+ Return a list of names of methods of `obj`
+ '''
+ temp = []
+ for name in dir(obj):
+ func = getattr(obj, name)
+ if hasattr(func, '__call__'):
+ temp.append(name)
+ return temp
+
+def public_methods(obj):
+ '''
+ Return a list of names of methods of `obj` which do not start with '_'
+ '''
+ return [name for name in all_methods(obj) if name[0] != '_']
+
+#
+# Server which is run in a process controlled by a manager
+#
+
+class Server(object):
+ '''
+ Server class which runs in a process controlled by a manager object
+ '''
+ public = ['shutdown', 'create', 'accept_connection', 'get_methods',
+ 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
+
+ def __init__(self, registry, address, authkey, serializer):
+ assert isinstance(authkey, bytes)
+ self.registry = registry
+ self.authkey = AuthenticationString(authkey)
+ Listener, Client = listener_client[serializer]
+
+ # do authentication later
+ self.listener = Listener(address=address, backlog=5)
+ self.address = self.listener.address
+
+ self.id_to_obj = {0: (None, ())}
+ self.id_to_refcount = {}
+ self.mutex = threading.RLock()
+ self.stop = 0
+
+ def serve_forever(self):
+ '''
+ Run the server forever
+ '''
+ current_process()._manager_server = self
+ try:
+ try:
+ while 1:
+ try:
+ c = self.listener.accept()
+ except (OSError, IOError):
+ continue
+ t = threading.Thread(target=self.handle_request, args=(c,))
+ t.set_daemon(True)
+ t.start()
+ except (KeyboardInterrupt, SystemExit):
+ pass
+ finally:
+ self.stop = 999
+ self.listener.close()
+
+ def handle_request(self, c):
+ '''
+ Handle a new connection
+ '''
+ funcname = result = request = None
+ try:
+ connection.deliver_challenge(c, self.authkey)
+ connection.answer_challenge(c, self.authkey)
+ request = c.recv()
+ ignore, funcname, args, kwds = request
+ assert funcname in self.public, '%r unrecognized' % funcname
+ func = getattr(self, funcname)
+ except Exception:
+ msg = ('#TRACEBACK', format_exc())
+ else:
+ try:
+ result = func(c, *args, **kwds)
+ except Exception:
+ msg = ('#TRACEBACK', format_exc())
+ else:
+ msg = ('#RETURN', result)
+ try:
+ c.send(msg)
+ except Exception, e:
+ try:
+ c.send(('#TRACEBACK', format_exc()))
+ except Exception:
+ pass
+ util.info('Failure to send message: %r', msg)
+ util.info(' ... request was %r', request)
+ util.info(' ... exception was %r', e)
+
+ c.close()
+
+ def serve_client(self, conn):
+ '''
+ Handle requests from the proxies in a particular process/thread
+ '''
+ util.debug('starting server thread to service %r',
+ threading.current_thread().get_name())
+
+ recv = conn.recv
+ send = conn.send
+ id_to_obj = self.id_to_obj
+
+ while not self.stop:
+
+ try:
+ methodname = obj = None
+ request = recv()
+ ident, methodname, args, kwds = request
+ obj, exposed, gettypeid = id_to_obj[ident]
+
+ if methodname not in exposed:
+ raise AttributeError(
+ 'method %r of %r object is not in exposed=%r' %
+ (methodname, type(obj), exposed)
+ )
+
+ function = getattr(obj, methodname)
+
+ try:
+ res = function(*args, **kwds)
+ except Exception, e:
+ msg = ('#ERROR', e)
+ else:
+ typeid = gettypeid and gettypeid.get(methodname, None)
+ if typeid:
+ rident, rexposed = self.create(conn, typeid, res)
+ token = Token(typeid, self.address, rident)
+ msg = ('#PROXY', (rexposed, token))
+ else:
+ msg = ('#RETURN', res)
+
+ except AttributeError:
+ if methodname is None:
+ msg = ('#TRACEBACK', format_exc())
+ else:
+ try:
+ fallback_func = self.fallback_mapping[methodname]
+ result = fallback_func(
+ self, conn, ident, obj, *args, **kwds
+ )
+ msg = ('#RETURN', result)
+ except Exception:
+ msg = ('#TRACEBACK', format_exc())
+
+ except EOFError:
+ util.debug('got EOF -- exiting thread serving %r',
+ threading.current_thread().get_name())
+ sys.exit(0)
+
+ except Exception:
+ msg = ('#TRACEBACK', format_exc())
+
+ try:
+ try:
+ send(msg)
+ except Exception, e:
+ send(('#UNSERIALIZABLE', repr(msg)))
+ except Exception, e:
+ util.info('exception in thread serving %r',
+ threading.current_thread().get_name())
+ util.info(' ... message was %r', msg)
+ util.info(' ... exception was %r', e)
+ conn.close()
+ sys.exit(1)
+
+ def fallback_getvalue(self, conn, ident, obj):
+ return obj
+
+ def fallback_str(self, conn, ident, obj):
+ return str(obj)
+
+ def fallback_repr(self, conn, ident, obj):
+ return repr(obj)
+
+ fallback_mapping = {
+ '__str__':fallback_str,
+ '__repr__':fallback_repr,
+ '#GETVALUE':fallback_getvalue
+ }
+
+ def dummy(self, c):
+ pass
+
+ def debug_info(self, c):
+ '''
+ Return some info --- useful to spot problems with refcounting
+ '''
+ self.mutex.acquire()
+ try:
+ result = []
+ keys = self.id_to_obj.keys()
+ keys.sort()
+ for ident in keys:
+ if ident != 0:
+ result.append(' %s: refcount=%s\n %s' %
+ (ident, self.id_to_refcount[ident],
+ str(self.id_to_obj[ident][0])[:75]))
+ return '\n'.join(result)
+ finally:
+ self.mutex.release()
+
+ def number_of_objects(self, c):
+ '''
+ Number of shared objects
+ '''
+ return len(self.id_to_obj) - 1 # don't count ident=0
+
+ def shutdown(self, c):
+ '''
+ Shutdown this process
+ '''
+ try:
+ try:
+ util.debug('manager received shutdown message')
+ c.send(('#RETURN', None))
+
+ if sys.stdout != sys.__stdout__:
+ util.debug('resetting stdout, stderr')
+ sys.stdout = sys.__stdout__
+ sys.stderr = sys.__stderr__
+
+ util._run_finalizers(0)
+
+ for p in active_children():
+ util.debug('terminating a child process of manager')
+ p.terminate()
+
+ for p in active_children():
+ util.debug('terminating a child process of manager')
+ p.join()
+
+ util._run_finalizers()
+ util.info('manager exiting with exitcode 0')
+ except:
+ import traceback
+ traceback.print_exc()
+ finally:
+ exit(0)
+
+ def create(self, c, typeid, *args, **kwds):
+ '''
+ Create a new shared object and return its id
+ '''
+ self.mutex.acquire()
+ try:
+ callable, exposed, method_to_typeid, proxytype = \
+ self.registry[typeid]
+
+ if callable is None:
+ assert len(args) == 1 and not kwds
+ obj = args[0]
+ else:
+ obj = callable(*args, **kwds)
+
+ if exposed is None:
+ exposed = public_methods(obj)
+ if method_to_typeid is not None:
+ assert type(method_to_typeid) is dict
+ exposed = list(exposed) + list(method_to_typeid)
+
+ ident = '%x' % id(obj) # convert to string because xmlrpclib
+ # only has 32 bit signed integers
+ util.debug('%r callable returned object with id %r', typeid, ident)
+
+ self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
+ if ident not in self.id_to_refcount:
+ self.id_to_refcount[ident] = None
+ return ident, tuple(exposed)
+ finally:
+ self.mutex.release()
+
+ def get_methods(self, c, token):
+ '''
+ Return the methods of the shared object indicated by token
+ '''
+ return tuple(self.id_to_obj[token.id][1])
+
+ def accept_connection(self, c, name):
+ '''
+ Spawn a new thread to serve this connection
+ '''
+ threading.current_thread().set_name(name)
+ c.send(('#RETURN', None))
+ self.serve_client(c)
+
+ def incref(self, c, ident):
+ self.mutex.acquire()
+ try:
+ try:
+ self.id_to_refcount[ident] += 1
+ except TypeError:
+ assert self.id_to_refcount[ident] is None
+ self.id_to_refcount[ident] = 1
+ finally:
+ self.mutex.release()
+
+ def decref(self, c, ident):
+ self.mutex.acquire()
+ try:
+ assert self.id_to_refcount[ident] >= 1
+ self.id_to_refcount[ident] -= 1
+ if self.id_to_refcount[ident] == 0:
+ del self.id_to_obj[ident], self.id_to_refcount[ident]
+ util.debug('disposing of obj with id %d', ident)
+ finally:
+ self.mutex.release()
+
+#
+# Class to represent state of a manager
+#
+
+class State(object):
+ __slots__ = ['value']
+ INITIAL = 0
+ STARTED = 1
+ SHUTDOWN = 2
+
+#
+# Mapping from serializer name to Listener and Client types
+#
+
+listener_client = {
+ 'pickle' : (connection.Listener, connection.Client),
+ 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
+ }
+
+#
+# Definition of BaseManager
+#
+
+class BaseManager(object):
+ '''
+ Base class for managers
+ '''
+ _registry = {}
+ _Server = Server
+
+ def __init__(self, address=None, authkey=None, serializer='pickle'):
+ if authkey is None:
+ authkey = current_process().get_authkey()
+ self._address = address # XXX not final address if eg ('', 0)
+ self._authkey = AuthenticationString(authkey)
+ self._state = State()
+ self._state.value = State.INITIAL
+ self._serializer = serializer
+ self._Listener, self._Client = listener_client[serializer]
+
+ def __reduce__(self):
+ return type(self).from_address, \
+ (self._address, self._authkey, self._serializer)
+
+ def get_server(self):
+ '''
+ Return server object with serve_forever() method and address attribute
+ '''
+ assert self._state.value == State.INITIAL
+ return Server(self._registry, self._address,
+ self._authkey, self._serializer)
+
+ def connect(self):
+ '''
+ Connect manager object to the server process
+ '''
+ Listener, Client = listener_client[self._serializer]
+ conn = Client(self._address, authkey=self._authkey)
+ dispatch(conn, None, 'dummy')
+ self._state.value = State.STARTED
+
+ def start(self):
+ '''
+ Spawn a server process for this manager object
+ '''
+ assert self._state.value == State.INITIAL
+
+ # pipe over which we will retrieve address of server
+ reader, writer = connection.Pipe(duplex=False)
+
+ # spawn process which runs a server
+ self._process = Process(
+ target=type(self)._run_server,
+ args=(self._registry, self._address, self._authkey,
+ self._serializer, writer),
+ )
+ ident = ':'.join(str(i) for i in self._process._identity)
+ self._process.set_name(type(self).__name__ + '-' + ident)
+ self._process.start()
+
+ # get address of server
+ writer.close()
+ self._address = reader.recv()
+ reader.close()
+
+ # register a finalizer
+ self._state.value = State.STARTED
+ self.shutdown = util.Finalize(
+ self, type(self)._finalize_manager,
+ args=(self._process, self._address, self._authkey,
+ self._state, self._Client),
+ exitpriority=0
+ )
+
+ @classmethod
+ def _run_server(cls, registry, address, authkey, serializer, writer):
+ '''
+ Create a server, report its address and run it
+ '''
+ # create server
+ server = cls._Server(registry, address, authkey, serializer)
+
+ # inform parent process of the server's address
+ writer.send(server.address)
+ writer.close()
+
+ # run the manager
+ util.info('manager serving at %r', server.address)
+ server.serve_forever()
+
+ def _create(self, typeid, *args, **kwds):
+ '''
+ Create a new shared object; return the token and exposed tuple
+ '''
+ assert self._state.value == State.STARTED, 'server not yet started'
+ conn = self._Client(self._address, authkey=self._authkey)
+ try:
+ id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
+ finally:
+ conn.close()
+ return Token(typeid, self._address, id), exposed
+
+ def join(self, timeout=None):
+ '''
+ Join the manager process (if it has been spawned)
+ '''
+ self._process.join(timeout)
+
+ def _debug_info(self):
+ '''
+ Return some info about the servers shared objects and connections
+ '''
+ conn = self._Client(self._address, authkey=self._authkey)
+ try:
+ return dispatch(conn, None, 'debug_info')
+ finally:
+ conn.close()
+
+ def _number_of_objects(self):
+ '''
+ Return the number of shared objects
+ '''
+ conn = self._Client(self._address, authkey=self._authkey)
+ try:
+ return dispatch(conn, None, 'number_of_objects')
+ finally:
+ conn.close()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.shutdown()
+
+ @staticmethod
+ def _finalize_manager(process, address, authkey, state, _Client):
+ '''
+ Shutdown the manager process; will be registered as a finalizer
+ '''
+ if process.is_alive():
+ util.info('sending shutdown message to manager')
+ try:
+ conn = _Client(address, authkey=authkey)
+ try:
+ dispatch(conn, None, 'shutdown')
+ finally:
+ conn.close()
+ except Exception:
+ pass
+
+ process.join(timeout=0.2)
+ if process.is_alive():
+ util.info('manager still alive')
+ if hasattr(process, 'terminate'):
+ util.info('trying to `terminate()` manager process')
+ process.terminate()
+ process.join(timeout=0.1)
+ if process.is_alive():
+ util.info('manager still alive after terminate')
+
+ state.value = State.SHUTDOWN
+ try:
+ del BaseProxy._address_to_local[address]
+ except KeyError:
+ pass
+
+ address = property(lambda self: self._address)
+
+ @classmethod
+ def register(cls, typeid, callable=None, proxytype=None, exposed=None,
+ method_to_typeid=None, create_method=True):
+ '''
+ Register a typeid with the manager type
+ '''
+ if '_registry' not in cls.__dict__:
+ cls._registry = cls._registry.copy()
+
+ if proxytype is None:
+ proxytype = AutoProxy
+
+ exposed = exposed or getattr(proxytype, '_exposed_', None)
+
+ method_to_typeid = method_to_typeid or \
+ getattr(proxytype, '_method_to_typeid_', None)
+
+ if method_to_typeid:
+ for key, value in method_to_typeid.items():
+ assert type(key) is str, '%r is not a string' % key
+ assert type(value) is str, '%r is not a string' % value
+
+ cls._registry[typeid] = (
+ callable, exposed, method_to_typeid, proxytype
+ )
+
+ if create_method:
+ def temp(self, *args, **kwds):
+ util.debug('requesting creation of a shared %r object', typeid)
+ token, exp = self._create(typeid, *args, **kwds)
+ proxy = proxytype(
+ token, self._serializer, manager=self,
+ authkey=self._authkey, exposed=exp
+ )
+ return proxy
+ temp.__name__ = typeid
+ setattr(cls, typeid, temp)
+
+#
+# Subclass of set which get cleared after a fork
+#
+
+class ProcessLocalSet(set):
+ def __init__(self):
+ util.register_after_fork(self, lambda obj: obj.clear())
+ def __reduce__(self):
+ return type(self), ()
+
+#
+# Definition of BaseProxy
+#
+
+class BaseProxy(object):
+ '''
+ A base for proxies of shared objects
+ '''
+ _address_to_local = {}
+ _mutex = util.ForkAwareThreadLock()
+
+ def __init__(self, token, serializer, manager=None,
+ authkey=None, exposed=None, incref=True):
+ BaseProxy._mutex.acquire()
+ try:
+ tls_idset = BaseProxy._address_to_local.get(token.address, None)
+ if tls_idset is None:
+ tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
+ BaseProxy._address_to_local[token.address] = tls_idset
+ finally:
+ BaseProxy._mutex.release()
+
+ # self._tls is used to record the connection used by this
+ # thread to communicate with the manager at token.address
+ self._tls = tls_idset[0]
+
+ # self._idset is used to record the identities of all shared
+ # objects for which the current process owns references and
+ # which are in the manager at token.address
+ self._idset = tls_idset[1]
+
+ self._token = token
+ self._id = self._token.id
+ self._manager = manager
+ self._serializer = serializer
+ self._Client = listener_client[serializer][1]
+
+ if authkey is not None:
+ self._authkey = AuthenticationString(authkey)
+ elif self._manager is not None:
+ self._authkey = self._manager._authkey
+ else:
+ self._authkey = current_process().get_authkey()
+
+ if incref:
+ self._incref()
+
+ util.register_after_fork(self, BaseProxy._after_fork)
+
+ def _connect(self):
+ util.debug('making connection to manager')
+ name = current_process().get_name()
+ if threading.current_thread().get_name() != 'MainThread':
+ name += '|' + threading.current_thread().get_name()
+ conn = self._Client(self._token.address, authkey=self._authkey)
+ dispatch(conn, None, 'accept_connection', (name,))
+ self._tls.connection = conn
+
+ def _callmethod(self, methodname, args=(), kwds={}):
+ '''
+ Try to call a method of the referrent and return a copy of the result
+ '''
+ try:
+ conn = self._tls.connection
+ except AttributeError:
+ util.debug('thread %r does not own a connection',
+ threading.current_thread().get_name())
+ self._connect()
+ conn = self._tls.connection
+
+ conn.send((self._id, methodname, args, kwds))
+ kind, result = conn.recv()
+
+ if kind == '#RETURN':
+ return result
+ elif kind == '#PROXY':
+ exposed, token = result
+ proxytype = self._manager._registry[token.typeid][-1]
+ return proxytype(
+ token, self._serializer, manager=self._manager,
+ authkey=self._authkey, exposed=exposed
+ )
+ raise convert_to_error(kind, result)
+
+ def _getvalue(self):
+ '''
+ Get a copy of the value of the referent
+ '''
+ return self._callmethod('#GETVALUE')
+
+ def _incref(self):
+ conn = self._Client(self._token.address, authkey=self._authkey)
+ dispatch(conn, None, 'incref', (self._id,))
+ util.debug('INCREF %r', self._token.id)
+
+ self._idset.add(self._id)
+
+ state = self._manager and self._manager._state
+
+ self._close = util.Finalize(
+ self, BaseProxy._decref,
+ args=(self._token, self._authkey, state,
+ self._tls, self._idset, self._Client),
+ exitpriority=10
+ )
+
+ @staticmethod
+ def _decref(token, authkey, state, tls, idset, _Client):
+ idset.discard(token.id)
+
+ # check whether manager is still alive
+ if state is None or state.value == State.STARTED:
+ # tell manager this process no longer cares about referent
+ try:
+ util.debug('DECREF %r', token.id)
+ conn = _Client(token.address, authkey=authkey)
+ dispatch(conn, None, 'decref', (token.id,))
+ except Exception, e:
+ util.debug('... decref failed %s', e)
+
+ else:
+ util.debug('DECREF %r -- manager already shutdown', token.id)
+
+ # check whether we can close this thread's connection because
+ # the process owns no more references to objects for this manager
+ if not idset and hasattr(tls, 'connection'):
+ util.debug('thread %r has no more proxies so closing conn',
+ threading.current_thread().get_name())
+ tls.connection.close()
+ del tls.connection
+
+ def _after_fork(self):
+ self._manager = None
+ try:
+ self._incref()
+ except Exception, e:
+ # the proxy may just be for a manager which has shutdown
+ util.info('incref failed: %s' % e)
+
+ def __reduce__(self):
+ kwds = {}
+ if Popen.thread_is_spawning():
+ kwds['authkey'] = self._authkey
+
+ if getattr(self, '_isauto', False):
+ kwds['exposed'] = self._exposed_
+ return (RebuildProxy,
+ (AutoProxy, self._token, self._serializer, kwds))
+ else:
+ return (RebuildProxy,
+ (type(self), self._token, self._serializer, kwds))
+
+ def __deepcopy__(self, memo):
+ return self._getvalue()
+
+ def __repr__(self):
+ return '<%s object, typeid %r at %s>' % \
+ (type(self).__name__, self._token.typeid, '0x%x' % id(self))
+
+ def __str__(self):
+ '''
+ Return representation of the referent (or a fall-back if that fails)
+ '''
+ try:
+ return self._callmethod('__repr__')
+ except Exception:
+ return repr(self)[:-1] + "; '__str__()' failed>"
+
+#
+# Function used for unpickling
+#
+
+def RebuildProxy(func, token, serializer, kwds):
+ '''
+ Function used for unpickling proxy objects.
+
+ If possible the shared object is returned, or otherwise a proxy for it.
+ '''
+ server = getattr(current_process(), '_manager_server', None)
+
+ if server and server.address == token.address:
+ return server.id_to_obj[token.id][0]
+ else:
+ incref = (
+ kwds.pop('incref', True) and
+ not getattr(current_process(), '_inheriting', False)
+ )
+ return func(token, serializer, incref=incref, **kwds)
+
+#
+# Functions to create proxies and proxy types
+#
+
+def MakeProxyType(name, exposed, _cache={}):
+ '''
+ Return an proxy type whose methods are given by `exposed`
+ '''
+ exposed = tuple(exposed)
+ try:
+ return _cache[(name, exposed)]
+ except KeyError:
+ pass
+
+ dic = {}
+
+ for meth in exposed:
+ exec '''def %s(self, *args, **kwds):
+ return self._callmethod(%r, args, kwds)''' % (meth, meth) in dic
+
+ ProxyType = type(name, (BaseProxy,), dic)
+ ProxyType._exposed_ = exposed
+ _cache[(name, exposed)] = ProxyType
+ return ProxyType
+
+
+def AutoProxy(token, serializer, manager=None, authkey=None,
+ exposed=None, incref=True):
+ '''
+ Return an auto-proxy for `token`
+ '''
+ _Client = listener_client[serializer][1]
+
+ if exposed is None:
+ conn = _Client(token.address, authkey=authkey)
+ try:
+ exposed = dispatch(conn, None, 'get_methods', (token,))
+ finally:
+ conn.close()
+
+ if authkey is None and manager is not None:
+ authkey = manager._authkey
+ if authkey is None:
+ authkey = current_process().get_authkey()
+
+ ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
+ proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
+ incref=incref)
+ proxy._isauto = True
+ return proxy
+
+#
+# Types/callables which we will register with SyncManager
+#
+
+class Namespace(object):
+ def __init__(self, **kwds):
+ self.__dict__.update(kwds)
+ def __repr__(self):
+ items = self.__dict__.items()
+ temp = []
+ for name, value in items:
+ if not name.startswith('_'):
+ temp.append('%s=%r' % (name, value))
+ temp.sort()
+ return 'Namespace(%s)' % str.join(', ', temp)
+
+class Value(object):
+ def __init__(self, typecode, value, lock=True):
+ self._typecode = typecode
+ self._value = value
+ def get(self):
+ return self._value
+ def set(self, value):
+ self._value = value
+ def __repr__(self):
+ return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
+ value = property(get, set)
+
+def Array(typecode, sequence, lock=True):
+ return array.array(typecode, sequence)
+
+#
+# Proxy types used by SyncManager
+#
+
+class IteratorProxy(BaseProxy):
+ # XXX remove methods for Py3.0 and Py2.6
+ _exposed_ = ('__next__', 'next', 'send', 'throw', 'close')
+ def __iter__(self):
+ return self
+ def __next__(self, *args):
+ return self._callmethod('__next__', args)
+ def next(self, *args):
+ return self._callmethod('next', args)
+ def send(self, *args):
+ return self._callmethod('send', args)
+ def throw(self, *args):
+ return self._callmethod('throw', args)
+ def close(self, *args):
+ return self._callmethod('close', args)
+
+
+class AcquirerProxy(BaseProxy):
+ _exposed_ = ('acquire', 'release')
+ def acquire(self, blocking=True):
+ return self._callmethod('acquire', (blocking,))
+ def release(self):
+ return self._callmethod('release')
+ def __enter__(self):
+ return self._callmethod('acquire')
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ return self._callmethod('release')
+
+
+class ConditionProxy(AcquirerProxy):
+ # XXX will Condition.notfyAll() name be available in Py3.0?
+ _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
+ def wait(self, timeout=None):
+ return self._callmethod('wait', (timeout,))
+ def notify(self):
+ return self._callmethod('notify')
+ def notify_all(self):
+ return self._callmethod('notify_all')
+
+class EventProxy(BaseProxy):
+ # XXX will Event.isSet name be available in Py3.0?
+ _exposed_ = ('isSet', 'set', 'clear', 'wait')
+ def is_set(self):
+ return self._callmethod('isSet')
+ def set(self):
+ return self._callmethod('set')
+ def clear(self):
+ return self._callmethod('clear')
+ def wait(self, timeout=None):
+ return self._callmethod('wait', (timeout,))
+
+class NamespaceProxy(BaseProxy):
+ _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
+ def __getattr__(self, key):
+ if key[0] == '_':
+ return object.__getattribute__(self, key)
+ callmethod = object.__getattribute__(self, '_callmethod')
+ return callmethod('__getattribute__', (key,))
+ def __setattr__(self, key, value):
+ if key[0] == '_':
+ return object.__setattr__(self, key, value)
+ callmethod = object.__getattribute__(self, '_callmethod')
+ return callmethod('__setattr__', (key, value))
+ def __delattr__(self, key):
+ if key[0] == '_':
+ return object.__delattr__(self, key)
+ callmethod = object.__getattribute__(self, '_callmethod')
+ return callmethod('__delattr__', (key,))
+
+
+class ValueProxy(BaseProxy):
+ _exposed_ = ('get', 'set')
+ def get(self):
+ return self._callmethod('get')
+ def set(self, value):
+ return self._callmethod('set', (value,))
+ value = property(get, set)
+
+
+BaseListProxy = MakeProxyType('BaseListProxy', (
+ '__add__', '__contains__', '__delitem__', '__delslice__',
+ '__getitem__', '__getslice__', '__len__', '__mul__',
+ '__reversed__', '__rmul__', '__setitem__', '__setslice__',
+ 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
+ 'reverse', 'sort', '__imul__'
+ )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
+class ListProxy(BaseListProxy):
+ def __iadd__(self, value):
+ self._callmethod('extend', (value,))
+ return self
+ def __imul__(self, value):
+ self._callmethod('__imul__', (value,))
+ return self
+
+
+DictProxy = MakeProxyType('DictProxy', (
+ '__contains__', '__delitem__', '__getitem__', '__len__',
+ '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
+ 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
+ ))
+
+
+ArrayProxy = MakeProxyType('ArrayProxy', (
+ '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
+ )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
+
+
+PoolProxy = MakeProxyType('PoolProxy', (
+ 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
+ 'map', 'map_async', 'terminate'
+ ))
+PoolProxy._method_to_typeid_ = {
+ 'apply_async': 'AsyncResult',
+ 'map_async': 'AsyncResult',
+ 'imap': 'Iterator',
+ 'imap_unordered': 'Iterator'
+ }
+
+#
+# Definition of SyncManager
+#
+
+class SyncManager(BaseManager):
+ '''
+ Subclass of `BaseManager` which supports a number of shared object types.
+
+ The types registered are those intended for the synchronization
+ of threads, plus `dict`, `list` and `Namespace`.
+
+ The `multiprocessing.Manager()` function creates started instances of
+ this class.
+ '''
+
+SyncManager.register('Queue', Queue.Queue)
+SyncManager.register('JoinableQueue', Queue.Queue)
+SyncManager.register('Event', threading.Event, EventProxy)
+SyncManager.register('Lock', threading.Lock, AcquirerProxy)
+SyncManager.register('RLock', threading.RLock, AcquirerProxy)
+SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
+SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
+ AcquirerProxy)
+SyncManager.register('Condition', threading.Condition, ConditionProxy)
+SyncManager.register('Pool', Pool, PoolProxy)
+SyncManager.register('list', list, ListProxy)
+SyncManager.register('dict', dict, DictProxy)
+SyncManager.register('Value', Value, ValueProxy)
+SyncManager.register('Array', Array, ArrayProxy)
+SyncManager.register('Namespace', Namespace, NamespaceProxy)
+
+# types returned by methods of PoolProxy
+SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
+SyncManager.register('AsyncResult', create_method=False)
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index 79f0a29..8aaec63 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -1,596 +1,596 @@
-#
-# Module providing the `Pool` class for managing a process pool
-#
-# multiprocessing/pool.py
-#
-# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
-#
-
-__all__ = ['Pool']
-
-#
-# Imports
-#
-
-import threading
-import Queue
-import itertools
-import collections
-import time
-
-from multiprocessing import Process, cpu_count, TimeoutError
-from multiprocessing.util import Finalize, debug
-
-#
-# Constants representing the state of a pool
-#
-
-RUN = 0
-CLOSE = 1
-TERMINATE = 2
-
-#
-# Miscellaneous
-#
-
-job_counter = itertools.count()
-
-def mapstar(args):
- return map(*args)
-
-#
-# Code run by worker processes
-#
-
-def worker(inqueue, outqueue, initializer=None, initargs=()):
- put = outqueue.put
- get = inqueue.get
- if hasattr(inqueue, '_writer'):
- inqueue._writer.close()
- outqueue._reader.close()
-
- if initializer is not None:
- initializer(*initargs)
-
- while 1:
- try:
- task = get()
- except (EOFError, IOError):
- debug('worker got EOFError or IOError -- exiting')
- break
-
- if task is None:
- debug('worker got sentinel -- exiting')
- break
-
- job, i, func, args, kwds = task
- try:
- result = (True, func(*args, **kwds))
- except Exception, e:
- result = (False, e)
- put((job, i, result))
-
-#
-# Class representing a process pool
-#
-
-class Pool(object):
- '''
- Class which supports an async version of the `apply()` builtin
- '''
- Process = Process
-
- def __init__(self, processes=None, initializer=None, initargs=()):
- self._setup_queues()
- self._taskqueue = Queue.Queue()
- self._cache = {}
- self._state = RUN
-
- if processes is None:
- try:
- processes = cpu_count()
- except NotImplementedError:
- processes = 1
-
- self._pool = []
- for i in range(processes):
- w = self.Process(
- target=worker,
- args=(self._inqueue, self._outqueue, initializer, initargs)
- )
- self._pool.append(w)
- w.set_name(w.get_name().replace('Process', 'PoolWorker'))
- w.set_daemon(True)
- w.start()
-
- self._task_handler = threading.Thread(
- target=Pool._handle_tasks,
- args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
- )
- self._task_handler.set_daemon(True)
- self._task_handler._state = RUN
- self._task_handler.start()
-
- self._result_handler = threading.Thread(
- target=Pool._handle_results,
- args=(self._outqueue, self._quick_get, self._cache)
- )
- self._result_handler.set_daemon(True)
- self._result_handler._state = RUN
- self._result_handler.start()
-
- self._terminate = Finalize(
- self, self._terminate_pool,
- args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
- self._task_handler, self._result_handler, self._cache),
- exitpriority=15
- )
-
- def _setup_queues(self):
- from .queues import SimpleQueue
- self._inqueue = SimpleQueue()
- self._outqueue = SimpleQueue()
- self._quick_put = self._inqueue._writer.send
- self._quick_get = self._outqueue._reader.recv
-
- def apply(self, func, args=(), kwds={}):
- '''
- Equivalent of `apply()` builtin
- '''
- assert self._state == RUN
- return self.apply_async(func, args, kwds).get()
-
- def map(self, func, iterable, chunksize=None):
- '''
- Equivalent of `map()` builtin
- '''
- assert self._state == RUN
- return self.map_async(func, iterable, chunksize).get()
-
- def imap(self, func, iterable, chunksize=1):
- '''
- Equivalent of `itertool.imap()` -- can be MUCH slower than `Pool.map()`
- '''
- assert self._state == RUN
- if chunksize == 1:
- result = IMapIterator(self._cache)
- self._taskqueue.put((((result._job, i, func, (x,), {})
- for i, x in enumerate(iterable)), result._set_length))
- return result
- else:
- assert chunksize > 1
- task_batches = Pool._get_tasks(func, iterable, chunksize)
- result = IMapIterator(self._cache)
- self._taskqueue.put((((result._job, i, mapstar, (x,), {})
- for i, x in enumerate(task_batches)), result._set_length))
- return (item for chunk in result for item in chunk)
-
- def imap_unordered(self, func, iterable, chunksize=1):
- '''
- Like `imap()` method but ordering of results is arbitrary
- '''
- assert self._state == RUN
- if chunksize == 1:
- result = IMapUnorderedIterator(self._cache)
- self._taskqueue.put((((result._job, i, func, (x,), {})
- for i, x in enumerate(iterable)), result._set_length))
- return result
- else:
- assert chunksize > 1
- task_batches = Pool._get_tasks(func, iterable, chunksize)
- result = IMapUnorderedIterator(self._cache)
- self._taskqueue.put((((result._job, i, mapstar, (x,), {})
- for i, x in enumerate(task_batches)), result._set_length))
- return (item for chunk in result for item in chunk)
-
- def apply_async(self, func, args=(), kwds={}, callback=None):
- '''
- Asynchronous equivalent of `apply()` builtin
- '''
- assert self._state == RUN
- result = ApplyResult(self._cache, callback)
- self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
- return result
-
- def map_async(self, func, iterable, chunksize=None, callback=None):
- '''
- Asynchronous equivalent of `map()` builtin
- '''
- assert self._state == RUN
- if not hasattr(iterable, '__len__'):
- iterable = list(iterable)
-
- if chunksize is None:
- chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
- if extra:
- chunksize += 1
-
- task_batches = Pool._get_tasks(func, iterable, chunksize)
- result = MapResult(self._cache, chunksize, len(iterable), callback)
- self._taskqueue.put((((result._job, i, mapstar, (x,), {})
- for i, x in enumerate(task_batches)), None))
- return result
-
- @staticmethod
- def _handle_tasks(taskqueue, put, outqueue, pool):
- thread = threading.current_thread()
-
- for taskseq, set_length in iter(taskqueue.get, None):
- i = -1
- for i, task in enumerate(taskseq):
- if thread._state:
- debug('task handler found thread._state != RUN')
- break
- try:
- put(task)
- except IOError:
- debug('could not put task on queue')
- break
- else:
- if set_length:
- debug('doing set_length()')
- set_length(i+1)
- continue
- break
- else:
- debug('task handler got sentinel')
-
-
- try:
- # tell result handler to finish when cache is empty
- debug('task handler sending sentinel to result handler')
- outqueue.put(None)
-
- # tell workers there is no more work
- debug('task handler sending sentinel to workers')
- for p in pool:
- put(None)
- except IOError:
- debug('task handler got IOError when sending sentinels')
-
- debug('task handler exiting')
-
- @staticmethod
- def _handle_results(outqueue, get, cache):
- thread = threading.current_thread()
-
- while 1:
- try:
- task = get()
- except (IOError, EOFError):
- debug('result handler got EOFError/IOError -- exiting')
- return
-
- if thread._state:
- assert thread._state == TERMINATE
- debug('result handler found thread._state=TERMINATE')
- break
-
- if task is None:
- debug('result handler got sentinel')
- break
-
- job, i, obj = task
- try:
- cache[job]._set(i, obj)
- except KeyError:
- pass
-
- while cache and thread._state != TERMINATE:
- try:
- task = get()
- except (IOError, EOFError):
- debug('result handler got EOFError/IOError -- exiting')
- return
-
- if task is None:
- debug('result handler ignoring extra sentinel')
- continue
- job, i, obj = task
- try:
- cache[job]._set(i, obj)
- except KeyError:
- pass
-
- if hasattr(outqueue, '_reader'):
- debug('ensuring that outqueue is not full')
- # If we don't make room available in outqueue then
- # attempts to add the sentinel (None) to outqueue may
- # block. There is guaranteed to be no more than 2 sentinels.
- try:
- for i in range(10):
- if not outqueue._reader.poll():
- break
- get()
- except (IOError, EOFError):
- pass
-
- debug('result handler exiting: len(cache)=%s, thread._state=%s',
- len(cache), thread._state)
-
- @staticmethod
- def _get_tasks(func, it, size):
- it = iter(it)
- while 1:
- x = tuple(itertools.islice(it, size))
- if not x:
- return
- yield (func, x)
-
- def __reduce__(self):
- raise NotImplementedError(
- 'pool objects cannot be passed between processes or pickled'
- )
-
- def close(self):
- debug('closing pool')
- if self._state == RUN:
- self._state = CLOSE
- self._taskqueue.put(None)
-
- def terminate(self):
- debug('terminating pool')
- self._state = TERMINATE
- self._terminate()
-
- def join(self):
- debug('joining pool')
- assert self._state in (CLOSE, TERMINATE)
- self._task_handler.join()
- self._result_handler.join()
- for p in self._pool:
- p.join()
-
- @staticmethod
- def _help_stuff_finish(inqueue, task_handler, size):
- # task_handler may be blocked trying to put items on inqueue
- debug('removing tasks from inqueue until task handler finished')
- inqueue._rlock.acquire()
- while task_handler.is_alive() and inqueue._reader.poll():
- inqueue._reader.recv()
- time.sleep(0)
-
- @classmethod
- def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
- task_handler, result_handler, cache):
- # this is guaranteed to only be called once
- debug('finalizing pool')
-
- task_handler._state = TERMINATE
- taskqueue.put(None) # sentinel
-
- debug('helping task handler/workers to finish')
- cls._help_stuff_finish(inqueue, task_handler, len(pool))
-
- assert result_handler.is_alive() or len(cache) == 0
-
- result_handler._state = TERMINATE
- outqueue.put(None) # sentinel
-
- if pool and hasattr(pool[0], 'terminate'):
- debug('terminating workers')
- for p in pool:
- p.terminate()
-
- debug('joining task handler')
- task_handler.join(1e100)
-
- debug('joining result handler')
- result_handler.join(1e100)
-
- if pool and hasattr(pool[0], 'terminate'):
- debug('joining pool workers')
- for p in pool:
- p.join()
-
-#
-# Class whose instances are returned by `Pool.apply_async()`
-#
-
-class ApplyResult(object):
-
- def __init__(self, cache, callback):
- self._cond = threading.Condition(threading.Lock())
- self._job = job_counter.next()
- self._cache = cache
- self._ready = False
- self._callback = callback
- cache[self._job] = self
-
- def ready(self):
- return self._ready
-
- def successful(self):
- assert self._ready
- return self._success
-
- def wait(self, timeout=None):
- self._cond.acquire()
- try:
- if not self._ready:
- self._cond.wait(timeout)
- finally:
- self._cond.release()
-
- def get(self, timeout=None):
- self.wait(timeout)
- if not self._ready:
- raise TimeoutError
- if self._success:
- return self._value
- else:
- raise self._value
-
- def _set(self, i, obj):
- self._success, self._value = obj
- if self._callback and self._success:
- self._callback(self._value)
- self._cond.acquire()
- try:
- self._ready = True
- self._cond.notify()
- finally:
- self._cond.release()
- del self._cache[self._job]
-
-#
-# Class whose instances are returned by `Pool.map_async()`
-#
-
-class MapResult(ApplyResult):
-
- def __init__(self, cache, chunksize, length, callback):
- ApplyResult.__init__(self, cache, callback)
- self._success = True
- self._value = [None] * length
- self._chunksize = chunksize
- if chunksize <= 0:
- self._number_left = 0
- self._ready = True
- else:
- self._number_left = length//chunksize + bool(length % chunksize)
-
- def _set(self, i, success_result):
- success, result = success_result
- if success:
- self._value[i*self._chunksize:(i+1)*self._chunksize] = result
- self._number_left -= 1
- if self._number_left == 0:
- if self._callback:
- self._callback(self._value)
- del self._cache[self._job]
- self._cond.acquire()
- try:
- self._ready = True
- self._cond.notify()
- finally:
- self._cond.release()
-
- else:
- self._success = False
- self._value = result
- del self._cache[self._job]
- self._cond.acquire()
- try:
- self._ready = True
- self._cond.notify()
- finally:
- self._cond.release()
-
-#
-# Class whose instances are returned by `Pool.imap()`
-#
-
-class IMapIterator(object):
-
- def __init__(self, cache):
- self._cond = threading.Condition(threading.Lock())
- self._job = job_counter.next()
- self._cache = cache
- self._items = collections.deque()
- self._index = 0
- self._length = None
- self._unsorted = {}
- cache[self._job] = self
-
- def __iter__(self):
- return self
-
- def next(self, timeout=None):
- self._cond.acquire()
- try:
- try:
- item = self._items.popleft()
- except IndexError:
- if self._index == self._length:
- raise StopIteration
- self._cond.wait(timeout)
- try:
- item = self._items.popleft()
- except IndexError:
- if self._index == self._length:
- raise StopIteration
- raise TimeoutError
- finally:
- self._cond.release()
-
- success, value = item
- if success:
- return value
- raise value
-
- __next__ = next # XXX
-
- def _set(self, i, obj):
- self._cond.acquire()
- try:
- if self._index == i:
- self._items.append(obj)
- self._index += 1
- while self._index in self._unsorted:
- obj = self._unsorted.pop(self._index)
- self._items.append(obj)
- self._index += 1
- self._cond.notify()
- else:
- self._unsorted[i] = obj
-
- if self._index == self._length:
- del self._cache[self._job]
- finally:
- self._cond.release()
-
- def _set_length(self, length):
- self._cond.acquire()
- try:
- self._length = length
- if self._index == self._length:
- self._cond.notify()
- del self._cache[self._job]
- finally:
- self._cond.release()
-
-#
-# Class whose instances are returned by `Pool.imap_unordered()`
-#
-
-class IMapUnorderedIterator(IMapIterator):
-
- def _set(self, i, obj):
- self._cond.acquire()
- try:
- self._items.append(obj)
- self._index += 1
- self._cond.notify()
- if self._index == self._length:
- del self._cache[self._job]
- finally:
- self._cond.release()
-
-#
-#
-#
-
-class ThreadPool(Pool):
-
- from .dummy import Process
-
- def __init__(self, processes=None, initializer=None, initargs=()):
- Pool.__init__(self, processes, initializer, initargs)
-
- def _setup_queues(self):
- self._inqueue = Queue.Queue()
- self._outqueue = Queue.Queue()
- self._quick_put = self._inqueue.put
- self._quick_get = self._outqueue.get
-
- @staticmethod
- def _help_stuff_finish(inqueue, task_handler, size):
- # put sentinels at head of inqueue to make workers finish
- inqueue.not_empty.acquire()
- try:
- inqueue.queue.clear()
- inqueue.queue.extend([None] * size)
- inqueue.not_empty.notify_all()
- finally:
- inqueue.not_empty.release()
+#
+# Module providing the `Pool` class for managing a process pool
+#
+# multiprocessing/pool.py
+#
+# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
+#
+
+__all__ = ['Pool']
+
+#
+# Imports
+#
+
+import threading
+import Queue
+import itertools
+import collections
+import time
+
+from multiprocessing import Process, cpu_count, TimeoutError
+from multiprocessing.util import Finalize, debug
+
+#
+# Constants representing the state of a pool
+#
+
+RUN = 0
+CLOSE = 1
+TERMINATE = 2
+
+#
+# Miscellaneous
+#
+
+job_counter = itertools.count()
+
+def mapstar(args):
+ return map(*args)
+
+#
+# Code run by worker processes
+#
+
+def worker(inqueue, outqueue, initializer=None, initargs=()):
+ put = outqueue.put
+ get = inqueue.get
+ if hasattr(inqueue, '_writer'):
+ inqueue._writer.close()
+ outqueue._reader.close()
+
+ if initializer is not None:
+ initializer(*initargs)
+
+ while 1:
+ try:
+ task = get()
+ except (EOFError, IOError):
+ debug('worker got EOFError or IOError -- exiting')
+ break
+
+ if task is None:
+ debug('worker got sentinel -- exiting')
+ break
+
+ job, i, func, args, kwds = task
+ try:
+ result = (True, func(*args, **kwds))
+ except Exception, e:
+ result = (False, e)
+ put((job, i, result))
+
+#
+# Class representing a process pool
+#
+
+class Pool(object):
+ '''
+ Class which supports an async version of the `apply()` builtin
+ '''
+ Process = Process
+
+ def __init__(self, processes=None, initializer=None, initargs=()):
+ self._setup_queues()
+ self._taskqueue = Queue.Queue()
+ self._cache = {}
+ self._state = RUN
+
+ if processes is None:
+ try:
+ processes = cpu_count()
+ except NotImplementedError:
+ processes = 1
+
+ self._pool = []
+ for i in range(processes):
+ w = self.Process(
+ target=worker,
+ args=(self._inqueue, self._outqueue, initializer, initargs)
+ )
+ self._pool.append(w)
+ w.set_name(w.get_name().replace('Process', 'PoolWorker'))
+ w.set_daemon(True)
+ w.start()
+
+ self._task_handler = threading.Thread(
+ target=Pool._handle_tasks,
+ args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
+ )
+ self._task_handler.set_daemon(True)
+ self._task_handler._state = RUN
+ self._task_handler.start()
+
+ self._result_handler = threading.Thread(
+ target=Pool._handle_results,
+ args=(self._outqueue, self._quick_get, self._cache)
+ )
+ self._result_handler.set_daemon(True)
+ self._result_handler._state = RUN
+ self._result_handler.start()
+
+ self._terminate = Finalize(
+ self, self._terminate_pool,
+ args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
+ self._task_handler, self._result_handler, self._cache),
+ exitpriority=15
+ )
+
+ def _setup_queues(self):
+ from .queues import SimpleQueue
+ self._inqueue = SimpleQueue()
+ self._outqueue = SimpleQueue()
+ self._quick_put = self._inqueue._writer.send
+ self._quick_get = self._outqueue._reader.recv
+
+ def apply(self, func, args=(), kwds={}):
+ '''
+ Equivalent of `apply()` builtin
+ '''
+ assert self._state == RUN
+ return self.apply_async(func, args, kwds).get()
+
+ def map(self, func, iterable, chunksize=None):
+ '''
+ Equivalent of `map()` builtin
+ '''
+ assert self._state == RUN
+ return self.map_async(func, iterable, chunksize).get()
+
+ def imap(self, func, iterable, chunksize=1):
+ '''
+ Equivalent of `itertool.imap()` -- can be MUCH slower than `Pool.map()`
+ '''
+ assert self._state == RUN
+ if chunksize == 1:
+ result = IMapIterator(self._cache)
+ self._taskqueue.put((((result._job, i, func, (x,), {})
+ for i, x in enumerate(iterable)), result._set_length))
+ return result
+ else:
+ assert chunksize > 1
+ task_batches = Pool._get_tasks(func, iterable, chunksize)
+ result = IMapIterator(self._cache)
+ self._taskqueue.put((((result._job, i, mapstar, (x,), {})
+ for i, x in enumerate(task_batches)), result._set_length))
+ return (item for chunk in result for item in chunk)
+
+ def imap_unordered(self, func, iterable, chunksize=1):
+ '''
+ Like `imap()` method but ordering of results is arbitrary
+ '''
+ assert self._state == RUN
+ if chunksize == 1:
+ result = IMapUnorderedIterator(self._cache)
+ self._taskqueue.put((((result._job, i, func, (x,), {})
+ for i, x in enumerate(iterable)), result._set_length))
+ return result
+ else:
+ assert chunksize > 1
+ task_batches = Pool._get_tasks(func, iterable, chunksize)
+ result = IMapUnorderedIterator(self._cache)
+ self._taskqueue.put((((result._job, i, mapstar, (x,), {})
+ for i, x in enumerate(task_batches)), result._set_length))
+ return (item for chunk in result for item in chunk)
+
+ def apply_async(self, func, args=(), kwds={}, callback=None):
+ '''
+ Asynchronous equivalent of `apply()` builtin
+ '''
+ assert self._state == RUN
+ result = ApplyResult(self._cache, callback)
+ self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
+ return result
+
+ def map_async(self, func, iterable, chunksize=None, callback=None):
+ '''
+ Asynchronous equivalent of `map()` builtin
+ '''
+ assert self._state == RUN
+ if not hasattr(iterable, '__len__'):
+ iterable = list(iterable)
+
+ if chunksize is None:
+ chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
+ if extra:
+ chunksize += 1
+
+ task_batches = Pool._get_tasks(func, iterable, chunksize)
+ result = MapResult(self._cache, chunksize, len(iterable), callback)
+ self._taskqueue.put((((result._job, i, mapstar, (x,), {})
+ for i, x in enumerate(task_batches)), None))
+ return result
+
+ @staticmethod
+ def _handle_tasks(taskqueue, put, outqueue, pool):
+ thread = threading.current_thread()
+
+ for taskseq, set_length in iter(taskqueue.get, None):
+ i = -1
+ for i, task in enumerate(taskseq):
+ if thread._state:
+ debug('task handler found thread._state != RUN')
+ break
+ try:
+ put(task)
+ except IOError:
+ debug('could not put task on queue')
+ break
+ else:
+ if set_length:
+ debug('doing set_length()')
+ set_length(i+1)
+ continue
+ break
+ else:
+ debug('task handler got sentinel')
+
+
+ try:
+ # tell result handler to finish when cache is empty
+ debug('task handler sending sentinel to result handler')
+ outqueue.put(None)
+
+ # tell workers there is no more work
+ debug('task handler sending sentinel to workers')
+ for p in pool:
+ put(None)
+ except IOError:
+ debug('task handler got IOError when sending sentinels')
+
+ debug('task handler exiting')
+
+ @staticmethod
+ def _handle_results(outqueue, get, cache):
+ thread = threading.current_thread()
+
+ while 1:
+ try:
+ task = get()
+ except (IOError, EOFError):
+ debug('result handler got EOFError/IOError -- exiting')
+ return
+
+ if thread._state:
+ assert thread._state == TERMINATE
+ debug('result handler found thread._state=TERMINATE')
+ break
+
+ if task is None:
+ debug('result handler got sentinel')
+ break
+
+ job, i, obj = task
+ try:
+ cache[job]._set(i, obj)
+ except KeyError:
+ pass
+
+ while cache and thread._state != TERMINATE:
+ try:
+ task = get()
+ except (IOError, EOFError):
+ debug('result handler got EOFError/IOError -- exiting')
+ return
+
+ if task is None:
+ debug('result handler ignoring extra sentinel')
+ continue
+ job, i, obj = task
+ try:
+ cache[job]._set(i, obj)
+ except KeyError:
+ pass
+
+ if hasattr(outqueue, '_reader'):
+ debug('ensuring that outqueue is not full')
+ # If we don't make room available in outqueue then
+ # attempts to add the sentinel (None) to outqueue may
+ # block. There is guaranteed to be no more than 2 sentinels.
+ try:
+ for i in range(10):
+ if not outqueue._reader.poll():
+ break
+ get()
+ except (IOError, EOFError):
+ pass
+
+ debug('result handler exiting: len(cache)=%s, thread._state=%s',
+ len(cache), thread._state)
+
+ @staticmethod
+ def _get_tasks(func, it, size):
+ it = iter(it)
+ while 1:
+ x = tuple(itertools.islice(it, size))
+ if not x:
+ return
+ yield (func, x)
+
+ def __reduce__(self):
+ raise NotImplementedError(
+ 'pool objects cannot be passed between processes or pickled'
+ )
+
+ def close(self):
+ debug('closing pool')
+ if self._state == RUN:
+ self._state = CLOSE
+ self._taskqueue.put(None)
+
+ def terminate(self):
+ debug('terminating pool')
+ self._state = TERMINATE
+ self._terminate()
+
+ def join(self):
+ debug('joining pool')
+ assert self._state in (CLOSE, TERMINATE)
+ self._task_handler.join()
+ self._result_handler.join()
+ for p in self._pool:
+ p.join()
+
+ @staticmethod
+ def _help_stuff_finish(inqueue, task_handler, size):
+ # task_handler may be blocked trying to put items on inqueue
+ debug('removing tasks from inqueue until task handler finished')
+ inqueue._rlock.acquire()
+ while task_handler.is_alive() and inqueue._reader.poll():
+ inqueue._reader.recv()
+ time.sleep(0)
+
+ @classmethod
+ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
+ task_handler, result_handler, cache):
+ # this is guaranteed to only be called once
+ debug('finalizing pool')
+
+ task_handler._state = TERMINATE
+ taskqueue.put(None) # sentinel
+
+ debug('helping task handler/workers to finish')
+ cls._help_stuff_finish(inqueue, task_handler, len(pool))
+
+ assert result_handler.is_alive() or len(cache) == 0
+
+ result_handler._state = TERMINATE
+ outqueue.put(None) # sentinel
+
+ if pool and hasattr(pool[0], 'terminate'):
+ debug('terminating workers')
+ for p in pool:
+ p.terminate()
+
+ debug('joining task handler')
+ task_handler.join(1e100)
+
+ debug('joining result handler')
+ result_handler.join(1e100)
+
+ if pool and hasattr(pool[0], 'terminate'):
+ debug('joining pool workers')
+ for p in pool:
+ p.join()
+
+#
+# Class whose instances are returned by `Pool.apply_async()`
+#
+
+class ApplyResult(object):
+
+ def __init__(self, cache, callback):
+ self._cond = threading.Condition(threading.Lock())
+ self._job = job_counter.next()
+ self._cache = cache
+ self._ready = False
+ self._callback = callback
+ cache[self._job] = self
+
+ def ready(self):
+ return self._ready
+
+ def successful(self):
+ assert self._ready
+ return self._success
+
+ def wait(self, timeout=None):
+ self._cond.acquire()
+ try:
+ if not self._ready:
+ self._cond.wait(timeout)
+ finally:
+ self._cond.release()
+
+ def get(self, timeout=None):
+ self.wait(timeout)
+ if not self._ready:
+ raise TimeoutError
+ if self._success:
+ return self._value
+ else:
+ raise self._value
+
+ def _set(self, i, obj):
+ self._success, self._value = obj
+ if self._callback and self._success:
+ self._callback(self._value)
+ self._cond.acquire()
+ try:
+ self._ready = True
+ self._cond.notify()
+ finally:
+ self._cond.release()
+ del self._cache[self._job]
+
+#
+# Class whose instances are returned by `Pool.map_async()`
+#
+
+class MapResult(ApplyResult):
+
+ def __init__(self, cache, chunksize, length, callback):
+ ApplyResult.__init__(self, cache, callback)
+ self._success = True
+ self._value = [None] * length
+ self._chunksize = chunksize
+ if chunksize <= 0:
+ self._number_left = 0
+ self._ready = True
+ else:
+ self._number_left = length//chunksize + bool(length % chunksize)
+
+ def _set(self, i, success_result):
+ success, result = success_result
+ if success:
+ self._value[i*self._chunksize:(i+1)*self._chunksize] = result
+ self._number_left -= 1
+ if self._number_left == 0:
+ if self._callback:
+ self._callback(self._value)
+ del self._cache[self._job]
+ self._cond.acquire()
+ try:
+ self._ready = True
+ self._cond.notify()
+ finally:
+ self._cond.release()
+
+ else:
+ self._success = False
+ self._value = result
+ del self._cache[self._job]
+ self._cond.acquire()
+ try:
+ self._ready = True
+ self._cond.notify()
+ finally:
+ self._cond.release()
+
+#
+# Class whose instances are returned by `Pool.imap()`
+#
+
+class IMapIterator(object):
+
+ def __init__(self, cache):
+ self._cond = threading.Condition(threading.Lock())
+ self._job = job_counter.next()
+ self._cache = cache
+ self._items = collections.deque()
+ self._index = 0
+ self._length = None
+ self._unsorted = {}
+ cache[self._job] = self
+
+ def __iter__(self):
+ return self
+
+ def next(self, timeout=None):
+ self._cond.acquire()
+ try:
+ try:
+ item = self._items.popleft()
+ except IndexError:
+ if self._index == self._length:
+ raise StopIteration
+ self._cond.wait(timeout)
+ try:
+ item = self._items.popleft()
+ except IndexError:
+ if self._index == self._length:
+ raise StopIteration
+ raise TimeoutError
+ finally:
+ self._cond.release()
+
+ success, value = item
+ if success:
+ return value
+ raise value
+
+ __next__ = next # XXX
+
+ def _set(self, i, obj):
+ self._cond.acquire()
+ try:
+ if self._index == i:
+ self._items.append(obj)
+ self._index += 1
+ while self._index in self._unsorted:
+ obj = self._unsorted.pop(self._index)
+ self._items.append(obj)
+ self._index += 1
+ self._cond.notify()
+ else:
+ self._unsorted[i] = obj
+
+ if self._index == self._length:
+ del self._cache[self._job]
+ finally:
+ self._cond.release()
+
+ def _set_length(self, length):
+ self._cond.acquire()
+ try:
+ self._length = length
+ if self._index == self._length:
+ self._cond.notify()
+ del self._cache[self._job]
+ finally:
+ self._cond.release()
+
+#
+# Class whose instances are returned by `Pool.imap_unordered()`
+#
+
+class IMapUnorderedIterator(IMapIterator):
+
+ def _set(self, i, obj):
+ self._cond.acquire()
+ try:
+ self._items.append(obj)
+ self._index += 1
+ self._cond.notify()
+ if self._index == self._length:
+ del self._cache[self._job]
+ finally:
+ self._cond.release()
+
+#
+#
+#
+
+class ThreadPool(Pool):
+
+ from .dummy import Process
+
+ def __init__(self, processes=None, initializer=None, initargs=()):
+ Pool.__init__(self, processes, initializer, initargs)
+
+ def _setup_queues(self):
+ self._inqueue = Queue.Queue()
+ self._outqueue = Queue.Queue()
+ self._quick_put = self._inqueue.put
+ self._quick_get = self._outqueue.get
+
+ @staticmethod
+ def _help_stuff_finish(inqueue, task_handler, size):
+ # put sentinels at head of inqueue to make workers finish
+ inqueue.not_empty.acquire()
+ try:
+ inqueue.queue.clear()
+ inqueue.queue.extend([None] * size)
+ inqueue.not_empty.notify_all()
+ finally:
+ inqueue.not_empty.release()
diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py
index 43d8297..e1189cf 100644
--- a/Lib/multiprocessing/process.py
+++ b/Lib/multiprocessing/process.py
@@ -1,302 +1,302 @@
-#
-# Module providing the `Process` class which emulates `threading.Thread`
-#
-# multiprocessing/process.py
-#
-# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
-#
-
-__all__ = ['Process', 'current_process', 'active_children']
-
-#
-# Imports
-#
-
-import os
-import sys
-import signal
-import itertools
-
-#
-#
-#
-
-try:
- ORIGINAL_DIR = os.path.abspath(os.getcwd())
-except OSError:
- ORIGINAL_DIR = None
-
-try:
- bytes
-except NameError:
- bytes = str # XXX not needed in Py2.6 and Py3.0
-
-#
-# Public functions
-#
-
-def current_process():
- '''
- Return process object representing the current process
- '''
- return _current_process
-
-def active_children():
- '''
- Return list of process objects corresponding to live child processes
- '''
- _cleanup()
- return list(_current_process._children)
-
-#
-#
-#
-
-def _cleanup():
- # check for processes which have finished
- for p in list(_current_process._children):
- if p._popen.poll() is not None:
- _current_process._children.discard(p)
-
-#
-# The `Process` class
-#
-
-class Process(object):
- '''
- Process objects represent activity that is run in a separate process
-
- The class is analagous to `threading.Thread`
- '''
- _Popen = None
-
- def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
- assert group is None, 'group argument must be None for now'
- count = _current_process._counter.next()
- self._identity = _current_process._identity + (count,)
- self._authkey = _current_process._authkey
- self._daemonic = _current_process._daemonic
- self._tempdir = _current_process._tempdir
- self._parent_pid = os.getpid()
- self._popen = None
- self._target = target
- self._args = tuple(args)
- self._kwargs = dict(kwargs)
- self._name = name or type(self).__name__ + '-' + \
- ':'.join(str(i) for i in self._identity)
-
- def run(self):
- '''
- Method to be run in sub-process; can be overridden in sub-class
- '''
- if self._target:
- self._target(*self._args, **self._kwargs)
-
- def start(self):
- '''
- Start child process
- '''
- assert self._popen is None, 'cannot start a process twice'
- assert self._parent_pid == os.getpid(), \
- 'can only start a process object created by current process'
- assert not _current_process._daemonic, \
- 'daemonic processes are not allowed to have children'
- _cleanup()
- if self._Popen is not None:
- Popen = self._Popen
- else:
- from .forking import Popen
- self._popen = Popen(self)
- _current_process._children.add(self)
-
- def terminate(self):
- '''
- Terminate process; sends SIGTERM signal or uses TerminateProcess()
- '''
- self._popen.terminate()
-
- def join(self, timeout=None):
- '''
- Wait until child process terminates
- '''
- assert self._parent_pid == os.getpid(), 'can only join a child process'
- assert self._popen is not None, 'can only join a started process'
- res = self._popen.wait(timeout)
- if res is not None:
- _current_process._children.discard(self)
-
- def is_alive(self):
- '''
- Return whether process is alive
- '''
- if self is _current_process:
- return True
- assert self._parent_pid == os.getpid(), 'can only test a child process'
- if self._popen is None:
- return False
- self._popen.poll()
- return self._popen.returncode is None
-
- def get_name(self):
- '''
- Return name of process
- '''
- return self._name
-
- def set_name(self, name):
- '''
- Set name of process
- '''
- assert isinstance(name, str), 'name must be a string'
- self._name = name
-
- def is_daemon(self):
- '''
- Return whether process is a daemon
- '''
- return self._daemonic
-
- def set_daemon(self, daemonic):
- '''
- Set whether process is a daemon
- '''
- assert self._popen is None, 'process has already started'
- self._daemonic = daemonic
-
- def get_authkey(self):
- '''
- Return authorization key of process
- '''
- return self._authkey
-
- def set_authkey(self, authkey):
- '''
- Set authorization key of process
- '''
- self._authkey = AuthenticationString(authkey)
-
- def get_exitcode(self):
- '''
- Return exit code of process or `None` if it has yet to stop
- '''
- if self._popen is None:
- return self._popen
- return self._popen.poll()
-
- def get_ident(self):
- '''
- Return indentifier (PID) of process or `None` if it has yet to start
- '''
- if self is _current_process:
- return os.getpid()
- else:
- return self._popen and self._popen.pid
-
- pid = property(get_ident)
-
- def __repr__(self):
- if self is _current_process:
- status = 'started'
- elif self._parent_pid != os.getpid():
- status = 'unknown'
- elif self._popen is None:
- status = 'initial'
- else:
- if self._popen.poll() is not None:
- status = self.get_exitcode()
- else:
- status = 'started'
-
- if type(status) is int:
- if status == 0:
- status = 'stopped'
- else:
- status = 'stopped[%s]' % _exitcode_to_name.get(status, status)
-
- return '<%s(%s, %s%s)>' % (type(self).__name__, self._name,
- status, self._daemonic and ' daemon' or '')
-
- ##
-
- def _bootstrap(self):
- from . import util
- global _current_process
-
- try:
- self._children = set()
- self._counter = itertools.count(1)
- try:
- os.close(sys.stdin.fileno())
- except (OSError, ValueError):
- pass
- _current_process = self
- util._finalizer_registry.clear()
- util._run_after_forkers()
- util.info('child process calling self.run()')
- try:
- self.run()
- exitcode = 0
- finally:
- util._exit_function()
- except SystemExit, e:
- if not e.args:
- exitcode = 1
- elif type(e.args[0]) is int:
- exitcode = e.args[0]
- else:
- sys.stderr.write(e.args[0] + '\n')
- sys.stderr.flush()
- exitcode = 1
- except:
- exitcode = 1
- import traceback
- sys.stderr.write('Process %s:\n' % self.get_name())
- sys.stderr.flush()
- traceback.print_exc()
-
- util.info('process exiting with exitcode %d' % exitcode)
- return exitcode
-
-#
-# We subclass bytes to avoid accidental transmission of auth keys over network
-#
-
-class AuthenticationString(bytes):
- def __reduce__(self):
- from .forking import Popen
- if not Popen.thread_is_spawning():
- raise TypeError(
- 'Pickling an AuthenticationString object is '
- 'disallowed for security reasons'
- )
- return AuthenticationString, (bytes(self),)
-
-#
-# Create object representing the main process
-#
-
-class _MainProcess(Process):
-
- def __init__(self):
- self._identity = ()
- self._daemonic = False
- self._name = 'MainProcess'
- self._parent_pid = None
- self._popen = None
- self._counter = itertools.count(1)
- self._children = set()
- self._authkey = AuthenticationString(os.urandom(32))
- self._tempdir = None
-
-_current_process = _MainProcess()
-del _MainProcess
-
-#
-# Give names to some return codes
-#
-
-_exitcode_to_name = {}
-
-for name, signum in signal.__dict__.items():
- if name[:3]=='SIG' and '_' not in name:
- _exitcode_to_name[-signum] = name
+#
+# Module providing the `Process` class which emulates `threading.Thread`
+#
+# multiprocessing/process.py
+#
+# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+#
+
+__all__ = ['Process', 'current_process', 'active_children']
+
+#
+# Imports
+#
+
+import os
+import sys
+import signal
+import itertools
+
+#
+#
+#
+
+try:
+ ORIGINAL_DIR = os.path.abspath(os.getcwd())
+except OSError:
+ ORIGINAL_DIR = None
+
+try:
+ bytes
+except NameError:
+ bytes = str # XXX not needed in Py2.6 and Py3.0
+
+#
+# Public functions
+#
+
+def current_process():
+ '''
+ Return process object representing the current process
+ '''
+ return _current_process
+
+def active_children():
+ '''
+ Return list of process objects corresponding to live child processes
+ '''
+ _cleanup()
+ return list(_current_process._children)
+
+#
+#
+#
+
+def _cleanup():
+ # check for processes which have finished
+ for p in list(_current_process._children):
+ if p._popen.poll() is not None:
+ _current_process._children.discard(p)
+
+#
+# The `Process` class
+#
+
+class Process(object):
+ '''
+ Process objects represent activity that is run in a separate process
+
+ The class is analagous to `threading.Thread`
+ '''
+ _Popen = None
+
+ def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
+ assert group is None, 'group argument must be None for now'
+ count = _current_process._counter.next()
+ self._identity = _current_process._identity + (count,)
+ self._authkey = _current_process._authkey
+ self._daemonic = _current_process._daemonic
+ self._tempdir = _current_process._tempdir
+ self._parent_pid = os.getpid()
+ self._popen = None
+ self._target = target
+ self._args = tuple(args)
+ self._kwargs = dict(kwargs)
+ self._name = name or type(self).__name__ + '-' + \
+ ':'.join(str(i) for i in self._identity)
+
+ def run(self):
+ '''
+ Method to be run in sub-process; can be overridden in sub-class
+ '''
+ if self._target:
+ self._target(*self._args, **self._kwargs)
+
+ def start(self):
+ '''
+ Start child process
+ '''
+ assert self._popen is None, 'cannot start a process twice'
+ assert self._parent_pid == os.getpid(), \
+ 'can only start a process object created by current process'
+ assert not _current_process._daemonic, \
+ 'daemonic processes are not allowed to have children'
+ _cleanup()
+ if self._Popen is not None:
+ Popen = self._Popen
+ else:
+ from .forking import Popen
+ self._popen = Popen(self)
+ _current_process._children.add(self)
+
+ def terminate(self):
+ '''
+ Terminate process; sends SIGTERM signal or uses TerminateProcess()
+ '''
+ self._popen.terminate()
+
+ def join(self, timeout=None):
+ '''
+ Wait until child process terminates
+ '''
+ assert self._parent_pid == os.getpid(), 'can only join a child process'
+ assert self._popen is not None, 'can only join a started process'
+ res = self._popen.wait(timeout)
+ if res is not None:
+ _current_process._children.discard(self)
+
+ def is_alive(self):
+ '''
+ Return whether process is alive
+ '''
+ if self is _current_process:
+ return True
+ assert self._parent_pid == os.getpid(), 'can only test a child process'
+ if self._popen is None:
+ return False
+ self._popen.poll()
+ return self._popen.returncode is None
+
+ def get_name(self):
+ '''
+ Return name of process
+ '''
+ return self._name
+
+ def set_name(self, name):
+ '''
+ Set name of process
+ '''
+ assert isinstance(name, str), 'name must be a string'
+ self._name = name
+
+ def is_daemon(self):
+ '''
+ Return whether process is a daemon
+ '''
+ return self._daemonic
+
+ def set_daemon(self, daemonic):
+ '''
+ Set whether process is a daemon
+ '''
+ assert self._popen is None, 'process has already started'
+ self._daemonic = daemonic
+
+ def get_authkey(self):
+ '''
+ Return authorization key of process
+ '''
+ return self._authkey
+
+ def set_authkey(self, authkey):
+ '''
+ Set authorization key of process
+ '''
+ self._authkey = AuthenticationString(authkey)
+
+ def get_exitcode(self):
+ '''
+ Return exit code of process or `None` if it has yet to stop
+ '''
+ if self._popen is None:
+ return self._popen
+ return self._popen.poll()
+
+ def get_ident(self):
+ '''
+ Return indentifier (PID) of process or `None` if it has yet to start
+ '''
+ if self is _current_process:
+ return os.getpid()
+ else:
+ return self._popen and self._popen.pid
+
+ pid = property(get_ident)
+
+ def __repr__(self):
+ if self is _current_process:
+ status = 'started'
+ elif self._parent_pid != os.getpid():
+ status = 'unknown'
+ elif self._popen is None:
+ status = 'initial'
+ else:
+ if self._popen.poll() is not None:
+ status = self.get_exitcode()
+ else:
+ status = 'started'
+
+ if type(status) is int:
+ if status == 0:
+ status = 'stopped'
+ else:
+ status = 'stopped[%s]' % _exitcode_to_name.get(status, status)
+
+ return '<%s(%s, %s%s)>' % (type(self).__name__, self._name,
+ status, self._daemonic and ' daemon' or '')
+
+ ##
+
+ def _bootstrap(self):
+ from . import util
+ global _current_process
+
+ try:
+ self._children = set()
+ self._counter = itertools.count(1)
+ try:
+ os.close(sys.stdin.fileno())
+ except (OSError, ValueError):
+ pass
+ _current_process = self
+ util._finalizer_registry.clear()
+ util._run_after_forkers()
+ util.info('child process calling self.run()')
+ try:
+ self.run()
+ exitcode = 0
+ finally:
+ util._exit_function()
+ except SystemExit, e:
+ if not e.args:
+ exitcode = 1
+ elif type(e.args[0]) is int:
+ exitcode = e.args[0]
+ else:
+ sys.stderr.write(e.args[0] + '\n')
+ sys.stderr.flush()
+ exitcode = 1
+ except:
+ exitcode = 1
+ import traceback
+ sys.stderr.write('Process %s:\n' % self.get_name())
+ sys.stderr.flush()
+ traceback.print_exc()
+
+ util.info('process exiting with exitcode %d' % exitcode)
+ return exitcode
+
+#
+# We subclass bytes to avoid accidental transmission of auth keys over network
+#
+
+class AuthenticationString(bytes):
+ def __reduce__(self):
+ from .forking import Popen
+ if not Popen.thread_is_spawning():
+ raise TypeError(
+ 'Pickling an AuthenticationString object is '
+ 'disallowed for security reasons'
+ )
+ return AuthenticationString, (bytes(self),)
+
+#
+# Create object representing the main process
+#
+
+class _MainProcess(Process):
+
+ def __init__(self):
+ self._identity = ()
+ self._daemonic = False
+ self._name = 'MainProcess'
+ self._parent_pid = None
+ self._popen = None
+ self._counter = itertools.count(1)
+ self._children = set()
+ self._authkey = AuthenticationString(os.urandom(32))
+ self._tempdir = None
+
+_current_process = _MainProcess()
+del _MainProcess
+
+#
+# Give names to some return codes
+#
+
+_exitcode_to_name = {}
+
+for name, signum in signal.__dict__.items():
+ if name[:3]=='SIG' and '_' not in name:
+ _exitcode_to_name[-signum] = name
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index ea89090..7235a41 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -1,356 +1,356 @@
-#
-# Module implementing queues
-#
-# multiprocessing/queues.py
-#
-# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
-#
-
-__all__ = ['Queue', 'SimpleQueue']
-
-import sys
-import os
-import threading
-import collections
-import time
-import atexit
-import weakref
-
-from Queue import Empty, Full
-import _multiprocessing
-from multiprocessing import Pipe
-from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
-from multiprocessing.util import debug, info, Finalize, register_after_fork
-from multiprocessing.forking import assert_spawning
-
-#
-# Queue type using a pipe, buffer and thread
-#
-
-class Queue(object):
-
- def __init__(self, maxsize=0):
- if maxsize <= 0:
- maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
- self._maxsize = maxsize
- self._reader, self._writer = Pipe(duplex=False)
- self._rlock = Lock()
- self._opid = os.getpid()
- if sys.platform == 'win32':
- self._wlock = None
- else:
- self._wlock = Lock()
- self._sem = BoundedSemaphore(maxsize)
-
- self._after_fork()
-
- if sys.platform != 'win32':
- register_after_fork(self, Queue._after_fork)
-
- def __getstate__(self):
- assert_spawning(self)
- return (self._maxsize, self._reader, self._writer,
- self._rlock, self._wlock, self._sem, self._opid)
-
- def __setstate__(self, state):
- (self._maxsize, self._reader, self._writer,
- self._rlock, self._wlock, self._sem, self._opid) = state
- self._after_fork()
-
- def _after_fork(self):
- debug('Queue._after_fork()')
- self._notempty = threading.Condition(threading.Lock())
- self._buffer = collections.deque()
- self._thread = None
- self._jointhread = None
- self._joincancelled = False
- self._closed = False
- self._close = None
- self._send = self._writer.send
- self._recv = self._reader.recv
- self._poll = self._reader.poll
-
- def put(self, obj, block=True, timeout=None):
- assert not self._closed
- if not self._sem.acquire(block, timeout):
- raise Full
-
- self._notempty.acquire()
- try:
- if self._thread is None:
- self._start_thread()
- self._buffer.append(obj)
- self._notempty.notify()
- finally:
- self._notempty.release()
-
- def get(self, block=True, timeout=None):
- if block and timeout is None:
- self._rlock.acquire()
- try:
- res = self._recv()
- self._sem.release()
- return res
- finally:
- self._rlock.release()
-
- else:
- if block:
- deadline = time.time() + timeout
- if not self._rlock.acquire(block, timeout):
- raise Empty
- try:
- if not self._poll(block and (deadline-time.time()) or 0.0):
- raise Empty
- res = self._recv()
- self._sem.release()
- return res
- finally:
- self._rlock.release()
-
- def qsize(self):
- # Raises NotImplementError on Mac OSX because of broken sem_getvalue()
- return self._maxsize - self._sem._semlock._get_value()
-
- def empty(self):
- return not self._poll()
-
- def full(self):
- return self._sem._semlock._is_zero()
-
- def get_nowait(self):
- return self.get(False)
-
- def put_nowait(self, obj):
- return self.put(obj, False)
-
- def close(self):
- self._closed = True
- self._reader.close()
- if self._close:
- self._close()
-
- def join_thread(self):
- debug('Queue.join_thread()')
- assert self._closed
- if self._jointhread:
- self._jointhread()
-
- def cancel_join_thread(self):
- debug('Queue.cancel_join_thread()')
- self._joincancelled = True
- try:
- self._jointhread.cancel()
- except AttributeError:
- pass
-
- def _start_thread(self):
- debug('Queue._start_thread()')
-
- # Start thread which transfers data from buffer to pipe
- self._buffer.clear()
- self._thread = threading.Thread(
- target=Queue._feed,
- args=(self._buffer, self._notempty, self._send,
- self._wlock, self._writer.close),
- name='QueueFeederThread'
- )
- self._thread.set_daemon(True)
-
- debug('doing self._thread.start()')
- self._thread.start()
- debug('... done self._thread.start()')
-
- # On process exit we will wait for data to be flushed to pipe.
- #
- # However, if this process created the queue then all
- # processes which use the queue will be descendants of this
- # process. Therefore waiting for the queue to be flushed
- # is pointless once all the child processes have been joined.
- created_by_this_process = (self._opid == os.getpid())
- if not self._joincancelled and not created_by_this_process:
- self._jointhread = Finalize(
- self._thread, Queue._finalize_join,
- [weakref.ref(self._thread)],
- exitpriority=-5
- )
-
- # Send sentinel to the thread queue object when garbage collected
- self._close = Finalize(
- self, Queue._finalize_close,
- [self._buffer, self._notempty],
- exitpriority=10
- )
-
- @staticmethod
- def _finalize_join(twr):
- debug('joining queue thread')
- thread = twr()
- if thread is not None:
- thread.join()
- debug('... queue thread joined')
- else:
- debug('... queue thread already dead')
-
- @staticmethod
- def _finalize_close(buffer, notempty):
- debug('telling queue thread to quit')
- notempty.acquire()
- try:
- buffer.append(_sentinel)
- notempty.notify()
- finally:
- notempty.release()
-
- @staticmethod
- def _feed(buffer, notempty, send, writelock, close):
- debug('starting thread to feed data to pipe')
- from .util import is_exiting
-
- nacquire = notempty.acquire
- nrelease = notempty.release
- nwait = notempty.wait
- bpopleft = buffer.popleft
- sentinel = _sentinel
- if sys.platform != 'win32':
- wacquire = writelock.acquire
- wrelease = writelock.release
- else:
- wacquire = None
-
- try:
- while 1:
- nacquire()
- try:
- if not buffer:
- nwait()
- finally:
- nrelease()
- try:
- while 1:
- obj = bpopleft()
- if obj is sentinel:
- debug('feeder thread got sentinel -- exiting')
- close()
- return
-
- if wacquire is None:
- send(obj)
- else:
- wacquire()
- try:
- send(obj)
- finally:
- wrelease()
- except IndexError:
- pass
- except Exception, e:
- # Since this runs in a daemon thread the resources it uses
- # may be become unusable while the process is cleaning up.
- # We ignore errors which happen after the process has
- # started to cleanup.
- try:
- if is_exiting():
- info('error in queue thread: %s', e)
- else:
- import traceback
- traceback.print_exc()
- except Exception:
- pass
-
-_sentinel = object()
-
-#
-# A queue type which also supports join() and task_done() methods
-#
-# Note that if you do not call task_done() for each finished task then
-# eventually the counter's semaphore may overflow causing Bad Things
-# to happen.
-#
-
-class JoinableQueue(Queue):
-
- def __init__(self, maxsize=0):
- Queue.__init__(self, maxsize)
- self._unfinished_tasks = Semaphore(0)
- self._cond = Condition()
-
- def __getstate__(self):
- return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
-
- def __setstate__(self, state):
- Queue.__setstate__(self, state[:-2])
- self._cond, self._unfinished_tasks = state[-2:]
-
- def put(self, item, block=True, timeout=None):
- Queue.put(self, item, block, timeout)
- self._unfinished_tasks.release()
-
- def task_done(self):
- self._cond.acquire()
- try:
- if not self._unfinished_tasks.acquire(False):
- raise ValueError('task_done() called too many times')
- if self._unfinished_tasks._semlock._is_zero():
- self._cond.notify_all()
- finally:
- self._cond.release()
-
- def join(self):
- self._cond.acquire()
- try:
- if not self._unfinished_tasks._semlock._is_zero():
- self._cond.wait()
- finally:
- self._cond.release()
-
-#
-# Simplified Queue type -- really just a locked pipe
-#
-
-class SimpleQueue(object):
-
- def __init__(self):
- self._reader, self._writer = Pipe(duplex=False)
- self._rlock = Lock()
- if sys.platform == 'win32':
- self._wlock = None
- else:
- self._wlock = Lock()
- self._make_methods()
-
- def empty(self):
- return not self._reader.poll()
-
- def __getstate__(self):
- assert_spawning(self)
- return (self._reader, self._writer, self._rlock, self._wlock)
-
- def __setstate__(self, state):
- (self._reader, self._writer, self._rlock, self._wlock) = state
- self._make_methods()
-
- def _make_methods(self):
- recv = self._reader.recv
- racquire, rrelease = self._rlock.acquire, self._rlock.release
- def get():
- racquire()
- try:
- return recv()
- finally:
- rrelease()
- self.get = get
-
- if self._wlock is None:
- # writes to a message oriented win32 pipe are atomic
- self.put = self._writer.send
- else:
- send = self._writer.send
- wacquire, wrelease = self._wlock.acquire, self._wlock.release
- def put(obj):
- wacquire()
- try:
- return send(obj)
- finally:
- wrelease()
- self.put = put
+#
+# Module implementing queues
+#
+# multiprocessing/queues.py
+#
+# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+#
+
+__all__ = ['Queue', 'SimpleQueue']
+
+import sys
+import os
+import threading
+import collections
+import time
+import atexit
+import weakref
+
+from Queue import Empty, Full
+import _multiprocessing
+from multiprocessing import Pipe
+from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
+from multiprocessing.util import debug, info, Finalize, register_after_fork
+from multiprocessing.forking import assert_spawning
+
+#
+# Queue type using a pipe, buffer and thread
+#
+
+class Queue(object):
+
+ def __init__(self, maxsize=0):
+ if maxsize <= 0:
+ maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
+ self._maxsize = maxsize
+ self._reader, self._writer = Pipe(duplex=False)
+ self._rlock = Lock()
+ self._opid = os.getpid()
+ if sys.platform == 'win32':
+ self._wlock = None
+ else:
+ self._wlock = Lock()
+ self._sem = BoundedSemaphore(maxsize)
+
+ self._after_fork()
+
+ if sys.platform != 'win32':
+ register_after_fork(self, Queue._after_fork)
+
+ def __getstate__(self):
+ assert_spawning(self)
+ return (self._maxsize, self._reader, self._writer,
+ self._rlock, self._wlock, self._sem, self._opid)
+
+ def __setstate__(self, state):
+ (self._maxsize, self._reader, self._writer,
+ self._rlock, self._wlock, self._sem, self._opid) = state
+ self._after_fork()
+
+ def _after_fork(self):
+ debug('Queue._after_fork()')
+ self._notempty = threading.Condition(threading.Lock())
+ self._buffer = collections.deque()
+ self._thread = None
+ self._jointhread = None
+ self._joincancelled = False
+ self._closed = False
+ self._close = None
+ self._send = self._writer.send
+ self._recv = self._reader.recv
+ self._poll = self._reader.poll
+
+ def put(self, obj, block=True, timeout=None):
+ assert not self._closed
+ if not self._sem.acquire(block, timeout):
+ raise Full
+
+ self._notempty.acquire()
+ try:
+ if self._thread is None:
+ self._start_thread()
+ self._buffer.append(obj)
+ self._notempty.notify()
+ finally:
+ self._notempty.release()
+
+ def get(self, block=True, timeout=None):
+ if block and timeout is None:
+ self._rlock.acquire()
+ try:
+ res = self._recv()
+ self._sem.release()
+ return res
+ finally:
+ self._rlock.release()
+
+ else:
+ if block:
+ deadline = time.time() + timeout
+ if not self._rlock.acquire(block, timeout):
+ raise Empty
+ try:
+ if not self._poll(block and (deadline-time.time()) or 0.0):
+ raise Empty
+ res = self._recv()
+ self._sem.release()
+ return res
+ finally:
+ self._rlock.release()
+
+ def qsize(self):
+ # Raises NotImplementError on Mac OSX because of broken sem_getvalue()
+ return self._maxsize - self._sem._semlock._get_value()
+
+ def empty(self):
+ return not self._poll()
+
+ def full(self):
+ return self._sem._semlock._is_zero()
+
+ def get_nowait(self):
+ return self.get(False)
+
+ def put_nowait(self, obj):
+ return self.put(obj, False)
+
+ def close(self):
+ self._closed = True
+ self._reader.close()
+ if self._close:
+ self._close()
+
+ def join_thread(self):
+ debug('Queue.join_thread()')
+ assert self._closed
+ if self._jointhread:
+ self._jointhread()
+
+ def cancel_join_thread(self):
+ debug('Queue.cancel_join_thread()')
+ self._joincancelled = True
+ try:
+ self._jointhread.cancel()
+ except AttributeError:
+ pass
+
+ def _start_thread(self):
+ debug('Queue._start_thread()')
+
+ # Start thread which transfers data from buffer to pipe
+ self._buffer.clear()
+ self._thread = threading.Thread(
+ target=Queue._feed,
+ args=(self._buffer, self._notempty, self._send,
+ self._wlock, self._writer.close),
+ name='QueueFeederThread'
+ )
+ self._thread.set_daemon(True)
+
+ debug('doing self._thread.start()')
+ self._thread.start()
+ debug('... done self._thread.start()')
+
+ # On process exit we will wait for data to be flushed to pipe.
+ #
+ # However, if this process created the queue then all
+ # processes which use the queue will be descendants of this
+ # process. Therefore waiting for the queue to be flushed
+ # is pointless once all the child processes have been joined.
+ created_by_this_process = (self._opid == os.getpid())
+ if not self._joincancelled and not created_by_this_process:
+ self._jointhread = Finalize(
+ self._thread, Queue._finalize_join,
+ [weakref.ref(self._thread)],
+ exitpriority=-5
+ )
+
+ # Send sentinel to the thread queue object when garbage collected
+ self._close = Finalize(
+ self, Queue._finalize_close,
+ [self._buffer, self._notempty],
+ exitpriority=10
+ )
+
+ @staticmethod
+ def _finalize_join(twr):
+ debug('joining queue thread')
+ thread = twr()
+ if thread is not None:
+ thread.join()
+ debug('... queue thread joined')
+ else:
+ debug('... queue thread already dead')
+
+ @staticmethod
+ def _finalize_close(buffer, notempty):
+ debug('telling queue thread to quit')
+ notempty.acquire()
+ try:
+ buffer.append(_sentinel)
+ notempty.notify()
+ finally:
+ notempty.release()
+
+ @staticmethod
+ def _feed(buffer, notempty, send, writelock, close):
+ debug('starting thread to feed data to pipe')
+ from .util import is_exiting
+
+ nacquire = notempty.acquire
+ nrelease = notempty.release
+ nwait = notempty.wait
+ bpopleft = buffer.popleft
+ sentinel = _sentinel
+ if sys.platform != 'win32':
+ wacquire = writelock.acquire
+ wrelease = writelock.release
+ else:
+ wacquire = None
+
+ try:
+ while 1:
+ nacquire()
+ try:
+ if not buffer:
+ nwait()
+ finally:
+ nrelease()
+ try:
+ while 1:
+ obj = bpopleft()
+ if obj is sentinel:
+ debug('feeder thread got sentinel -- exiting')
+ close()
+ return
+
+ if wacquire is None:
+ send(obj)
+ else:
+ wacquire()
+ try:
+ send(obj)
+ finally:
+ wrelease()
+ except IndexError:
+ pass
+ except Exception, e:
+ # Since this runs in a daemon thread the resources it uses
+ # may be become unusable while the process is cleaning up.
+ # We ignore errors which happen after the process has
+ # started to cleanup.
+ try:
+ if is_exiting():
+ info('error in queue thread: %s', e)
+ else:
+ import traceback
+ traceback.print_exc()
+ except Exception:
+ pass
+
+_sentinel = object()
+
+#
+# A queue type which also supports join() and task_done() methods
+#
+# Note that if you do not call task_done() for each finished task then
+# eventually the counter's semaphore may overflow causing Bad Things
+# to happen.
+#
+
+class JoinableQueue(Queue):
+
+ def __init__(self, maxsize=0):
+ Queue.__init__(self, maxsize)
+ self._unfinished_tasks = Semaphore(0)
+ self._cond = Condition()
+
+ def __getstate__(self):
+ return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
+
+ def __setstate__(self, state):
+ Queue.__setstate__(self, state[:-2])
+ self._cond, self._unfinished_tasks = state[-2:]
+
+ def put(self, item, block=True, timeout=None):
+ Queue.put(self, item, block, timeout)
+ self._unfinished_tasks.release()
+
+ def task_done(self):
+ self._cond.acquire()
+ try:
+ if not self._unfinished_tasks.acquire(False):
+ raise ValueError('task_done() called too many times')
+ if self._unfinished_tasks._semlock._is_zero():
+ self._cond.notify_all()
+ finally:
+ self._cond.release()
+
+ def join(self):
+ self._cond.acquire()
+ try:
+ if not self._unfinished_tasks._semlock._is_zero():
+ self._cond.wait()
+ finally:
+ self._cond.release()
+
+#
+# Simplified Queue type -- really just a locked pipe
+#
+
+class SimpleQueue(object):
+
+ def __init__(self):
+ self._reader, self._writer = Pipe(duplex=False)
+ self._rlock = Lock()
+ if sys.platform == 'win32':
+ self._wlock = None
+ else:
+ self._wlock = Lock()
+ self._make_methods()
+
+ def empty(self):
+ return not self._reader.poll()
+
+ def __getstate__(self):
+ assert_spawning(self)
+ return (self._reader, self._writer, self._rlock, self._wlock)
+
+ def __setstate__(self, state):
+ (self._reader, self._writer, self._rlock, self._wlock) = state
+ self._make_methods()
+
+ def _make_methods(self):
+ recv = self._reader.recv
+ racquire, rrelease = self._rlock.acquire, self._rlock.release
+ def get():
+ racquire()
+ try:
+ return recv()
+ finally:
+ rrelease()
+ self.get = get
+
+ if self._wlock is None:
+ # writes to a message oriented win32 pipe are atomic
+ self.put = self._writer.send
+ else:
+ send = self._writer.send
+ wacquire, wrelease = self._wlock.acquire, self._wlock.release
+ def put(obj):
+ wacquire()
+ try:
+ return send(obj)
+ finally:
+ wrelease()
+ self.put = put
diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py
index 17778ef..1f514b2 100644
--- a/Lib/multiprocessing/reduction.py
+++ b/Lib/multiprocessing/reduction.py
@@ -1,190 +1,190 @@
-#
-# Module to allow connection and socket objects to be transferred
-# between processes
-#
-# multiprocessing/reduction.py
-#
-# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
-#
-
-__all__ = []
-
-import os
-import sys
-import socket
-import threading
-import copy_reg
-
-import _multiprocessing
-from multiprocessing import current_process
-from multiprocessing.forking import Popen, duplicate, close
-from multiprocessing.util import register_after_fork, debug, sub_debug
-from multiprocessing.connection import Client, Listener
-
-
-#
-#
-#
-
-if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')):
- raise ImportError('pickling of connections not supported')
-
-#
-# Platform specific definitions
-#
-
-if sys.platform == 'win32':
- import _subprocess
- from ._multiprocessing import win32
-
- def send_handle(conn, handle, destination_pid):
- process_handle = win32.OpenProcess(
- win32.PROCESS_ALL_ACCESS, False, destination_pid
- )
- try:
- new_handle = duplicate(handle, process_handle)
- conn.send(new_handle)
- finally:
- close(process_handle)
-
- def recv_handle(conn):
- return conn.recv()
-
-else:
- def send_handle(conn, handle, destination_pid):
- _multiprocessing.sendfd(conn.fileno(), handle)
-
- def recv_handle(conn):
- return _multiprocessing.recvfd(conn.fileno())
-
-#
-# Support for a per-process server thread which caches pickled handles
-#
-
-_cache = set()
-
-def _reset(obj):
- global _lock, _listener, _cache
- for h in _cache:
- close(h)
- _cache.clear()
- _lock = threading.Lock()
- _listener = None
-
-_reset(None)
-register_after_fork(_reset, _reset)
-
-def _get_listener():
- global _listener
-
- if _listener is None:
- _lock.acquire()
- try:
- if _listener is None:
- debug('starting listener and thread for sending handles')
- _listener = Listener(authkey=current_process().get_authkey())
- t = threading.Thread(target=_serve)
- t.set_daemon(True)
- t.start()
- finally:
- _lock.release()
-
- return _listener
-
-def _serve():
- from .util import is_exiting, sub_warning
-
- while 1:
- try:
- conn = _listener.accept()
- handle_wanted, destination_pid = conn.recv()
- _cache.remove(handle_wanted)
- send_handle(conn, handle_wanted, destination_pid)
- close(handle_wanted)
- conn.close()
- except:
- if not is_exiting():
- import traceback
- sub_warning(
- 'thread for sharing handles raised exception :\n' +
- '-'*79 + '\n' + traceback.format_exc() + '-'*79
- )
-
-#
-# Functions to be used for pickling/unpickling objects with handles
-#
-
-def reduce_handle(handle):
- if Popen.thread_is_spawning():
- return (None, Popen.duplicate_for_child(handle), True)
- dup_handle = duplicate(handle)
- _cache.add(dup_handle)
- sub_debug('reducing handle %d', handle)
- return (_get_listener().address, dup_handle, False)
-
-def rebuild_handle(pickled_data):
- address, handle, inherited = pickled_data
- if inherited:
- return handle
- sub_debug('rebuilding handle %d', handle)
- conn = Client(address, authkey=current_process().get_authkey())
- conn.send((handle, os.getpid()))
- new_handle = recv_handle(conn)
- conn.close()
- return new_handle
-
-#
-# Register `_multiprocessing.Connection` with `copy_reg`
-#
-
-def reduce_connection(conn):
- rh = reduce_handle(conn.fileno())
- return rebuild_connection, (rh, conn.readable, conn.writable)
-
-def rebuild_connection(reduced_handle, readable, writable):
- handle = rebuild_handle(reduced_handle)
- return _multiprocessing.Connection(
- handle, readable=readable, writable=writable
- )
-
-copy_reg.pickle(_multiprocessing.Connection, reduce_connection)
-
-#
-# Register `socket.socket` with `copy_reg`
-#
-
-def fromfd(fd, family, type_, proto=0):
- s = socket.fromfd(fd, family, type_, proto)
- if s.__class__ is not socket.socket:
- s = socket.socket(_sock=s)
- return s
-
-def reduce_socket(s):
- reduced_handle = reduce_handle(s.fileno())
- return rebuild_socket, (reduced_handle, s.family, s.type, s.proto)
-
-def rebuild_socket(reduced_handle, family, type_, proto):
- fd = rebuild_handle(reduced_handle)
- _sock = fromfd(fd, family, type_, proto)
- close(fd)
- return _sock
-
-copy_reg.pickle(socket.socket, reduce_socket)
-
-#
-# Register `_multiprocessing.PipeConnection` with `copy_reg`
-#
-
-if sys.platform == 'win32':
-
- def reduce_pipe_connection(conn):
- rh = reduce_handle(conn.fileno())
- return rebuild_pipe_connection, (rh, conn.readable, conn.writable)
-
- def rebuild_pipe_connection(reduced_handle, readable, writable):
- handle = rebuild_handle(reduced_handle)
- return _multiprocessing.PipeConnection(
- handle, readable=readable, writable=writable
- )
-
- copy_reg.pickle(_multiprocessing.PipeConnection, reduce_pipe_connection)
+#
+# Module to allow connection and socket objects to be transferred
+# between processes
+#
+# multiprocessing/reduction.py
+#
+# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+#
+
+__all__ = []
+
+import os
+import sys
+import socket
+import threading
+import copy_reg
+
+import _multiprocessing
+from multiprocessing import current_process
+from multiprocessing.forking import Popen, duplicate, close
+from multiprocessing.util import register_after_fork, debug, sub_debug
+from multiprocessing.connection import Client, Listener
+
+
+#
+#
+#
+
+if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')):
+ raise ImportError('pickling of connections not supported')
+
+#
+# Platform specific definitions
+#
+
+if sys.platform == 'win32':
+ import _subprocess
+ from ._multiprocessing import win32
+
+ def send_handle(conn, handle, destination_pid):
+ process_handle = win32.OpenProcess(
+ win32.PROCESS_ALL_ACCESS, False, destination_pid
+ )
+ try:
+ new_handle = duplicate(handle, process_handle)
+ conn.send(new_handle)
+ finally:
+ close(process_handle)
+
+ def recv_handle(conn):
+ return conn.recv()
+
+else:
+ def send_handle(conn, handle, destination_pid):
+ _multiprocessing.sendfd(conn.fileno(), handle)
+
+ def recv_handle(conn):
+ return _multiprocessing.recvfd(conn.fileno())
+
+#
+# Support for a per-process server thread which caches pickled handles
+#
+
+_cache = set()
+
+def _reset(obj):
+ global _lock, _listener, _cache
+ for h in _cache:
+ close(h)
+ _cache.clear()
+ _lock = threading.Lock()
+ _listener = None
+
+_reset(None)
+register_after_fork(_reset, _reset)
+
+def _get_listener():
+ global _listener
+
+ if _listener is None:
+ _lock.acquire()
+ try:
+ if _listener is None:
+ debug('starting listener and thread for sending handles')
+ _listener = Listener(authkey=current_process().get_authkey())
+ t = threading.Thread(target=_serve)
+ t.set_daemon(True)
+ t.start()
+ finally:
+ _lock.release()
+
+ return _listener
+
+def _serve():
+ from .util import is_exiting, sub_warning
+
+ while 1:
+ try:
+ conn = _listener.accept()
+ handle_wanted, destination_pid = conn.recv()
+ _cache.remove(handle_wanted)
+ send_handle(conn, handle_wanted, destination_pid)
+ close(handle_wanted)
+ conn.close()
+ except:
+ if not is_exiting():
+ import traceback
+ sub_warning(
+ 'thread for sharing handles raised exception :\n' +
+ '-'*79 + '\n' + traceback.format_exc() + '-'*79
+ )
+
+#
+# Functions to be used for pickling/unpickling objects with handles
+#
+
+def reduce_handle(handle):
+ if Popen.thread_is_spawning():
+ return (None, Popen.duplicate_for_child(handle), True)
+ dup_handle = duplicate(handle)
+ _cache.add(dup_handle)
+ sub_debug('reducing handle %d', handle)
+ return (_get_listener().address, dup_handle, False)
+
+def rebuild_handle(pickled_data):
+ address, handle, inherited = pickled_data
+ if inherited:
+ return handle
+ sub_debug('rebuilding handle %d', handle)
+ conn = Client(address, authkey=current_process().get_authkey())
+ conn.send((handle, os.getpid()))
+ new_handle = recv_handle(conn)
+ conn.close()
+ return new_handle
+
+#
+# Register `_multiprocessing.Connection` with `copy_reg`
+#
+
+def reduce_connection(conn):
+ rh = reduce_handle(conn.fileno())
+ return rebuild_connection, (rh, conn.readable, conn.writable)
+
+def rebuild_connection(reduced_handle, readable, writable):
+ handle = rebuild_handle(reduced_handle)
+ return _multiprocessing.Connection(
+ handle, readable=readable, writable=writable
+ )
+
+copy_reg.pickle(_multiprocessing.Connection, reduce_connection)
+
+#
+# Register `socket.socket` with `copy_reg`
+#
+
+def fromfd(fd, family, type_, proto=0):
+ s = socket.fromfd(fd, family, type_, proto)
+ if s.__class__ is not socket.socket:
+ s = socket.socket(_sock=s)
+ return s
+
+def reduce_socket(s):
+ reduced_handle = reduce_handle(s.fileno())
+ return rebuild_socket, (reduced_handle, s.family, s.type, s.proto)
+
+def rebuild_socket(reduced_handle, family, type_, proto):
+ fd = rebuild_handle(reduced_handle)
+ _sock = fromfd(fd, family, type_, proto)
+ close(fd)
+ return _sock
+
+copy_reg.pickle(socket.socket, reduce_socket)
+
+#
+# Register `_multiprocessing.PipeConnection` with `copy_reg`
+#
+
+if sys.platform == 'win32':
+
+ def reduce_pipe_connection(conn):
+ rh = reduce_handle(conn.fileno())
+ return rebuild_pipe_connection, (rh, conn.readable, conn.writable)
+
+ def rebuild_pipe_connection(reduced_handle, readable, writable):
+ handle = rebuild_handle(reduced_handle)
+ return _multiprocessing.PipeConnection(
+ handle, readable=readable, writable=writable
+ )
+
+ copy_reg.pickle(_multiprocessing.PipeConnection, reduce_pipe_connection)
diff --git a/Lib/multiprocessing/sharedctypes.py b/Lib/multiprocessing/sharedctypes.py
index 808d312..c35c9e0 100644
--- a/Lib/multiprocessing/sharedctypes.py
+++ b/Lib/multiprocessing/sharedctypes.py
@@ -1,234 +1,234 @@
-#
-# Module which supports allocation of ctypes objects from shared memory
-#
-# multiprocessing/sharedctypes.py
-#
-# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
-#
-
-import sys
-import ctypes
-import weakref
-import copy_reg
-
-from multiprocessing import heap, RLock
-from multiprocessing.forking import assert_spawning
-
-__all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
-
-#
-#
-#
-
-typecode_to_type = {
- 'c': ctypes.c_char, 'u': ctypes.c_wchar,
- 'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
- 'h': ctypes.c_short, 'H': ctypes.c_ushort,
- 'i': ctypes.c_int, 'I': ctypes.c_uint,
- 'l': ctypes.c_long, 'L': ctypes.c_ulong,
- 'f': ctypes.c_float, 'd': ctypes.c_double
- }
-
-#
-#
-#
-
-def _new_value(type_):
- size = ctypes.sizeof(type_)
- wrapper = heap.BufferWrapper(size)
- return rebuild_ctype(type_, wrapper, None)
-
-def RawValue(typecode_or_type, *args):
- '''
- Returns a ctypes object allocated from shared memory
- '''
- type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
- obj = _new_value(type_)
- ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
- obj.__init__(*args)
- return obj
-
-def RawArray(typecode_or_type, size_or_initializer):
- '''
- Returns a ctypes array allocated from shared memory
- '''
- type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
- if isinstance(size_or_initializer, int):
- type_ = type_ * size_or_initializer
- return _new_value(type_)
- else:
- type_ = type_ * len(size_or_initializer)
- result = _new_value(type_)
- result.__init__(*size_or_initializer)
- return result
-
-def Value(typecode_or_type, *args, **kwds):
- '''
- Return a synchronization wrapper for a Value
- '''
- lock = kwds.pop('lock', None)
- if kwds:
- raise ValueError('unrecognized keyword argument(s): %s' % kwds.keys())
- obj = RawValue(typecode_or_type, *args)
- if lock is None:
- lock = RLock()
- assert hasattr(lock, 'acquire')
- return synchronized(obj, lock)
-
-def Array(typecode_or_type, size_or_initializer, **kwds):
- '''
- Return a synchronization wrapper for a RawArray
- '''
- lock = kwds.pop('lock', None)
- if kwds:
- raise ValueError('unrecognized keyword argument(s): %s' % kwds.keys())
- obj = RawArray(typecode_or_type, size_or_initializer)
- if lock is None:
- lock = RLock()
- assert hasattr(lock, 'acquire')
- return synchronized(obj, lock)
-
-def copy(obj):
- new_obj = _new_value(type(obj))
- ctypes.pointer(new_obj)[0] = obj
- return new_obj
-
-def synchronized(obj, lock=None):
- assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
-
- if isinstance(obj, ctypes._SimpleCData):
- return Synchronized(obj, lock)
- elif isinstance(obj, ctypes.Array):
- if obj._type_ is ctypes.c_char:
- return SynchronizedString(obj, lock)
- return SynchronizedArray(obj, lock)
- else:
- cls = type(obj)
- try:
- scls = class_cache[cls]
- except KeyError:
- names = [field[0] for field in cls._fields_]
- d = dict((name, make_property(name)) for name in names)
- classname = 'Synchronized' + cls.__name__
- scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
- return scls(obj, lock)
-
-#
-# Functions for pickling/unpickling
-#
-
-def reduce_ctype(obj):
- assert_spawning(obj)
- if isinstance(obj, ctypes.Array):
- return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_)
- else:
- return rebuild_ctype, (type(obj), obj._wrapper, None)
-
-def rebuild_ctype(type_, wrapper, length):
- if length is not None:
- type_ = type_ * length
- if sys.platform == 'win32' and type_ not in copy_reg.dispatch_table:
- copy_reg.pickle(type_, reduce_ctype)
- obj = type_.from_address(wrapper.get_address())
- obj._wrapper = wrapper
- return obj
-
-#
-# Function to create properties
-#
-
-def make_property(name):
- try:
- return prop_cache[name]
- except KeyError:
- d = {}
- exec template % ((name,)*7) in d
- prop_cache[name] = d[name]
- return d[name]
-
-template = '''
-def get%s(self):
- self.acquire()
- try:
- return self._obj.%s
- finally:
- self.release()
-def set%s(self, value):
- self.acquire()
- try:
- self._obj.%s = value
- finally:
- self.release()
-%s = property(get%s, set%s)
-'''
-
-prop_cache = {}
-class_cache = weakref.WeakKeyDictionary()
-
-#
-# Synchronized wrappers
-#
-
-class SynchronizedBase(object):
-
- def __init__(self, obj, lock=None):
- self._obj = obj
- self._lock = lock or RLock()
- self.acquire = self._lock.acquire
- self.release = self._lock.release
-
- def __reduce__(self):
- assert_spawning(self)
- return synchronized, (self._obj, self._lock)
-
- def get_obj(self):
- return self._obj
-
- def get_lock(self):
- return self._lock
-
- def __repr__(self):
- return '<%s wrapper for %s>' % (type(self).__name__, self._obj)
-
-
-class Synchronized(SynchronizedBase):
- value = make_property('value')
-
-
-class SynchronizedArray(SynchronizedBase):
-
- def __len__(self):
- return len(self._obj)
-
- def __getitem__(self, i):
- self.acquire()
- try:
- return self._obj[i]
- finally:
- self.release()
-
- def __setitem__(self, i, value):
- self.acquire()
- try:
- self._obj[i] = value
- finally:
- self.release()
-
- def __getslice__(self, start, stop):
- self.acquire()
- try:
- return self._obj[start:stop]
- finally:
- self.release()
-
- def __setslice__(self, start, stop, values):
- self.acquire()
- try:
- self._obj[start:stop] = values
- finally:
- self.release()
-
-
-class SynchronizedString(SynchronizedArray):
- value = make_property('value')
- raw = make_property('raw')
+#
+# Module which supports allocation of ctypes objects from shared memory
+#
+# multiprocessing/sharedctypes.py
+#
+# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
+#
+
+import sys
+import ctypes
+import weakref
+import copy_reg
+
+from multiprocessing import heap, RLock
+from multiprocessing.forking import assert_spawning
+
+__all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
+
+#
+#
+#
+
+typecode_to_type = {
+ 'c': ctypes.c_char, 'u': ctypes.c_wchar,
+ 'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
+ 'h': ctypes.c_short, 'H': ctypes.c_ushort,
+ 'i': ctypes.c_int, 'I': ctypes.c_uint,
+ 'l': ctypes.c_long, 'L': ctypes.c_ulong,
+ 'f': ctypes.c_float, 'd': ctypes.c_double
+ }
+
+#
+#
+#
+
+def _new_value(type_):
+ size = ctypes.sizeof(type_)
+ wrapper = heap.BufferWrapper(size)
+ return rebuild_ctype(type_, wrapper, None)
+
+def RawValue(typecode_or_type, *args):
+ '''
+ Returns a ctypes object allocated from shared memory
+ '''
+ type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
+ obj = _new_value(type_)
+ ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
+ obj.__init__(*args)
+ return obj
+
+def RawArray(typecode_or_type, size_or_initializer):
+ '''
+ Returns a ctypes array allocated from shared memory
+ '''
+ type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
+ if isinstance(size_or_initializer, int):
+ type_ = type_ * size_or_initializer
+ return _new_value(type_)
+ else:
+ type_ = type_ * len(size_or_initializer)
+ result = _new_value(type_)
+ result.__init__(*size_or_initializer)
+ return result
+
+def Value(typecode_or_type, *args, **kwds):
+ '''
+ Return a synchronization wrapper for a Value
+ '''
+ lock = kwds.pop('lock', None)
+ if kwds:
+ raise ValueError('unrecognized keyword argument(s): %s' % kwds.keys())
+ obj = RawValue(typecode_or_type, *args)
+ if lock is None:
+ lock = RLock()
+ assert hasattr(lock, 'acquire')
+ return synchronized(obj, lock)
+
+def Array(typecode_or_type, size_or_initializer, **kwds):
+ '''
+ Return a synchronization wrapper for a RawArray
+ '''
+ lock = kwds.pop('lock', None)
+ if kwds:
+ raise ValueError('unrecognized keyword argument(s): %s' % kwds.keys())
+ obj = RawArray(typecode_or_type, size_or_initializer)
+ if lock is None:
+ lock = RLock()
+ assert hasattr(lock, 'acquire')
+ return synchronized(obj, lock)
+
+def copy(obj):
+ new_obj = _new_value(type(obj))
+ ctypes.pointer(new_obj)[0] = obj
+ return new_obj
+
+def synchronized(obj, lock=None):
+ assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
+
+ if isinstance(obj, ctypes._SimpleCData):
+ return Synchronized(obj, lock)
+ elif isinstance(obj, ctypes.Array):
+ if obj._type_ is ctypes.c_char:
+ return SynchronizedString(obj, lock)
+ return SynchronizedArray(obj, lock)
+ else:
+ cls = type(obj)
+ try:
+ scls = class_cache[cls]
+ except KeyError:
+ names = [field[0] for field in cls._fields_]
+ d = dict((name, make_property(name)) for name in names)
+ classname = 'Synchronized' + cls.__name__
+ scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
+ return scls(obj, lock)
+
+#
+# Functions for pickling/unpickling
+#
+
+def reduce_ctype(obj):
+ assert_spawning(obj)
+ if isinstance(obj, ctypes.Array):
+ return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_)
+ else:
+ return rebuild_ctype, (type(obj), obj._wrapper, None)
+
+def rebuild_ctype(type_, wrapper, length):
+ if length is not None:
+ type_ = type_ * length
+ if sys.platform == 'win32' and type_ not in copy_reg.dispatch_table:
+ copy_reg.pickle(type_, reduce_ctype)
+ obj = type_.from_address(wrapper.get_address())
+ obj._wrapper = wrapper
+ return obj
+
+#
+# Function to create properties
+#
+
+def make_property(name):
+ try:
+ return prop_cache[name]
+ except KeyError:
+ d = {}
+ exec template % ((name,)*7) in d
+ prop_cache[name] = d[name]
+ return d[name]
+
+template = '''
+def get%s(self):
+ self.acquire()
+ try:
+ return self._obj.%s
+ finally:
+ self.release()
+def set%s(self, value):
+ self.acquire()
+ try:
+ self._obj.%s = value
+ finally:
+ self.release()
+%s = property(get%s, set%s)
+'''
+
+prop_cache = {}
+class_cache = weakref.WeakKeyDictionary()
+
+#
+# Synchronized wrappers
+#
+
+class SynchronizedBase(object):
+
+ def __init__(self, obj, lock=None):
+ self._obj = obj
+ self._lock = lock or RLock()
+ self.acquire = self._lock.acquire
+ self.release = self._lock.release
+
+ def __reduce__(self):
+ assert_spawning(self)
+ return synchronized, (self._obj, self._lock)
+
+ def get_obj(self):
+ return self._obj
+
+ def get_lock(self):
+ return self._lock
+
+ def __repr__(self):
+ return '<%s wrapper for %s>' % (type(self).__name__, self._obj)
+
+
+class Synchronized(SynchronizedBase):
+ value = make_property('value')
+
+
+class SynchronizedArray(SynchronizedBase):
+
+ def __len__(self):
+ return len(self._obj)
+
+ def __getitem__(self, i):
+ self.acquire()
+ try:
+ return self._obj[i]
+ finally:
+ self.release()
+
+ def __setitem__(self, i, value):
+ self.acquire()
+ try:
+ self._obj[i] = value
+ finally:
+ self.release()
+
+ def __getslice__(self, start, stop):
+ self.acquire()
+ try:
+ return self._obj[start:stop]
+ finally:
+ self.release()
+
+ def __setslice__(self, start, stop, values):
+ self.acquire()
+ try:
+ self._obj[start:stop] = values
+ finally:
+ self.release()
+
+
+class SynchronizedString(SynchronizedArray):
+ value = make_property('value')
+ raw = make_property('raw')
diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py
index 6a7189a..1ebd7b6 100644
--- a/Lib/multiprocessing/synchronize.py
+++ b/Lib/multiprocessing/synchronize.py
@@ -1,294 +1,294 @@
-#
-# Module implementing synchronization primitives
-#
-# multiprocessing/synchronize.py
-#
-# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
-#
-
-__all__ = [
- 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
- ]
-
-import threading
-import os
-import sys
-
-from time import time as _time, sleep as _sleep
-
-import _multiprocessing
-from multiprocessing.process import current_process
-from multiprocessing.util import Finalize, register_after_fork, debug
-from multiprocessing.forking import assert_spawning, Popen
-
-#
-# Constants
-#
-
-RECURSIVE_MUTEX, SEMAPHORE = range(2)
-SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
-
-#
-# Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
-#
-
-class SemLock(object):
-
- def __init__(self, kind, value, maxvalue):
- sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
- debug('created semlock with handle %s' % sl.handle)
- self._make_methods()
-
- if sys.platform != 'win32':
- def _after_fork(obj):
- obj._semlock._after_fork()
- register_after_fork(self, _after_fork)
-
- def _make_methods(self):
- self.acquire = self._semlock.acquire
- self.release = self._semlock.release
- self.__enter__ = self._semlock.__enter__
- self.__exit__ = self._semlock.__exit__
-
- def __getstate__(self):
- assert_spawning(self)
- sl = self._semlock
- return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)
-
- def __setstate__(self, state):
- self._semlock = _multiprocessing.SemLock._rebuild(*state)
- debug('recreated blocker with handle %r' % state[0])
- self._make_methods()
-
-#
-# Semaphore
-#
-
-class Semaphore(SemLock):
-
- def __init__(self, value=1):
- SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX)
-
- def get_value(self):
- return self._semlock._get_value()
-
- def __repr__(self):
- try:
- value = self._semlock._get_value()
- except Exception:
- value = 'unknown'
- return '<Semaphore(value=%s)>' % value
-
-#
-# Bounded semaphore
-#
-
-class BoundedSemaphore(Semaphore):
-
- def __init__(self, value=1):
- SemLock.__init__(self, SEMAPHORE, value, value)
-
- def __repr__(self):
- try:
- value = self._semlock._get_value()
- except Exception:
- value = 'unknown'
- return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \
- (value, self._semlock.maxvalue)
-
-#
-# Non-recursive lock
-#
-
-class Lock(SemLock):
-
- def __init__(self):
- SemLock.__init__(self, SEMAPHORE, 1, 1)
-
- def __repr__(self):
- try:
- if self._semlock._is_mine():
- name = current_process().get_name()
- if threading.current_thread().get_name() != 'MainThread':
- name += '|' + threading.current_thread().get_name()
- elif self._semlock._get_value() == 1:
- name = 'None'
- elif self._semlock._count() > 0:
- name = 'SomeOtherThread'
- else:
- name = 'SomeOtherProcess'
- except Exception:
- name = 'unknown'
- return '<Lock(owner=%s)>' % name
-
-#
-# Recursive lock
-#
-
-class RLock(SemLock):
-
- def __init__(self):
- SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1)
-
- def __repr__(self):
- try:
- if self._semlock._is_mine():
- name = current_process().get_name()
- if threading.current_thread().get_name() != 'MainThread':
- name += '|' + threading.current_thread().get_name()
- count = self._semlock._count()
- elif self._semlock._get_value() == 1:
- name, count = 'None', 0
- elif self._semlock._count() > 0:
- name, count = 'SomeOtherThread', 'nonzero'
- else:
- name, count = 'SomeOtherProcess', 'nonzero'
- except Exception:
- name, count = 'unknown', 'unknown'
- return '<RLock(%s, %s)>' % (name, count)
-
-#
-# Condition variable
-#
-
-class Condition(object):
-
- def __init__(self, lock=None):
- self._lock = lock or RLock()
- self._sleeping_count = Semaphore(0)
- self._woken_count = Semaphore(0)
- self._wait_semaphore = Semaphore(0)
- self._make_methods()
-
- def __getstate__(self):
- assert_spawning(self)
- return (self._lock, self._sleeping_count,
- self._woken_count, self._wait_semaphore)
-
- def __setstate__(self, state):
- (self._lock, self._sleeping_count,
- self._woken_count, self._wait_semaphore) = state
- self._make_methods()
-
- def _make_methods(self):
- self.acquire = self._lock.acquire
- self.release = self._lock.release
- self.__enter__ = self._lock.__enter__
- self.__exit__ = self._lock.__exit__
-
- def __repr__(self):
- try:
- num_waiters = (self._sleeping_count._semlock._get_value() -
- self._woken_count._semlock._get_value())
- except Exception:
- num_waiters = 'unkown'
- return '<Condition(%s, %s)>' % (self._lock, num_waiters)
-
- def wait(self, timeout=None):
- assert self._lock._semlock._is_mine(), \
- 'must acquire() condition before using wait()'
-
- # indicate that this thread is going to sleep
- self._sleeping_count.release()
-
- # release lock
- count = self._lock._semlock._count()
- for i in xrange(count):
- self._lock.release()
-
- try:
- # wait for notification or timeout
- self._wait_semaphore.acquire(True, timeout)
- finally:
- # indicate that this thread has woken
- self._woken_count.release()
-
- # reacquire lock
- for i in xrange(count):
- self._lock.acquire()
-
- def notify(self):
- assert self._lock._semlock._is_mine(), 'lock is not owned'
- assert not self._wait_semaphore.acquire(False)
-
- # to take account of timeouts since last notify() we subtract
- # woken_count from sleeping_count and rezero woken_count
- while self._woken_count.acquire(False):
- res = self._sleeping_count.acquire(False)
- assert res
-
- if self._sleeping_count.acquire(False): # try grabbing a sleeper
- self._wait_semaphore.release() # wake up one sleeper
- self._woken_count.acquire() # wait for the sleeper to wake
-
- # rezero _wait_semaphore in case a timeout just happened
- self._wait_semaphore.acquire(False)
-
- def notify_all(self):
- assert self._lock._semlock._is_mine(), 'lock is not owned'
- assert not self._wait_semaphore.acquire(False)
-
- # to take account of timeouts since last notify*() we subtract
- # woken_count from sleeping_count and rezero woken_count
- while self._woken_count.acquire(False):
- res = self._sleeping_count.acquire(False)
- assert res
-
- sleepers = 0
- while self._sleeping_count.acquire(False):
- self._wait_semaphore.release() # wake up one sleeper
- sleepers += 1
-
- if sleepers:
- for i in xrange(sleepers):
- self._woken_count.acquire() # wait for a sleeper to wake
-
- # rezero wait_semaphore in case some timeouts just happened
- while self._wait_semaphore.acquire(False):
- pass
-
-#
-# Event
-#
-
-class Event(object):
-
- def __init__(self):
- self._cond = Condition(Lock())
- self._flag = Semaphore(0)
-
- def is_set(self):
- self._cond.acquire()
- try:
- if self._flag.acquire(False):
- self._flag.release()
- return True
- return False
- finally:
- self._cond.release()
-
- def set(self):
- self._cond.acquire()
- try:
- self._flag.acquire(False)
- self._flag.release()
- self._cond.notify_all()
- finally:
- self._cond.release()
-
- def clear(self):
- self._cond.acquire()
- try:
- self._flag.acquire(False)
- finally:
- self._cond.release()
-
- def wait(self, timeout=None):
- self._cond.acquire()
- try:
- if self._flag.acquire(False):
- self._flag.release()
- else:
- self._cond.wait(timeout)
- finally:
- self._cond.release()
+#
+# Module implementing synchronization primitives
+#
+# multiprocessing/synchronize.py
+#
+# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+#
+
+__all__ = [
+ 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
+ ]
+
+import threading
+import os
+import sys
+
+from time import time as _time, sleep as _sleep
+
+import _multiprocessing
+from multiprocessing.process import current_process
+from multiprocessing.util import Finalize, register_after_fork, debug
+from multiprocessing.forking import assert_spawning, Popen
+
+#
+# Constants
+#
+
+RECURSIVE_MUTEX, SEMAPHORE = range(2)
+SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
+
+#
+# Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
+#
+
+class SemLock(object):
+
+ def __init__(self, kind, value, maxvalue):
+ sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
+ debug('created semlock with handle %s' % sl.handle)
+ self._make_methods()
+
+ if sys.platform != 'win32':
+ def _after_fork(obj):
+ obj._semlock._after_fork()
+ register_after_fork(self, _after_fork)
+
+ def _make_methods(self):
+ self.acquire = self._semlock.acquire
+ self.release = self._semlock.release
+ self.__enter__ = self._semlock.__enter__
+ self.__exit__ = self._semlock.__exit__
+
+ def __getstate__(self):
+ assert_spawning(self)
+ sl = self._semlock
+ return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)
+
+ def __setstate__(self, state):
+ self._semlock = _multiprocessing.SemLock._rebuild(*state)
+ debug('recreated blocker with handle %r' % state[0])
+ self._make_methods()
+
+#
+# Semaphore
+#
+
+class Semaphore(SemLock):
+
+ def __init__(self, value=1):
+ SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX)
+
+ def get_value(self):
+ return self._semlock._get_value()
+
+ def __repr__(self):
+ try:
+ value = self._semlock._get_value()
+ except Exception:
+ value = 'unknown'
+ return '<Semaphore(value=%s)>' % value
+
+#
+# Bounded semaphore
+#
+
+class BoundedSemaphore(Semaphore):
+
+ def __init__(self, value=1):
+ SemLock.__init__(self, SEMAPHORE, value, value)
+
+ def __repr__(self):
+ try:
+ value = self._semlock._get_value()
+ except Exception:
+ value = 'unknown'
+ return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \
+ (value, self._semlock.maxvalue)
+
+#
+# Non-recursive lock
+#
+
+class Lock(SemLock):
+
+ def __init__(self):
+ SemLock.__init__(self, SEMAPHORE, 1, 1)
+
+ def __repr__(self):
+ try:
+ if self._semlock._is_mine():
+ name = current_process().get_name()
+ if threading.current_thread().get_name() != 'MainThread':
+ name += '|' + threading.current_thread().get_name()
+ elif self._semlock._get_value() == 1:
+ name = 'None'
+ elif self._semlock._count() > 0:
+ name = 'SomeOtherThread'
+ else:
+ name = 'SomeOtherProcess'
+ except Exception:
+ name = 'unknown'
+ return '<Lock(owner=%s)>' % name
+
+#
+# Recursive lock
+#
+
+class RLock(SemLock):
+
+ def __init__(self):
+ SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1)
+
+ def __repr__(self):
+ try:
+ if self._semlock._is_mine():
+ name = current_process().get_name()
+ if threading.current_thread().get_name() != 'MainThread':
+ name += '|' + threading.current_thread().get_name()
+ count = self._semlock._count()
+ elif self._semlock._get_value() == 1:
+ name, count = 'None', 0
+ elif self._semlock._count() > 0:
+ name, count = 'SomeOtherThread', 'nonzero'
+ else:
+ name, count = 'SomeOtherProcess', 'nonzero'
+ except Exception:
+ name, count = 'unknown', 'unknown'
+ return '<RLock(%s, %s)>' % (name, count)
+
+#
+# Condition variable
+#
+
+class Condition(object):
+
+ def __init__(self, lock=None):
+ self._lock = lock or RLock()
+ self._sleeping_count = Semaphore(0)
+ self._woken_count = Semaphore(0)
+ self._wait_semaphore = Semaphore(0)
+ self._make_methods()
+
+ def __getstate__(self):
+ assert_spawning(self)
+ return (self._lock, self._sleeping_count,
+ self._woken_count, self._wait_semaphore)
+
+ def __setstate__(self, state):
+ (self._lock, self._sleeping_count,
+ self._woken_count, self._wait_semaphore) = state
+ self._make_methods()
+
+ def _make_methods(self):
+ self.acquire = self._lock.acquire
+ self.release = self._lock.release
+ self.__enter__ = self._lock.__enter__
+ self.__exit__ = self._lock.__exit__
+
+ def __repr__(self):
+ try:
+ num_waiters = (self._sleeping_count._semlock._get_value() -
+ self._woken_count._semlock._get_value())
+ except Exception:
+ num_waiters = 'unkown'
+ return '<Condition(%s, %s)>' % (self._lock, num_waiters)
+
+ def wait(self, timeout=None):
+ assert self._lock._semlock._is_mine(), \
+ 'must acquire() condition before using wait()'
+
+ # indicate that this thread is going to sleep
+ self._sleeping_count.release()
+
+ # release lock
+ count = self._lock._semlock._count()
+ for i in xrange(count):
+ self._lock.release()
+
+ try:
+ # wait for notification or timeout
+ self._wait_semaphore.acquire(True, timeout)
+ finally:
+ # indicate that this thread has woken
+ self._woken_count.release()
+
+ # reacquire lock
+ for i in xrange(count):
+ self._lock.acquire()
+
+ def notify(self):
+ assert self._lock._semlock._is_mine(), 'lock is not owned'
+ assert not self._wait_semaphore.acquire(False)
+
+ # to take account of timeouts since last notify() we subtract
+ # woken_count from sleeping_count and rezero woken_count
+ while self._woken_count.acquire(False):
+ res = self._sleeping_count.acquire(False)
+ assert res
+
+ if self._sleeping_count.acquire(False): # try grabbing a sleeper
+ self._wait_semaphore.release() # wake up one sleeper
+ self._woken_count.acquire() # wait for the sleeper to wake
+
+ # rezero _wait_semaphore in case a timeout just happened
+ self._wait_semaphore.acquire(False)
+
+ def notify_all(self):
+ assert self._lock._semlock._is_mine(), 'lock is not owned'
+ assert not self._wait_semaphore.acquire(False)
+
+ # to take account of timeouts since last notify*() we subtract
+ # woken_count from sleeping_count and rezero woken_count
+ while self._woken_count.acquire(False):
+ res = self._sleeping_count.acquire(False)
+ assert res
+
+ sleepers = 0
+ while self._sleeping_count.acquire(False):
+ self._wait_semaphore.release() # wake up one sleeper
+ sleepers += 1
+
+ if sleepers:
+ for i in xrange(sleepers):
+ self._woken_count.acquire() # wait for a sleeper to wake
+
+ # rezero wait_semaphore in case some timeouts just happened
+ while self._wait_semaphore.acquire(False):
+ pass
+
+#
+# Event
+#
+
+class Event(object):
+
+ def __init__(self):
+ self._cond = Condition(Lock())
+ self._flag = Semaphore(0)
+
+ def is_set(self):
+ self._cond.acquire()
+ try:
+ if self._flag.acquire(False):
+ self._flag.release()
+ return True
+ return False
+ finally:
+ self._cond.release()
+
+ def set(self):
+ self._cond.acquire()
+ try:
+ self._flag.acquire(False)
+ self._flag.release()
+ self._cond.notify_all()
+ finally:
+ self._cond.release()
+
+ def clear(self):
+ self._cond.acquire()
+ try:
+ self._flag.acquire(False)
+ finally:
+ self._cond.release()
+
+ def wait(self, timeout=None):
+ self._cond.acquire()
+ try:
+ if self._flag.acquire(False):
+ self._flag.release()
+ else:
+ self._cond.wait(timeout)
+ finally:
+ self._cond.release()
diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py
index 25ff8bd..c20a562 100644
--- a/Lib/multiprocessing/util.py
+++ b/Lib/multiprocessing/util.py
@@ -1,336 +1,336 @@
-#
-# Module providing various facilities to other parts of the package
-#
-# multiprocessing/util.py
-#
-# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
-#
-
-import itertools
-import weakref
-import copy_reg
-import atexit
-import threading # we want threading to install it's
- # cleanup function before multiprocessing does
-
-from multiprocessing.process import current_process, active_children
-
-__all__ = [
- 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
- 'log_to_stderr', 'get_temp_dir', 'register_after_fork',
- 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal'
- ]
-
-#
-# Logging
-#
-
-NOTSET = 0
-SUBDEBUG = 5
-DEBUG = 10
-INFO = 20
-SUBWARNING = 25
-
-LOGGER_NAME = 'multiprocessing'
-DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
-
-_logger = None
-_log_to_stderr = False
-
-def sub_debug(msg, *args):
- if _logger:
- _logger.log(SUBDEBUG, msg, *args)
-
-def debug(msg, *args):
- if _logger:
- _logger.log(DEBUG, msg, *args)
-
-def info(msg, *args):
- if _logger:
- _logger.log(INFO, msg, *args)
-
-def sub_warning(msg, *args):
- if _logger:
- _logger.log(SUBWARNING, msg, *args)
-
-def get_logger():
- '''
- Returns logger used by multiprocessing
- '''
- global _logger
-
- if not _logger:
- import logging, atexit
-
- # XXX multiprocessing should cleanup before logging
- if hasattr(atexit, 'unregister'):
- atexit.unregister(_exit_function)
- atexit.register(_exit_function)
- else:
- atexit._exithandlers.remove((_exit_function, (), {}))
- atexit._exithandlers.append((_exit_function, (), {}))
-
- _check_logger_class()
- _logger = logging.getLogger(LOGGER_NAME)
-
- return _logger
-
-def _check_logger_class():
- '''
- Make sure process name is recorded when loggers are used
- '''
- # XXX This function is unnecessary once logging is patched
- import logging
- if hasattr(logging, 'multiprocessing'):
- return
-
- logging._acquireLock()
- try:
- OldLoggerClass = logging.getLoggerClass()
- if not getattr(OldLoggerClass, '_process_aware', False):
- class ProcessAwareLogger(OldLoggerClass):
- _process_aware = True
- def makeRecord(self, *args, **kwds):
- record = OldLoggerClass.makeRecord(self, *args, **kwds)
- record.processName = current_process()._name
- return record
- logging.setLoggerClass(ProcessAwareLogger)
- finally:
- logging._releaseLock()
-
-def log_to_stderr(level=None):
- '''
- Turn on logging and add a handler which prints to stderr
- '''
- global _log_to_stderr
- import logging
- logger = get_logger()
- formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
- handler = logging.StreamHandler()
- handler.setFormatter(formatter)
- logger.addHandler(handler)
- if level is not None:
- logger.setLevel(level)
- _log_to_stderr = True
-
-#
-# Function returning a temp directory which will be removed on exit
-#
-
-def get_temp_dir():
- # get name of a temp directory which will be automatically cleaned up
- if current_process()._tempdir is None:
- import shutil, tempfile
- tempdir = tempfile.mkdtemp(prefix='pymp-')
- info('created temp directory %s', tempdir)
- Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
- current_process()._tempdir = tempdir
- return current_process()._tempdir
-
-#
-# Support for reinitialization of objects when bootstrapping a child process
-#
-
-_afterfork_registry = weakref.WeakValueDictionary()
-_afterfork_counter = itertools.count()
-
-def _run_after_forkers():
- items = list(_afterfork_registry.items())
- items.sort()
- for (index, ident, func), obj in items:
- try:
- func(obj)
- except Exception, e:
- info('after forker raised exception %s', e)
-
-def register_after_fork(obj, func):
- _afterfork_registry[(_afterfork_counter.next(), id(obj), func)] = obj
-
-#
-# Finalization using weakrefs
-#
-
-_finalizer_registry = {}
-_finalizer_counter = itertools.count()
-
-
-class Finalize(object):
- '''
- Class which supports object finalization using weakrefs
- '''
- def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
- assert exitpriority is None or type(exitpriority) is int
-
- if obj is not None:
- self._weakref = weakref.ref(obj, self)
- else:
- assert exitpriority is not None
-
- self._callback = callback
- self._args = args
- self._kwargs = kwargs or {}
- self._key = (exitpriority, _finalizer_counter.next())
-
- _finalizer_registry[self._key] = self
-
- def __call__(self, wr=None):
- '''
- Run the callback unless it has already been called or cancelled
- '''
- try:
- del _finalizer_registry[self._key]
- except KeyError:
- sub_debug('finalizer no longer registered')
- else:
- sub_debug('finalizer calling %s with args %s and kwargs %s',
- self._callback, self._args, self._kwargs)
- res = self._callback(*self._args, **self._kwargs)
- self._weakref = self._callback = self._args = \
- self._kwargs = self._key = None
- return res
-
- def cancel(self):
- '''
- Cancel finalization of the object
- '''
- try:
- del _finalizer_registry[self._key]
- except KeyError:
- pass
- else:
- self._weakref = self._callback = self._args = \
- self._kwargs = self._key = None
-
- def still_active(self):
- '''
- Return whether this finalizer is still waiting to invoke callback
- '''
- return self._key in _finalizer_registry
-
- def __repr__(self):
- try:
- obj = self._weakref()
- except (AttributeError, TypeError):
- obj = None
-
- if obj is None:
- return '<Finalize object, dead>'
-
- x = '<Finalize object, callback=%s' % \
- getattr(self._callback, '__name__', self._callback)
- if self._args:
- x += ', args=' + str(self._args)
- if self._kwargs:
- x += ', kwargs=' + str(self._kwargs)
- if self._key[0] is not None:
- x += ', exitprority=' + str(self._key[0])
- return x + '>'
-
-
-def _run_finalizers(minpriority=None):
- '''
- Run all finalizers whose exit priority is not None and at least minpriority
-
- Finalizers with highest priority are called first; finalizers with
- the same priority will be called in reverse order of creation.
- '''
- if minpriority is None:
- f = lambda p : p[0][0] is not None
- else:
- f = lambda p : p[0][0] is not None and p[0][0] >= minpriority
-
- items = [x for x in _finalizer_registry.items() if f(x)]
- items.sort(reverse=True)
-
- for key, finalizer in items:
- sub_debug('calling %s', finalizer)
- try:
- finalizer()
- except Exception:
- import traceback
- traceback.print_exc()
-
- if minpriority is None:
- _finalizer_registry.clear()
-
-#
-# Clean up on exit
-#
-
-def is_exiting():
- '''
- Returns true if the process is shutting down
- '''
- return _exiting or _exiting is None
-
-_exiting = False
-
-def _exit_function():
- global _exiting
-
- info('process shutting down')
- debug('running all "atexit" finalizers with priority >= 0')
- _run_finalizers(0)
-
- for p in active_children():
- if p._daemonic:
- info('calling terminate() for daemon %s', p.get_name())
- p._popen.terminate()
-
- for p in active_children():
- info('calling join() for process %s', p.get_name())
- p.join()
-
- debug('running the remaining "atexit" finalizers')
- _run_finalizers()
-
-atexit.register(_exit_function)
-
-#
-# Some fork aware types
-#
-
-class ForkAwareThreadLock(object):
- def __init__(self):
- self._lock = threading.Lock()
- self.acquire = self._lock.acquire
- self.release = self._lock.release
- register_after_fork(self, ForkAwareThreadLock.__init__)
-
-class ForkAwareLocal(threading.local):
- def __init__(self):
- register_after_fork(self, lambda obj : obj.__dict__.clear())
- def __reduce__(self):
- return type(self), ()
-
-#
-# Try making some callable types picklable
-#
-
-def _reduce_method(m):
- if m.im_self is None:
- return getattr, (m.im_class, m.im_func.func_name)
- else:
- return getattr, (m.im_self, m.im_func.func_name)
-copy_reg.pickle(type(Finalize.__init__), _reduce_method)
-
-def _reduce_method_descriptor(m):
- return getattr, (m.__objclass__, m.__name__)
-copy_reg.pickle(type(list.append), _reduce_method_descriptor)
-copy_reg.pickle(type(int.__add__), _reduce_method_descriptor)
-
-def _reduce_builtin_function_or_method(m):
- return getattr, (m.__self__, m.__name__)
-copy_reg.pickle(type(list().append), _reduce_builtin_function_or_method)
-copy_reg.pickle(type(int().__add__), _reduce_builtin_function_or_method)
-
-try:
- from functools import partial
-except ImportError:
- pass
-else:
- def _reduce_partial(p):
- return _rebuild_partial, (p.func, p.args, p.keywords or {})
- def _rebuild_partial(func, args, keywords):
- return partial(func, *args, **keywords)
- copy_reg.pickle(partial, _reduce_partial)
+#
+# Module providing various facilities to other parts of the package
+#
+# multiprocessing/util.py
+#
+# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+#
+
+import itertools
+import weakref
+import copy_reg
+import atexit
+import threading # we want threading to install it's
+ # cleanup function before multiprocessing does
+
+from multiprocessing.process import current_process, active_children
+
+__all__ = [
+ 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
+ 'log_to_stderr', 'get_temp_dir', 'register_after_fork',
+ 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal'
+ ]
+
+#
+# Logging
+#
+
+NOTSET = 0
+SUBDEBUG = 5
+DEBUG = 10
+INFO = 20
+SUBWARNING = 25
+
+LOGGER_NAME = 'multiprocessing'
+DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
+
+_logger = None
+_log_to_stderr = False
+
+def sub_debug(msg, *args):
+ if _logger:
+ _logger.log(SUBDEBUG, msg, *args)
+
+def debug(msg, *args):
+ if _logger:
+ _logger.log(DEBUG, msg, *args)
+
+def info(msg, *args):
+ if _logger:
+ _logger.log(INFO, msg, *args)
+
+def sub_warning(msg, *args):
+ if _logger:
+ _logger.log(SUBWARNING, msg, *args)
+
+def get_logger():
+ '''
+ Returns logger used by multiprocessing
+ '''
+ global _logger
+
+ if not _logger:
+ import logging, atexit
+
+ # XXX multiprocessing should cleanup before logging
+ if hasattr(atexit, 'unregister'):
+ atexit.unregister(_exit_function)
+ atexit.register(_exit_function)
+ else:
+ atexit._exithandlers.remove((_exit_function, (), {}))
+ atexit._exithandlers.append((_exit_function, (), {}))
+
+ _check_logger_class()
+ _logger = logging.getLogger(LOGGER_NAME)
+
+ return _logger
+
+def _check_logger_class():
+ '''
+ Make sure process name is recorded when loggers are used
+ '''
+ # XXX This function is unnecessary once logging is patched
+ import logging
+ if hasattr(logging, 'multiprocessing'):
+ return
+
+ logging._acquireLock()
+ try:
+ OldLoggerClass = logging.getLoggerClass()
+ if not getattr(OldLoggerClass, '_process_aware', False):
+ class ProcessAwareLogger(OldLoggerClass):
+ _process_aware = True
+ def makeRecord(self, *args, **kwds):
+ record = OldLoggerClass.makeRecord(self, *args, **kwds)
+ record.processName = current_process()._name
+ return record
+ logging.setLoggerClass(ProcessAwareLogger)
+ finally:
+ logging._releaseLock()
+
+def log_to_stderr(level=None):
+ '''
+ Turn on logging and add a handler which prints to stderr
+ '''
+ global _log_to_stderr
+ import logging
+ logger = get_logger()
+ formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
+ handler = logging.StreamHandler()
+ handler.setFormatter(formatter)
+ logger.addHandler(handler)
+ if level is not None:
+ logger.setLevel(level)
+ _log_to_stderr = True
+
+#
+# Function returning a temp directory which will be removed on exit
+#
+
+def get_temp_dir():
+ # get name of a temp directory which will be automatically cleaned up
+ if current_process()._tempdir is None:
+ import shutil, tempfile
+ tempdir = tempfile.mkdtemp(prefix='pymp-')
+ info('created temp directory %s', tempdir)
+ Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
+ current_process()._tempdir = tempdir
+ return current_process()._tempdir
+
+#
+# Support for reinitialization of objects when bootstrapping a child process
+#
+
+_afterfork_registry = weakref.WeakValueDictionary()
+_afterfork_counter = itertools.count()
+
+def _run_after_forkers():
+ items = list(_afterfork_registry.items())
+ items.sort()
+ for (index, ident, func), obj in items:
+ try:
+ func(obj)
+ except Exception, e:
+ info('after forker raised exception %s', e)
+
+def register_after_fork(obj, func):
+ _afterfork_registry[(_afterfork_counter.next(), id(obj), func)] = obj
+
+#
+# Finalization using weakrefs
+#
+
+_finalizer_registry = {}
+_finalizer_counter = itertools.count()
+
+
+class Finalize(object):
+ '''
+ Class which supports object finalization using weakrefs
+ '''
+ def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
+ assert exitpriority is None or type(exitpriority) is int
+
+ if obj is not None:
+ self._weakref = weakref.ref(obj, self)
+ else:
+ assert exitpriority is not None
+
+ self._callback = callback
+ self._args = args
+ self._kwargs = kwargs or {}
+ self._key = (exitpriority, _finalizer_counter.next())
+
+ _finalizer_registry[self._key] = self
+
+ def __call__(self, wr=None):
+ '''
+ Run the callback unless it has already been called or cancelled
+ '''
+ try:
+ del _finalizer_registry[self._key]
+ except KeyError:
+ sub_debug('finalizer no longer registered')
+ else:
+ sub_debug('finalizer calling %s with args %s and kwargs %s',
+ self._callback, self._args, self._kwargs)
+ res = self._callback(*self._args, **self._kwargs)
+ self._weakref = self._callback = self._args = \
+ self._kwargs = self._key = None
+ return res
+
+ def cancel(self):
+ '''
+ Cancel finalization of the object
+ '''
+ try:
+ del _finalizer_registry[self._key]
+ except KeyError:
+ pass
+ else:
+ self._weakref = self._callback = self._args = \
+ self._kwargs = self._key = None
+
+ def still_active(self):
+ '''
+ Return whether this finalizer is still waiting to invoke callback
+ '''
+ return self._key in _finalizer_registry
+
+ def __repr__(self):
+ try:
+ obj = self._weakref()
+ except (AttributeError, TypeError):
+ obj = None
+
+ if obj is None:
+ return '<Finalize object, dead>'
+
+ x = '<Finalize object, callback=%s' % \
+ getattr(self._callback, '__name__', self._callback)
+ if self._args:
+ x += ', args=' + str(self._args)
+ if self._kwargs:
+ x += ', kwargs=' + str(self._kwargs)
+ if self._key[0] is not None:
+ x += ', exitprority=' + str(self._key[0])
+ return x + '>'
+
+
+def _run_finalizers(minpriority=None):
+ '''
+ Run all finalizers whose exit priority is not None and at least minpriority
+
+ Finalizers with highest priority are called first; finalizers with
+ the same priority will be called in reverse order of creation.
+ '''
+ if minpriority is None:
+ f = lambda p : p[0][0] is not None
+ else:
+ f = lambda p : p[0][0] is not None and p[0][0] >= minpriority
+
+ items = [x for x in _finalizer_registry.items() if f(x)]
+ items.sort(reverse=True)
+
+ for key, finalizer in items:
+ sub_debug('calling %s', finalizer)
+ try:
+ finalizer()
+ except Exception:
+ import traceback
+ traceback.print_exc()
+
+ if minpriority is None:
+ _finalizer_registry.clear()
+
+#
+# Clean up on exit
+#
+
+def is_exiting():
+ '''
+ Returns true if the process is shutting down
+ '''
+ return _exiting or _exiting is None
+
+_exiting = False
+
+def _exit_function():
+ global _exiting
+
+ info('process shutting down')
+ debug('running all "atexit" finalizers with priority >= 0')
+ _run_finalizers(0)
+
+ for p in active_children():
+ if p._daemonic:
+ info('calling terminate() for daemon %s', p.get_name())
+ p._popen.terminate()
+
+ for p in active_children():
+ info('calling join() for process %s', p.get_name())
+ p.join()
+
+ debug('running the remaining "atexit" finalizers')
+ _run_finalizers()
+
+atexit.register(_exit_function)
+
+#
+# Some fork aware types
+#
+
+class ForkAwareThreadLock(object):
+ def __init__(self):
+ self._lock = threading.Lock()
+ self.acquire = self._lock.acquire
+ self.release = self._lock.release
+ register_after_fork(self, ForkAwareThreadLock.__init__)
+
+class ForkAwareLocal(threading.local):
+ def __init__(self):
+ register_after_fork(self, lambda obj : obj.__dict__.clear())
+ def __reduce__(self):
+ return type(self), ()
+
+#
+# Try making some callable types picklable
+#
+
+def _reduce_method(m):
+ if m.im_self is None:
+ return getattr, (m.im_class, m.im_func.func_name)
+ else:
+ return getattr, (m.im_self, m.im_func.func_name)
+copy_reg.pickle(type(Finalize.__init__), _reduce_method)
+
+def _reduce_method_descriptor(m):
+ return getattr, (m.__objclass__, m.__name__)
+copy_reg.pickle(type(list.append), _reduce_method_descriptor)
+copy_reg.pickle(type(int.__add__), _reduce_method_descriptor)
+
+def _reduce_builtin_function_or_method(m):
+ return getattr, (m.__self__, m.__name__)
+copy_reg.pickle(type(list().append), _reduce_builtin_function_or_method)
+copy_reg.pickle(type(int().__add__), _reduce_builtin_function_or_method)
+
+try:
+ from functools import partial
+except ImportError:
+ pass
+else:
+ def _reduce_partial(p):
+ return _rebuild_partial, (p.func, p.args, p.keywords or {})
+ def _rebuild_partial(func, args, keywords):
+ return partial(func, *args, **keywords)
+ copy_reg.pickle(partial, _reduce_partial)