summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_asyncio/test_events.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_asyncio/test_events.py')
-rw-r--r--Lib/test/test_asyncio/test_events.py60
1 files changed, 40 insertions, 20 deletions
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
index fe791fa..c46c9dd 100644
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -88,23 +88,27 @@ class MyBaseProto(asyncio.Protocol):
self.connected = loop.create_future()
self.done = loop.create_future()
+ def _assert_state(self, *expected):
+ if self.state not in expected:
+ raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
+
def connection_made(self, transport):
self.transport = transport
- assert self.state == 'INITIAL', self.state
+ self._assert_state('INITIAL')
self.state = 'CONNECTED'
if self.connected:
self.connected.set_result(None)
def data_received(self, data):
- assert self.state == 'CONNECTED', self.state
+ self._assert_state('CONNECTED')
self.nbytes += len(data)
def eof_received(self):
- assert self.state == 'CONNECTED', self.state
+ self._assert_state('CONNECTED')
self.state = 'EOF'
def connection_lost(self, exc):
- assert self.state in ('CONNECTED', 'EOF'), self.state
+ self._assert_state('CONNECTED', 'EOF')
self.state = 'CLOSED'
if self.done:
self.done.set_result(None)
@@ -125,20 +129,24 @@ class MyDatagramProto(asyncio.DatagramProtocol):
if loop is not None:
self.done = loop.create_future()
+ def _assert_state(self, expected):
+ if self.state != expected:
+ raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
+
def connection_made(self, transport):
self.transport = transport
- assert self.state == 'INITIAL', self.state
+ self._assert_state('INITIAL')
self.state = 'INITIALIZED'
def datagram_received(self, data, addr):
- assert self.state == 'INITIALIZED', self.state
+ self._assert_state('INITIALIZED')
self.nbytes += len(data)
def error_received(self, exc):
- assert self.state == 'INITIALIZED', self.state
+ self._assert_state('INITIALIZED')
def connection_lost(self, exc):
- assert self.state == 'INITIALIZED', self.state
+ self._assert_state('INITIALIZED')
self.state = 'CLOSED'
if self.done:
self.done.set_result(None)
@@ -154,23 +162,27 @@ class MyReadPipeProto(asyncio.Protocol):
if loop is not None:
self.done = loop.create_future()
+ def _assert_state(self, expected):
+ if self.state != expected:
+ raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
+
def connection_made(self, transport):
self.transport = transport
- assert self.state == ['INITIAL'], self.state
+ self._assert_state(['INITIAL'])
self.state.append('CONNECTED')
def data_received(self, data):
- assert self.state == ['INITIAL', 'CONNECTED'], self.state
+ self._assert_state(['INITIAL', 'CONNECTED'])
self.nbytes += len(data)
def eof_received(self):
- assert self.state == ['INITIAL', 'CONNECTED'], self.state
+ self._assert_state(['INITIAL', 'CONNECTED'])
self.state.append('EOF')
def connection_lost(self, exc):
if 'EOF' not in self.state:
self.state.append('EOF') # It is okay if EOF is missed.
- assert self.state == ['INITIAL', 'CONNECTED', 'EOF'], self.state
+ self._assert_state(['INITIAL', 'CONNECTED', 'EOF'])
self.state.append('CLOSED')
if self.done:
self.done.set_result(None)
@@ -185,13 +197,17 @@ class MyWritePipeProto(asyncio.BaseProtocol):
if loop is not None:
self.done = loop.create_future()
+ def _assert_state(self, expected):
+ if self.state != expected:
+ raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
+
def connection_made(self, transport):
self.transport = transport
- assert self.state == 'INITIAL', self.state
+ self._assert_state('INITIAL')
self.state = 'CONNECTED'
def connection_lost(self, exc):
- assert self.state == 'CONNECTED', self.state
+ self._assert_state('CONNECTED')
self.state = 'CLOSED'
if self.done:
self.done.set_result(None)
@@ -210,31 +226,35 @@ class MySubprocessProtocol(asyncio.SubprocessProtocol):
self.got_data = {1: asyncio.Event(),
2: asyncio.Event()}
+ def _assert_state(self, expected):
+ if self.state != expected:
+ raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
+
def connection_made(self, transport):
self.transport = transport
- assert self.state == 'INITIAL', self.state
+ self._assert_state('INITIAL')
self.state = 'CONNECTED'
self.connected.set_result(None)
def connection_lost(self, exc):
- assert self.state == 'CONNECTED', self.state
+ self._assert_state('CONNECTED')
self.state = 'CLOSED'
self.completed.set_result(None)
def pipe_data_received(self, fd, data):
- assert self.state == 'CONNECTED', self.state
+ self._assert_state('CONNECTED')
self.data[fd] += data
self.got_data[fd].set()
def pipe_connection_lost(self, fd, exc):
- assert self.state == 'CONNECTED', self.state
+ self._assert_state('CONNECTED')
if exc:
self.disconnects[fd].set_exception(exc)
else:
self.disconnects[fd].set_result(exc)
def process_exited(self):
- assert self.state == 'CONNECTED', self.state
+ self._assert_state('CONNECTED')
self.returncode = self.transport.get_returncode()
@@ -1284,7 +1304,7 @@ class EventLoopTestsMixin:
else:
break
else:
- assert False, 'Can not create socket.'
+ self.fail('Can not create socket.')
f = self.loop.create_datagram_endpoint(
lambda: MyDatagramProto(loop=self.loop), sock=sock)