1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
# Test case for the select.devpoll() function
# Initial tests are copied as is from "test_poll.py"
import os, select, random, unittest, sys
from test.support import TESTFN, run_unittest, cpython_only
try:
select.devpoll
except AttributeError:
raise unittest.SkipTest("select.devpoll not defined -- skipping test_devpoll")
def find_ready_matching(ready, flag):
match = []
for fd, mode in ready:
if mode & flag:
match.append(fd)
return match
class DevPollTests(unittest.TestCase):
def test_devpoll1(self):
# Basic functional test of poll object
# Create a bunch of pipe and test that poll works with them.
p = select.devpoll()
NUM_PIPES = 12
MSG = b" This is a test."
MSG_LEN = len(MSG)
readers = []
writers = []
r2w = {}
w2r = {}
for i in range(NUM_PIPES):
rd, wr = os.pipe()
p.register(rd)
p.modify(rd, select.POLLIN)
p.register(wr, select.POLLOUT)
readers.append(rd)
writers.append(wr)
r2w[rd] = wr
w2r[wr] = rd
bufs = []
while writers:
ready = p.poll()
ready_writers = find_ready_matching(ready, select.POLLOUT)
if not ready_writers:
self.fail("no pipes ready for writing")
wr = random.choice(ready_writers)
os.write(wr, MSG)
ready = p.poll()
ready_readers = find_ready_matching(ready, select.POLLIN)
if not ready_readers:
self.fail("no pipes ready for reading")
self.assertEqual([w2r[wr]], ready_readers)
rd = ready_readers[0]
buf = os.read(rd, MSG_LEN)
self.assertEqual(len(buf), MSG_LEN)
bufs.append(buf)
os.close(r2w[rd]) ; os.close(rd)
p.unregister(r2w[rd])
p.unregister(rd)
writers.remove(r2w[rd])
self.assertEqual(bufs, [MSG] * NUM_PIPES)
def test_timeout_overflow(self):
pollster = select.devpoll()
w, r = os.pipe()
pollster.register(w)
pollster.poll(-1)
self.assertRaises(OverflowError, pollster.poll, -2)
self.assertRaises(OverflowError, pollster.poll, -1 << 31)
self.assertRaises(OverflowError, pollster.poll, -1 << 64)
pollster.poll(0)
pollster.poll(1)
pollster.poll(1 << 30)
self.assertRaises(OverflowError, pollster.poll, 1 << 31)
self.assertRaises(OverflowError, pollster.poll, 1 << 63)
self.assertRaises(OverflowError, pollster.poll, 1 << 64)
def test_events_mask_overflow(self):
pollster = select.devpoll()
w, r = os.pipe()
pollster.register(w)
# Issue #17919
self.assertRaises(OverflowError, pollster.register, 0, -1)
self.assertRaises(OverflowError, pollster.register, 0, 1 << 64)
self.assertRaises(OverflowError, pollster.modify, 1, -1)
self.assertRaises(OverflowError, pollster.modify, 1, 1 << 64)
@cpython_only
def test_events_mask_overflow_c_limits(self):
from _testcapi import USHRT_MAX
pollster = select.devpoll()
w, r = os.pipe()
pollster.register(w)
# Issue #17919
self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1)
self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1)
def test_main():
run_unittest(DevPollTests)
if __name__ == '__main__':
test_main()
|