summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_winapi.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_winapi.py')
-rw-r--r--Lib/test/test_winapi.py34
1 files changed, 33 insertions, 1 deletions
diff --git a/Lib/test/test_winapi.py b/Lib/test/test_winapi.py
index 2ac6f36..73b8228 100644
--- a/Lib/test/test_winapi.py
+++ b/Lib/test/test_winapi.py
@@ -7,7 +7,7 @@ import re
import threading
import time
import unittest
-from test.support import import_helper
+from test.support import import_helper, os_helper
_winapi = import_helper.import_module('_winapi', required_on=['win'])
@@ -127,3 +127,35 @@ class WinAPITests(unittest.TestCase):
# Should contain "PROGRA~" but we can't predict the number
self.assertIsNotNone(re.match(r".\:\\PROGRA~\d", actual.upper()), actual)
+
+ def test_namedpipe(self):
+ pipe_name = rf"\\.\pipe\LOCAL\{os_helper.TESTFN}"
+
+ # Pipe does not exist, so this raises
+ with self.assertRaises(FileNotFoundError):
+ _winapi.WaitNamedPipe(pipe_name, 0)
+
+ pipe = _winapi.CreateNamedPipe(
+ pipe_name,
+ _winapi.PIPE_ACCESS_DUPLEX,
+ 8, # 8=PIPE_REJECT_REMOTE_CLIENTS
+ 2, # two instances available
+ 32, 32, 0, 0)
+ self.addCleanup(_winapi.CloseHandle, pipe)
+
+ # Pipe instance is available, so this passes
+ _winapi.WaitNamedPipe(pipe_name, 0)
+
+ with open(pipe_name, 'w+b') as pipe2:
+ # No instances available, so this times out
+ # (WinError 121 does not get mapped to TimeoutError)
+ with self.assertRaises(OSError):
+ _winapi.WaitNamedPipe(pipe_name, 0)
+
+ _winapi.WriteFile(pipe, b'testdata')
+ self.assertEqual(b'testdata', pipe2.read(8))
+
+ self.assertEqual((b'', 0), _winapi.PeekNamedPipe(pipe, 8)[:2])
+ pipe2.write(b'testdata')
+ pipe2.flush()
+ self.assertEqual((b'testdata', 8), _winapi.PeekNamedPipe(pipe, 8)[:2])