diff options
| author | Benjamin Peterson <benjamin@python.org> | 2008-06-13 19:13:39 (GMT) | 
|---|---|---|
| committer | Benjamin Peterson <benjamin@python.org> | 2008-06-13 19:13:39 (GMT) | 
| commit | dfd79494ce78868c937dc91eddd630cbdcae5611 (patch) | |
| tree | 497db9bd37079421b144f33635c6bdd4b7058db5 /Lib/test/test_multiprocessing.py | |
| parent | c9798fc7094c8ddcd73cc73870bbe0a1d4b5b1b1 (diff) | |
| download | cpython-dfd79494ce78868c937dc91eddd630cbdcae5611.zip cpython-dfd79494ce78868c937dc91eddd630cbdcae5611.tar.gz cpython-dfd79494ce78868c937dc91eddd630cbdcae5611.tar.bz2  | |
convert multiprocessing to unix line endings
Diffstat (limited to 'Lib/test/test_multiprocessing.py')
| -rw-r--r-- | Lib/test/test_multiprocessing.py | 3582 | 
1 files changed, 1791 insertions, 1791 deletions
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py index d75fd20..4d3527c 100644 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -1,1791 +1,1791 @@ -#
 -# Unit tests for the multiprocessing package
 -#
 -
 -import unittest
 -import threading
 -import Queue
 -import time
 -import sys
 -import os
 -import gc
 -import signal
 -import array
 -import copy
 -import socket
 -import random
 -import logging
 -
 -import multiprocessing.dummy
 -import multiprocessing.connection
 -import multiprocessing.managers
 -import multiprocessing.heap
 -import multiprocessing.managers
 -import multiprocessing.pool
 -import _multiprocessing
 -
 -from multiprocessing import util
 -
 -#
 -#
 -#
 -
 -if sys.version_info >= (3, 0):
 -    def latin(s):
 -        return s.encode('latin')
 -else:
 -    latin = str
 -
 -try:
 -    bytes
 -except NameError:
 -    bytes = str
 -    def bytearray(seq):
 -        return array.array('c', seq)
 -
 -#
 -# Constants
 -#
 -
 -LOG_LEVEL = util.SUBWARNING
 -#LOG_LEVEL = logging.WARNING
 -
 -DELTA = 0.1
 -CHECK_TIMINGS = False     # making true makes tests take a lot longer
 -                          # and can sometimes cause some non-serious
 -                          # failures because some calls block a bit
 -                          # longer than expected
 -if CHECK_TIMINGS:
 -    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
 -else:
 -    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
 -
 -HAVE_GETVALUE = not getattr(_multiprocessing,
 -                            'HAVE_BROKEN_SEM_GETVALUE', False)
 -
 -#
 -# Creates a wrapper for a function which records the time it takes to finish
 -#
 -
 -class TimingWrapper(object):
 -
 -    def __init__(self, func):
 -        self.func = func
 -        self.elapsed = None
 -
 -    def __call__(self, *args, **kwds):
 -        t = time.time()
 -        try:
 -            return self.func(*args, **kwds)
 -        finally:
 -            self.elapsed = time.time() - t
 -        
 -#
 -# Base class for test cases
 -#
 -
 -class BaseTestCase(object):
 -    
 -    ALLOWED_TYPES = ('processes', 'manager', 'threads')
 -
 -    def assertTimingAlmostEqual(self, a, b):
 -        if CHECK_TIMINGS:
 -            self.assertAlmostEqual(a, b, 1)
 -
 -    def assertReturnsIfImplemented(self, value, func, *args):
 -        try:
 -            res = func(*args)
 -        except NotImplementedError:
 -            pass
 -        else:
 -            return self.assertEqual(value, res)
 -
 -#
 -# Return the value of a semaphore
 -#
 -
 -def get_value(self):
 -    try:
 -        return self.get_value()
 -    except AttributeError:
 -        try:
 -            return self._Semaphore__value
 -        except AttributeError:
 -            try:
 -                return self._value
 -            except AttributeError:
 -                raise NotImplementedError
 -
 -#
 -# Testcases
 -#
 -
 -class _TestProcess(BaseTestCase):
 -    
 -    ALLOWED_TYPES = ('processes', 'threads')
 -    
 -    def test_current(self):
 -        if self.TYPE == 'threads':
 -            return
 -
 -        current = self.current_process()
 -        authkey = current.get_authkey()
 -        
 -        self.assertTrue(current.is_alive())
 -        self.assertTrue(not current.is_daemon())        
 -        self.assertTrue(isinstance(authkey, bytes))
 -        self.assertTrue(len(authkey) > 0)
 -        self.assertEqual(current.get_ident(), os.getpid())
 -        self.assertEqual(current.get_exitcode(), None)
 -
 -    def _test(self, q, *args, **kwds):
 -        current = self.current_process()
 -        q.put(args)
 -        q.put(kwds)
 -        q.put(current.get_name())
 -        if self.TYPE != 'threads':
 -            q.put(bytes(current.get_authkey()))
 -            q.put(current.pid)
 -
 -    def test_process(self):
 -        q = self.Queue(1)
 -        e = self.Event()
 -        args = (q, 1, 2)
 -        kwargs = {'hello':23, 'bye':2.54}
 -        name = 'SomeProcess'
 -        p = self.Process(
 -            target=self._test, args=args, kwargs=kwargs, name=name
 -            )
 -        p.set_daemon(True)
 -        current = self.current_process()
 -
 -        if self.TYPE != 'threads':
 -            self.assertEquals(p.get_authkey(), current.get_authkey())
 -        self.assertEquals(p.is_alive(), False)
 -        self.assertEquals(p.is_daemon(), True)
 -        self.assertTrue(p not in self.active_children())
 -        self.assertTrue(type(self.active_children()) is list)
 -        self.assertEqual(p.get_exitcode(), None)
 -        
 -        p.start()
 -        
 -        self.assertEquals(p.get_exitcode(), None)
 -        self.assertEquals(p.is_alive(), True)
 -        self.assertTrue(p in self.active_children())
 -        
 -        self.assertEquals(q.get(), args[1:])
 -        self.assertEquals(q.get(), kwargs)
 -        self.assertEquals(q.get(), p.get_name())
 -        if self.TYPE != 'threads':
 -            self.assertEquals(q.get(), current.get_authkey())
 -            self.assertEquals(q.get(), p.pid)
 -
 -        p.join()
 -
 -        self.assertEquals(p.get_exitcode(), 0)
 -        self.assertEquals(p.is_alive(), False)
 -        self.assertTrue(p not in self.active_children())        
 -
 -    def _test_terminate(self):
 -        time.sleep(1000)
 -
 -    def test_terminate(self):
 -        if self.TYPE == 'threads':
 -            return
 -        
 -        p = self.Process(target=self._test_terminate)
 -        p.set_daemon(True)
 -        p.start()
 -
 -        self.assertEqual(p.is_alive(), True)
 -        self.assertTrue(p in self.active_children())
 -        self.assertEqual(p.get_exitcode(), None)
 -
 -        p.terminate()
 -
 -        join = TimingWrapper(p.join)
 -        self.assertEqual(join(), None)
 -        self.assertTimingAlmostEqual(join.elapsed, 0.0)
 -        
 -        self.assertEqual(p.is_alive(), False)
 -        self.assertTrue(p not in self.active_children())
 -
 -        p.join()
 -
 -        # XXX sometimes get p.get_exitcode() == 0 on Windows ...
 -        #self.assertEqual(p.get_exitcode(), -signal.SIGTERM)
 -
 -    def test_cpu_count(self):
 -        try:
 -            cpus = multiprocessing.cpu_count()
 -        except NotImplementedError:
 -            cpus = 1
 -        self.assertTrue(type(cpus) is int)
 -        self.assertTrue(cpus >= 1)
 -
 -    def test_active_children(self):
 -        self.assertEqual(type(self.active_children()), list)
 -
 -        p = self.Process(target=time.sleep, args=(DELTA,))
 -        self.assertTrue(p not in self.active_children())
 -        
 -        p.start()
 -        self.assertTrue(p in self.active_children())
 -
 -        p.join()
 -        self.assertTrue(p not in self.active_children())
 -
 -    def _test_recursion(self, wconn, id):
 -        from multiprocessing import forking
 -        wconn.send(id)
 -        if len(id) < 2:
 -            for i in range(2):
 -                p = self.Process(
 -                    target=self._test_recursion, args=(wconn, id+[i])
 -                    )
 -                p.start()
 -                p.join()
 -
 -    def test_recursion(self):
 -        rconn, wconn = self.Pipe(duplex=False)
 -        self._test_recursion(wconn, [])
 -        
 -        time.sleep(DELTA)
 -        result = []
 -        while rconn.poll():
 -            result.append(rconn.recv())
 -            
 -        expected = [
 -            [],
 -              [0],
 -                [0, 0],
 -                [0, 1],
 -              [1],
 -                [1, 0],
 -                [1, 1]
 -            ]
 -        self.assertEqual(result, expected)
 -
 -#
 -#
 -#
 -
 -class _UpperCaser(multiprocessing.Process):
 -
 -    def __init__(self):
 -        multiprocessing.Process.__init__(self)
 -        self.child_conn, self.parent_conn = multiprocessing.Pipe()
 -
 -    def run(self):
 -        self.parent_conn.close()
 -        for s in iter(self.child_conn.recv, None):
 -            self.child_conn.send(s.upper())
 -        self.child_conn.close()
 -
 -    def submit(self, s):
 -        assert type(s) is str
 -        self.parent_conn.send(s)
 -        return self.parent_conn.recv()
 -
 -    def stop(self):
 -        self.parent_conn.send(None)
 -        self.parent_conn.close()
 -        self.child_conn.close()
 -
 -class _TestSubclassingProcess(BaseTestCase):
 -
 -    ALLOWED_TYPES = ('processes',)
 -
 -    def test_subclassing(self):
 -        uppercaser = _UpperCaser()
 -        uppercaser.start()
 -        self.assertEqual(uppercaser.submit('hello'), 'HELLO')
 -        self.assertEqual(uppercaser.submit('world'), 'WORLD')
 -        uppercaser.stop()
 -        uppercaser.join()
 -        
 -#
 -#
 -#
 -
 -def queue_empty(q):
 -    if hasattr(q, 'empty'):
 -        return q.empty()
 -    else:
 -        return q.qsize() == 0
 -
 -def queue_full(q, maxsize):
 -    if hasattr(q, 'full'):
 -        return q.full()
 -    else:
 -        return q.qsize() == maxsize
 -
 -
 -class _TestQueue(BaseTestCase):
 -
 -
 -    def _test_put(self, queue, child_can_start, parent_can_continue):
 -        child_can_start.wait()
 -        for i in range(6):
 -            queue.get()
 -        parent_can_continue.set()
 -
 -    def test_put(self):
 -        MAXSIZE = 6
 -        queue = self.Queue(maxsize=MAXSIZE)
 -        child_can_start = self.Event()
 -        parent_can_continue = self.Event()
 -
 -        proc = self.Process(
 -            target=self._test_put,
 -            args=(queue, child_can_start, parent_can_continue)
 -            )
 -        proc.set_daemon(True)
 -        proc.start()
 -        
 -        self.assertEqual(queue_empty(queue), True)
 -        self.assertEqual(queue_full(queue, MAXSIZE), False)
 -
 -        queue.put(1)
 -        queue.put(2, True)
 -        queue.put(3, True, None)
 -        queue.put(4, False)
 -        queue.put(5, False, None)
 -        queue.put_nowait(6)
 -
 -        # the values may be in buffer but not yet in pipe so sleep a bit
 -        time.sleep(DELTA)     
 -
 -        self.assertEqual(queue_empty(queue), False)
 -        self.assertEqual(queue_full(queue, MAXSIZE), True)
 -
 -        put = TimingWrapper(queue.put)
 -        put_nowait = TimingWrapper(queue.put_nowait)
 -
 -        self.assertRaises(Queue.Full, put, 7, False)
 -        self.assertTimingAlmostEqual(put.elapsed, 0)
 -
 -        self.assertRaises(Queue.Full, put, 7, False, None)
 -        self.assertTimingAlmostEqual(put.elapsed, 0)
 -
 -        self.assertRaises(Queue.Full, put_nowait, 7)
 -        self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
 -
 -        self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
 -        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
 -
 -        self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
 -        self.assertTimingAlmostEqual(put.elapsed, 0)
 -
 -        self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
 -        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
 -
 -        child_can_start.set()
 -        parent_can_continue.wait()
 -
 -        self.assertEqual(queue_empty(queue), True)
 -        self.assertEqual(queue_full(queue, MAXSIZE), False)
 -
 -        proc.join()
 -
 -    def _test_get(self, queue, child_can_start, parent_can_continue):
 -        child_can_start.wait()
 -        queue.put(1)
 -        queue.put(2)
 -        queue.put(3)
 -        queue.put(4)
 -        queue.put(5)
 -        parent_can_continue.set()
 -        
 -    def test_get(self):
 -        queue = self.Queue()
 -        child_can_start = self.Event()
 -        parent_can_continue = self.Event()
 -        
 -        proc = self.Process(
 -            target=self._test_get,
 -            args=(queue, child_can_start, parent_can_continue)
 -            )
 -        proc.set_daemon(True)
 -        proc.start()
 -        
 -        self.assertEqual(queue_empty(queue), True)
 -        
 -        child_can_start.set()
 -        parent_can_continue.wait()
 -
 -        time.sleep(DELTA)
 -        self.assertEqual(queue_empty(queue), False)
 -
 -        self.assertEqual(queue.get(), 1)
 -        self.assertEqual(queue.get(True, None), 2)
 -        self.assertEqual(queue.get(True), 3)
 -        self.assertEqual(queue.get(timeout=1), 4)
 -        self.assertEqual(queue.get_nowait(), 5)
 -        
 -        self.assertEqual(queue_empty(queue), True)
 -
 -        get = TimingWrapper(queue.get)
 -        get_nowait = TimingWrapper(queue.get_nowait)
 -        
 -        self.assertRaises(Queue.Empty, get, False)
 -        self.assertTimingAlmostEqual(get.elapsed, 0)
 -
 -        self.assertRaises(Queue.Empty, get, False, None)
 -        self.assertTimingAlmostEqual(get.elapsed, 0)
 -
 -        self.assertRaises(Queue.Empty, get_nowait)
 -        self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
 -
 -        self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
 -        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
 -
 -        self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
 -        self.assertTimingAlmostEqual(get.elapsed, 0)
 -
 -        self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
 -        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
 -
 -        proc.join()
 -        
 -    def _test_fork(self, queue):
 -        for i in range(10, 20):
 -            queue.put(i)
 -        # note that at this point the items may only be buffered, so the
 -        # process cannot shutdown until the feeder thread has finished
 -        # pushing items onto the pipe.
 -
 -    def test_fork(self):
 -        # Old versions of Queue would fail to create a new feeder
 -        # thread for a forked process if the original process had its
 -        # own feeder thread.  This test checks that this no longer
 -        # happens.
 -
 -        queue = self.Queue()
 -
 -        # put items on queue so that main process starts a feeder thread
 -        for i in range(10):
 -            queue.put(i)
 -
 -        # wait to make sure thread starts before we fork a new process
 -        time.sleep(DELTA)
 -
 -        # fork process
 -        p = self.Process(target=self._test_fork, args=(queue,))
 -        p.start()
 -
 -        # check that all expected items are in the queue
 -        for i in range(20):
 -            self.assertEqual(queue.get(), i)
 -        self.assertRaises(Queue.Empty, queue.get, False)
 -
 -        p.join()
 -
 -    def test_qsize(self):
 -        q = self.Queue()
 -        try:
 -            self.assertEqual(q.qsize(), 0)
 -        except NotImplementedError:
 -            return
 -        q.put(1)
 -        self.assertEqual(q.qsize(), 1)
 -        q.put(5)
 -        self.assertEqual(q.qsize(), 2)
 -        q.get()
 -        self.assertEqual(q.qsize(), 1)
 -        q.get()
 -        self.assertEqual(q.qsize(), 0)
 -
 -    def _test_task_done(self, q):
 -        for obj in iter(q.get, None):
 -            time.sleep(DELTA)
 -            q.task_done()
 -
 -    def test_task_done(self):
 -        queue = self.JoinableQueue()
 -
 -        if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
 -            return
 -
 -        workers = [self.Process(target=self._test_task_done, args=(queue,))
 -                   for i in xrange(4)]
 -        
 -        for p in workers:
 -            p.start()
 -
 -        for i in xrange(10):
 -            queue.put(i)
 -
 -        queue.join()
 -
 -        for p in workers:
 -            queue.put(None)
 -        
 -        for p in workers:
 -            p.join()
 -
 -#
 -#
 -#
 -
 -class _TestLock(BaseTestCase):
 -
 -    def test_lock(self):
 -        lock = self.Lock()
 -        self.assertEqual(lock.acquire(), True)
 -        self.assertEqual(lock.acquire(False), False)
 -        self.assertEqual(lock.release(), None)
 -        self.assertRaises((ValueError, threading.ThreadError), lock.release)
 -
 -    def test_rlock(self):
 -        lock = self.RLock()
 -        self.assertEqual(lock.acquire(), True)
 -        self.assertEqual(lock.acquire(), True)
 -        self.assertEqual(lock.acquire(), True)
 -        self.assertEqual(lock.release(), None)
 -        self.assertEqual(lock.release(), None)
 -        self.assertEqual(lock.release(), None)
 -        self.assertRaises((AssertionError, RuntimeError), lock.release)
 -        
 -        
 -class _TestSemaphore(BaseTestCase):
 -
 -    def _test_semaphore(self, sem):
 -        self.assertReturnsIfImplemented(2, get_value, sem)
 -        self.assertEqual(sem.acquire(), True)
 -        self.assertReturnsIfImplemented(1, get_value, sem)
 -        self.assertEqual(sem.acquire(), True)
 -        self.assertReturnsIfImplemented(0, get_value, sem)
 -        self.assertEqual(sem.acquire(False), False)
 -        self.assertReturnsIfImplemented(0, get_value, sem)
 -        self.assertEqual(sem.release(), None)
 -        self.assertReturnsIfImplemented(1, get_value, sem)
 -        self.assertEqual(sem.release(), None)
 -        self.assertReturnsIfImplemented(2, get_value, sem)
 -        
 -    def test_semaphore(self):
 -        sem = self.Semaphore(2)
 -        self._test_semaphore(sem)
 -        self.assertEqual(sem.release(), None)
 -        self.assertReturnsIfImplemented(3, get_value, sem)
 -        self.assertEqual(sem.release(), None)
 -        self.assertReturnsIfImplemented(4, get_value, sem)
 -
 -    def test_bounded_semaphore(self):
 -        sem = self.BoundedSemaphore(2)
 -        self._test_semaphore(sem)
 -        # Currently fails on OS/X
 -        #if HAVE_GETVALUE:
 -        #    self.assertRaises(ValueError, sem.release)
 -        #    self.assertReturnsIfImplemented(2, get_value, sem)
 -
 -    def test_timeout(self):
 -        if self.TYPE != 'processes':
 -            return
 -
 -        sem = self.Semaphore(0)
 -        acquire = TimingWrapper(sem.acquire)
 -
 -        self.assertEqual(acquire(False), False)
 -        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
 -
 -        self.assertEqual(acquire(False, None), False)
 -        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
 -
 -        self.assertEqual(acquire(False, TIMEOUT1), False)
 -        self.assertTimingAlmostEqual(acquire.elapsed, 0)
 -
 -        self.assertEqual(acquire(True, TIMEOUT2), False)
 -        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
 -
 -        self.assertEqual(acquire(timeout=TIMEOUT3), False)
 -        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
 -
 -
 -class _TestCondition(BaseTestCase):
 -    
 -    def f(self, cond, sleeping, woken, timeout=None):
 -        cond.acquire()
 -        sleeping.release()
 -        cond.wait(timeout)
 -        woken.release()
 -        cond.release()
 -    
 -    def check_invariant(self, cond):
 -        # this is only supposed to succeed when there are no sleepers
 -        if self.TYPE == 'processes':
 -            try:
 -                sleepers = (cond._sleeping_count.get_value() -
 -                            cond._woken_count.get_value())
 -                self.assertEqual(sleepers, 0)
 -                self.assertEqual(cond._wait_semaphore.get_value(), 0)
 -            except NotImplementedError:
 -                pass
 -            
 -    def test_notify(self):
 -        cond = self.Condition()
 -        sleeping = self.Semaphore(0)
 -        woken = self.Semaphore(0)
 -        
 -        p = self.Process(target=self.f, args=(cond, sleeping, woken))
 -        p.set_daemon(True)
 -        p.start()
 -
 -        p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
 -        p.set_daemon(True)
 -        p.start()
 -        
 -        # wait for both children to start sleeping
 -        sleeping.acquire()
 -        sleeping.acquire()
 -        
 -        # check no process/thread has woken up
 -        time.sleep(DELTA)
 -        self.assertReturnsIfImplemented(0, get_value, woken)
 -
 -        # wake up one process/thread
 -        cond.acquire()
 -        cond.notify()
 -        cond.release()
 -        
 -        # check one process/thread has woken up
 -        time.sleep(DELTA)
 -        self.assertReturnsIfImplemented(1, get_value, woken)
 -
 -        # wake up another
 -        cond.acquire()
 -        cond.notify()
 -        cond.release()
 -        
 -        # check other has woken up
 -        time.sleep(DELTA)
 -        self.assertReturnsIfImplemented(2, get_value, woken)
 -        
 -        # check state is not mucked up
 -        self.check_invariant(cond)
 -        p.join()
 -        
 -    def test_notify_all(self):
 -        cond = self.Condition()
 -        sleeping = self.Semaphore(0)
 -        woken = self.Semaphore(0)
 -
 -        # start some threads/processes which will timeout
 -        for i in range(3):
 -            p = self.Process(target=self.f,
 -                             args=(cond, sleeping, woken, TIMEOUT1))
 -            p.set_daemon(True)
 -            p.start()
 -
 -            t = threading.Thread(target=self.f,
 -                                 args=(cond, sleeping, woken, TIMEOUT1))
 -            t.set_daemon(True)
 -            t.start()
 -
 -        # wait for them all to sleep
 -        for i in xrange(6):
 -            sleeping.acquire()
 -
 -        # check they have all timed out
 -        for i in xrange(6):
 -            woken.acquire()
 -        self.assertReturnsIfImplemented(0, get_value, woken)
 -
 -        # check state is not mucked up
 -        self.check_invariant(cond)
 -
 -        # start some more threads/processes
 -        for i in range(3):
 -            p = self.Process(target=self.f, args=(cond, sleeping, woken))
 -            p.set_daemon(True)
 -            p.start()
 -            
 -            t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
 -            t.set_daemon(True)
 -            t.start()
 -            
 -        # wait for them to all sleep
 -        for i in xrange(6):
 -            sleeping.acquire()
 -            
 -        # check no process/thread has woken up
 -        time.sleep(DELTA)
 -        self.assertReturnsIfImplemented(0, get_value, woken)
 -
 -        # wake them all up
 -        cond.acquire()
 -        cond.notify_all()
 -        cond.release()
 -
 -        # check they have all woken
 -        time.sleep(DELTA)
 -        self.assertReturnsIfImplemented(6, get_value, woken)
 -
 -        # check state is not mucked up
 -        self.check_invariant(cond)
 -
 -    def test_timeout(self):
 -        cond = self.Condition()
 -        wait = TimingWrapper(cond.wait)
 -        cond.acquire()
 -        res = wait(TIMEOUT1)
 -        cond.release()
 -        self.assertEqual(res, None)
 -        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
 -
 -        
 -class _TestEvent(BaseTestCase):
 -
 -    def _test_event(self, event):
 -        time.sleep(TIMEOUT2)
 -        event.set()
 -
 -    def test_event(self):
 -        event = self.Event()
 -        wait = TimingWrapper(event.wait)
 -        
 -        # Removed temporaily, due to API shear, this does not 
 -        # work with threading._Event objects. is_set == isSet
 -        #self.assertEqual(event.is_set(), False)
 -        
 -        self.assertEqual(wait(0.0), None)
 -        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
 -        self.assertEqual(wait(TIMEOUT1), None)
 -        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
 -
 -        event.set()
 -
 -        # See note above on the API differences
 -        # self.assertEqual(event.is_set(), True)
 -        self.assertEqual(wait(), None)
 -        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
 -        self.assertEqual(wait(TIMEOUT1), None)
 -        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
 -        # self.assertEqual(event.is_set(), True)
 -
 -        event.clear()
 -
 -        #self.assertEqual(event.is_set(), False)
 -
 -        self.Process(target=self._test_event, args=(event,)).start()
 -        self.assertEqual(wait(), None)
 -
 -#
 -#
 -#
 -
 -class _TestValue(BaseTestCase):
 -
 -    codes_values = [
 -        ('i', 4343, 24234),
 -        ('d', 3.625, -4.25),
 -        ('h', -232, 234),
 -        ('c', latin('x'), latin('y'))
 -        ]
 -
 -    def _test(self, values):
 -        for sv, cv in zip(values, self.codes_values):
 -            sv.value = cv[2]
 -            
 -        
 -    def test_value(self, raw=False):
 -        if self.TYPE != 'processes':
 -            return
 -
 -        if raw:
 -            values = [self.RawValue(code, value)
 -                      for code, value, _ in self.codes_values]
 -        else:
 -            values = [self.Value(code, value)
 -                      for code, value, _ in self.codes_values]
 -            
 -        for sv, cv in zip(values, self.codes_values):
 -            self.assertEqual(sv.value, cv[1])
 -        
 -        proc = self.Process(target=self._test, args=(values,))
 -        proc.start()
 -        proc.join()
 -
 -        for sv, cv in zip(values, self.codes_values):
 -            self.assertEqual(sv.value, cv[2])
 -
 -    def test_rawvalue(self):
 -        self.test_value(raw=True)
 -
 -    def test_getobj_getlock(self):
 -        if self.TYPE != 'processes':
 -            return
 -
 -        val1 = self.Value('i', 5)
 -        lock1 = val1.get_lock()
 -        obj1 = val1.get_obj()
 -
 -        val2 = self.Value('i', 5, lock=None)
 -        lock2 = val2.get_lock()
 -        obj2 = val2.get_obj()
 -
 -        lock = self.Lock()
 -        val3 = self.Value('i', 5, lock=lock)
 -        lock3 = val3.get_lock()
 -        obj3 = val3.get_obj()
 -        self.assertEqual(lock, lock3)
 -        
 -        arr4 = self.RawValue('i', 5)
 -        self.assertFalse(hasattr(arr4, 'get_lock'))
 -        self.assertFalse(hasattr(arr4, 'get_obj'))
 -
 -
 -class _TestArray(BaseTestCase):
 -
 -    def f(self, seq):
 -        for i in range(1, len(seq)):
 -            seq[i] += seq[i-1]
 -
 -    def test_array(self, raw=False):
 -        if self.TYPE != 'processes':
 -            return
 -
 -        seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
 -        if raw:
 -            arr = self.RawArray('i', seq)
 -        else:
 -            arr = self.Array('i', seq)
 -        
 -        self.assertEqual(len(arr), len(seq))
 -        self.assertEqual(arr[3], seq[3])
 -        self.assertEqual(list(arr[2:7]), list(seq[2:7]))
 -        
 -        arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
 -        
 -        self.assertEqual(list(arr[:]), seq)
 -        
 -        self.f(seq)
 -        
 -        p = self.Process(target=self.f, args=(arr,))
 -        p.start()
 -        p.join()
 -        
 -        self.assertEqual(list(arr[:]), seq)
 -        
 -    def test_rawarray(self):
 -        self.test_array(raw=True)
 -        
 -    def test_getobj_getlock_obj(self):
 -        if self.TYPE != 'processes':
 -            return
 -
 -        arr1 = self.Array('i', range(10))
 -        lock1 = arr1.get_lock()
 -        obj1 = arr1.get_obj()
 -
 -        arr2 = self.Array('i', range(10), lock=None)
 -        lock2 = arr2.get_lock()
 -        obj2 = arr2.get_obj()
 -
 -        lock = self.Lock()
 -        arr3 = self.Array('i', range(10), lock=lock)
 -        lock3 = arr3.get_lock()
 -        obj3 = arr3.get_obj()
 -        self.assertEqual(lock, lock3)
 -        
 -        arr4 = self.RawArray('i', range(10))
 -        self.assertFalse(hasattr(arr4, 'get_lock'))
 -        self.assertFalse(hasattr(arr4, 'get_obj'))
 -
 -#
 -#
 -#
 -
 -class _TestContainers(BaseTestCase):
 -
 -    ALLOWED_TYPES = ('manager',)
 -
 -    def test_list(self):
 -        a = self.list(range(10))
 -        self.assertEqual(a[:], range(10))
 -        
 -        b = self.list()
 -        self.assertEqual(b[:], [])
 -        
 -        b.extend(range(5))
 -        self.assertEqual(b[:], range(5))
 -        
 -        self.assertEqual(b[2], 2)
 -        self.assertEqual(b[2:10], [2,3,4])
 -
 -        b *= 2
 -        self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
 -
 -        self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
 -
 -        self.assertEqual(a[:], range(10))
 -
 -        d = [a, b]
 -        e = self.list(d)
 -        self.assertEqual(
 -            e[:],
 -            [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
 -            )
 -        
 -        f = self.list([a])
 -        a.append('hello')
 -        self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
 -
 -    def test_dict(self):
 -        d = self.dict()
 -        indices = range(65, 70)
 -        for i in indices:
 -            d[i] = chr(i)
 -        self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
 -        self.assertEqual(sorted(d.keys()), indices)
 -        self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
 -        self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
 -        
 -    def test_namespace(self):
 -        n = self.Namespace()
 -        n.name = 'Bob'
 -        n.job = 'Builder'
 -        n._hidden = 'hidden'
 -        self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
 -        del n.job
 -        self.assertEqual(str(n), "Namespace(name='Bob')")
 -        self.assertTrue(hasattr(n, 'name'))
 -        self.assertTrue(not hasattr(n, 'job'))
 -
 -#
 -#
 -#
 -
 -def sqr(x, wait=0.0):
 -    time.sleep(wait)
 -    return x*x
 -
 -class _TestPool(BaseTestCase):
 -
 -    def test_apply(self):
 -        papply = self.pool.apply
 -        self.assertEqual(papply(sqr, (5,)), sqr(5))
 -        self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
 -
 -    def test_map(self):
 -        pmap = self.pool.map
 -        self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
 -        self.assertEqual(pmap(sqr, range(100), chunksize=20),
 -                         map(sqr, range(100)))
 -        
 -    def test_async(self):
 -        res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
 -        get = TimingWrapper(res.get)
 -        self.assertEqual(get(), 49)
 -        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
 -
 -    def test_async_timeout(self):
 -        res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
 -        get = TimingWrapper(res.get)
 -        self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
 -        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
 -
 -    def test_imap(self):
 -        it = self.pool.imap(sqr, range(10))
 -        self.assertEqual(list(it), map(sqr, range(10)))
 -
 -        it = self.pool.imap(sqr, range(10))
 -        for i in range(10):
 -            self.assertEqual(it.next(), i*i)
 -        self.assertRaises(StopIteration, it.next)
 -
 -        it = self.pool.imap(sqr, range(1000), chunksize=100)
 -        for i in range(1000):
 -            self.assertEqual(it.next(), i*i)
 -        self.assertRaises(StopIteration, it.next)
 -
 -    def test_imap_unordered(self):
 -        it = self.pool.imap_unordered(sqr, range(1000))
 -        self.assertEqual(sorted(it), map(sqr, range(1000)))
 -
 -        it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
 -        self.assertEqual(sorted(it), map(sqr, range(1000)))
 -
 -    def test_make_pool(self):
 -        p = multiprocessing.Pool(3)
 -        self.assertEqual(3, len(p._pool))
 -        p.close()
 -        p.join()
 -
 -    def test_terminate(self):
 -        if self.TYPE == 'manager':
 -            # On Unix a forked process increfs each shared object to
 -            # which its parent process held a reference.  If the
 -            # forked process gets terminated then there is likely to
 -            # be a reference leak.  So to prevent
 -            # _TestZZZNumberOfObjects from failing we skip this test
 -            # when using a manager.
 -            return
 -
 -        result = self.pool.map_async(
 -            time.sleep, [0.1 for i in range(10000)], chunksize=1
 -            )
 -        self.pool.terminate()
 -        join = TimingWrapper(self.pool.join)
 -        join()
 -        self.assertTrue(join.elapsed < 0.2)
 -
 -#
 -# Test that manager has expected number of shared objects left
 -#
 -
 -class _TestZZZNumberOfObjects(BaseTestCase):
 -    # Because test cases are sorted alphabetically, this one will get
 -    # run after all the other tests for the manager.  It tests that
 -    # there have been no "reference leaks" for the manager's shared
 -    # objects.  Note the comment in _TestPool.test_terminate().
 -    ALLOWED_TYPES = ('manager',)
 -
 -    def test_number_of_objects(self):
 -        EXPECTED_NUMBER = 1                # the pool object is still alive
 -        multiprocessing.active_children()  # discard dead process objs
 -        gc.collect()                       # do garbage collection
 -        refs = self.manager._number_of_objects()
 -        if refs != EXPECTED_NUMBER:
 -            print self.manager._debugInfo()
 -
 -        self.assertEqual(refs, EXPECTED_NUMBER)
 -
 -#
 -# Test of creating a customized manager class
 -#
 -
 -from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
 -    
 -class FooBar(object):
 -    def f(self):
 -        return 'f()'
 -    def g(self):
 -        raise ValueError
 -    def _h(self):
 -        return '_h()'
 -    
 -def baz():
 -    for i in xrange(10):
 -        yield i*i
 -
 -class IteratorProxy(BaseProxy):
 -    _exposed_ = ('next', '__next__')
 -    def __iter__(self):
 -        return self
 -    def next(self):
 -        return self._callmethod('next')
 -    def __next__(self):
 -        return self._callmethod('__next__')
 -
 -class MyManager(BaseManager):
 -    pass
 -
 -MyManager.register('Foo', callable=FooBar)
 -MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
 -MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
 -
 -
 -class _TestMyManager(BaseTestCase):
 -    
 -    ALLOWED_TYPES = ('manager',)
 -
 -    def test_mymanager(self):
 -        manager = MyManager()
 -        manager.start()
 -        
 -        foo = manager.Foo()
 -        bar = manager.Bar()
 -        baz = manager.baz()
 -        
 -        foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
 -        bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
 -        
 -        self.assertEqual(foo_methods, ['f', 'g'])
 -        self.assertEqual(bar_methods, ['f', '_h'])
 -        
 -        self.assertEqual(foo.f(), 'f()')
 -        self.assertRaises(ValueError, foo.g)
 -        self.assertEqual(foo._callmethod('f'), 'f()')
 -        self.assertRaises(RemoteError, foo._callmethod, '_h')
 -        
 -        self.assertEqual(bar.f(), 'f()')
 -        self.assertEqual(bar._h(), '_h()')
 -        self.assertEqual(bar._callmethod('f'), 'f()')
 -        self.assertEqual(bar._callmethod('_h'), '_h()')
 -        
 -        self.assertEqual(list(baz), [i*i for i in range(10)])
 -        
 -        manager.shutdown()
 -        
 -#
 -# Test of connecting to a remote server and using xmlrpclib for serialization
 -#
 -
 -_queue = Queue.Queue()
 -def get_queue():
 -    return _queue
 -
 -class QueueManager(BaseManager):
 -    '''manager class used by server process'''
 -QueueManager.register('get_queue', callable=get_queue)
 -
 -class QueueManager2(BaseManager):
 -    '''manager class which specifies the same interface as QueueManager'''
 -QueueManager2.register('get_queue')
 -
 -
 -SERIALIZER = 'xmlrpclib'
 -
 -class _TestRemoteManager(BaseTestCase):
 -
 -    ALLOWED_TYPES = ('manager',)
 -    
 -    def _putter(self, address, authkey):
 -        manager = QueueManager2(
 -            address=address, authkey=authkey, serializer=SERIALIZER
 -            )
 -        manager.connect()
 -        queue = manager.get_queue()
 -        queue.put(('hello world', None, True, 2.25))
 -
 -    def test_remote(self):
 -        authkey = os.urandom(32)
 -
 -        manager = QueueManager(
 -            address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
 -            )
 -        manager.start()
 -
 -        p = self.Process(target=self._putter, args=(manager.address, authkey))
 -        p.start()
 -        
 -        manager2 = QueueManager2(
 -            address=manager.address, authkey=authkey, serializer=SERIALIZER
 -            )
 -        manager2.connect()
 -        queue = manager2.get_queue()
 -        
 -        # Note that xmlrpclib will deserialize object as a list not a tuple
 -        self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
 -
 -        # Because we are using xmlrpclib for serialization instead of
 -        # pickle this will cause a serialization error.
 -        self.assertRaises(Exception, queue.put, time.sleep)
 -
 -        # Make queue finalizer run before the server is stopped
 -        del queue
 -        manager.shutdown()
 -
 -#
 -#
 -#
 -
 -SENTINEL = latin('')
 -
 -class _TestConnection(BaseTestCase):
 -
 -    ALLOWED_TYPES = ('processes', 'threads')
 -
 -    def _echo(self, conn):
 -        for msg in iter(conn.recv_bytes, SENTINEL):
 -            conn.send_bytes(msg)
 -        conn.close()
 -
 -    def test_connection(self):
 -        conn, child_conn = self.Pipe()
 -        
 -        p = self.Process(target=self._echo, args=(child_conn,))
 -        p.set_daemon(True)
 -        p.start()
 -
 -        seq = [1, 2.25, None]
 -        msg = latin('hello world')
 -        longmsg = msg * 10
 -        arr = array.array('i', range(4))
 -
 -        if self.TYPE == 'processes':
 -            self.assertEqual(type(conn.fileno()), int)
 -
 -        self.assertEqual(conn.send(seq), None)
 -        self.assertEqual(conn.recv(), seq)
 -
 -        self.assertEqual(conn.send_bytes(msg), None)
 -        self.assertEqual(conn.recv_bytes(), msg)
 -
 -        if self.TYPE == 'processes':
 -            buffer = array.array('i', [0]*10)
 -            expected = list(arr) + [0] * (10 - len(arr))
 -            self.assertEqual(conn.send_bytes(arr), None)
 -            self.assertEqual(conn.recv_bytes_into(buffer),
 -                             len(arr) * buffer.itemsize)
 -            self.assertEqual(list(buffer), expected)
 -
 -            buffer = array.array('i', [0]*10)
 -            expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
 -            self.assertEqual(conn.send_bytes(arr), None)
 -            self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
 -                             len(arr) * buffer.itemsize)
 -            self.assertEqual(list(buffer), expected)
 -
 -            buffer = bytearray(latin(' ' * 40))
 -            self.assertEqual(conn.send_bytes(longmsg), None)
 -            try:
 -                res = conn.recv_bytes_into(buffer)
 -            except multiprocessing.BufferTooShort, e:
 -                self.assertEqual(e.args, (longmsg,))
 -            else:
 -                self.fail('expected BufferTooShort, got %s' % res)
 -
 -        poll = TimingWrapper(conn.poll)
 -
 -        self.assertEqual(poll(), False)
 -        self.assertTimingAlmostEqual(poll.elapsed, 0)
 -
 -        self.assertEqual(poll(TIMEOUT1), False)
 -        self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
 -
 -        conn.send(None)
 -
 -        self.assertEqual(poll(TIMEOUT1), True)
 -        self.assertTimingAlmostEqual(poll.elapsed, 0)
 -        
 -        self.assertEqual(conn.recv(), None)
 -
 -        really_big_msg = latin('X') * (1024 * 1024 * 16)   # 16Mb
 -        conn.send_bytes(really_big_msg)
 -        self.assertEqual(conn.recv_bytes(), really_big_msg)
 -        
 -        conn.send_bytes(SENTINEL)                          # tell child to quit
 -        child_conn.close()
 -
 -        if self.TYPE == 'processes':
 -            self.assertEqual(conn.readable, True)
 -            self.assertEqual(conn.writable, True)
 -            self.assertRaises(EOFError, conn.recv)
 -            self.assertRaises(EOFError, conn.recv_bytes)
 -
 -        p.join()
 -        
 -    def test_duplex_false(self):
 -        reader, writer = self.Pipe(duplex=False)
 -        self.assertEqual(writer.send(1), None)
 -        self.assertEqual(reader.recv(), 1)
 -        if self.TYPE == 'processes':
 -            self.assertEqual(reader.readable, True)
 -            self.assertEqual(reader.writable, False)
 -            self.assertEqual(writer.readable, False)
 -            self.assertEqual(writer.writable, True)
 -            self.assertRaises(IOError, reader.send, 2)
 -            self.assertRaises(IOError, writer.recv)
 -            self.assertRaises(IOError, writer.poll)
 -
 -    def test_spawn_close(self):
 -        # We test that a pipe connection can be closed by parent
 -        # process immediately after child is spawned.  On Windows this
 -        # would have sometimes failed on old versions because
 -        # child_conn would be closed before the child got a chance to
 -        # duplicate it.
 -        conn, child_conn = self.Pipe()
 -        
 -        p = self.Process(target=self._echo, args=(child_conn,))
 -        p.start()
 -        child_conn.close()    # this might complete before child initializes
 -
 -        msg = latin('hello')
 -        conn.send_bytes(msg)
 -        self.assertEqual(conn.recv_bytes(), msg)
 -
 -        conn.send_bytes(SENTINEL)
 -        conn.close()
 -        p.join()
 -
 -    def test_sendbytes(self):
 -        if self.TYPE != 'processes':
 -            return
 -
 -        msg = latin('abcdefghijklmnopqrstuvwxyz')
 -        a, b = self.Pipe()
 -        
 -        a.send_bytes(msg)
 -        self.assertEqual(b.recv_bytes(), msg)
 -
 -        a.send_bytes(msg, 5)
 -        self.assertEqual(b.recv_bytes(), msg[5:])
 -
 -        a.send_bytes(msg, 7, 8)
 -        self.assertEqual(b.recv_bytes(), msg[7:7+8])
 -
 -        a.send_bytes(msg, 26)
 -        self.assertEqual(b.recv_bytes(), latin(''))
 -
 -        a.send_bytes(msg, 26, 0)
 -        self.assertEqual(b.recv_bytes(), latin(''))
 -
 -        self.assertRaises(ValueError, a.send_bytes, msg, 27)
 -        
 -        self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
 -        
 -        self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
 -
 -        self.assertRaises(ValueError, a.send_bytes, msg, -1)
 -
 -        self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
 -        
 -
 -class _TestListenerClient(BaseTestCase):
 -
 -    ALLOWED_TYPES = ('processes', 'threads')
 -
 -    def _test(self, address):
 -        conn = self.connection.Client(address)
 -        conn.send('hello')
 -        conn.close()
 -
 -    def test_listener_client(self):        
 -        for family in self.connection.families:
 -            l = self.connection.Listener(family=family)
 -            p = self.Process(target=self._test, args=(l.address,))
 -            p.set_daemon(True)
 -            p.start()
 -            conn = l.accept()
 -            self.assertEqual(conn.recv(), 'hello')
 -            p.join()
 -            l.close()
 -
 -#
 -# Test of sending connection and socket objects between processes
 -#
 -
 -class _TestPicklingConnections(BaseTestCase):
 -
 -    ALLOWED_TYPES = ('processes',)
 -
 -    def _listener(self, conn, families):
 -        for fam in families:
 -            l = self.connection.Listener(family=fam)
 -            conn.send(l.address)
 -            new_conn = l.accept()
 -            conn.send(new_conn)
 -
 -        if self.TYPE == 'processes':
 -            l = socket.socket()
 -            l.bind(('localhost', 0))
 -            conn.send(l.getsockname())
 -            l.listen(1)
 -            new_conn, addr = l.accept()
 -            conn.send(new_conn)
 -        
 -        conn.recv()
 -
 -    def _remote(self, conn):
 -        for (address, msg) in iter(conn.recv, None):
 -            client = self.connection.Client(address)
 -            client.send(msg.upper())
 -            client.close()
 -
 -        if self.TYPE == 'processes':
 -            address, msg = conn.recv()
 -            client = socket.socket()
 -            client.connect(address)
 -            client.sendall(msg.upper())
 -            client.close()
 -
 -        conn.close()
 -
 -    def test_pickling(self):
 -        try:
 -            multiprocessing.allow_connection_pickling()
 -        except ImportError:
 -            return
 -        
 -        families = self.connection.families
 -
 -        lconn, lconn0 = self.Pipe()
 -        lp = self.Process(target=self._listener, args=(lconn0, families))
 -        lp.start()
 -        lconn0.close()
 -
 -        rconn, rconn0 = self.Pipe()
 -        rp = self.Process(target=self._remote, args=(rconn0,))
 -        rp.start()
 -        rconn0.close()
 -
 -        for fam in families:
 -            msg = ('This connection uses family %s' % fam).encode('ascii')
 -            address = lconn.recv()
 -            rconn.send((address, msg))
 -            new_conn = lconn.recv()
 -            self.assertEqual(new_conn.recv(), msg.upper())
 -            
 -        rconn.send(None)
 -
 -        if self.TYPE == 'processes':
 -            msg = latin('This connection uses a normal socket')
 -            address = lconn.recv()
 -            rconn.send((address, msg))
 -            if hasattr(socket, 'fromfd'):
 -                new_conn = lconn.recv()
 -                self.assertEqual(new_conn.recv(100), msg.upper())
 -            else:
 -                # XXX On Windows with Py2.6 need to backport fromfd()
 -                discard = lconn.recv_bytes()
 -                
 -        lconn.send(None)
 -        
 -        rconn.close()
 -        lconn.close()
 -        
 -        lp.join()
 -        rp.join()
 -
 -#
 -#
 -#
 -
 -class _TestHeap(BaseTestCase):
 -
 -    ALLOWED_TYPES = ('processes',)
 -
 -    def test_heap(self):
 -        iterations = 5000
 -        maxblocks = 50
 -        blocks = []
 -
 -        # create and destroy lots of blocks of different sizes
 -        for i in xrange(iterations):
 -            size = int(random.lognormvariate(0, 1) * 1000)
 -            b = multiprocessing.heap.BufferWrapper(size)
 -            blocks.append(b)
 -            if len(blocks) > maxblocks:
 -                i = random.randrange(maxblocks)
 -                del blocks[i]
 -
 -        # get the heap object
 -        heap = multiprocessing.heap.BufferWrapper._heap
 -
 -        # verify the state of the heap
 -        all = []
 -        occupied = 0
 -        for L in heap._len_to_seq.values():
 -            for arena, start, stop in L:
 -                all.append((heap._arenas.index(arena), start, stop,
 -                            stop-start, 'free'))
 -        for arena, start, stop in heap._allocated_blocks:
 -            all.append((heap._arenas.index(arena), start, stop,
 -                        stop-start, 'occupied'))
 -            occupied += (stop-start)
 -
 -        all.sort()
 -
 -        for i in range(len(all)-1):
 -            (arena, start, stop) = all[i][:3]
 -            (narena, nstart, nstop) = all[i+1][:3]
 -            self.assertTrue((arena != narena and nstart == 0) or
 -                            (stop == nstart))
 -            
 -#
 -#
 -#
 -
 -try:
 -    from ctypes import Structure, Value, copy, c_int, c_double
 -except ImportError:
 -    Structure = object
 -    c_int = c_double = None
 -
 -class _Foo(Structure):
 -    _fields_ = [
 -        ('x', c_int),
 -        ('y', c_double)
 -        ]
 -
 -class _TestSharedCTypes(BaseTestCase):
 -
 -    ALLOWED_TYPES = ('processes',)
 -
 -    def _double(self, x, y, foo, arr, string):
 -        x.value *= 2
 -        y.value *= 2
 -        foo.x *= 2
 -        foo.y *= 2
 -        string.value *= 2
 -        for i in range(len(arr)):
 -            arr[i] *= 2
 -
 -    def test_sharedctypes(self, lock=False):
 -        if c_int is None:
 -            return
 -        
 -        x = Value('i', 7, lock=lock)
 -        y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
 -        foo = Value(_Foo, 3, 2, lock=lock)
 -        arr = Array('d', range(10), lock=lock)
 -        string = Array('c', 20, lock=lock)
 -        string.value = 'hello'
 -
 -        p = self.Process(target=self._double, args=(x, y, foo, arr, string))
 -        p.start()
 -        p.join()
 -
 -        self.assertEqual(x.value, 14)
 -        self.assertAlmostEqual(y.value, 2.0/3.0)
 -        self.assertEqual(foo.x, 6)
 -        self.assertAlmostEqual(foo.y, 4.0)
 -        for i in range(10):
 -            self.assertAlmostEqual(arr[i], i*2)
 -        self.assertEqual(string.value, latin('hellohello'))
 -
 -    def test_synchronize(self):
 -        self.test_sharedctypes(lock=True)
 -
 -    def test_copy(self):
 -        if c_int is None:
 -            return
 -
 -        foo = _Foo(2, 5.0)
 -        bar = copy(foo)
 -        foo.x = 0
 -        foo.y = 0
 -        self.assertEqual(bar.x, 2)
 -        self.assertAlmostEqual(bar.y, 5.0)
 -
 -#
 -#
 -#
 -
 -class _TestFinalize(BaseTestCase):
 -
 -    ALLOWED_TYPES = ('processes',)
 -
 -    def _test_finalize(self, conn):
 -        class Foo(object):
 -            pass
 -
 -        a = Foo()
 -        util.Finalize(a, conn.send, args=('a',))
 -        del a           # triggers callback for a
 -
 -        b = Foo()
 -        close_b = util.Finalize(b, conn.send, args=('b',))    
 -        close_b()       # triggers callback for b
 -        close_b()       # does nothing because callback has already been called
 -        del b           # does nothing because callback has already been called
 -
 -        c = Foo()
 -        util.Finalize(c, conn.send, args=('c',))
 -
 -        d10 = Foo()
 -        util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
 -
 -        d01 = Foo()
 -        util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
 -        d02 = Foo()
 -        util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
 -        d03 = Foo()
 -        util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
 -
 -        util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
 -
 -        util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
 -
 -        # call mutliprocessing's cleanup function then exit process without
 -        # garbage collecting locals
 -        util._exit_function()
 -        conn.close()
 -        os._exit(0)
 -
 -    def test_finalize(self):
 -        conn, child_conn = self.Pipe()
 -        
 -        p = self.Process(target=self._test_finalize, args=(child_conn,))
 -        p.start()
 -        p.join()
 -
 -        result = [obj for obj in iter(conn.recv, 'STOP')]
 -        self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
 -
 -#
 -# Test that from ... import * works for each module
 -#
 -
 -class _TestImportStar(BaseTestCase):
 -
 -    ALLOWED_TYPES = ('processes',)
 -
 -    def test_import(self):
 -        modules = (
 -            'multiprocessing', 'multiprocessing.connection',
 -            'multiprocessing.heap', 'multiprocessing.managers',
 -            'multiprocessing.pool', 'multiprocessing.process',
 -            'multiprocessing.reduction', 'multiprocessing.sharedctypes',
 -            'multiprocessing.synchronize', 'multiprocessing.util'
 -            )
 -        
 -        for name in modules:
 -            __import__(name)
 -            mod = sys.modules[name]
 -            
 -            for attr in getattr(mod, '__all__', ()):
 -                self.assertTrue(
 -                    hasattr(mod, attr),
 -                    '%r does not have attribute %r' % (mod, attr)
 -                    )
 -
 -#
 -# Quick test that logging works -- does not test logging output
 -#
 -
 -class _TestLogging(BaseTestCase):
 -
 -    ALLOWED_TYPES = ('processes',)
 -
 -    def test_enable_logging(self):
 -        logger = multiprocessing.get_logger()
 -        logger.setLevel(util.SUBWARNING)
 -        self.assertTrue(logger is not None)
 -        logger.debug('this will not be printed')
 -        logger.info('nor will this')
 -        logger.setLevel(LOG_LEVEL)
 -
 -    def _test_level(self, conn):
 -        logger = multiprocessing.get_logger()
 -        conn.send(logger.getEffectiveLevel())
 -
 -    def test_level(self):
 -        LEVEL1 = 32
 -        LEVEL2 = 37
 -        
 -        logger = multiprocessing.get_logger()
 -        root_logger = logging.getLogger()
 -        root_level = root_logger.level
 -
 -        reader, writer = multiprocessing.Pipe(duplex=False)
 -
 -        logger.setLevel(LEVEL1)
 -        self.Process(target=self._test_level, args=(writer,)).start()
 -        self.assertEqual(LEVEL1, reader.recv())
 -
 -        logger.setLevel(logging.NOTSET)
 -        root_logger.setLevel(LEVEL2)
 -        self.Process(target=self._test_level, args=(writer,)).start()
 -        self.assertEqual(LEVEL2, reader.recv())
 -
 -        root_logger.setLevel(root_level)
 -        logger.setLevel(level=LOG_LEVEL)
 -
 -#
 -# Functions used to create test cases from the base ones in this module
 -#
 -
 -def get_attributes(Source, names):
 -    d = {}
 -    for name in names:
 -        obj = getattr(Source, name)
 -        if type(obj) == type(get_attributes):
 -            obj = staticmethod(obj)
 -        d[name] = obj
 -    return d
 -
 -def create_test_cases(Mixin, type):
 -    result = {}
 -    glob = globals()
 -    Type = type[0].upper() + type[1:]
 -
 -    for name in glob.keys():
 -        if name.startswith('_Test'):
 -            base = glob[name]
 -            if type in base.ALLOWED_TYPES:
 -                newname = 'With' + Type + name[1:]
 -                class Temp(base, unittest.TestCase, Mixin):
 -                    pass
 -                result[newname] = Temp
 -                Temp.__name__ = newname
 -                Temp.__module__ = Mixin.__module__
 -    return result
 -
 -#
 -# Create test cases
 -#
 -
 -class ProcessesMixin(object):
 -    TYPE = 'processes'
 -    Process = multiprocessing.Process
 -    locals().update(get_attributes(multiprocessing, (
 -        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
 -        'Condition', 'Event', 'Value', 'Array', 'RawValue',
 -        'RawArray', 'current_process', 'active_children', 'Pipe',
 -        'connection', 'JoinableQueue'
 -        )))
 -
 -testcases_processes = create_test_cases(ProcessesMixin, type='processes')
 -globals().update(testcases_processes)
 -
 -
 -class ManagerMixin(object):
 -    TYPE = 'manager'
 -    Process = multiprocessing.Process
 -    manager = object.__new__(multiprocessing.managers.SyncManager)
 -    locals().update(get_attributes(manager, (
 -        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 
 -       'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
 -        'Namespace', 'JoinableQueue'
 -        )))
 -
 -testcases_manager = create_test_cases(ManagerMixin, type='manager')
 -globals().update(testcases_manager)
 -
 -
 -class ThreadsMixin(object):
 -    TYPE = 'threads'
 -    Process = multiprocessing.dummy.Process
 -    locals().update(get_attributes(multiprocessing.dummy, (
 -        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
 -        'Condition', 'Event', 'Value', 'Array', 'current_process',
 -        'active_children', 'Pipe', 'connection', 'dict', 'list',
 -        'Namespace', 'JoinableQueue'
 -        )))
 -
 -testcases_threads = create_test_cases(ThreadsMixin, type='threads')
 -globals().update(testcases_threads)
 -
 -#
 -#
 -#
 -
 -def test_main(run=None):
 -    if run is None:
 -        from test.test_support import run_unittest as run
 -
 -    util.get_temp_dir()     # creates temp directory for use by all processes
 -    
 -    multiprocessing.get_logger().setLevel(LOG_LEVEL)
 -
 -    ProcessesMixin.pool = multiprocessing.Pool(4)
 -    ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
 -    ManagerMixin.manager.__init__()
 -    ManagerMixin.manager.start()
 -    ManagerMixin.pool = ManagerMixin.manager.Pool(4)
 -
 -    testcases = (
 -        sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
 -        sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
 -        sorted(testcases_manager.values(), key=lambda tc:tc.__name__)
 -        )
 -
 -    loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
 -    suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
 -    run(suite)
 -
 -    ThreadsMixin.pool.terminate()
 -    ProcessesMixin.pool.terminate()
 -    ManagerMixin.pool.terminate()
 -    ManagerMixin.manager.shutdown()
 -    
 -    del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
 -
 -def main():
 -    test_main(unittest.TextTestRunner(verbosity=2).run)
 -
 -if __name__ == '__main__':
 -    main()
 +# +# Unit tests for the multiprocessing package +# + +import unittest +import threading +import Queue +import time +import sys +import os +import gc +import signal +import array +import copy +import socket +import random +import logging + +import multiprocessing.dummy +import multiprocessing.connection +import multiprocessing.managers +import multiprocessing.heap +import multiprocessing.managers +import multiprocessing.pool +import _multiprocessing + +from multiprocessing import util + +# +# +# + +if sys.version_info >= (3, 0): +    def latin(s): +        return s.encode('latin') +else: +    latin = str + +try: +    bytes +except NameError: +    bytes = str +    def bytearray(seq): +        return array.array('c', seq) + +# +# Constants +# + +LOG_LEVEL = util.SUBWARNING +#LOG_LEVEL = logging.WARNING + +DELTA = 0.1 +CHECK_TIMINGS = False     # making true makes tests take a lot longer +                          # and can sometimes cause some non-serious +                          # failures because some calls block a bit +                          # longer than expected +if CHECK_TIMINGS: +    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4 +else: +    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1 + +HAVE_GETVALUE = not getattr(_multiprocessing, +                            'HAVE_BROKEN_SEM_GETVALUE', False) + +# +# Creates a wrapper for a function which records the time it takes to finish +# + +class TimingWrapper(object): + +    def __init__(self, func): +        self.func = func +        self.elapsed = None + +    def __call__(self, *args, **kwds): +        t = time.time() +        try: +            return self.func(*args, **kwds) +        finally: +            self.elapsed = time.time() - t + +# +# Base class for test cases +# + +class BaseTestCase(object): + +    ALLOWED_TYPES = ('processes', 'manager', 'threads') + +    def assertTimingAlmostEqual(self, a, b): +        if CHECK_TIMINGS: +            self.assertAlmostEqual(a, b, 1) + +    def assertReturnsIfImplemented(self, value, func, *args): +        try: +            res = func(*args) +        except NotImplementedError: +            pass +        else: +            return self.assertEqual(value, res) + +# +# Return the value of a semaphore +# + +def get_value(self): +    try: +        return self.get_value() +    except AttributeError: +        try: +            return self._Semaphore__value +        except AttributeError: +            try: +                return self._value +            except AttributeError: +                raise NotImplementedError + +# +# Testcases +# + +class _TestProcess(BaseTestCase): + +    ALLOWED_TYPES = ('processes', 'threads') + +    def test_current(self): +        if self.TYPE == 'threads': +            return + +        current = self.current_process() +        authkey = current.get_authkey() + +        self.assertTrue(current.is_alive()) +        self.assertTrue(not current.is_daemon()) +        self.assertTrue(isinstance(authkey, bytes)) +        self.assertTrue(len(authkey) > 0) +        self.assertEqual(current.get_ident(), os.getpid()) +        self.assertEqual(current.get_exitcode(), None) + +    def _test(self, q, *args, **kwds): +        current = self.current_process() +        q.put(args) +        q.put(kwds) +        q.put(current.get_name()) +        if self.TYPE != 'threads': +            q.put(bytes(current.get_authkey())) +            q.put(current.pid) + +    def test_process(self): +        q = self.Queue(1) +        e = self.Event() +        args = (q, 1, 2) +        kwargs = {'hello':23, 'bye':2.54} +        name = 'SomeProcess' +        p = self.Process( +            target=self._test, args=args, kwargs=kwargs, name=name +            ) +        p.set_daemon(True) +        current = self.current_process() + +        if self.TYPE != 'threads': +            self.assertEquals(p.get_authkey(), current.get_authkey()) +        self.assertEquals(p.is_alive(), False) +        self.assertEquals(p.is_daemon(), True) +        self.assertTrue(p not in self.active_children()) +        self.assertTrue(type(self.active_children()) is list) +        self.assertEqual(p.get_exitcode(), None) + +        p.start() + +        self.assertEquals(p.get_exitcode(), None) +        self.assertEquals(p.is_alive(), True) +        self.assertTrue(p in self.active_children()) + +        self.assertEquals(q.get(), args[1:]) +        self.assertEquals(q.get(), kwargs) +        self.assertEquals(q.get(), p.get_name()) +        if self.TYPE != 'threads': +            self.assertEquals(q.get(), current.get_authkey()) +            self.assertEquals(q.get(), p.pid) + +        p.join() + +        self.assertEquals(p.get_exitcode(), 0) +        self.assertEquals(p.is_alive(), False) +        self.assertTrue(p not in self.active_children()) + +    def _test_terminate(self): +        time.sleep(1000) + +    def test_terminate(self): +        if self.TYPE == 'threads': +            return + +        p = self.Process(target=self._test_terminate) +        p.set_daemon(True) +        p.start() + +        self.assertEqual(p.is_alive(), True) +        self.assertTrue(p in self.active_children()) +        self.assertEqual(p.get_exitcode(), None) + +        p.terminate() + +        join = TimingWrapper(p.join) +        self.assertEqual(join(), None) +        self.assertTimingAlmostEqual(join.elapsed, 0.0) + +        self.assertEqual(p.is_alive(), False) +        self.assertTrue(p not in self.active_children()) + +        p.join() + +        # XXX sometimes get p.get_exitcode() == 0 on Windows ... +        #self.assertEqual(p.get_exitcode(), -signal.SIGTERM) + +    def test_cpu_count(self): +        try: +            cpus = multiprocessing.cpu_count() +        except NotImplementedError: +            cpus = 1 +        self.assertTrue(type(cpus) is int) +        self.assertTrue(cpus >= 1) + +    def test_active_children(self): +        self.assertEqual(type(self.active_children()), list) + +        p = self.Process(target=time.sleep, args=(DELTA,)) +        self.assertTrue(p not in self.active_children()) + +        p.start() +        self.assertTrue(p in self.active_children()) + +        p.join() +        self.assertTrue(p not in self.active_children()) + +    def _test_recursion(self, wconn, id): +        from multiprocessing import forking +        wconn.send(id) +        if len(id) < 2: +            for i in range(2): +                p = self.Process( +                    target=self._test_recursion, args=(wconn, id+[i]) +                    ) +                p.start() +                p.join() + +    def test_recursion(self): +        rconn, wconn = self.Pipe(duplex=False) +        self._test_recursion(wconn, []) + +        time.sleep(DELTA) +        result = [] +        while rconn.poll(): +            result.append(rconn.recv()) + +        expected = [ +            [], +              [0], +                [0, 0], +                [0, 1], +              [1], +                [1, 0], +                [1, 1] +            ] +        self.assertEqual(result, expected) + +# +# +# + +class _UpperCaser(multiprocessing.Process): + +    def __init__(self): +        multiprocessing.Process.__init__(self) +        self.child_conn, self.parent_conn = multiprocessing.Pipe() + +    def run(self): +        self.parent_conn.close() +        for s in iter(self.child_conn.recv, None): +            self.child_conn.send(s.upper()) +        self.child_conn.close() + +    def submit(self, s): +        assert type(s) is str +        self.parent_conn.send(s) +        return self.parent_conn.recv() + +    def stop(self): +        self.parent_conn.send(None) +        self.parent_conn.close() +        self.child_conn.close() + +class _TestSubclassingProcess(BaseTestCase): + +    ALLOWED_TYPES = ('processes',) + +    def test_subclassing(self): +        uppercaser = _UpperCaser() +        uppercaser.start() +        self.assertEqual(uppercaser.submit('hello'), 'HELLO') +        self.assertEqual(uppercaser.submit('world'), 'WORLD') +        uppercaser.stop() +        uppercaser.join() + +# +# +# + +def queue_empty(q): +    if hasattr(q, 'empty'): +        return q.empty() +    else: +        return q.qsize() == 0 + +def queue_full(q, maxsize): +    if hasattr(q, 'full'): +        return q.full() +    else: +        return q.qsize() == maxsize + + +class _TestQueue(BaseTestCase): + + +    def _test_put(self, queue, child_can_start, parent_can_continue): +        child_can_start.wait() +        for i in range(6): +            queue.get() +        parent_can_continue.set() + +    def test_put(self): +        MAXSIZE = 6 +        queue = self.Queue(maxsize=MAXSIZE) +        child_can_start = self.Event() +        parent_can_continue = self.Event() + +        proc = self.Process( +            target=self._test_put, +            args=(queue, child_can_start, parent_can_continue) +            ) +        proc.set_daemon(True) +        proc.start() + +        self.assertEqual(queue_empty(queue), True) +        self.assertEqual(queue_full(queue, MAXSIZE), False) + +        queue.put(1) +        queue.put(2, True) +        queue.put(3, True, None) +        queue.put(4, False) +        queue.put(5, False, None) +        queue.put_nowait(6) + +        # the values may be in buffer but not yet in pipe so sleep a bit +        time.sleep(DELTA) + +        self.assertEqual(queue_empty(queue), False) +        self.assertEqual(queue_full(queue, MAXSIZE), True) + +        put = TimingWrapper(queue.put) +        put_nowait = TimingWrapper(queue.put_nowait) + +        self.assertRaises(Queue.Full, put, 7, False) +        self.assertTimingAlmostEqual(put.elapsed, 0) + +        self.assertRaises(Queue.Full, put, 7, False, None) +        self.assertTimingAlmostEqual(put.elapsed, 0) + +        self.assertRaises(Queue.Full, put_nowait, 7) +        self.assertTimingAlmostEqual(put_nowait.elapsed, 0) + +        self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1) +        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1) + +        self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2) +        self.assertTimingAlmostEqual(put.elapsed, 0) + +        self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3) +        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3) + +        child_can_start.set() +        parent_can_continue.wait() + +        self.assertEqual(queue_empty(queue), True) +        self.assertEqual(queue_full(queue, MAXSIZE), False) + +        proc.join() + +    def _test_get(self, queue, child_can_start, parent_can_continue): +        child_can_start.wait() +        queue.put(1) +        queue.put(2) +        queue.put(3) +        queue.put(4) +        queue.put(5) +        parent_can_continue.set() + +    def test_get(self): +        queue = self.Queue() +        child_can_start = self.Event() +        parent_can_continue = self.Event() + +        proc = self.Process( +            target=self._test_get, +            args=(queue, child_can_start, parent_can_continue) +            ) +        proc.set_daemon(True) +        proc.start() + +        self.assertEqual(queue_empty(queue), True) + +        child_can_start.set() +        parent_can_continue.wait() + +        time.sleep(DELTA) +        self.assertEqual(queue_empty(queue), False) + +        self.assertEqual(queue.get(), 1) +        self.assertEqual(queue.get(True, None), 2) +        self.assertEqual(queue.get(True), 3) +        self.assertEqual(queue.get(timeout=1), 4) +        self.assertEqual(queue.get_nowait(), 5) + +        self.assertEqual(queue_empty(queue), True) + +        get = TimingWrapper(queue.get) +        get_nowait = TimingWrapper(queue.get_nowait) + +        self.assertRaises(Queue.Empty, get, False) +        self.assertTimingAlmostEqual(get.elapsed, 0) + +        self.assertRaises(Queue.Empty, get, False, None) +        self.assertTimingAlmostEqual(get.elapsed, 0) + +        self.assertRaises(Queue.Empty, get_nowait) +        self.assertTimingAlmostEqual(get_nowait.elapsed, 0) + +        self.assertRaises(Queue.Empty, get, True, TIMEOUT1) +        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) + +        self.assertRaises(Queue.Empty, get, False, TIMEOUT2) +        self.assertTimingAlmostEqual(get.elapsed, 0) + +        self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3) +        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3) + +        proc.join() + +    def _test_fork(self, queue): +        for i in range(10, 20): +            queue.put(i) +        # note that at this point the items may only be buffered, so the +        # process cannot shutdown until the feeder thread has finished +        # pushing items onto the pipe. + +    def test_fork(self): +        # Old versions of Queue would fail to create a new feeder +        # thread for a forked process if the original process had its +        # own feeder thread.  This test checks that this no longer +        # happens. + +        queue = self.Queue() + +        # put items on queue so that main process starts a feeder thread +        for i in range(10): +            queue.put(i) + +        # wait to make sure thread starts before we fork a new process +        time.sleep(DELTA) + +        # fork process +        p = self.Process(target=self._test_fork, args=(queue,)) +        p.start() + +        # check that all expected items are in the queue +        for i in range(20): +            self.assertEqual(queue.get(), i) +        self.assertRaises(Queue.Empty, queue.get, False) + +        p.join() + +    def test_qsize(self): +        q = self.Queue() +        try: +            self.assertEqual(q.qsize(), 0) +        except NotImplementedError: +            return +        q.put(1) +        self.assertEqual(q.qsize(), 1) +        q.put(5) +        self.assertEqual(q.qsize(), 2) +        q.get() +        self.assertEqual(q.qsize(), 1) +        q.get() +        self.assertEqual(q.qsize(), 0) + +    def _test_task_done(self, q): +        for obj in iter(q.get, None): +            time.sleep(DELTA) +            q.task_done() + +    def test_task_done(self): +        queue = self.JoinableQueue() + +        if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'): +            return + +        workers = [self.Process(target=self._test_task_done, args=(queue,)) +                   for i in xrange(4)] + +        for p in workers: +            p.start() + +        for i in xrange(10): +            queue.put(i) + +        queue.join() + +        for p in workers: +            queue.put(None) + +        for p in workers: +            p.join() + +# +# +# + +class _TestLock(BaseTestCase): + +    def test_lock(self): +        lock = self.Lock() +        self.assertEqual(lock.acquire(), True) +        self.assertEqual(lock.acquire(False), False) +        self.assertEqual(lock.release(), None) +        self.assertRaises((ValueError, threading.ThreadError), lock.release) + +    def test_rlock(self): +        lock = self.RLock() +        self.assertEqual(lock.acquire(), True) +        self.assertEqual(lock.acquire(), True) +        self.assertEqual(lock.acquire(), True) +        self.assertEqual(lock.release(), None) +        self.assertEqual(lock.release(), None) +        self.assertEqual(lock.release(), None) +        self.assertRaises((AssertionError, RuntimeError), lock.release) + + +class _TestSemaphore(BaseTestCase): + +    def _test_semaphore(self, sem): +        self.assertReturnsIfImplemented(2, get_value, sem) +        self.assertEqual(sem.acquire(), True) +        self.assertReturnsIfImplemented(1, get_value, sem) +        self.assertEqual(sem.acquire(), True) +        self.assertReturnsIfImplemented(0, get_value, sem) +        self.assertEqual(sem.acquire(False), False) +        self.assertReturnsIfImplemented(0, get_value, sem) +        self.assertEqual(sem.release(), None) +        self.assertReturnsIfImplemented(1, get_value, sem) +        self.assertEqual(sem.release(), None) +        self.assertReturnsIfImplemented(2, get_value, sem) + +    def test_semaphore(self): +        sem = self.Semaphore(2) +        self._test_semaphore(sem) +        self.assertEqual(sem.release(), None) +        self.assertReturnsIfImplemented(3, get_value, sem) +        self.assertEqual(sem.release(), None) +        self.assertReturnsIfImplemented(4, get_value, sem) + +    def test_bounded_semaphore(self): +        sem = self.BoundedSemaphore(2) +        self._test_semaphore(sem) +        # Currently fails on OS/X +        #if HAVE_GETVALUE: +        #    self.assertRaises(ValueError, sem.release) +        #    self.assertReturnsIfImplemented(2, get_value, sem) + +    def test_timeout(self): +        if self.TYPE != 'processes': +            return + +        sem = self.Semaphore(0) +        acquire = TimingWrapper(sem.acquire) + +        self.assertEqual(acquire(False), False) +        self.assertTimingAlmostEqual(acquire.elapsed, 0.0) + +        self.assertEqual(acquire(False, None), False) +        self.assertTimingAlmostEqual(acquire.elapsed, 0.0) + +        self.assertEqual(acquire(False, TIMEOUT1), False) +        self.assertTimingAlmostEqual(acquire.elapsed, 0) + +        self.assertEqual(acquire(True, TIMEOUT2), False) +        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2) + +        self.assertEqual(acquire(timeout=TIMEOUT3), False) +        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3) + + +class _TestCondition(BaseTestCase): + +    def f(self, cond, sleeping, woken, timeout=None): +        cond.acquire() +        sleeping.release() +        cond.wait(timeout) +        woken.release() +        cond.release() + +    def check_invariant(self, cond): +        # this is only supposed to succeed when there are no sleepers +        if self.TYPE == 'processes': +            try: +                sleepers = (cond._sleeping_count.get_value() - +                            cond._woken_count.get_value()) +                self.assertEqual(sleepers, 0) +                self.assertEqual(cond._wait_semaphore.get_value(), 0) +            except NotImplementedError: +                pass + +    def test_notify(self): +        cond = self.Condition() +        sleeping = self.Semaphore(0) +        woken = self.Semaphore(0) + +        p = self.Process(target=self.f, args=(cond, sleeping, woken)) +        p.set_daemon(True) +        p.start() + +        p = threading.Thread(target=self.f, args=(cond, sleeping, woken)) +        p.set_daemon(True) +        p.start() + +        # wait for both children to start sleeping +        sleeping.acquire() +        sleeping.acquire() + +        # check no process/thread has woken up +        time.sleep(DELTA) +        self.assertReturnsIfImplemented(0, get_value, woken) + +        # wake up one process/thread +        cond.acquire() +        cond.notify() +        cond.release() + +        # check one process/thread has woken up +        time.sleep(DELTA) +        self.assertReturnsIfImplemented(1, get_value, woken) + +        # wake up another +        cond.acquire() +        cond.notify() +        cond.release() + +        # check other has woken up +        time.sleep(DELTA) +        self.assertReturnsIfImplemented(2, get_value, woken) + +        # check state is not mucked up +        self.check_invariant(cond) +        p.join() + +    def test_notify_all(self): +        cond = self.Condition() +        sleeping = self.Semaphore(0) +        woken = self.Semaphore(0) + +        # start some threads/processes which will timeout +        for i in range(3): +            p = self.Process(target=self.f, +                             args=(cond, sleeping, woken, TIMEOUT1)) +            p.set_daemon(True) +            p.start() + +            t = threading.Thread(target=self.f, +                                 args=(cond, sleeping, woken, TIMEOUT1)) +            t.set_daemon(True) +            t.start() + +        # wait for them all to sleep +        for i in xrange(6): +            sleeping.acquire() + +        # check they have all timed out +        for i in xrange(6): +            woken.acquire() +        self.assertReturnsIfImplemented(0, get_value, woken) + +        # check state is not mucked up +        self.check_invariant(cond) + +        # start some more threads/processes +        for i in range(3): +            p = self.Process(target=self.f, args=(cond, sleeping, woken)) +            p.set_daemon(True) +            p.start() + +            t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) +            t.set_daemon(True) +            t.start() + +        # wait for them to all sleep +        for i in xrange(6): +            sleeping.acquire() + +        # check no process/thread has woken up +        time.sleep(DELTA) +        self.assertReturnsIfImplemented(0, get_value, woken) + +        # wake them all up +        cond.acquire() +        cond.notify_all() +        cond.release() + +        # check they have all woken +        time.sleep(DELTA) +        self.assertReturnsIfImplemented(6, get_value, woken) + +        # check state is not mucked up +        self.check_invariant(cond) + +    def test_timeout(self): +        cond = self.Condition() +        wait = TimingWrapper(cond.wait) +        cond.acquire() +        res = wait(TIMEOUT1) +        cond.release() +        self.assertEqual(res, None) +        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) + + +class _TestEvent(BaseTestCase): + +    def _test_event(self, event): +        time.sleep(TIMEOUT2) +        event.set() + +    def test_event(self): +        event = self.Event() +        wait = TimingWrapper(event.wait) + +        # Removed temporaily, due to API shear, this does not +        # work with threading._Event objects. is_set == isSet +        #self.assertEqual(event.is_set(), False) + +        self.assertEqual(wait(0.0), None) +        self.assertTimingAlmostEqual(wait.elapsed, 0.0) +        self.assertEqual(wait(TIMEOUT1), None) +        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) + +        event.set() + +        # See note above on the API differences +        # self.assertEqual(event.is_set(), True) +        self.assertEqual(wait(), None) +        self.assertTimingAlmostEqual(wait.elapsed, 0.0) +        self.assertEqual(wait(TIMEOUT1), None) +        self.assertTimingAlmostEqual(wait.elapsed, 0.0) +        # self.assertEqual(event.is_set(), True) + +        event.clear() + +        #self.assertEqual(event.is_set(), False) + +        self.Process(target=self._test_event, args=(event,)).start() +        self.assertEqual(wait(), None) + +# +# +# + +class _TestValue(BaseTestCase): + +    codes_values = [ +        ('i', 4343, 24234), +        ('d', 3.625, -4.25), +        ('h', -232, 234), +        ('c', latin('x'), latin('y')) +        ] + +    def _test(self, values): +        for sv, cv in zip(values, self.codes_values): +            sv.value = cv[2] + + +    def test_value(self, raw=False): +        if self.TYPE != 'processes': +            return + +        if raw: +            values = [self.RawValue(code, value) +                      for code, value, _ in self.codes_values] +        else: +            values = [self.Value(code, value) +                      for code, value, _ in self.codes_values] + +        for sv, cv in zip(values, self.codes_values): +            self.assertEqual(sv.value, cv[1]) + +        proc = self.Process(target=self._test, args=(values,)) +        proc.start() +        proc.join() + +        for sv, cv in zip(values, self.codes_values): +            self.assertEqual(sv.value, cv[2]) + +    def test_rawvalue(self): +        self.test_value(raw=True) + +    def test_getobj_getlock(self): +        if self.TYPE != 'processes': +            return + +        val1 = self.Value('i', 5) +        lock1 = val1.get_lock() +        obj1 = val1.get_obj() + +        val2 = self.Value('i', 5, lock=None) +        lock2 = val2.get_lock() +        obj2 = val2.get_obj() + +        lock = self.Lock() +        val3 = self.Value('i', 5, lock=lock) +        lock3 = val3.get_lock() +        obj3 = val3.get_obj() +        self.assertEqual(lock, lock3) + +        arr4 = self.RawValue('i', 5) +        self.assertFalse(hasattr(arr4, 'get_lock')) +        self.assertFalse(hasattr(arr4, 'get_obj')) + + +class _TestArray(BaseTestCase): + +    def f(self, seq): +        for i in range(1, len(seq)): +            seq[i] += seq[i-1] + +    def test_array(self, raw=False): +        if self.TYPE != 'processes': +            return + +        seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831] +        if raw: +            arr = self.RawArray('i', seq) +        else: +            arr = self.Array('i', seq) + +        self.assertEqual(len(arr), len(seq)) +        self.assertEqual(arr[3], seq[3]) +        self.assertEqual(list(arr[2:7]), list(seq[2:7])) + +        arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4]) + +        self.assertEqual(list(arr[:]), seq) + +        self.f(seq) + +        p = self.Process(target=self.f, args=(arr,)) +        p.start() +        p.join() + +        self.assertEqual(list(arr[:]), seq) + +    def test_rawarray(self): +        self.test_array(raw=True) + +    def test_getobj_getlock_obj(self): +        if self.TYPE != 'processes': +            return + +        arr1 = self.Array('i', range(10)) +        lock1 = arr1.get_lock() +        obj1 = arr1.get_obj() + +        arr2 = self.Array('i', range(10), lock=None) +        lock2 = arr2.get_lock() +        obj2 = arr2.get_obj() + +        lock = self.Lock() +        arr3 = self.Array('i', range(10), lock=lock) +        lock3 = arr3.get_lock() +        obj3 = arr3.get_obj() +        self.assertEqual(lock, lock3) + +        arr4 = self.RawArray('i', range(10)) +        self.assertFalse(hasattr(arr4, 'get_lock')) +        self.assertFalse(hasattr(arr4, 'get_obj')) + +# +# +# + +class _TestContainers(BaseTestCase): + +    ALLOWED_TYPES = ('manager',) + +    def test_list(self): +        a = self.list(range(10)) +        self.assertEqual(a[:], range(10)) + +        b = self.list() +        self.assertEqual(b[:], []) + +        b.extend(range(5)) +        self.assertEqual(b[:], range(5)) + +        self.assertEqual(b[2], 2) +        self.assertEqual(b[2:10], [2,3,4]) + +        b *= 2 +        self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]) + +        self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6]) + +        self.assertEqual(a[:], range(10)) + +        d = [a, b] +        e = self.list(d) +        self.assertEqual( +            e[:], +            [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]] +            ) + +        f = self.list([a]) +        a.append('hello') +        self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']]) + +    def test_dict(self): +        d = self.dict() +        indices = range(65, 70) +        for i in indices: +            d[i] = chr(i) +        self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices)) +        self.assertEqual(sorted(d.keys()), indices) +        self.assertEqual(sorted(d.values()), [chr(i) for i in indices]) +        self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices]) + +    def test_namespace(self): +        n = self.Namespace() +        n.name = 'Bob' +        n.job = 'Builder' +        n._hidden = 'hidden' +        self.assertEqual((n.name, n.job), ('Bob', 'Builder')) +        del n.job +        self.assertEqual(str(n), "Namespace(name='Bob')") +        self.assertTrue(hasattr(n, 'name')) +        self.assertTrue(not hasattr(n, 'job')) + +# +# +# + +def sqr(x, wait=0.0): +    time.sleep(wait) +    return x*x + +class _TestPool(BaseTestCase): + +    def test_apply(self): +        papply = self.pool.apply +        self.assertEqual(papply(sqr, (5,)), sqr(5)) +        self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3)) + +    def test_map(self): +        pmap = self.pool.map +        self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10))) +        self.assertEqual(pmap(sqr, range(100), chunksize=20), +                         map(sqr, range(100))) + +    def test_async(self): +        res = self.pool.apply_async(sqr, (7, TIMEOUT1,)) +        get = TimingWrapper(res.get) +        self.assertEqual(get(), 49) +        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) + +    def test_async_timeout(self): +        res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2)) +        get = TimingWrapper(res.get) +        self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2) +        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2) + +    def test_imap(self): +        it = self.pool.imap(sqr, range(10)) +        self.assertEqual(list(it), map(sqr, range(10))) + +        it = self.pool.imap(sqr, range(10)) +        for i in range(10): +            self.assertEqual(it.next(), i*i) +        self.assertRaises(StopIteration, it.next) + +        it = self.pool.imap(sqr, range(1000), chunksize=100) +        for i in range(1000): +            self.assertEqual(it.next(), i*i) +        self.assertRaises(StopIteration, it.next) + +    def test_imap_unordered(self): +        it = self.pool.imap_unordered(sqr, range(1000)) +        self.assertEqual(sorted(it), map(sqr, range(1000))) + +        it = self.pool.imap_unordered(sqr, range(1000), chunksize=53) +        self.assertEqual(sorted(it), map(sqr, range(1000))) + +    def test_make_pool(self): +        p = multiprocessing.Pool(3) +        self.assertEqual(3, len(p._pool)) +        p.close() +        p.join() + +    def test_terminate(self): +        if self.TYPE == 'manager': +            # On Unix a forked process increfs each shared object to +            # which its parent process held a reference.  If the +            # forked process gets terminated then there is likely to +            # be a reference leak.  So to prevent +            # _TestZZZNumberOfObjects from failing we skip this test +            # when using a manager. +            return + +        result = self.pool.map_async( +            time.sleep, [0.1 for i in range(10000)], chunksize=1 +            ) +        self.pool.terminate() +        join = TimingWrapper(self.pool.join) +        join() +        self.assertTrue(join.elapsed < 0.2) + +# +# Test that manager has expected number of shared objects left +# + +class _TestZZZNumberOfObjects(BaseTestCase): +    # Because test cases are sorted alphabetically, this one will get +    # run after all the other tests for the manager.  It tests that +    # there have been no "reference leaks" for the manager's shared +    # objects.  Note the comment in _TestPool.test_terminate(). +    ALLOWED_TYPES = ('manager',) + +    def test_number_of_objects(self): +        EXPECTED_NUMBER = 1                # the pool object is still alive +        multiprocessing.active_children()  # discard dead process objs +        gc.collect()                       # do garbage collection +        refs = self.manager._number_of_objects() +        if refs != EXPECTED_NUMBER: +            print self.manager._debugInfo() + +        self.assertEqual(refs, EXPECTED_NUMBER) + +# +# Test of creating a customized manager class +# + +from multiprocessing.managers import BaseManager, BaseProxy, RemoteError + +class FooBar(object): +    def f(self): +        return 'f()' +    def g(self): +        raise ValueError +    def _h(self): +        return '_h()' + +def baz(): +    for i in xrange(10): +        yield i*i + +class IteratorProxy(BaseProxy): +    _exposed_ = ('next', '__next__') +    def __iter__(self): +        return self +    def next(self): +        return self._callmethod('next') +    def __next__(self): +        return self._callmethod('__next__') + +class MyManager(BaseManager): +    pass + +MyManager.register('Foo', callable=FooBar) +MyManager.register('Bar', callable=FooBar, exposed=('f', '_h')) +MyManager.register('baz', callable=baz, proxytype=IteratorProxy) + + +class _TestMyManager(BaseTestCase): + +    ALLOWED_TYPES = ('manager',) + +    def test_mymanager(self): +        manager = MyManager() +        manager.start() + +        foo = manager.Foo() +        bar = manager.Bar() +        baz = manager.baz() + +        foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)] +        bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)] + +        self.assertEqual(foo_methods, ['f', 'g']) +        self.assertEqual(bar_methods, ['f', '_h']) + +        self.assertEqual(foo.f(), 'f()') +        self.assertRaises(ValueError, foo.g) +        self.assertEqual(foo._callmethod('f'), 'f()') +        self.assertRaises(RemoteError, foo._callmethod, '_h') + +        self.assertEqual(bar.f(), 'f()') +        self.assertEqual(bar._h(), '_h()') +        self.assertEqual(bar._callmethod('f'), 'f()') +        self.assertEqual(bar._callmethod('_h'), '_h()') + +        self.assertEqual(list(baz), [i*i for i in range(10)]) + +        manager.shutdown() + +# +# Test of connecting to a remote server and using xmlrpclib for serialization +# + +_queue = Queue.Queue() +def get_queue(): +    return _queue + +class QueueManager(BaseManager): +    '''manager class used by server process''' +QueueManager.register('get_queue', callable=get_queue) + +class QueueManager2(BaseManager): +    '''manager class which specifies the same interface as QueueManager''' +QueueManager2.register('get_queue') + + +SERIALIZER = 'xmlrpclib' + +class _TestRemoteManager(BaseTestCase): + +    ALLOWED_TYPES = ('manager',) + +    def _putter(self, address, authkey): +        manager = QueueManager2( +            address=address, authkey=authkey, serializer=SERIALIZER +            ) +        manager.connect() +        queue = manager.get_queue() +        queue.put(('hello world', None, True, 2.25)) + +    def test_remote(self): +        authkey = os.urandom(32) + +        manager = QueueManager( +            address=('localhost', 0), authkey=authkey, serializer=SERIALIZER +            ) +        manager.start() + +        p = self.Process(target=self._putter, args=(manager.address, authkey)) +        p.start() + +        manager2 = QueueManager2( +            address=manager.address, authkey=authkey, serializer=SERIALIZER +            ) +        manager2.connect() +        queue = manager2.get_queue() + +        # Note that xmlrpclib will deserialize object as a list not a tuple +        self.assertEqual(queue.get(), ['hello world', None, True, 2.25]) + +        # Because we are using xmlrpclib for serialization instead of +        # pickle this will cause a serialization error. +        self.assertRaises(Exception, queue.put, time.sleep) + +        # Make queue finalizer run before the server is stopped +        del queue +        manager.shutdown() + +# +# +# + +SENTINEL = latin('') + +class _TestConnection(BaseTestCase): + +    ALLOWED_TYPES = ('processes', 'threads') + +    def _echo(self, conn): +        for msg in iter(conn.recv_bytes, SENTINEL): +            conn.send_bytes(msg) +        conn.close() + +    def test_connection(self): +        conn, child_conn = self.Pipe() + +        p = self.Process(target=self._echo, args=(child_conn,)) +        p.set_daemon(True) +        p.start() + +        seq = [1, 2.25, None] +        msg = latin('hello world') +        longmsg = msg * 10 +        arr = array.array('i', range(4)) + +        if self.TYPE == 'processes': +            self.assertEqual(type(conn.fileno()), int) + +        self.assertEqual(conn.send(seq), None) +        self.assertEqual(conn.recv(), seq) + +        self.assertEqual(conn.send_bytes(msg), None) +        self.assertEqual(conn.recv_bytes(), msg) + +        if self.TYPE == 'processes': +            buffer = array.array('i', [0]*10) +            expected = list(arr) + [0] * (10 - len(arr)) +            self.assertEqual(conn.send_bytes(arr), None) +            self.assertEqual(conn.recv_bytes_into(buffer), +                             len(arr) * buffer.itemsize) +            self.assertEqual(list(buffer), expected) + +            buffer = array.array('i', [0]*10) +            expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr)) +            self.assertEqual(conn.send_bytes(arr), None) +            self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize), +                             len(arr) * buffer.itemsize) +            self.assertEqual(list(buffer), expected) + +            buffer = bytearray(latin(' ' * 40)) +            self.assertEqual(conn.send_bytes(longmsg), None) +            try: +                res = conn.recv_bytes_into(buffer) +            except multiprocessing.BufferTooShort, e: +                self.assertEqual(e.args, (longmsg,)) +            else: +                self.fail('expected BufferTooShort, got %s' % res) + +        poll = TimingWrapper(conn.poll) + +        self.assertEqual(poll(), False) +        self.assertTimingAlmostEqual(poll.elapsed, 0) + +        self.assertEqual(poll(TIMEOUT1), False) +        self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1) + +        conn.send(None) + +        self.assertEqual(poll(TIMEOUT1), True) +        self.assertTimingAlmostEqual(poll.elapsed, 0) + +        self.assertEqual(conn.recv(), None) + +        really_big_msg = latin('X') * (1024 * 1024 * 16)   # 16Mb +        conn.send_bytes(really_big_msg) +        self.assertEqual(conn.recv_bytes(), really_big_msg) + +        conn.send_bytes(SENTINEL)                          # tell child to quit +        child_conn.close() + +        if self.TYPE == 'processes': +            self.assertEqual(conn.readable, True) +            self.assertEqual(conn.writable, True) +            self.assertRaises(EOFError, conn.recv) +            self.assertRaises(EOFError, conn.recv_bytes) + +        p.join() + +    def test_duplex_false(self): +        reader, writer = self.Pipe(duplex=False) +        self.assertEqual(writer.send(1), None) +        self.assertEqual(reader.recv(), 1) +        if self.TYPE == 'processes': +            self.assertEqual(reader.readable, True) +            self.assertEqual(reader.writable, False) +            self.assertEqual(writer.readable, False) +            self.assertEqual(writer.writable, True) +            self.assertRaises(IOError, reader.send, 2) +            self.assertRaises(IOError, writer.recv) +            self.assertRaises(IOError, writer.poll) + +    def test_spawn_close(self): +        # We test that a pipe connection can be closed by parent +        # process immediately after child is spawned.  On Windows this +        # would have sometimes failed on old versions because +        # child_conn would be closed before the child got a chance to +        # duplicate it. +        conn, child_conn = self.Pipe() + +        p = self.Process(target=self._echo, args=(child_conn,)) +        p.start() +        child_conn.close()    # this might complete before child initializes + +        msg = latin('hello') +        conn.send_bytes(msg) +        self.assertEqual(conn.recv_bytes(), msg) + +        conn.send_bytes(SENTINEL) +        conn.close() +        p.join() + +    def test_sendbytes(self): +        if self.TYPE != 'processes': +            return + +        msg = latin('abcdefghijklmnopqrstuvwxyz') +        a, b = self.Pipe() + +        a.send_bytes(msg) +        self.assertEqual(b.recv_bytes(), msg) + +        a.send_bytes(msg, 5) +        self.assertEqual(b.recv_bytes(), msg[5:]) + +        a.send_bytes(msg, 7, 8) +        self.assertEqual(b.recv_bytes(), msg[7:7+8]) + +        a.send_bytes(msg, 26) +        self.assertEqual(b.recv_bytes(), latin('')) + +        a.send_bytes(msg, 26, 0) +        self.assertEqual(b.recv_bytes(), latin('')) + +        self.assertRaises(ValueError, a.send_bytes, msg, 27) + +        self.assertRaises(ValueError, a.send_bytes, msg, 22, 5) + +        self.assertRaises(ValueError, a.send_bytes, msg, 26, 1) + +        self.assertRaises(ValueError, a.send_bytes, msg, -1) + +        self.assertRaises(ValueError, a.send_bytes, msg, 4, -1) + + +class _TestListenerClient(BaseTestCase): + +    ALLOWED_TYPES = ('processes', 'threads') + +    def _test(self, address): +        conn = self.connection.Client(address) +        conn.send('hello') +        conn.close() + +    def test_listener_client(self): +        for family in self.connection.families: +            l = self.connection.Listener(family=family) +            p = self.Process(target=self._test, args=(l.address,)) +            p.set_daemon(True) +            p.start() +            conn = l.accept() +            self.assertEqual(conn.recv(), 'hello') +            p.join() +            l.close() + +# +# Test of sending connection and socket objects between processes +# + +class _TestPicklingConnections(BaseTestCase): + +    ALLOWED_TYPES = ('processes',) + +    def _listener(self, conn, families): +        for fam in families: +            l = self.connection.Listener(family=fam) +            conn.send(l.address) +            new_conn = l.accept() +            conn.send(new_conn) + +        if self.TYPE == 'processes': +            l = socket.socket() +            l.bind(('localhost', 0)) +            conn.send(l.getsockname()) +            l.listen(1) +            new_conn, addr = l.accept() +            conn.send(new_conn) + +        conn.recv() + +    def _remote(self, conn): +        for (address, msg) in iter(conn.recv, None): +            client = self.connection.Client(address) +            client.send(msg.upper()) +            client.close() + +        if self.TYPE == 'processes': +            address, msg = conn.recv() +            client = socket.socket() +            client.connect(address) +            client.sendall(msg.upper()) +            client.close() + +        conn.close() + +    def test_pickling(self): +        try: +            multiprocessing.allow_connection_pickling() +        except ImportError: +            return + +        families = self.connection.families + +        lconn, lconn0 = self.Pipe() +        lp = self.Process(target=self._listener, args=(lconn0, families)) +        lp.start() +        lconn0.close() + +        rconn, rconn0 = self.Pipe() +        rp = self.Process(target=self._remote, args=(rconn0,)) +        rp.start() +        rconn0.close() + +        for fam in families: +            msg = ('This connection uses family %s' % fam).encode('ascii') +            address = lconn.recv() +            rconn.send((address, msg)) +            new_conn = lconn.recv() +            self.assertEqual(new_conn.recv(), msg.upper()) + +        rconn.send(None) + +        if self.TYPE == 'processes': +            msg = latin('This connection uses a normal socket') +            address = lconn.recv() +            rconn.send((address, msg)) +            if hasattr(socket, 'fromfd'): +                new_conn = lconn.recv() +                self.assertEqual(new_conn.recv(100), msg.upper()) +            else: +                # XXX On Windows with Py2.6 need to backport fromfd() +                discard = lconn.recv_bytes() + +        lconn.send(None) + +        rconn.close() +        lconn.close() + +        lp.join() +        rp.join() + +# +# +# + +class _TestHeap(BaseTestCase): + +    ALLOWED_TYPES = ('processes',) + +    def test_heap(self): +        iterations = 5000 +        maxblocks = 50 +        blocks = [] + +        # create and destroy lots of blocks of different sizes +        for i in xrange(iterations): +            size = int(random.lognormvariate(0, 1) * 1000) +            b = multiprocessing.heap.BufferWrapper(size) +            blocks.append(b) +            if len(blocks) > maxblocks: +                i = random.randrange(maxblocks) +                del blocks[i] + +        # get the heap object +        heap = multiprocessing.heap.BufferWrapper._heap + +        # verify the state of the heap +        all = [] +        occupied = 0 +        for L in heap._len_to_seq.values(): +            for arena, start, stop in L: +                all.append((heap._arenas.index(arena), start, stop, +                            stop-start, 'free')) +        for arena, start, stop in heap._allocated_blocks: +            all.append((heap._arenas.index(arena), start, stop, +                        stop-start, 'occupied')) +            occupied += (stop-start) + +        all.sort() + +        for i in range(len(all)-1): +            (arena, start, stop) = all[i][:3] +            (narena, nstart, nstop) = all[i+1][:3] +            self.assertTrue((arena != narena and nstart == 0) or +                            (stop == nstart)) + +# +# +# + +try: +    from ctypes import Structure, Value, copy, c_int, c_double +except ImportError: +    Structure = object +    c_int = c_double = None + +class _Foo(Structure): +    _fields_ = [ +        ('x', c_int), +        ('y', c_double) +        ] + +class _TestSharedCTypes(BaseTestCase): + +    ALLOWED_TYPES = ('processes',) + +    def _double(self, x, y, foo, arr, string): +        x.value *= 2 +        y.value *= 2 +        foo.x *= 2 +        foo.y *= 2 +        string.value *= 2 +        for i in range(len(arr)): +            arr[i] *= 2 + +    def test_sharedctypes(self, lock=False): +        if c_int is None: +            return + +        x = Value('i', 7, lock=lock) +        y = Value(ctypes.c_double, 1.0/3.0, lock=lock) +        foo = Value(_Foo, 3, 2, lock=lock) +        arr = Array('d', range(10), lock=lock) +        string = Array('c', 20, lock=lock) +        string.value = 'hello' + +        p = self.Process(target=self._double, args=(x, y, foo, arr, string)) +        p.start() +        p.join() + +        self.assertEqual(x.value, 14) +        self.assertAlmostEqual(y.value, 2.0/3.0) +        self.assertEqual(foo.x, 6) +        self.assertAlmostEqual(foo.y, 4.0) +        for i in range(10): +            self.assertAlmostEqual(arr[i], i*2) +        self.assertEqual(string.value, latin('hellohello')) + +    def test_synchronize(self): +        self.test_sharedctypes(lock=True) + +    def test_copy(self): +        if c_int is None: +            return + +        foo = _Foo(2, 5.0) +        bar = copy(foo) +        foo.x = 0 +        foo.y = 0 +        self.assertEqual(bar.x, 2) +        self.assertAlmostEqual(bar.y, 5.0) + +# +# +# + +class _TestFinalize(BaseTestCase): + +    ALLOWED_TYPES = ('processes',) + +    def _test_finalize(self, conn): +        class Foo(object): +            pass + +        a = Foo() +        util.Finalize(a, conn.send, args=('a',)) +        del a           # triggers callback for a + +        b = Foo() +        close_b = util.Finalize(b, conn.send, args=('b',)) +        close_b()       # triggers callback for b +        close_b()       # does nothing because callback has already been called +        del b           # does nothing because callback has already been called + +        c = Foo() +        util.Finalize(c, conn.send, args=('c',)) + +        d10 = Foo() +        util.Finalize(d10, conn.send, args=('d10',), exitpriority=1) + +        d01 = Foo() +        util.Finalize(d01, conn.send, args=('d01',), exitpriority=0) +        d02 = Foo() +        util.Finalize(d02, conn.send, args=('d02',), exitpriority=0) +        d03 = Foo() +        util.Finalize(d03, conn.send, args=('d03',), exitpriority=0) + +        util.Finalize(None, conn.send, args=('e',), exitpriority=-10) + +        util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100) + +        # call mutliprocessing's cleanup function then exit process without +        # garbage collecting locals +        util._exit_function() +        conn.close() +        os._exit(0) + +    def test_finalize(self): +        conn, child_conn = self.Pipe() + +        p = self.Process(target=self._test_finalize, args=(child_conn,)) +        p.start() +        p.join() + +        result = [obj for obj in iter(conn.recv, 'STOP')] +        self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e']) + +# +# Test that from ... import * works for each module +# + +class _TestImportStar(BaseTestCase): + +    ALLOWED_TYPES = ('processes',) + +    def test_import(self): +        modules = ( +            'multiprocessing', 'multiprocessing.connection', +            'multiprocessing.heap', 'multiprocessing.managers', +            'multiprocessing.pool', 'multiprocessing.process', +            'multiprocessing.reduction', 'multiprocessing.sharedctypes', +            'multiprocessing.synchronize', 'multiprocessing.util' +            ) + +        for name in modules: +            __import__(name) +            mod = sys.modules[name] + +            for attr in getattr(mod, '__all__', ()): +                self.assertTrue( +                    hasattr(mod, attr), +                    '%r does not have attribute %r' % (mod, attr) +                    ) + +# +# Quick test that logging works -- does not test logging output +# + +class _TestLogging(BaseTestCase): + +    ALLOWED_TYPES = ('processes',) + +    def test_enable_logging(self): +        logger = multiprocessing.get_logger() +        logger.setLevel(util.SUBWARNING) +        self.assertTrue(logger is not None) +        logger.debug('this will not be printed') +        logger.info('nor will this') +        logger.setLevel(LOG_LEVEL) + +    def _test_level(self, conn): +        logger = multiprocessing.get_logger() +        conn.send(logger.getEffectiveLevel()) + +    def test_level(self): +        LEVEL1 = 32 +        LEVEL2 = 37 + +        logger = multiprocessing.get_logger() +        root_logger = logging.getLogger() +        root_level = root_logger.level + +        reader, writer = multiprocessing.Pipe(duplex=False) + +        logger.setLevel(LEVEL1) +        self.Process(target=self._test_level, args=(writer,)).start() +        self.assertEqual(LEVEL1, reader.recv()) + +        logger.setLevel(logging.NOTSET) +        root_logger.setLevel(LEVEL2) +        self.Process(target=self._test_level, args=(writer,)).start() +        self.assertEqual(LEVEL2, reader.recv()) + +        root_logger.setLevel(root_level) +        logger.setLevel(level=LOG_LEVEL) + +# +# Functions used to create test cases from the base ones in this module +# + +def get_attributes(Source, names): +    d = {} +    for name in names: +        obj = getattr(Source, name) +        if type(obj) == type(get_attributes): +            obj = staticmethod(obj) +        d[name] = obj +    return d + +def create_test_cases(Mixin, type): +    result = {} +    glob = globals() +    Type = type[0].upper() + type[1:] + +    for name in glob.keys(): +        if name.startswith('_Test'): +            base = glob[name] +            if type in base.ALLOWED_TYPES: +                newname = 'With' + Type + name[1:] +                class Temp(base, unittest.TestCase, Mixin): +                    pass +                result[newname] = Temp +                Temp.__name__ = newname +                Temp.__module__ = Mixin.__module__ +    return result + +# +# Create test cases +# + +class ProcessesMixin(object): +    TYPE = 'processes' +    Process = multiprocessing.Process +    locals().update(get_attributes(multiprocessing, ( +        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', +        'Condition', 'Event', 'Value', 'Array', 'RawValue', +        'RawArray', 'current_process', 'active_children', 'Pipe', +        'connection', 'JoinableQueue' +        ))) + +testcases_processes = create_test_cases(ProcessesMixin, type='processes') +globals().update(testcases_processes) + + +class ManagerMixin(object): +    TYPE = 'manager' +    Process = multiprocessing.Process +    manager = object.__new__(multiprocessing.managers.SyncManager) +    locals().update(get_attributes(manager, ( +        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', +       'Condition', 'Event', 'Value', 'Array', 'list', 'dict', +        'Namespace', 'JoinableQueue' +        ))) + +testcases_manager = create_test_cases(ManagerMixin, type='manager') +globals().update(testcases_manager) + + +class ThreadsMixin(object): +    TYPE = 'threads' +    Process = multiprocessing.dummy.Process +    locals().update(get_attributes(multiprocessing.dummy, ( +        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', +        'Condition', 'Event', 'Value', 'Array', 'current_process', +        'active_children', 'Pipe', 'connection', 'dict', 'list', +        'Namespace', 'JoinableQueue' +        ))) + +testcases_threads = create_test_cases(ThreadsMixin, type='threads') +globals().update(testcases_threads) + +# +# +# + +def test_main(run=None): +    if run is None: +        from test.test_support import run_unittest as run + +    util.get_temp_dir()     # creates temp directory for use by all processes + +    multiprocessing.get_logger().setLevel(LOG_LEVEL) + +    ProcessesMixin.pool = multiprocessing.Pool(4) +    ThreadsMixin.pool = multiprocessing.dummy.Pool(4) +    ManagerMixin.manager.__init__() +    ManagerMixin.manager.start() +    ManagerMixin.pool = ManagerMixin.manager.Pool(4) + +    testcases = ( +        sorted(testcases_processes.values(), key=lambda tc:tc.__name__) + +        sorted(testcases_threads.values(), key=lambda tc:tc.__name__) + +        sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +        ) + +    loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase +    suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases) +    run(suite) + +    ThreadsMixin.pool.terminate() +    ProcessesMixin.pool.terminate() +    ManagerMixin.pool.terminate() +    ManagerMixin.manager.shutdown() + +    del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool + +def main(): +    test_main(unittest.TextTestRunner(verbosity=2).run) + +if __name__ == '__main__': +    main()  | 
