diff options
Diffstat (limited to 'Lib/test')
-rw-r--r-- | Lib/test/test_asyncio/test_events.py | 129 |
1 files changed, 65 insertions, 64 deletions
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index fd7022f..bafa875 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -56,6 +56,7 @@ SIGNING_CA = data_file('pycacert.pem') class MyBaseProto(asyncio.Protocol): + connected = None done = None def __init__(self, loop=None): @@ -63,12 +64,15 @@ class MyBaseProto(asyncio.Protocol): self.state = 'INITIAL' self.nbytes = 0 if loop is not None: + self.connected = asyncio.Future(loop=loop) self.done = asyncio.Future(loop=loop) def connection_made(self, transport): self.transport = transport assert self.state == 'INITIAL', self.state self.state = 'CONNECTED' + if self.connected: + self.connected.set_result(None) def data_received(self, data): assert self.state == 'CONNECTED', self.state @@ -330,7 +334,8 @@ class EventLoopTestsMixin: def test_reader_callback(self): r, w = test_utils.socketpair() - bytes_read = [] + r.setblocking(False) + bytes_read = bytearray() def reader(): try: @@ -340,37 +345,40 @@ class EventLoopTestsMixin: # at least on Linux -- see man select. return if data: - bytes_read.append(data) + bytes_read.extend(data) else: self.assertTrue(self.loop.remove_reader(r.fileno())) r.close() self.loop.add_reader(r.fileno(), reader) self.loop.call_soon(w.send, b'abc') - test_utils.run_briefly(self.loop) + test_utils.run_until(self.loop, lambda: len(bytes_read) >= 3) self.loop.call_soon(w.send, b'def') - test_utils.run_briefly(self.loop) + test_utils.run_until(self.loop, lambda: len(bytes_read) >= 6) self.loop.call_soon(w.close) self.loop.call_soon(self.loop.stop) self.loop.run_forever() - self.assertEqual(b''.join(bytes_read), b'abcdef') + self.assertEqual(bytes_read, b'abcdef') def test_writer_callback(self): r, w = test_utils.socketpair() w.setblocking(False) - self.loop.add_writer(w.fileno(), w.send, b'x'*(256*1024)) - test_utils.run_briefly(self.loop) - def remove_writer(): - self.assertTrue(self.loop.remove_writer(w.fileno())) + def writer(data): + w.send(data) + self.loop.stop() - self.loop.call_soon(remove_writer) - self.loop.call_soon(self.loop.stop) + data = b'x' * 1024 + self.loop.add_writer(w.fileno(), writer, data) self.loop.run_forever() + + self.assertTrue(self.loop.remove_writer(w.fileno())) + self.assertFalse(self.loop.remove_writer(w.fileno())) + w.close() - data = r.recv(256*1024) + read = r.recv(len(data) * 2) r.close() - self.assertGreaterEqual(len(data), 200) + self.assertEqual(read, data) def _basetest_sock_client_ops(self, httpd, sock): sock.setblocking(False) @@ -464,10 +472,10 @@ class EventLoopTestsMixin: self.assertFalse(self.loop.remove_signal_handler(signal.SIGKILL)) # Now set a handler and handle it. self.loop.add_signal_handler(signal.SIGINT, my_handler) - test_utils.run_briefly(self.loop) + os.kill(os.getpid(), signal.SIGINT) - test_utils.run_briefly(self.loop) - self.assertEqual(caught, 1) + test_utils.run_until(self.loop, lambda: caught) + # Removing it should restore the default handler. self.assertTrue(self.loop.remove_signal_handler(signal.SIGINT)) self.assertEqual(signal.getsignal(signal.SIGINT), @@ -623,7 +631,7 @@ class EventLoopTestsMixin: self.assertIn(str(httpd.address), cm.exception.strerror) def test_create_server(self): - proto = MyProto() + proto = MyProto(self.loop) f = self.loop.create_server(lambda: proto, '0.0.0.0', 0) server = self.loop.run_until_complete(f) self.assertEqual(len(server.sockets), 1) @@ -633,14 +641,11 @@ class EventLoopTestsMixin: client = socket.socket() client.connect(('127.0.0.1', port)) client.sendall(b'xxx') - test_utils.run_briefly(self.loop) - test_utils.run_until(self.loop, lambda: proto is not None, 10) - self.assertIsInstance(proto, MyProto) - self.assertEqual('INITIAL', proto.state) - test_utils.run_briefly(self.loop) + + self.loop.run_until_complete(proto.connected) self.assertEqual('CONNECTED', proto.state) - test_utils.run_until(self.loop, lambda: proto.nbytes > 0, - timeout=10) + + test_utils.run_until(self.loop, lambda: proto.nbytes > 0) self.assertEqual(3, proto.nbytes) # extra info is available @@ -650,7 +655,7 @@ class EventLoopTestsMixin: # close connection proto.transport.close() - test_utils.run_briefly(self.loop) # windows iocp + self.loop.run_until_complete(proto.done) self.assertEqual('CLOSED', proto.state) @@ -672,27 +677,22 @@ class EventLoopTestsMixin: @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') def test_create_unix_server(self): - proto = MyProto() + proto = MyProto(loop=self.loop) server, path = self._make_unix_server(lambda: proto) self.assertEqual(len(server.sockets), 1) client = socket.socket(socket.AF_UNIX) client.connect(path) client.sendall(b'xxx') - test_utils.run_briefly(self.loop) - test_utils.run_until(self.loop, lambda: proto is not None, 10) - self.assertIsInstance(proto, MyProto) - self.assertEqual('INITIAL', proto.state) - test_utils.run_briefly(self.loop) + self.loop.run_until_complete(proto.connected) self.assertEqual('CONNECTED', proto.state) - test_utils.run_until(self.loop, lambda: proto.nbytes > 0, - timeout=10) + test_utils.run_until(self.loop, lambda: proto.nbytes > 0) self.assertEqual(3, proto.nbytes) # close connection proto.transport.close() - test_utils.run_briefly(self.loop) # windows iocp + self.loop.run_until_complete(proto.done) self.assertEqual('CLOSED', proto.state) @@ -735,12 +735,10 @@ class EventLoopTestsMixin: client, pr = self.loop.run_until_complete(f_c) client.write(b'xxx') - test_utils.run_briefly(self.loop) - self.assertIsInstance(proto, MyProto) - test_utils.run_briefly(self.loop) + self.loop.run_until_complete(proto.connected) self.assertEqual('CONNECTED', proto.state) - test_utils.run_until(self.loop, lambda: proto.nbytes > 0, - timeout=10) + + test_utils.run_until(self.loop, lambda: proto.nbytes > 0) self.assertEqual(3, proto.nbytes) # extra info is available @@ -774,12 +772,9 @@ class EventLoopTestsMixin: client, pr = self.loop.run_until_complete(f_c) client.write(b'xxx') - test_utils.run_briefly(self.loop) - self.assertIsInstance(proto, MyProto) - test_utils.run_briefly(self.loop) + self.loop.run_until_complete(proto.connected) self.assertEqual('CONNECTED', proto.state) - test_utils.run_until(self.loop, lambda: proto.nbytes > 0, - timeout=10) + test_utils.run_until(self.loop, lambda: proto.nbytes > 0) self.assertEqual(3, proto.nbytes) # close connection @@ -1044,15 +1039,9 @@ class EventLoopTestsMixin: self.assertEqual('INITIALIZED', client.state) transport.sendto(b'xxx') - for _ in range(1000): - if server.nbytes: - break - test_utils.run_briefly(self.loop) + test_utils.run_until(self.loop, lambda: server.nbytes) self.assertEqual(3, server.nbytes) - for _ in range(1000): - if client.nbytes: - break - test_utils.run_briefly(self.loop) + test_utils.run_until(self.loop, lambda: client.nbytes) # received self.assertEqual(8, client.nbytes) @@ -1097,11 +1086,11 @@ class EventLoopTestsMixin: self.loop.run_until_complete(connect()) os.write(wpipe, b'1') - test_utils.run_briefly(self.loop) + test_utils.run_until(self.loop, lambda: proto.nbytes >= 1) self.assertEqual(1, proto.nbytes) os.write(wpipe, b'2345') - test_utils.run_briefly(self.loop) + test_utils.run_until(self.loop, lambda: proto.nbytes >= 5) self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) self.assertEqual(5, proto.nbytes) @@ -1166,14 +1155,19 @@ class EventLoopTestsMixin: self.assertEqual('CONNECTED', proto.state) transport.write(b'1') - test_utils.run_briefly(self.loop) - data = os.read(rpipe, 1024) + + data = bytearray() + def reader(data): + chunk = os.read(rpipe, 1024) + data += chunk + return len(data) + + test_utils.run_until(self.loop, lambda: reader(data) >= 1) self.assertEqual(b'1', data) transport.write(b'2345') - test_utils.run_briefly(self.loop) - data = os.read(rpipe, 1024) - self.assertEqual(b'2345', data) + test_utils.run_until(self.loop, lambda: reader(data) >= 5) + self.assertEqual(b'12345', data) self.assertEqual('CONNECTED', proto.state) os.close(rpipe) @@ -1225,14 +1219,21 @@ class EventLoopTestsMixin: self.assertEqual('CONNECTED', proto.state) transport.write(b'1') - test_utils.run_briefly(self.loop) - data = os.read(master, 1024) + + data = bytearray() + def reader(data): + chunk = os.read(master, 1024) + data += chunk + return len(data) + + test_utils.run_until(self.loop, lambda: reader(data) >= 1, + timeout=10) self.assertEqual(b'1', data) transport.write(b'2345') - test_utils.run_briefly(self.loop) - data = os.read(master, 1024) - self.assertEqual(b'2345', data) + test_utils.run_until(self.loop, lambda: reader(data) >= 5, + timeout=10) + self.assertEqual(b'12345', data) self.assertEqual('CONNECTED', proto.state) os.close(master) |