1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
|
"""Python part of the warnings subsystem."""
import sys
__all__ = ["warn", "warn_explicit", "showwarning",
"formatwarning", "filterwarnings", "simplefilter",
"resetwarnings", "catch_warnings"]
def showwarning(message, category, filename, lineno, file=None, line=None):
"""Hook to write a warning to a file; replace if you like."""
msg = WarningMessage(message, category, filename, lineno, file, line)
_showwarnmsg_impl(msg)
def formatwarning(message, category, filename, lineno, line=None):
"""Function to format a warning the standard way."""
msg = WarningMessage(message, category, filename, lineno, None, line)
return _formatwarnmsg_impl(msg)
def _showwarnmsg_impl(msg):
file = msg.file
if file is None:
file = sys.stderr
if file is None:
# sys.stderr is None when run with pythonw.exe:
# warnings get lost
return
text = _formatwarnmsg(msg)
try:
file.write(text)
except OSError:
# the file (probably stderr) is invalid - this warning gets lost.
pass
def _formatwarnmsg_impl(msg):
category = msg.category.__name__
s = f"{msg.filename}:{msg.lineno}: {category}: {msg.message}\n"
if msg.line is None:
try:
import linecache
line = linecache.getline(msg.filename, msg.lineno)
except Exception:
# When a warning is logged during Python shutdown, linecache
# and the import machinery don't work anymore
line = None
linecache = None
else:
line = msg.line
if line:
line = line.strip()
s += " %s\n" % line
if msg.source is not None:
try:
import tracemalloc
# Logging a warning should not raise a new exception:
# catch Exception, not only ImportError and RecursionError.
except Exception:
# don't suggest to enable tracemalloc if it's not available
tracing = True
tb = None
else:
tracing = tracemalloc.is_tracing()
try:
tb = tracemalloc.get_object_traceback(msg.source)
except Exception:
# When a warning is logged during Python shutdown, tracemalloc
# and the import machinery don't work anymore
tb = None
if tb is not None:
s += 'Object allocated at (most recent call last):\n'
for frame in tb:
s += (' File "%s", lineno %s\n'
% (frame.filename, frame.lineno))
try:
if linecache is not None:
line = linecache.getline(frame.filename, frame.lineno)
else:
line = None
except Exception:
line = None
if line:
line = line.strip()
s += ' %s\n' % line
elif not tracing:
s += (f'{category}: Enable tracemalloc to get the object '
f'allocation traceback\n')
return s
# Keep a reference to check if the function was replaced
_showwarning_orig = showwarning
def _showwarnmsg(msg):
"""Hook to write a warning to a file; replace if you like."""
try:
sw = showwarning
except NameError:
pass
else:
if sw is not _showwarning_orig:
# warnings.showwarning() was replaced
if not callable(sw):
raise TypeError("warnings.showwarning() must be set to a "
"function or method")
sw(msg.message, msg.category, msg.filename, msg.lineno,
msg.file, msg.line)
return
_showwarnmsg_impl(msg)
# Keep a reference to check if the function was replaced
_formatwarning_orig = formatwarning
def _formatwarnmsg(msg):
"""Function to format a warning the standard way."""
try:
fw = formatwarning
except NameError:
pass
else:
if fw is not _formatwarning_orig:
# warnings.formatwarning() was replaced
return fw(msg.message, msg.category,
msg.filename, msg.lineno, line=msg.line)
return _formatwarnmsg_impl(msg)
def filterwarnings(action, message="", category=Warning, module="", lineno=0,
append=False):
"""Insert an entry into the list of warnings filters (at the front).
'action' -- one of "error", "ignore", "always", "default", "module",
or "once"
'message' -- a regex that the warning message must match
'category' -- a class that the warning must be a subclass of
'module' -- a regex that the module name must match
'lineno' -- an integer line number, 0 matches all warnings
'append' -- if true, append to the list of filters
"""
assert action in ("error", "ignore", "always", "default", "module",
"once"), "invalid action: %r" % (action,)
assert isinstance(message, str), "message must be a string"
assert isinstance(category, type), "category must be a class"
assert issubclass(category, Warning), "category must be a Warning subclass"
assert isinstance(module, str), "module must be a string"
assert isinstance(lineno, int) and lineno >= 0, \
"lineno must be an int >= 0"
if message or module:
import re
if message:
message = re.compile(message, re.I)
else:
message = None
if module:
module = re.compile(module)
else:
module = None
_add_filter(action, message, category, module, lineno, append=append)
def simplefilter(action, category=Warning, lineno=0, append=False):
"""Insert a simple entry into the list of warnings filters (at the front).
A simple filter matches all modules and messages.
'action' -- one of "error", "ignore", "always", "default", "module",
or "once"
'category' -- a class that the warning must be a subclass of
'lineno' -- an integer line number, 0 matches all warnings
'append' -- if true, append to the list of filters
"""
assert action in ("error", "ignore", "always", "default", "module",
"once"), "invalid action: %r" % (action,)
assert isinstance(lineno, int) and lineno >= 0, \
"lineno must be an int >= 0"
_add_filter(action, None, category, None, lineno, append=append)
def _add_filter(*item, append):
# Remove possible duplicate filters, so new one will be placed
# in correct place. If append=True and duplicate exists, do nothing.
if not append:
try:
filters.remove(item)
except ValueError:
pass
filters.insert(0, item)
else:
if item not in filters:
filters.append(item)
_filters_mutated()
def resetwarnings():
"""Clear the list of warning filters, so that no filters are active."""
filters[:] = []
_filters_mutated()
class _OptionError(Exception):
"""Exception used by option processing helpers."""
pass
# Helper to process -W options passed via sys.warnoptions
def _processoptions(args):
for arg in args:
try:
_setoption(arg)
except _OptionError as msg:
print("Invalid -W option ignored:", msg, file=sys.stderr)
# Helper for _processoptions()
def _setoption(arg):
import re
parts = arg.split(':')
if len(parts) > 5:
raise _OptionError("too many fields (max 5): %r" % (arg,))
while len(parts) < 5:
parts.append('')
action, message, category, module, lineno = [s.strip()
for s in parts]
action = _getaction(action)
message = re.escape(message)
category = _getcategory(category)
module = re.escape(module)
if module:
module = module + '$'
if lineno:
try:
lineno = int(lineno)
if lineno < 0:
raise ValueError
except (ValueError, OverflowError):
raise _OptionError("invalid lineno %r" % (lineno,)) from None
else:
lineno = 0
filterwarnings(action, message, category, module, lineno)
# Helper for _setoption()
def _getaction(action):
if not action:
return "default"
if action == "all": return "always" # Alias
for a in ('default', 'always', 'ignore', 'module', 'once', 'error'):
if a.startswith(action):
return a
raise _OptionError("invalid action: %r" % (action,))
# Helper for _setoption()
def _getcategory(category):
import re
if not category:
return Warning
if re.match("^[a-zA-Z0-9_]+$", category):
try:
cat = eval(category)
except NameError:
raise _OptionError("unknown warning category: %r" % (category,)) from None
else:
i = category.rfind(".")
module = category[:i]
klass = category[i+1:]
try:
m = __import__(module, None, None, [klass])
except ImportError:
raise _OptionError("invalid module name: %r" % (module,)) from None
try:
cat = getattr(m, klass)
except AttributeError:
raise _OptionError("unknown warning category: %r" % (category,)) from None
if not issubclass(cat, Warning):
raise _OptionError("invalid warning category: %r" % (category,))
return cat
def _is_internal_frame(frame):
"""Signal whether the frame is an internal CPython implementation detail."""
filename = frame.f_code.co_filename
return 'importlib' in filename and '_bootstrap' in filename
def _next_external_frame(frame):
"""Find the next frame that doesn't involve CPython internals."""
frame = frame.f_back
while frame is not None and _is_internal_frame(frame):
frame = frame.f_back
return frame
# Code typically replaced by _warnings
def warn(message, category=None, stacklevel=1, source=None):
"""Issue a warning, or maybe ignore it or raise an exception."""
# Check if message is already a Warning object
if isinstance(message, Warning):
category = message.__class__
# Check category argument
if category is None:
category = UserWarning
if not (isinstance(category, type) and issubclass(category, Warning)):
raise TypeError("category must be a Warning subclass, "
"not '{:s}'".format(type(category).__name__))
# Get context information
try:
if stacklevel <= 1 or _is_internal_frame(sys._getframe(1)):
# If frame is too small to care or if the warning originated in
# internal code, then do not try to hide any frames.
frame = sys._getframe(stacklevel)
else:
frame = sys._getframe(1)
# Look for one frame less since the above line starts us off.
for x in range(stacklevel-1):
frame = _next_external_frame(frame)
if frame is None:
raise ValueError
except ValueError:
globals = sys.__dict__
filename = "sys"
lineno = 1
else:
globals = frame.f_globals
filename = frame.f_code.co_filename
lineno = frame.f_lineno
if '__name__' in globals:
module = globals['__name__']
else:
module = "<string>"
registry = globals.setdefault("__warningregistry__", {})
warn_explicit(message, category, filename, lineno, module, registry,
globals, source)
def warn_explicit(message, category, filename, lineno,
module=None, registry=None, module_globals=None,
source=None):
lineno = int(lineno)
if module is None:
module = filename or "<unknown>"
if module[-3:].lower() == ".py":
module = module[:-3] # XXX What about leading pathname?
if registry is None:
registry = {}
if registry.get('version', 0) != _filters_version:
registry.clear()
registry['version'] = _filters_version
if isinstance(message, Warning):
text = str(message)
category = message.__class__
else:
text = message
message = category(message)
key = (text, category, lineno)
# Quick test for common case
if registry.get(key):
return
# Search the filters
for item in filters:
action, msg, cat, mod, ln = item
if ((msg is None or msg.match(text)) and
issubclass(category, cat) and
(mod is None or mod.match(module)) and
(ln == 0 or lineno == ln)):
break
else:
action = defaultaction
# Early exit actions
if action == "ignore":
return
# Prime the linecache for formatting, in case the
# "file" is actually in a zipfile or something.
import linecache
linecache.getlines(filename, module_globals)
if action == "error":
raise message
# Other actions
if action == "once":
registry[key] = 1
oncekey = (text, category)
if onceregistry.get(oncekey):
return
onceregistry[oncekey] = 1
elif action == "always":
pass
elif action == "module":
registry[key] = 1
altkey = (text, category, 0)
if registry.get(altkey):
return
registry[altkey] = 1
elif action == "default":
registry[key] = 1
else:
# Unrecognized actions are errors
raise RuntimeError(
"Unrecognized action (%r) in warnings.filters:\n %s" %
(action, item))
# Print message and context
msg = WarningMessage(message, category, filename, lineno, source)
_showwarnmsg(msg)
class WarningMessage(object):
_WARNING_DETAILS = ("message", "category", "filename", "lineno", "file",
"line", "source")
def __init__(self, message, category, filename, lineno, file=None,
line=None, source=None):
self.message = message
self.category = category
self.filename = filename
self.lineno = lineno
self.file = file
self.line = line
self.source = source
self._category_name = category.__name__ if category else None
def __str__(self):
return ("{message : %r, category : %r, filename : %r, lineno : %s, "
"line : %r}" % (self.message, self._category_name,
self.filename, self.lineno, self.line))
class catch_warnings(object):
"""A context manager that copies and restores the warnings filter upon
exiting the context.
The 'record' argument specifies whether warnings should be captured by a
custom implementation of warnings.showwarning() and be appended to a list
returned by the context manager. Otherwise None is returned by the context
manager. The objects appended to the list are arguments whose attributes
mirror the arguments to showwarning().
The 'module' argument is to specify an alternative module to the module
named 'warnings' and imported under that name. This argument is only useful
when testing the warnings module itself.
"""
def __init__(self, *, record=False, module=None):
"""Specify whether to record warnings and if an alternative module
should be used other than sys.modules['warnings'].
For compatibility with Python 3.0, please consider all arguments to be
keyword-only.
"""
self._record = record
self._module = sys.modules['warnings'] if module is None else module
self._entered = False
def __repr__(self):
args = []
if self._record:
args.append("record=True")
if self._module is not sys.modules['warnings']:
args.append("module=%r" % self._module)
name = type(self).__name__
return "%s(%s)" % (name, ", ".join(args))
def __enter__(self):
if self._entered:
raise RuntimeError("Cannot enter %r twice" % self)
self._entered = True
self._filters = self._module.filters
self._module.filters = self._filters[:]
self._module._filters_mutated()
self._showwarning = self._module.showwarning
self._showwarnmsg_impl = self._module._showwarnmsg_impl
if self._record:
log = []
self._module._showwarnmsg_impl = log.append
# Reset showwarning() to the default implementation to make sure
# that _showwarnmsg() calls _showwarnmsg_impl()
self._module.showwarning = self._module._showwarning_orig
return log
else:
return None
def __exit__(self, *exc_info):
if not self._entered:
raise RuntimeError("Cannot exit %r without entering first" % self)
self._module.filters = self._filters
self._module._filters_mutated()
self._module.showwarning = self._showwarning
self._module._showwarnmsg_impl = self._showwarnmsg_impl
# Private utility function called by _PyErr_WarnUnawaitedCoroutine
def _warn_unawaited_coroutine(coro):
msg_lines = [
f"coroutine '{coro.__qualname__}' was never awaited\n"
]
if coro.cr_origin is not None:
import linecache, traceback
def extract():
for filename, lineno, funcname in reversed(coro.cr_origin):
line = linecache.getline(filename, lineno)
yield (filename, lineno, funcname, line)
msg_lines.append("Coroutine created at (most recent call last)\n")
msg_lines += traceback.format_list(list(extract()))
msg = "".join(msg_lines).rstrip("\n")
# Passing source= here means that if the user happens to have tracemalloc
# enabled and tracking where the coroutine was created, the warning will
# contain that traceback. This does mean that if they have *both*
# coroutine origin tracking *and* tracemalloc enabled, they'll get two
# partially-redundant tracebacks. If we wanted to be clever we could
# probably detect this case and avoid it, but for now we don't bother.
warn(msg, category=RuntimeWarning, stacklevel=2, source=coro)
# filters contains a sequence of filter 5-tuples
# The components of the 5-tuple are:
# - an action: error, ignore, always, default, module, or once
# - a compiled regex that must match the warning message
# - a class representing the warning category
# - a compiled regex that must match the module that is being warned
# - a line number for the line being warning, or 0 to mean any line
# If either if the compiled regexs are None, match anything.
try:
from _warnings import (filters, _defaultaction, _onceregistry,
warn, warn_explicit, _filters_mutated)
defaultaction = _defaultaction
onceregistry = _onceregistry
_warnings_defaults = True
except ImportError:
filters = []
defaultaction = "default"
onceregistry = {}
_filters_version = 1
def _filters_mutated():
global _filters_version
_filters_version += 1
_warnings_defaults = False
# Module initialization
_processoptions(sys.warnoptions)
if not _warnings_defaults:
# Several warning categories are ignored by default in regular builds
if not hasattr(sys, 'gettotalrefcount'):
filterwarnings("default", category=DeprecationWarning,
module="__main__", append=1)
simplefilter("ignore", category=DeprecationWarning, append=1)
simplefilter("ignore", category=PendingDeprecationWarning, append=1)
simplefilter("ignore", category=ImportWarning, append=1)
simplefilter("ignore", category=ResourceWarning, append=1)
del _warnings_defaults
|