1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
|
"""Utilities for with-statement contexts. See PEP 343."""
import sys
from collections import deque
from functools import wraps
__all__ = ["contextmanager", "closing", "ContextDecorator", "ExitStack"]
class ContextDecorator(object):
"A base class or mixin that enables context managers to work as decorators."
def _recreate_cm(self):
"""Return a recreated instance of self.
Allows an otherwise one-shot context manager like
_GeneratorContextManager to support use as
a decorator via implicit recreation.
This is a private interface just for _GeneratorContextManager.
See issue #11647 for details.
"""
return self
def __call__(self, func):
@wraps(func)
def inner(*args, **kwds):
with self._recreate_cm():
return func(*args, **kwds)
return inner
class _GeneratorContextManager(ContextDecorator):
"""Helper for @contextmanager decorator."""
def __init__(self, func, *args, **kwds):
self.gen = func(*args, **kwds)
self.func, self.args, self.kwds = func, args, kwds
def _recreate_cm(self):
# _GCM instances are one-shot context managers, so the
# CM must be recreated each time a decorated function is
# called
return self.__class__(self.func, *self.args, **self.kwds)
def __enter__(self):
try:
return next(self.gen)
except StopIteration:
raise RuntimeError("generator didn't yield")
def __exit__(self, type, value, traceback):
if type is None:
try:
next(self.gen)
except StopIteration:
return
else:
raise RuntimeError("generator didn't stop")
else:
if value is None:
# Need to force instantiation so we can reliably
# tell if we get the same exception back
value = type()
try:
self.gen.throw(type, value, traceback)
raise RuntimeError("generator didn't stop after throw()")
except StopIteration as exc:
# Suppress the exception *unless* it's the same exception that
# was passed to throw(). This prevents a StopIteration
# raised inside the "with" statement from being suppressed
return exc is not value
except:
# only re-raise if it's *not* the exception that was
# passed to throw(), because __exit__() must not raise
# an exception unless __exit__() itself failed. But throw()
# has to raise the exception to signal propagation, so this
# fixes the impedance mismatch between the throw() protocol
# and the __exit__() protocol.
#
if sys.exc_info()[1] is not value:
raise
def contextmanager(func):
"""@contextmanager decorator.
Typical usage:
@contextmanager
def some_generator(<arguments>):
<setup>
try:
yield <value>
finally:
<cleanup>
This makes this:
with some_generator(<arguments>) as <variable>:
<body>
equivalent to this:
<setup>
try:
<variable> = <value>
<body>
finally:
<cleanup>
"""
@wraps(func)
def helper(*args, **kwds):
return _GeneratorContextManager(func, *args, **kwds)
return helper
class closing(object):
"""Context to automatically close something at the end of a block.
Code like this:
with closing(<module>.open(<arguments>)) as f:
<block>
is equivalent to this:
f = <module>.open(<arguments>)
try:
<block>
finally:
f.close()
"""
def __init__(self, thing):
self.thing = thing
def __enter__(self):
return self.thing
def __exit__(self, *exc_info):
self.thing.close()
# Inspired by discussions on http://bugs.python.org/issue13585
class ExitStack(object):
"""Context manager for dynamic management of a stack of exit callbacks
For example:
with ExitStack() as stack:
files = [stack.enter_context(open(fname)) for fname in filenames]
# All opened files will automatically be closed at the end of
# the with statement, even if attempts to open files later
# in the list throw an exception
"""
def __init__(self):
self._exit_callbacks = deque()
def pop_all(self):
"""Preserve the context stack by transferring it to a new instance"""
new_stack = type(self)()
new_stack._exit_callbacks = self._exit_callbacks
self._exit_callbacks = deque()
return new_stack
def _push_cm_exit(self, cm, cm_exit):
"""Helper to correctly register callbacks to __exit__ methods"""
def _exit_wrapper(*exc_details):
return cm_exit(cm, *exc_details)
_exit_wrapper.__self__ = cm
self.push(_exit_wrapper)
def push(self, exit):
"""Registers a callback with the standard __exit__ method signature
Can suppress exceptions the same way __exit__ methods can.
Also accepts any object with an __exit__ method (registering a call
to the method instead of the object itself)
"""
# We use an unbound method rather than a bound method to follow
# the standard lookup behaviour for special methods
_cb_type = type(exit)
try:
exit_method = _cb_type.__exit__
except AttributeError:
# Not a context manager, so assume its a callable
self._exit_callbacks.append(exit)
else:
self._push_cm_exit(exit, exit_method)
return exit # Allow use as a decorator
def callback(self, callback, *args, **kwds):
"""Registers an arbitrary callback and arguments.
Cannot suppress exceptions.
"""
def _exit_wrapper(exc_type, exc, tb):
callback(*args, **kwds)
# We changed the signature, so using @wraps is not appropriate, but
# setting __wrapped__ may still help with introspection
_exit_wrapper.__wrapped__ = callback
self.push(_exit_wrapper)
return callback # Allow use as a decorator
def enter_context(self, cm):
"""Enters the supplied context manager
If successful, also pushes its __exit__ method as a callback and
returns the result of the __enter__ method.
"""
# We look up the special methods on the type to match the with statement
_cm_type = type(cm)
_exit = _cm_type.__exit__
result = _cm_type.__enter__(cm)
self._push_cm_exit(cm, _exit)
return result
def close(self):
"""Immediately unwind the context stack"""
self.__exit__(None, None, None)
def __enter__(self):
return self
def __exit__(self, *exc_details):
# We manipulate the exception state so it behaves as though
# we were actually nesting multiple with statements
frame_exc = sys.exc_info()[1]
def _fix_exception_context(new_exc, old_exc):
while 1:
exc_context = new_exc.__context__
if exc_context in (None, frame_exc):
break
new_exc = exc_context
new_exc.__context__ = old_exc
# Callbacks are invoked in LIFO order to match the behaviour of
# nested context managers
suppressed_exc = False
while self._exit_callbacks:
cb = self._exit_callbacks.pop()
try:
if cb(*exc_details):
suppressed_exc = True
exc_details = (None, None, None)
except:
new_exc_details = sys.exc_info()
# simulate the stack of exceptions by setting the context
_fix_exception_context(new_exc_details[1], exc_details[1])
if not self._exit_callbacks:
raise
exc_details = new_exc_details
return suppressed_exc
|