1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
|
import _dummy_thread as _thread
import time
import queue
import random
import unittest
from test import support
from unittest import mock
DELAY = 0
class LockTests(unittest.TestCase):
"""Test lock objects."""
def setUp(self):
# Create a lock
self.lock = _thread.allocate_lock()
def test_initlock(self):
#Make sure locks start locked
self.assertFalse(self.lock.locked(),
"Lock object is not initialized unlocked.")
def test_release(self):
# Test self.lock.release()
self.lock.acquire()
self.lock.release()
self.assertFalse(self.lock.locked(),
"Lock object did not release properly.")
def test_LockType_context_manager(self):
with _thread.LockType():
pass
self.assertFalse(self.lock.locked(),
"Acquired Lock was not released")
def test_improper_release(self):
#Make sure release of an unlocked thread raises RuntimeError
self.assertRaises(RuntimeError, self.lock.release)
def test_cond_acquire_success(self):
#Make sure the conditional acquiring of the lock works.
self.assertTrue(self.lock.acquire(0),
"Conditional acquiring of the lock failed.")
def test_cond_acquire_fail(self):
#Test acquiring locked lock returns False
self.lock.acquire(0)
self.assertFalse(self.lock.acquire(0),
"Conditional acquiring of a locked lock incorrectly "
"succeeded.")
def test_uncond_acquire_success(self):
#Make sure unconditional acquiring of a lock works.
self.lock.acquire()
self.assertTrue(self.lock.locked(),
"Uncondional locking failed.")
def test_uncond_acquire_return_val(self):
#Make sure that an unconditional locking returns True.
self.assertIs(self.lock.acquire(1), True,
"Unconditional locking did not return True.")
self.assertIs(self.lock.acquire(), True)
def test_uncond_acquire_blocking(self):
#Make sure that unconditional acquiring of a locked lock blocks.
def delay_unlock(to_unlock, delay):
"""Hold on to lock for a set amount of time before unlocking."""
time.sleep(delay)
to_unlock.release()
self.lock.acquire()
start_time = int(time.time())
_thread.start_new_thread(delay_unlock,(self.lock, DELAY))
if support.verbose:
print()
print("*** Waiting for thread to release the lock "\
"(approx. %s sec.) ***" % DELAY)
self.lock.acquire()
end_time = int(time.time())
if support.verbose:
print("done")
self.assertGreaterEqual(end_time - start_time, DELAY,
"Blocking by unconditional acquiring failed.")
@mock.patch('time.sleep')
def test_acquire_timeout(self, mock_sleep):
"""Test invoking acquire() with a positive timeout when the lock is
already acquired. Ensure that time.sleep() is invoked with the given
timeout and that False is returned."""
self.lock.acquire()
retval = self.lock.acquire(waitflag=0, timeout=1)
self.assertTrue(mock_sleep.called)
mock_sleep.assert_called_once_with(1)
self.assertEqual(retval, False)
def test_lock_representation(self):
self.lock.acquire()
self.assertIn("locked", repr(self.lock))
self.lock.release()
self.assertIn("unlocked", repr(self.lock))
class MiscTests(unittest.TestCase):
"""Miscellaneous tests."""
def test_exit(self):
self.assertRaises(SystemExit, _thread.exit)
def test_ident(self):
self.assertIsInstance(_thread.get_ident(), int,
"_thread.get_ident() returned a non-integer")
self.assertNotEqual(_thread.get_ident(), 0,
"_thread.get_ident() returned 0")
def test_LockType(self):
self.assertIsInstance(_thread.allocate_lock(), _thread.LockType,
"_thread.LockType is not an instance of what "
"is returned by _thread.allocate_lock()")
def test_set_sentinel(self):
self.assertIsInstance(_thread._set_sentinel(), _thread.LockType,
"_thread._set_sentinel() did not return a "
"LockType instance.")
def test_interrupt_main(self):
#Calling start_new_thread with a function that executes interrupt_main
# should raise KeyboardInterrupt upon completion.
def call_interrupt():
_thread.interrupt_main()
self.assertRaises(KeyboardInterrupt,
_thread.start_new_thread,
call_interrupt,
tuple())
def test_interrupt_in_main(self):
self.assertRaises(KeyboardInterrupt, _thread.interrupt_main)
def test_stack_size_None(self):
retval = _thread.stack_size(None)
self.assertEqual(retval, 0)
def test_stack_size_not_None(self):
with self.assertRaises(_thread.error) as cm:
_thread.stack_size("")
self.assertEqual(cm.exception.args[0],
"setting thread stack size not supported")
class ThreadTests(unittest.TestCase):
"""Test thread creation."""
def test_arg_passing(self):
#Make sure that parameter passing works.
def arg_tester(queue, arg1=False, arg2=False):
"""Use to test _thread.start_new_thread() passes args properly."""
queue.put((arg1, arg2))
testing_queue = queue.Queue(1)
_thread.start_new_thread(arg_tester, (testing_queue, True, True))
result = testing_queue.get()
self.assertTrue(result[0] and result[1],
"Argument passing for thread creation "
"using tuple failed")
_thread.start_new_thread(
arg_tester,
tuple(),
{'queue':testing_queue, 'arg1':True, 'arg2':True})
result = testing_queue.get()
self.assertTrue(result[0] and result[1],
"Argument passing for thread creation "
"using kwargs failed")
_thread.start_new_thread(
arg_tester,
(testing_queue, True),
{'arg2':True})
result = testing_queue.get()
self.assertTrue(result[0] and result[1],
"Argument passing for thread creation using both tuple"
" and kwargs failed")
def test_multi_thread_creation(self):
def queue_mark(queue, delay):
time.sleep(delay)
queue.put(_thread.get_ident())
thread_count = 5
testing_queue = queue.Queue(thread_count)
if support.verbose:
print()
print("*** Testing multiple thread creation "
"(will take approx. %s to %s sec.) ***" % (
DELAY, thread_count))
for count in range(thread_count):
if DELAY:
local_delay = round(random.random(), 1)
else:
local_delay = 0
_thread.start_new_thread(queue_mark,
(testing_queue, local_delay))
time.sleep(DELAY)
if support.verbose:
print('done')
self.assertEqual(testing_queue.qsize(), thread_count,
"Not all %s threads executed properly "
"after %s sec." % (thread_count, DELAY))
def test_args_not_tuple(self):
"""
Test invoking start_new_thread() with a non-tuple value for "args".
Expect TypeError with a meaningful error message to be raised.
"""
with self.assertRaises(TypeError) as cm:
_thread.start_new_thread(mock.Mock(), [])
self.assertEqual(cm.exception.args[0], "2nd arg must be a tuple")
def test_kwargs_not_dict(self):
"""
Test invoking start_new_thread() with a non-dict value for "kwargs".
Expect TypeError with a meaningful error message to be raised.
"""
with self.assertRaises(TypeError) as cm:
_thread.start_new_thread(mock.Mock(), tuple(), kwargs=[])
self.assertEqual(cm.exception.args[0], "3rd arg must be a dict")
def test_SystemExit(self):
"""
Test invoking start_new_thread() with a function that raises
SystemExit.
The exception should be discarded.
"""
func = mock.Mock(side_effect=SystemExit())
try:
_thread.start_new_thread(func, tuple())
except SystemExit:
self.fail("start_new_thread raised SystemExit.")
@mock.patch('traceback.print_exc')
def test_RaiseException(self, mock_print_exc):
"""
Test invoking start_new_thread() with a function that raises exception.
The exception should be discarded and the traceback should be printed
via traceback.print_exc()
"""
func = mock.Mock(side_effect=Exception)
_thread.start_new_thread(func, tuple())
self.assertTrue(mock_print_exc.called)
|