diff options
author | Richard Oudkerk <shibturn@gmail.com> | 2013-08-14 14:35:41 (GMT) |
---|---|---|
committer | Richard Oudkerk <shibturn@gmail.com> | 2013-08-14 14:35:41 (GMT) |
commit | 84ed9a68bd9a13252b376b21a9167dabae254325 (patch) | |
tree | ec8daa39fcf64b658bddf52f56ae47c0bdc2b091 /Lib/multiprocessing/synchronize.py | |
parent | d06eeb4a2492b59d34ab69a2046dcae1f10ec593 (diff) | |
download | cpython-84ed9a68bd9a13252b376b21a9167dabae254325.zip cpython-84ed9a68bd9a13252b376b21a9167dabae254325.tar.gz cpython-84ed9a68bd9a13252b376b21a9167dabae254325.tar.bz2 |
Issue #8713: Support alternative start methods in multiprocessing on Unix.
See http://hg.python.org/sandbox/sbt#spawn
Diffstat (limited to 'Lib/multiprocessing/synchronize.py')
-rw-r--r-- | Lib/multiprocessing/synchronize.py | 73 |
1 files changed, 58 insertions, 15 deletions
diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index 0faca78..09736ef 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -11,20 +11,24 @@ __all__ = [ 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event' ] +import os import threading import sys - +import itertools +import tempfile import _multiprocessing -from multiprocessing.process import current_process -from multiprocessing.util import register_after_fork, debug -from multiprocessing.forking import assert_spawning, Popen + from time import time as _time +from . import popen +from . import process +from . import util + # Try to import the mp.synchronize module cleanly, if it fails # raise ImportError for platforms lacking a working sem_open implementation. # See issue 3770 try: - from _multiprocessing import SemLock + from _multiprocessing import SemLock, sem_unlink except (ImportError): raise ImportError("This platform lacks a functioning sem_open" + " implementation, therefore, the required" + @@ -44,15 +48,45 @@ SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX class SemLock(object): + _rand = tempfile._RandomNameSequence() + def __init__(self, kind, value, maxvalue): - sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue) - debug('created semlock with handle %s' % sl.handle) + unlink_immediately = (sys.platform == 'win32' or + popen.get_start_method() == 'fork') + for i in range(100): + try: + sl = self._semlock = _multiprocessing.SemLock( + kind, value, maxvalue, self._make_name(), + unlink_immediately) + except FileExistsError: + pass + else: + break + else: + raise FileExistsError('cannot find name for semaphore') + + util.debug('created semlock with handle %s' % sl.handle) self._make_methods() if sys.platform != 'win32': def _after_fork(obj): obj._semlock._after_fork() - register_after_fork(self, _after_fork) + util.register_after_fork(self, _after_fork) + + if self._semlock.name is not None: + # We only get here if we are on Unix with forking + # disabled. When the object is garbage collected or the + # process shuts down we unlink the semaphore name + from .semaphore_tracker import register + register(self._semlock.name) + util.Finalize(self, SemLock._cleanup, (self._semlock.name,), + exitpriority=0) + + @staticmethod + def _cleanup(name): + from .semaphore_tracker import unregister + sem_unlink(name) + unregister(name) def _make_methods(self): self.acquire = self._semlock.acquire @@ -65,15 +99,24 @@ class SemLock(object): return self._semlock.__exit__(*args) def __getstate__(self): - assert_spawning(self) + popen.assert_spawning(self) sl = self._semlock - return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue) + if sys.platform == 'win32': + h = popen.get_spawning_popen().duplicate_for_child(sl.handle) + else: + h = sl.handle + return (h, sl.kind, sl.maxvalue, sl.name) def __setstate__(self, state): self._semlock = _multiprocessing.SemLock._rebuild(*state) - debug('recreated blocker with handle %r' % state[0]) + util.debug('recreated blocker with handle %r' % state[0]) self._make_methods() + @staticmethod + def _make_name(): + return '/%s-%s' % (process.current_process()._config['semprefix'], + next(SemLock._rand)) + # # Semaphore # @@ -122,7 +165,7 @@ class Lock(SemLock): def __repr__(self): try: if self._semlock._is_mine(): - name = current_process().name + name = process.current_process().name if threading.current_thread().name != 'MainThread': name += '|' + threading.current_thread().name elif self._semlock._get_value() == 1: @@ -147,7 +190,7 @@ class RLock(SemLock): def __repr__(self): try: if self._semlock._is_mine(): - name = current_process().name + name = process.current_process().name if threading.current_thread().name != 'MainThread': name += '|' + threading.current_thread().name count = self._semlock._count() @@ -175,7 +218,7 @@ class Condition(object): self._make_methods() def __getstate__(self): - assert_spawning(self) + popen.assert_spawning(self) return (self._lock, self._sleeping_count, self._woken_count, self._wait_semaphore) @@ -342,7 +385,7 @@ class Barrier(threading.Barrier): def __init__(self, parties, action=None, timeout=None): import struct - from multiprocessing.heap import BufferWrapper + from .heap import BufferWrapper wrapper = BufferWrapper(struct.calcsize('i') * 2) cond = Condition() self.__setstate__((parties, action, timeout, cond, wrapper)) |