summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing/pool.py
diff options
context:
space:
mode:
authorBenjamin Peterson <benjamin@python.org>2008-06-13 19:13:39 (GMT)
committerBenjamin Peterson <benjamin@python.org>2008-06-13 19:13:39 (GMT)
commitdfd79494ce78868c937dc91eddd630cbdcae5611 (patch)
tree497db9bd37079421b144f33635c6bdd4b7058db5 /Lib/multiprocessing/pool.py
parentc9798fc7094c8ddcd73cc73870bbe0a1d4b5b1b1 (diff)
downloadcpython-dfd79494ce78868c937dc91eddd630cbdcae5611.zip
cpython-dfd79494ce78868c937dc91eddd630cbdcae5611.tar.gz
cpython-dfd79494ce78868c937dc91eddd630cbdcae5611.tar.bz2
convert multiprocessing to unix line endings
Diffstat (limited to 'Lib/multiprocessing/pool.py')
-rw-r--r--Lib/multiprocessing/pool.py60
1 files changed, 30 insertions, 30 deletions
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index 0255c86..79f0a29 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -58,18 +58,18 @@ def worker(inqueue, outqueue, initializer=None, initargs=()):
except (EOFError, IOError):
debug('worker got EOFError or IOError -- exiting')
break
-
+
if task is None:
debug('worker got sentinel -- exiting')
break
-
+
job, i, func, args, kwds = task
try:
result = (True, func(*args, **kwds))
except Exception, e:
result = (False, e)
put((job, i, result))
-
+
#
# Class representing a process pool
#
@@ -91,7 +91,7 @@ class Pool(object):
processes = cpu_count()
except NotImplementedError:
processes = 1
-
+
self._pool = []
for i in range(processes):
w = self.Process(
@@ -102,7 +102,7 @@ class Pool(object):
w.set_name(w.get_name().replace('Process', 'PoolWorker'))
w.set_daemon(True)
w.start()
-
+
self._task_handler = threading.Thread(
target=Pool._handle_tasks,
args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
@@ -132,7 +132,7 @@ class Pool(object):
self._outqueue = SimpleQueue()
self._quick_put = self._inqueue._writer.send
self._quick_get = self._outqueue._reader.recv
-
+
def apply(self, func, args=(), kwds={}):
'''
Equivalent of `apply()` builtin
@@ -182,7 +182,7 @@ class Pool(object):
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
for i, x in enumerate(task_batches)), result._set_length))
return (item for chunk in result for item in chunk)
-
+
def apply_async(self, func, args=(), kwds={}, callback=None):
'''
Asynchronous equivalent of `apply()` builtin
@@ -199,12 +199,12 @@ class Pool(object):
assert self._state == RUN
if not hasattr(iterable, '__len__'):
iterable = list(iterable)
-
+
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
chunksize += 1
-
+
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = MapResult(self._cache, chunksize, len(iterable), callback)
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
@@ -234,13 +234,13 @@ class Pool(object):
break
else:
debug('task handler got sentinel')
-
+
try:
# tell result handler to finish when cache is empty
debug('task handler sending sentinel to result handler')
outqueue.put(None)
-
+
# tell workers there is no more work
debug('task handler sending sentinel to workers')
for p in pool:
@@ -260,12 +260,12 @@ class Pool(object):
except (IOError, EOFError):
debug('result handler got EOFError/IOError -- exiting')
return
-
+
if thread._state:
assert thread._state == TERMINATE
debug('result handler found thread._state=TERMINATE')
break
-
+
if task is None:
debug('result handler got sentinel')
break
@@ -321,7 +321,7 @@ class Pool(object):
raise NotImplementedError(
'pool objects cannot be passed between processes or pickled'
)
-
+
def close(self):
debug('closing pool')
if self._state == RUN:
@@ -355,7 +355,7 @@ class Pool(object):
task_handler, result_handler, cache):
# this is guaranteed to only be called once
debug('finalizing pool')
-
+
task_handler._state = TERMINATE
taskqueue.put(None) # sentinel
@@ -363,7 +363,7 @@ class Pool(object):
cls._help_stuff_finish(inqueue, task_handler, len(pool))
assert result_handler.is_alive() or len(cache) == 0
-
+
result_handler._state = TERMINATE
outqueue.put(None) # sentinel
@@ -396,14 +396,14 @@ class ApplyResult(object):
self._ready = False
self._callback = callback
cache[self._job] = self
-
+
def ready(self):
return self._ready
-
+
def successful(self):
assert self._ready
return self._success
-
+
def wait(self, timeout=None):
self._cond.acquire()
try:
@@ -438,7 +438,7 @@ class ApplyResult(object):
#
class MapResult(ApplyResult):
-
+
def __init__(self, cache, chunksize, length, callback):
ApplyResult.__init__(self, cache, callback)
self._success = True
@@ -449,7 +449,7 @@ class MapResult(ApplyResult):
self._ready = True
else:
self._number_left = length//chunksize + bool(length % chunksize)
-
+
def _set(self, i, success_result):
success, result = success_result
if success:
@@ -492,10 +492,10 @@ class IMapIterator(object):
self._length = None
self._unsorted = {}
cache[self._job] = self
-
+
def __iter__(self):
return self
-
+
def next(self, timeout=None):
self._cond.acquire()
try:
@@ -520,7 +520,7 @@ class IMapIterator(object):
raise value
__next__ = next # XXX
-
+
def _set(self, i, obj):
self._cond.acquire()
try:
@@ -534,12 +534,12 @@ class IMapIterator(object):
self._cond.notify()
else:
self._unsorted[i] = obj
-
+
if self._index == self._length:
del self._cache[self._job]
finally:
self._cond.release()
-
+
def _set_length(self, length):
self._cond.acquire()
try:
@@ -572,18 +572,18 @@ class IMapUnorderedIterator(IMapIterator):
#
class ThreadPool(Pool):
-
+
from .dummy import Process
-
+
def __init__(self, processes=None, initializer=None, initargs=()):
Pool.__init__(self, processes, initializer, initargs)
-
+
def _setup_queues(self):
self._inqueue = Queue.Queue()
self._outqueue = Queue.Queue()
self._quick_put = self._inqueue.put
self._quick_get = self._outqueue.get
-
+
@staticmethod
def _help_stuff_finish(inqueue, task_handler, size):
# put sentinels at head of inqueue to make workers finish