summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBenjamin Peterson <benjamin@python.org>2008-09-01 17:10:46 (GMT)
committerBenjamin Peterson <benjamin@python.org>2008-09-01 17:10:46 (GMT)
commitf7feaec16c6d21cbb3c6c5c9b84288dd3095c005 (patch)
treedf1c7b1330e30a1a4f369863749cb0c586d9af2a
parent27cc8e1dd2846ff8df18b5d776192d4f99354a26 (diff)
downloadcpython-f7feaec16c6d21cbb3c6c5c9b84288dd3095c005.zip
cpython-f7feaec16c6d21cbb3c6c5c9b84288dd3095c005.tar.gz
cpython-f7feaec16c6d21cbb3c6c5c9b84288dd3095c005.tar.bz2
revert r66114 for Jesse
-rw-r--r--Lib/multiprocessing/__init__.py137
-rw-r--r--Lib/multiprocessing/managers.py21
-rw-r--r--Lib/multiprocessing/sharedctypes.py2
-rw-r--r--Lib/multiprocessing/synchronize.py23
-rw-r--r--Lib/multiprocessing/util.py2
5 files changed, 146 insertions, 39 deletions
diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py
index c0fa1fc..f965056 100644
--- a/Lib/multiprocessing/__init__.py
+++ b/Lib/multiprocessing/__init__.py
@@ -97,6 +97,13 @@ def Manager():
m.start()
return m
+def Pipe(duplex=True):
+ '''
+ Returns two connection object connected by a pipe
+ '''
+ from multiprocessing.connection import Pipe
+ return Pipe(duplex)
+
def cpu_count():
'''
Returns the number of CPUs in the system
@@ -131,28 +138,134 @@ def freeze_support():
from multiprocessing.forking import freeze_support
freeze_support()
+def get_logger():
+ '''
+ Return package logger -- if it does not already exist then it is created
+ '''
+ from multiprocessing.util import get_logger
+ return get_logger()
+
+def log_to_stderr(level=None):
+ '''
+ Turn on logging and add a handler which prints to stderr
+ '''
+ from multiprocessing.util import log_to_stderr
+ return log_to_stderr(level)
+
def allow_connection_pickling():
'''
Install support for sending connections and sockets between processes
'''
from multiprocessing import reduction
-# Alias some names from submodules in the package namespace
-from multiprocessing.connection import Pipe
-from multiprocessing.util import (get_logger, log_to_stderr)
-
#
# Definitions depending on native semaphores
#
-# Alias some names from submodules in the package namespace
-from multiprocessing.synchronize import (Lock, RLock, Condition, Event,
- Semaphore, BoundedSemaphore)
-from multiprocessing.queues import (Queue, JoinableQueue)
-from multiprocessing.pool import Pool
-from multiprocessing.sharedctypes import (RawValue, Value,
- RawArray, Array)
+
+def Lock():
+ '''
+ Returns a non-recursive lock object
+ '''
+ from multiprocessing.synchronize import Lock
+ return Lock()
+
+def RLock():
+ '''
+ Returns a recursive lock object
+ '''
+ from multiprocessing.synchronize import RLock
+ return RLock()
+
+def Condition(lock=None):
+ '''
+ Returns a condition object
+ '''
+ from multiprocessing.synchronize import Condition
+ return Condition(lock)
+
+def Semaphore(value=1):
+ '''
+ Returns a semaphore object
+ '''
+ from multiprocessing.synchronize import Semaphore
+ return Semaphore(value)
+
+def BoundedSemaphore(value=1):
+ '''
+ Returns a bounded semaphore object
+ '''
+ from multiprocessing.synchronize import BoundedSemaphore
+ return BoundedSemaphore(value)
+
+def Event():
+ '''
+ Returns an event object
+ '''
+ from multiprocessing.synchronize import Event
+ return Event()
+
+def Queue(maxsize=0):
+ '''
+ Returns a queue object
+ '''
+ from multiprocessing.queues import Queue
+ return Queue(maxsize)
+
+def JoinableQueue(maxsize=0):
+ '''
+ Returns a queue object
+ '''
+ from multiprocessing.queues import JoinableQueue
+ return JoinableQueue(maxsize)
+
+def Pool(processes=None, initializer=None, initargs=()):
+ '''
+ Returns a process pool object
+ '''
+ from multiprocessing.pool import Pool
+ return Pool(processes, initializer, initargs)
+
+def RawValue(typecode_or_type, *args):
+ '''
+ Returns a shared object
+ '''
+ from multiprocessing.sharedctypes import RawValue
+ return RawValue(typecode_or_type, *args)
+
+def RawArray(typecode_or_type, size_or_initializer):
+ '''
+ Returns a shared array
+ '''
+ from multiprocessing.sharedctypes import RawArray
+ return RawArray(typecode_or_type, size_or_initializer)
+
+def Value(typecode_or_type, *args, **kwds):
+ '''
+ Returns a synchronized shared object
+ '''
+ from multiprocessing.sharedctypes import Value
+ return Value(typecode_or_type, *args, **kwds)
+
+def Array(typecode_or_type, size_or_initializer, **kwds):
+ '''
+ Returns a synchronized shared array
+ '''
+ from multiprocessing.sharedctypes import Array
+ return Array(typecode_or_type, size_or_initializer, **kwds)
+
+#
+#
+#
if sys.platform == 'win32':
- from multiprocessing.forking import set_executable
+
+ def set_executable(executable):
+ '''
+ Sets the path to a python.exe or pythonw.exe binary used to run
+ child processes on Windows instead of sys.executable.
+ Useful for people embedding Python.
+ '''
+ from multiprocessing.forking import set_executable
+ set_executable(executable)
__all__ += ['set_executable']
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index 256e572..f32dab1 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -371,7 +371,13 @@ class Server(object):
self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
if ident not in self.id_to_refcount:
- self.id_to_refcount[ident] = None
+ self.id_to_refcount[ident] = 0
+ # increment the reference count immediately, to avoid
+ # this object being garbage collected before a Proxy
+ # object for it can be created. The caller of create()
+ # is responsible for doing a decref once the Proxy object
+ # has been created.
+ self.incref(c, ident)
return ident, tuple(exposed)
finally:
self.mutex.release()
@@ -393,11 +399,7 @@ class Server(object):
def incref(self, c, ident):
self.mutex.acquire()
try:
- try:
- self.id_to_refcount[ident] += 1
- except TypeError:
- assert self.id_to_refcount[ident] is None
- self.id_to_refcount[ident] = 1
+ self.id_to_refcount[ident] += 1
finally:
self.mutex.release()
@@ -634,6 +636,8 @@ class BaseManager(object):
token, self._serializer, manager=self,
authkey=self._authkey, exposed=exp
)
+ conn = self._Client(token.address, authkey=self._authkey)
+ dispatch(conn, None, 'decref', (token.id,))
return proxy
temp.__name__ = typeid
setattr(cls, typeid, temp)
@@ -726,10 +730,13 @@ class BaseProxy(object):
elif kind == '#PROXY':
exposed, token = result
proxytype = self._manager._registry[token.typeid][-1]
- return proxytype(
+ proxy = proxytype(
token, self._serializer, manager=self._manager,
authkey=self._authkey, exposed=exposed
)
+ conn = self._Client(token.address, authkey=self._authkey)
+ dispatch(conn, None, 'decref', (token.id,))
+ return proxy
raise convert_to_error(kind, result)
def _getvalue(self):
diff --git a/Lib/multiprocessing/sharedctypes.py b/Lib/multiprocessing/sharedctypes.py
index 5ca8fb8..0054ff1 100644
--- a/Lib/multiprocessing/sharedctypes.py
+++ b/Lib/multiprocessing/sharedctypes.py
@@ -63,7 +63,7 @@ def RawArray(typecode_or_type, size_or_initializer):
def Value(typecode_or_type, *args, **kwds):
'''
- Return a synchronization wrapper for a RawValue
+ Return a synchronization wrapper for a Value
'''
lock = kwds.pop('lock', None)
if kwds:
diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py
index 3e21dfe..428656a 100644
--- a/Lib/multiprocessing/synchronize.py
+++ b/Lib/multiprocessing/synchronize.py
@@ -65,9 +65,7 @@ class SemLock(object):
#
class Semaphore(SemLock):
- '''
- A semaphore object
- '''
+
def __init__(self, value=1):
SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX)
@@ -86,9 +84,7 @@ class Semaphore(SemLock):
#
class BoundedSemaphore(Semaphore):
- '''
- A bounded semaphore object
- '''
+
def __init__(self, value=1):
SemLock.__init__(self, SEMAPHORE, value, value)
@@ -105,9 +101,7 @@ class BoundedSemaphore(Semaphore):
#
class Lock(SemLock):
- '''
- A non-recursive lock object
- '''
+
def __init__(self):
SemLock.__init__(self, SEMAPHORE, 1, 1)
@@ -132,9 +126,7 @@ class Lock(SemLock):
#
class RLock(SemLock):
- '''
- A recursive lock object
- '''
+
def __init__(self):
SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1)
@@ -160,9 +152,6 @@ class RLock(SemLock):
#
class Condition(object):
- '''
- A condition object
- '''
def __init__(self, lock=None):
self._lock = lock or RLock()
@@ -263,9 +252,7 @@ class Condition(object):
#
class Event(object):
- '''
- An event object
- '''
+
def __init__(self):
self._cond = Condition(Lock())
self._flag = Semaphore(0)
diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py
index c3e811c..7d53512 100644
--- a/Lib/multiprocessing/util.py
+++ b/Lib/multiprocessing/util.py
@@ -54,7 +54,7 @@ def sub_warning(msg, *args):
def get_logger():
'''
- Return package logger -- if it does not already exist then it is created
+ Returns logger used by multiprocessing
'''
global _logger