summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_winapi.py
blob: e64208330ad2f9e91b1efd0f95a7494aeff7736d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# Test the Windows-only _winapi module

import os
import pathlib
import re
import unittest
from test.support import import_helper, os_helper

_winapi = import_helper.import_module('_winapi', required_on=['win'])

MAXIMUM_WAIT_OBJECTS = 64
MAXIMUM_BATCHED_WAIT_OBJECTS = (MAXIMUM_WAIT_OBJECTS - 1) ** 2

class WinAPIBatchedWaitForMultipleObjectsTests(unittest.TestCase):
    def _events_waitall_test(self, n):
        evts = [_winapi.CreateEventW(0, False, False, None) for _ in range(n)]

        with self.assertRaises(TimeoutError):
            _winapi.BatchedWaitForMultipleObjects(evts, True, 100)

        # Ensure no errors raised when all are triggered
        for e in evts:
            _winapi.SetEvent(e)
        try:
            _winapi.BatchedWaitForMultipleObjects(evts, True, 100)
        except TimeoutError:
            self.fail("expected wait to complete immediately")

        # Choose 8 events to set, distributed throughout the list, to make sure
        # we don't always have them in the first chunk
        chosen = [i * (len(evts) // 8) for i in range(8)]

        # Replace events with invalid handles to make sure we fail
        for i in chosen:
            old_evt = evts[i]
            evts[i] = -1
            with self.assertRaises(OSError):
                _winapi.BatchedWaitForMultipleObjects(evts, True, 100)
            evts[i] = old_evt


    def _events_waitany_test(self, n):
        evts = [_winapi.CreateEventW(0, False, False, None) for _ in range(n)]

        with self.assertRaises(TimeoutError):
            _winapi.BatchedWaitForMultipleObjects(evts, False, 100)

        # Choose 8 events to set, distributed throughout the list, to make sure
        # we don't always have them in the first chunk
        chosen = [i * (len(evts) // 8) for i in range(8)]

        # Trigger one by one. They are auto-reset events, so will only trigger once
        for i in chosen:
            with self.subTest(f"trigger event {i} of {len(evts)}"):
                _winapi.SetEvent(evts[i])
                triggered = _winapi.BatchedWaitForMultipleObjects(evts, False, 10000)
                self.assertSetEqual(set(triggered), {i})

        # Trigger all at once. This may require multiple calls
        for i in chosen:
            _winapi.SetEvent(evts[i])
        triggered = set()
        while len(triggered) < len(chosen):
            triggered.update(_winapi.BatchedWaitForMultipleObjects(evts, False, 10000))
        self.assertSetEqual(triggered, set(chosen))

        # Replace events with invalid handles to make sure we fail
        for i in chosen:
            with self.subTest(f"corrupt event {i} of {len(evts)}"):
                old_evt = evts[i]
                evts[i] = -1
                with self.assertRaises(OSError):
                    _winapi.BatchedWaitForMultipleObjects(evts, False, 100)
                evts[i] = old_evt


    def test_few_events_waitall(self):
        self._events_waitall_test(16)

    def test_many_events_waitall(self):
        self._events_waitall_test(256)

    def test_max_events_waitall(self):
        self._events_waitall_test(MAXIMUM_BATCHED_WAIT_OBJECTS)


    def test_few_events_waitany(self):
        self._events_waitany_test(16)

    def test_many_events_waitany(self):
        self._events_waitany_test(256)

    def test_max_events_waitany(self):
        self._events_waitany_test(MAXIMUM_BATCHED_WAIT_OBJECTS)


class WinAPITests(unittest.TestCase):
    def test_getlongpathname(self):
        testfn = pathlib.Path(os.getenv("ProgramFiles")).parents[-1] / "PROGRA~1"
        if not os.path.isdir(testfn):
            raise unittest.SkipTest("require x:\\PROGRA~1 to test")

        # pathlib.Path will be rejected - only str is accepted
        with self.assertRaises(TypeError):
            _winapi.GetLongPathName(testfn)

        actual = _winapi.GetLongPathName(os.fsdecode(testfn))

        # Can't assume that PROGRA~1 expands to any particular variation, so
        # ensure it matches any one of them.
        candidates = set(testfn.parent.glob("Progra*"))
        self.assertIn(pathlib.Path(actual), candidates)

    def test_getshortpathname(self):
        testfn = pathlib.Path(os.getenv("ProgramFiles"))
        if not os.path.isdir(testfn):
            raise unittest.SkipTest("require '%ProgramFiles%' to test")

        # pathlib.Path will be rejected - only str is accepted
        with self.assertRaises(TypeError):
            _winapi.GetShortPathName(testfn)

        actual = _winapi.GetShortPathName(os.fsdecode(testfn))

        # Should contain "PROGRA~" but we can't predict the number
        self.assertIsNotNone(re.match(r".\:\\PROGRA~\d", actual.upper()), actual)

    def test_namedpipe(self):
        pipe_name = rf"\\.\pipe\LOCAL\{os_helper.TESTFN}"

        # Pipe does not exist, so this raises
        with self.assertRaises(FileNotFoundError):
            _winapi.WaitNamedPipe(pipe_name, 0)

        pipe = _winapi.CreateNamedPipe(
            pipe_name,
            _winapi.PIPE_ACCESS_DUPLEX,
            8, # 8=PIPE_REJECT_REMOTE_CLIENTS
            2, # two instances available
            32, 32, 0, 0)
        self.addCleanup(_winapi.CloseHandle, pipe)

        # Pipe instance is available, so this passes
        _winapi.WaitNamedPipe(pipe_name, 0)

        with open(pipe_name, 'w+b') as pipe2:
            # No instances available, so this times out
            # (WinError 121 does not get mapped to TimeoutError)
            with self.assertRaises(OSError):
                _winapi.WaitNamedPipe(pipe_name, 0)

            _winapi.WriteFile(pipe, b'testdata')
            self.assertEqual(b'testdata', pipe2.read(8))

            self.assertEqual((b'', 0), _winapi.PeekNamedPipe(pipe, 8)[:2])
            pipe2.write(b'testdata')
            pipe2.flush()
            self.assertEqual((b'testdata', 8), _winapi.PeekNamedPipe(pipe, 8)[:2])