1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
# Test the signal module
from test.test_support import verbose, TestSkipped, TestFailed, vereq
import signal
import os, sys, time
if sys.platform[:3] in ('win', 'os2') or sys.platform=='riscos':
raise TestSkipped, "Can't test signal on %s" % sys.platform
if verbose:
x = '-x'
else:
x = '+x'
pid = os.getpid()
if verbose:
print "test runner's pid is", pid
# Shell script that will send us asynchronous signals
script = """
(
set %(x)s
sleep 2
kill -HUP %(pid)d
sleep 2
kill -USR1 %(pid)d
sleep 2
kill -USR2 %(pid)d
) &
""" % vars()
a_called = b_called = False
def handlerA(*args):
global a_called
a_called = True
if verbose:
print "handlerA", args
class HandlerBCalled(Exception):
pass
def handlerB(*args):
global b_called
b_called = True
if verbose:
print "handlerB", args
raise HandlerBCalled, args
MAX_DURATION = 20
signal.alarm(MAX_DURATION) # Entire test should last at most 20 sec.
hup = signal.signal(signal.SIGHUP, handlerA)
usr1 = signal.signal(signal.SIGUSR1, handlerB)
usr2 = signal.signal(signal.SIGUSR2, signal.SIG_IGN)
alrm = signal.signal(signal.SIGALRM, signal.default_int_handler)
vereq(signal.getsignal(signal.SIGHUP), handlerA)
vereq(signal.getsignal(signal.SIGUSR1), handlerB)
vereq(signal.getsignal(signal.SIGUSR2), signal.SIG_IGN)
try:
signal.signal(4242, handlerB)
raise TestFailed, 'expected ValueError for invalid signal # to signal()'
except ValueError:
pass
try:
signal.getsignal(4242)
raise TestFailed, 'expected ValueError for invalid signal # to getsignal()'
except ValueError:
pass
try:
signal.signal(signal.SIGUSR1, None)
raise TestFailed, 'expected TypeError for non-callable'
except TypeError:
pass
# Set up a child to send an alarm signal to us (the parent) after waiting
# long enough to receive the alarm. It seems we miss the alarm for some
# reason. This will hopefully stop the hangs on Tru64/Alpha.
def force_test_exit():
# Sigh, both imports seem necessary to avoid errors.
import os
fork_pid = os.fork()
if fork_pid == 0:
# In child
import os, time
try:
# Wait 5 seconds longer than the expected alarm to give enough
# time for the normal sequence of events to occur. This is
# just a stop-gap to prevent the test from hanging.
time.sleep(MAX_DURATION + 5)
print >> sys.__stdout__, ' child should not have to kill parent'
for i in range(3):
os.kill(pid, signal.SIGALRM)
print >> sys.__stdout__, " child sent SIGALRM to", pid
finally:
os._exit(0)
# In parent (or error)
return fork_pid
try:
os.system(script)
# Try to ensure this test exits even if there is some problem with alarm.
# Tru64/Alpha sometimes hangs and is ultimately killed by the buildbot.
fork_pid = force_test_exit()
print "starting pause() loop..."
try:
while 1:
if verbose:
print "call pause()..."
try:
signal.pause()
if verbose:
print "pause() returned"
except HandlerBCalled:
if verbose:
print "HandlerBCalled exception caught"
else:
pass
except KeyboardInterrupt:
if verbose:
print "KeyboardInterrupt (assume the alarm() went off)"
# Forcibly kill the child we created to ping us if there was a test error.
try:
# Make sure we don't kill ourself if there was a fork error.
if fork_pid > 0:
os.kill(fork_pid, signal.SIGKILL)
except:
# If the child killed us, it has probably exited. Killing a
# non-existant process will raise an error which we don't care about.
pass
if not a_called:
print 'HandlerA not called'
if not b_called:
print 'HandlerB not called'
finally:
signal.signal(signal.SIGHUP, hup)
signal.signal(signal.SIGUSR1, usr1)
signal.signal(signal.SIGUSR2, usr2)
signal.signal(signal.SIGALRM, alrm)
|