diff options
author | Guido van Rossum <guido@dropbox.com> | 2013-10-30 21:44:05 (GMT) |
---|---|---|
committer | Guido van Rossum <guido@dropbox.com> | 2013-10-30 21:44:05 (GMT) |
commit | 90fb914b4b90f74a9ab4c12d2a3aa2fa2090f3c7 (patch) | |
tree | c4234a9c7b98bbf85fc54b7858989f7400a50606 /Lib/test/test_asyncio/test_windows_events.py | |
parent | ec7922cb3eef8c770e906478fad75951a0d69116 (diff) | |
download | cpython-90fb914b4b90f74a9ab4c12d2a3aa2fa2090f3c7.zip cpython-90fb914b4b90f74a9ab4c12d2a3aa2fa2090f3c7.tar.gz cpython-90fb914b4b90f74a9ab4c12d2a3aa2fa2090f3c7.tar.bz2 |
asyncio: Make the IOCP proactor support "waitable" handles (Richard Oudkerk).
Diffstat (limited to 'Lib/test/test_asyncio/test_windows_events.py')
-rw-r--r-- | Lib/test/test_asyncio/test_windows_events.py | 40 |
1 files changed, 40 insertions, 0 deletions
diff --git a/Lib/test/test_asyncio/test_windows_events.py b/Lib/test/test_asyncio/test_windows_events.py index 969360c..553ea34 100644 --- a/Lib/test/test_asyncio/test_windows_events.py +++ b/Lib/test/test_asyncio/test_windows_events.py @@ -5,13 +5,17 @@ import unittest if sys.platform != 'win32': raise unittest.SkipTest('Windows only') +import _winapi + import asyncio from asyncio import windows_events +from asyncio import futures from asyncio import protocols from asyncio import streams from asyncio import transports from asyncio import test_utils +from asyncio import _overlapped class UpperProto(protocols.Protocol): @@ -94,6 +98,42 @@ class ProactorTests(unittest.TestCase): return 'done' + def test_wait_for_handle(self): + event = _overlapped.CreateEvent(None, True, False, None) + self.addCleanup(_winapi.CloseHandle, event) + + # Wait for unset event with 0.2s timeout; + # result should be False at timeout + f = self.loop._proactor.wait_for_handle(event, 0.2) + start = self.loop.time() + self.loop.run_until_complete(f) + elapsed = self.loop.time() - start + self.assertFalse(f.result()) + self.assertTrue(0.18 < elapsed < 0.22, elapsed) + + _overlapped.SetEvent(event) + + # Wait for for set event; + # result should be True immediately + f = self.loop._proactor.wait_for_handle(event, 10) + start = self.loop.time() + self.loop.run_until_complete(f) + elapsed = self.loop.time() - start + self.assertTrue(f.result()) + self.assertTrue(0 <= elapsed < 0.02, elapsed) + + _overlapped.ResetEvent(event) + + # Wait for unset event with a cancelled future; + # CancelledError should be raised immediately + f = self.loop._proactor.wait_for_handle(event, 10) + f.cancel() + start = self.loop.time() + with self.assertRaises(futures.CancelledError): + self.loop.run_until_complete(f) + elapsed = self.loop.time() - start + self.assertTrue(0 <= elapsed < 0.02, elapsed) + if __name__ == '__main__': unittest.main() |