summaryrefslogtreecommitdiffstats
path: root/Lib/test/_test_multiprocessing.py
diff options
context:
space:
mode:
authorThomas Moreau <thomas.moreau.2010@gmail.com>2019-05-20 19:37:05 (GMT)
committerAntoine Pitrou <antoine@python.org>2019-05-20 19:37:05 (GMT)
commitc09a9f56c08d80567454cae6f78f738a89e1ae94 (patch)
tree7f00233cfa994ba74ca952d371ae85651690602a /Lib/test/_test_multiprocessing.py
parent5ae1c84bcd13b766989fc3f1e1c851e7bd4c1faa (diff)
downloadcpython-c09a9f56c08d80567454cae6f78f738a89e1ae94.zip
cpython-c09a9f56c08d80567454cae6f78f738a89e1ae94.tar.gz
cpython-c09a9f56c08d80567454cae6f78f738a89e1ae94.tar.bz2
bpo-36888: Add multiprocessing.parent_process() (GH-13247)
Diffstat (limited to 'Lib/test/_test_multiprocessing.py')
-rw-r--r--Lib/test/_test_multiprocessing.py59
1 files changed, 59 insertions, 0 deletions
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 78ec53b..071b54a 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -269,6 +269,64 @@ class _TestProcess(BaseTestCase):
q.put(bytes(current.authkey))
q.put(current.pid)
+ def test_parent_process_attributes(self):
+ if self.TYPE == "threads":
+ self.skipTest('test not appropriate for {}'.format(self.TYPE))
+
+ self.assertIsNone(self.parent_process())
+
+ rconn, wconn = self.Pipe(duplex=False)
+ p = self.Process(target=self._test_send_parent_process, args=(wconn,))
+ p.start()
+ p.join()
+ parent_pid, parent_name = rconn.recv()
+ self.assertEqual(parent_pid, self.current_process().pid)
+ self.assertEqual(parent_pid, os.getpid())
+ self.assertEqual(parent_name, self.current_process().name)
+
+ @classmethod
+ def _test_send_parent_process(cls, wconn):
+ from multiprocessing.process import parent_process
+ wconn.send([parent_process().pid, parent_process().name])
+
+ def test_parent_process(self):
+ if self.TYPE == "threads":
+ self.skipTest('test not appropriate for {}'.format(self.TYPE))
+
+ # Launch a child process. Make it launch a grandchild process. Kill the
+ # child process and make sure that the grandchild notices the death of
+ # its parent (a.k.a the child process).
+ rconn, wconn = self.Pipe(duplex=False)
+ p = self.Process(
+ target=self._test_create_grandchild_process, args=(wconn, ))
+ p.start()
+
+ if not rconn.poll(timeout=5):
+ raise AssertionError("Could not communicate with child process")
+ parent_process_status = rconn.recv()
+ self.assertEqual(parent_process_status, "alive")
+
+ p.terminate()
+ p.join()
+
+ if not rconn.poll(timeout=5):
+ raise AssertionError("Could not communicate with child process")
+ parent_process_status = rconn.recv()
+ self.assertEqual(parent_process_status, "not alive")
+
+ @classmethod
+ def _test_create_grandchild_process(cls, wconn):
+ p = cls.Process(target=cls._test_report_parent_status, args=(wconn, ))
+ p.start()
+ time.sleep(100)
+
+ @classmethod
+ def _test_report_parent_status(cls, wconn):
+ from multiprocessing.process import parent_process
+ wconn.send("alive" if parent_process().is_alive() else "not alive")
+ parent_process().join(timeout=5)
+ wconn.send("alive" if parent_process().is_alive() else "not alive")
+
def test_process(self):
q = self.Queue(1)
e = self.Event()
@@ -5398,6 +5456,7 @@ class ProcessesMixin(BaseMixin):
Process = multiprocessing.Process
connection = multiprocessing.connection
current_process = staticmethod(multiprocessing.current_process)
+ parent_process = staticmethod(multiprocessing.parent_process)
active_children = staticmethod(multiprocessing.active_children)
Pool = staticmethod(multiprocessing.Pool)
Pipe = staticmethod(multiprocessing.Pipe)