summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_asyncio/test_events.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_asyncio/test_events.py')
-rw-r--r--Lib/test/test_asyncio/test_events.py75
1 files changed, 75 insertions, 0 deletions
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
index ddb0d44..e742eb7 100644
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -21,6 +21,8 @@ import unittest
from unittest import mock
import weakref
+if sys.platform != 'win32':
+ import tty
import asyncio
from asyncio import proactor_events
@@ -1626,6 +1628,79 @@ class EventLoopTestsMixin:
self.loop.run_until_complete(proto.done)
self.assertEqual('CLOSED', proto.state)
+ @unittest.skipUnless(sys.platform != 'win32',
+ "Don't support pipes for Windows")
+ # select, poll and kqueue don't support character devices (PTY) on Mac OS X
+ # older than 10.6 (Snow Leopard)
+ @support.requires_mac_ver(10, 6)
+ def test_bidirectional_pty(self):
+ master, read_slave = os.openpty()
+ write_slave = os.dup(read_slave)
+ tty.setraw(read_slave)
+
+ slave_read_obj = io.open(read_slave, 'rb', 0)
+ read_proto = MyReadPipeProto(loop=self.loop)
+ read_connect = self.loop.connect_read_pipe(lambda: read_proto,
+ slave_read_obj)
+ read_transport, p = self.loop.run_until_complete(read_connect)
+ self.assertIs(p, read_proto)
+ self.assertIs(read_transport, read_proto.transport)
+ self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
+ self.assertEqual(0, read_proto.nbytes)
+
+
+ slave_write_obj = io.open(write_slave, 'wb', 0)
+ write_proto = MyWritePipeProto(loop=self.loop)
+ write_connect = self.loop.connect_write_pipe(lambda: write_proto,
+ slave_write_obj)
+ write_transport, p = self.loop.run_until_complete(write_connect)
+ self.assertIs(p, write_proto)
+ self.assertIs(write_transport, write_proto.transport)
+ self.assertEqual('CONNECTED', write_proto.state)
+
+ data = bytearray()
+ def reader(data):
+ chunk = os.read(master, 1024)
+ data += chunk
+ return len(data)
+
+ write_transport.write(b'1')
+ test_utils.run_until(self.loop, lambda: reader(data) >= 1, timeout=10)
+ self.assertEqual(b'1', data)
+ self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
+ self.assertEqual('CONNECTED', write_proto.state)
+
+ os.write(master, b'a')
+ test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 1,
+ timeout=10)
+ self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
+ self.assertEqual(1, read_proto.nbytes)
+ self.assertEqual('CONNECTED', write_proto.state)
+
+ write_transport.write(b'2345')
+ test_utils.run_until(self.loop, lambda: reader(data) >= 5, timeout=10)
+ self.assertEqual(b'12345', data)
+ self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
+ self.assertEqual('CONNECTED', write_proto.state)
+
+ os.write(master, b'bcde')
+ test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 5,
+ timeout=10)
+ self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
+ self.assertEqual(5, read_proto.nbytes)
+ self.assertEqual('CONNECTED', write_proto.state)
+
+ os.close(master)
+
+ read_transport.close()
+ self.loop.run_until_complete(read_proto.done)
+ self.assertEqual(
+ ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], read_proto.state)
+
+ write_transport.close()
+ self.loop.run_until_complete(write_proto.done)
+ self.assertEqual('CLOSED', write_proto.state)
+
def test_prompt_cancellation(self):
r, w = test_utils.socketpair()
r.setblocking(False)