1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
|
"""Benchmark some basic import use-cases.
The assumption is made that this benchmark is run in a fresh interpreter and
thus has no external changes made to import-related attributes in sys.
"""
from test.test_importlib import util
import decimal
from importlib.util import cache_from_source
import importlib
import importlib.machinery
import json
import os
import py_compile
import sys
import tabnanny
import timeit
def bench(name, cleanup=lambda: None, *, seconds=1, repeat=3):
"""Bench the given statement as many times as necessary until total
executions take one second."""
stmt = "__import__({!r})".format(name)
timer = timeit.Timer(stmt)
for x in range(repeat):
total_time = 0
count = 0
while total_time < seconds:
try:
total_time += timer.timeit(1)
finally:
cleanup()
count += 1
else:
# One execution too far
if total_time > seconds:
count -= 1
yield count // seconds
def from_cache(seconds, repeat):
"""sys.modules"""
name = '<benchmark import>'
module = imp.new_module(name)
module.__file__ = '<test>'
module.__package__ = ''
with util.uncache(name):
sys.modules[name] = module
yield from bench(name, repeat=repeat, seconds=seconds)
def builtin_mod(seconds, repeat):
"""Built-in module"""
name = 'errno'
if name in sys.modules:
del sys.modules[name]
# Relying on built-in importer being implicit.
yield from bench(name, lambda: sys.modules.pop(name), repeat=repeat,
seconds=seconds)
def source_wo_bytecode(seconds, repeat):
"""Source w/o bytecode: small"""
sys.dont_write_bytecode = True
try:
name = '__importlib_test_benchmark__'
# Clears out sys.modules and puts an entry at the front of sys.path.
with util.create_modules(name) as mapping:
assert not os.path.exists(cache_from_source(mapping[name]))
sys.meta_path.append(importlib.machinery.PathFinder)
loader = (importlib.machinery.SourceFileLoader,
importlib.machinery.SOURCE_SUFFIXES)
sys.path_hooks.append(importlib.machinery.FileFinder.path_hook(loader))
yield from bench(name, lambda: sys.modules.pop(name), repeat=repeat,
seconds=seconds)
finally:
sys.dont_write_bytecode = False
def _wo_bytecode(module):
name = module.__name__
def benchmark_wo_bytecode(seconds, repeat):
"""Source w/o bytecode: {}"""
bytecode_path = cache_from_source(module.__file__)
if os.path.exists(bytecode_path):
os.unlink(bytecode_path)
sys.dont_write_bytecode = True
try:
yield from bench(name, lambda: sys.modules.pop(name),
repeat=repeat, seconds=seconds)
finally:
sys.dont_write_bytecode = False
benchmark_wo_bytecode.__doc__ = benchmark_wo_bytecode.__doc__.format(name)
return benchmark_wo_bytecode
tabnanny_wo_bytecode = _wo_bytecode(tabnanny)
decimal_wo_bytecode = _wo_bytecode(decimal)
def source_writing_bytecode(seconds, repeat):
"""Source writing bytecode: small"""
assert not sys.dont_write_bytecode
name = '__importlib_test_benchmark__'
with util.create_modules(name) as mapping:
sys.meta_path.append(importlib.machinery.PathFinder)
loader = (importlib.machinery.SourceFileLoader,
importlib.machinery.SOURCE_SUFFIXES)
sys.path_hooks.append(importlib.machinery.FileFinder.path_hook(loader))
def cleanup():
sys.modules.pop(name)
os.unlink(cache_from_source(mapping[name]))
for result in bench(name, cleanup, repeat=repeat, seconds=seconds):
assert not os.path.exists(cache_from_source(mapping[name]))
yield result
def _writing_bytecode(module):
name = module.__name__
def writing_bytecode_benchmark(seconds, repeat):
"""Source writing bytecode: {}"""
assert not sys.dont_write_bytecode
def cleanup():
sys.modules.pop(name)
os.unlink(cache_from_source(module.__file__))
yield from bench(name, cleanup, repeat=repeat, seconds=seconds)
writing_bytecode_benchmark.__doc__ = (
writing_bytecode_benchmark.__doc__.format(name))
return writing_bytecode_benchmark
tabnanny_writing_bytecode = _writing_bytecode(tabnanny)
decimal_writing_bytecode = _writing_bytecode(decimal)
def source_using_bytecode(seconds, repeat):
"""Source w/ bytecode: small"""
name = '__importlib_test_benchmark__'
with util.create_modules(name) as mapping:
sys.meta_path.append(importlib.machinery.PathFinder)
loader = (importlib.machinery.SourceFileLoader,
importlib.machinery.SOURCE_SUFFIXES)
sys.path_hooks.append(importlib.machinery.FileFinder.path_hook(loader))
py_compile.compile(mapping[name])
assert os.path.exists(cache_from_source(mapping[name]))
yield from bench(name, lambda: sys.modules.pop(name), repeat=repeat,
seconds=seconds)
def _using_bytecode(module):
name = module.__name__
def using_bytecode_benchmark(seconds, repeat):
"""Source w/ bytecode: {}"""
py_compile.compile(module.__file__)
yield from bench(name, lambda: sys.modules.pop(name), repeat=repeat,
seconds=seconds)
using_bytecode_benchmark.__doc__ = (
using_bytecode_benchmark.__doc__.format(name))
return using_bytecode_benchmark
tabnanny_using_bytecode = _using_bytecode(tabnanny)
decimal_using_bytecode = _using_bytecode(decimal)
def main(import_, options):
if options.source_file:
with options.source_file:
prev_results = json.load(options.source_file)
else:
prev_results = {}
__builtins__.__import__ = import_
benchmarks = (from_cache, builtin_mod,
source_writing_bytecode,
source_wo_bytecode, source_using_bytecode,
tabnanny_writing_bytecode,
tabnanny_wo_bytecode, tabnanny_using_bytecode,
decimal_writing_bytecode,
decimal_wo_bytecode, decimal_using_bytecode,
)
if options.benchmark:
for b in benchmarks:
if b.__doc__ == options.benchmark:
benchmarks = [b]
break
else:
print('Unknown benchmark: {!r}'.format(options.benchmark),
file=sys.stderr)
sys.exit(1)
seconds = 1
seconds_plural = 's' if seconds > 1 else ''
repeat = 3
header = ('Measuring imports/second over {} second{}, best out of {}\n'
'Entire benchmark run should take about {} seconds\n'
'Using {!r} as __import__\n')
print(header.format(seconds, seconds_plural, repeat,
len(benchmarks) * seconds * repeat, __import__))
new_results = {}
for benchmark in benchmarks:
print(benchmark.__doc__, "[", end=' ')
sys.stdout.flush()
results = []
for result in benchmark(seconds=seconds, repeat=repeat):
results.append(result)
print(result, end=' ')
sys.stdout.flush()
assert not sys.dont_write_bytecode
print("]", "best is", format(max(results), ',d'))
new_results[benchmark.__doc__] = results
if prev_results:
print('\n\nComparing new vs. old\n')
for benchmark in benchmarks:
benchmark_name = benchmark.__doc__
old_result = max(prev_results[benchmark_name])
new_result = max(new_results[benchmark_name])
result = '{:,d} vs. {:,d} ({:%})'.format(new_result,
old_result,
new_result/old_result)
print(benchmark_name, ':', result)
if options.dest_file:
with options.dest_file:
json.dump(new_results, options.dest_file, indent=2)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-b', '--builtin', dest='builtin', action='store_true',
default=False, help="use the built-in __import__")
parser.add_argument('-r', '--read', dest='source_file',
type=argparse.FileType('r'),
help='file to read benchmark data from to compare '
'against')
parser.add_argument('-w', '--write', dest='dest_file',
type=argparse.FileType('w'),
help='file to write benchmark data to')
parser.add_argument('--benchmark', dest='benchmark',
help='specific benchmark to run')
options = parser.parse_args()
import_ = __import__
if not options.builtin:
import_ = importlib.__import__
main(import_, options)
|